关于算法:二-初始化Server

44次阅读

共计 34198 个字符,预计需要花费 86 分钟才能阅读完成。

二 初始化 Server

hello,又到了本期的博客了,这一期我将会给大家介绍启动时 redis 是如何初始化网络状态的,大家一起高兴的学习吧!!

先看一看初始化 server 在 main 函数被调用的代码:

int main(int argc char * argv[])
{loadServerConfig(server.configfile, config_from_stdin, options);
     /*
         *****
     */
    initServer();
   /*
           ****
     */
}

当将配置文件加载到全局变量 server 中时,这时 redis 就会依据配置文件中的内容去初始化服务端状态了,server 在全局中的定义如下:

/* Global vars */
struct redisServer server; /* Server global state */

接下来咱们来看看初始化 server 具体干了哪些事件。

1 信号

     signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    setupSignalHandlers();
    makeThreadKillable();

signal(SIGHUP, SIG_IGN)用于 疏忽 SIGHUP 信号 ,它是一种在 Unix 和类 Unix 操作系统中宽泛应用的信号。在 Unix 零碎中,当管制终端挂起时,会向过程组中的所有过程发送 SIGHUP 信号,通常用于从新初始化过程。通常状况下,当一个过程接管到 SIGHUP 信号时,它会终止执行,但应用signal(SIGHUP, SIG_IGN) 能够疏忽这个信号,从而防止过程被终止。在下面的代码中,当 Redis 服务器接管到 SIGHUP 信号时,它不会做任何事件。

signal(SIGPIPE, SIG_IGN) 的作用是 疏忽对于管道 / 套接字等读取端曾经敞开的写入操作而产生的 SIGPIPE 信号。在应用网络套接字进行通信时,如果对方断开连接,而以后套接字依然在写数据,那么就会产生 SIGPIPE 信号,程序默认状况下会退出。通过将 SIGPIPE 信号的处理函数设置为 SIG_IGN,程序将疏忽该信号,防止程序异样退出。


void setupSignalHandlers(void) {
    struct sigaction act;

    /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
     * Otherwise, sa_handler is used. */
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    act.sa_handler = sigShutdownHandler;
    sigaction(SIGTERM, &act, NULL);
    sigaction(SIGINT, &act, NULL);

    sigemptyset(&act.sa_mask);
    act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
    act.sa_sigaction = sigsegvHandler;
    if(server.crashlog_enabled) {sigaction(SIGSEGV, &act, NULL);
        sigaction(SIGBUS, &act, NULL);
        sigaction(SIGFPE, &act, NULL);
        sigaction(SIGILL, &act, NULL);
        sigaction(SIGABRT, &act, NULL);
    }
    return;
}

setupSignalHandlers()函数用于设置 Redis 过程的信号处理程序。在 Unix/Linux 零碎中,信号是一种异步通信机制,用于解决过程之间的异步事件。在 Redis 中,有多种信号能够被解决,比方 SIGTERM 示意终止过程,SIGINT示意中断过程等。通过设置信号处理程序,Redis 能够对这些异步事件做出响应,例如在 SIGTERM 信号到来时进行清理工作并优雅地敞开 Redis 过程。

首先,应用 sigemptyset 函数初始化一个信号集,将其存储在 act.sa_mask 中,以便在信号处理器运行时阻塞其余信号。而后将 act.sa_flags 设置为 0,表明应用默认行为,即不应用 SA_RESTART 重新启动零碎调用。最初,将 act.sa_handler 设置为 sigShutdownHandler 函数,示意在接管到信号时调用 sigShutdownHandler 函数进行解决。此函数用于解决 SIGTERMSIGINT 信号,以使 Redis 失常退出。

首先,sigemptyset(&act.sa_mask) 会将 act.sa_mask 初始化为空集,这是为了在信号处理函数运行期间避免过程收到其余信号。

接着,act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;设置了信号处理选项。其中,SA_NODEFER 示意在信号处理函数执行期间禁止该信号被阻塞,SA_RESETHAND 示意在信号处理函数执行结束后将该信号处理形式重置为默认值,SA_SIGINFO 示意应用带有三个参数的 sa_sigaction 处理函数。

最初,act.sa_sigaction = sigsegvHandler 设置了信号处理函数。如果过程收到 SIGSEGV 信号,就会调用 sigsegvHandler 函数。

捕捉 SIGSEGV、SIGBUS、SIGFPE、SIGILL 和 SIGABRT 信号。如果服务器的 crashlog_enabled 选项设置为 true,则向这些信号注册 sigsegvHandler()函数作为信号处理程序。如果这些信号被触发,它们将调用 sigsegvHandler()函数。此函数是 Redis 的默认解体日志记录器,它将记录解体时的上下文信息并尝试将其写入服务器的日志文件。

makeThreadKillable()

makeThreadKillable()函数的作用是将以后线程标记为可被勾销的状态。在应用线程时,当线程处于某些要害代码段时,如果接管到勾销申请,可能会造成资源透露等问题,因而须要将线程标记为可被勾销的状态,以便在接管到勾销申请时,线程能够平安地进行执行。此函数通常在执行长时间操作的线程中应用,例如执行文件 I / O 或网络 I / O 的线程。

2 日志

 if (server.syslog_enabled) {
        openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
            server.syslog_facility);
    }

这段代码的作用是在启用 syslog 的状况下,调用 openlog 函数关上 syslog 服务,并设置 syslog_ident、syslog_facility 等参数。syslog 是一个系统日志服务,用于记录操作系统或者应用程序的日志信息。在 Redis 中,如果启用了 syslog,Redis 就会把一些重要的日志信息输入到 syslog 中,以不便用户查看和剖析。openlog 是 syslog 服务提供的一个函数,用于关上 syslog 服务,并设置相干参数。参数 LOG_PID 示意在每条日志信息中退出以后过程 ID,参数 LOG_NDELAY 示意关上 syslog 服务时不会期待,参数 LOG_NOWAIT 示意在写入日志信息时,不会期待 syslog 过程返回。

3 初始化数据结构和变量

  server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
    server.hz = server.config_hz;
    server.pid = getpid();
    server.in_fork_child = CHILD_TYPE_NONE;
    server.main_thread_id = pthread_self();
    server.current_client = NULL;
    server.errors = raxNew();
    server.fixed_time_expire = 0;
    server.in_nested_call = 0;
    server.clients = listCreate();
    server.clients_index = raxNew();
    server.clients_to_close = listCreate();
    server.slaves = listCreate();
    server.monitors = listCreate();
    server.clients_pending_write = listCreate();
    server.clients_pending_read = listCreate();
    server.clients_timeout_table = raxNew();
    server.replication_allowed = 1;
    server.slaveseldb = -1; /* Force to emit the first SELECT command. */
    server.unblocked_clients = listCreate();
    server.ready_keys = listCreate();
    server.tracking_pending_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.client_pause_type = CLIENT_PAUSE_OFF;
    server.client_pause_end_time = 0;
    memset(server.client_pause_per_purpose, 0,
           sizeof(server.client_pause_per_purpose));
    server.postponed_clients = listCreate();
    server.events_processed_while_blocked = 0;
    server.system_memory_size = zmalloc_get_memory_size();
    server.blocked_last_cron = 0;
    server.blocking_op_nesting = 0;
    server.thp_enabled = 0;
    server.cluster_drop_packet_filter = -1;
    server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
    server.reply_buffer_resizing_enabled = 1;
     = NULL;

这是 Redis 服务器在启动时初始化的一些变量和数据结构。以下是每个变量的简要阐明:

  • server.aof_state: 标记是否启用 AOF(Append Only File)长久化,初始化为启用或禁用状态。
  • server.hz: Redis 服务器的运行频率,即每秒执行的事件循环次数。
  • server.pid: Redis 服务器的过程 ID。
  • server.in_fork_child: 标记以后过程是否为子过程,初始为 NONE。
  • server.main_thread_id: Redis 服务器的主线程 ID。
  • server.current_client: 以后客户端,初始为 NULL。
  • server.errors: 存储 Redis 服务器产生的谬误,以及谬误产生的次数和工夫戳等信息。
  • server.fixed_time_expire: 指定某些键的过期工夫是否为固定工夫。
  • server.in_nested_call: 标记 Redis 服务器是否处于嵌套调用状态,初始为 0。
  • server.clients: 存储所有已连贯的客户端。
  • server.clients_index: 用于疾速查找客户端。
  • server.clients_to_close: 存储待敞开的客户端。
  • server.slaves: 存储所有从服务器。
  • server.monitors: 存储所有 MONITOR 客户端。
  • server.clients_pending_write: 存储须要写入数据的客户端。
  • server.clients_pending_read: 存储须要读取数据的客户端。
  • server.clients_timeout_table: 存储客户端的超时工夫。
  • server.replication_allowed: 标记是否容许复制,初始为容许。
  • server.slaveseldb: 从服务器的以后数据库,初始化为 -1。
  • server.unblocked_clients: 存储非阻塞客户端。
  • server.ready_keys: 存储已就绪的键。
  • server.tracking_pending_keys: 存储须要追踪的键。
  • server.clients_waiting_acks: 存储期待客户端应答的客户端。
  • server.get_ack_from_slaves: 标记是否期待从服务器的应答。
  • server.client_pause_type: 客户端暂停状态,初始化为未暂停。
  • server.client_pause_end_time: 客户端暂停完结工夫,初始化为 0。
  • server.client_pause_per_purpose: 存储客户端暂停的起因和工夫戳。
  • server.postponed_clients: 存储已推延解决的客户端。
  • server.events_processed_while_blocked: 存储在阻塞状态下解决的事件数。
  • server.system_memory_size: Redis 服务器可用的零碎内存大小。
  • server.blocked_last_cron: 记录 Redis 服务器最初一次被阻塞的工夫。
  • server.blocking_op_nesting: 阻塞操作的嵌套深度,初始为 0。
  • server.thp_enabled: 是否启用 Transparent Huge Pages。
  • server.cluster_drop_packet_filter: 集群节点间通信中过滤器的编号,初始化为 -1。
  • server.reply_buffer_peak_reset_time: Redis 回复缓冲区的峰值重置工夫
  • server.client_mem_usage_buckets 跟踪客户端的内存应用状况, 如果为 NULL 示意未启用内存应用统计。

4 重置服务器缓冲区

void resetReplicationBuffer(void) {
    server.repl_buffer_mem = 0;
    server.repl_buffer_blocks = listCreate();
    listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree);
}

resetReplicationBuffer()是 Redis 服务器中的一个函数,用于重置服务器的复制缓冲区。

复制缓冲区是 Redis 用来 存储复制操作期间生成的命令的缓冲区 。这些命令会被发送给从服务器,以便它们能够复制主服务器上执行的操作。 在复制期间,如果从服务器断开连接或呈现其余谬误,那么 Redis 将重置复制缓冲区以防止数据失落或混同。

5 TLS

 if ((server.tls_port || server.tls_replication || server.tls_cluster)
                && tlsConfigure(&server.tls_ctx_config) == C_ERR) {serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
        exit(1);
    }

这段代码片段是在服务器启动时查看是否须要配置 TLS,并尝试进行配置。如果服务器配置了 TLS 端口、TLS 复制或 TLS 集群,就会调用 tlsConfigure 函数进行配置。如果配置失败,会记录一条正告日志并退出服务器。这里应用了 exit 函数,因而服务器无奈持续运行。

TLS 是 Transport Layer Security(传输层平安协定)的缩写,是一种加密协议,用于保护计算机网络通信的平安。它是 SSL(Secure Sockets Layer,安全套接字层)的继任者。TLS 协定通过对数据进行加密、认证和完整性爱护等伎俩来保障通信的安全性。在网络通信中,罕用的应用层协定,如 HTTP、SMTP、POP3 等,都能够在 TLS 协定的根底上实现平安通信。

6 创立共享对象


void createSharedObjects(void) {
    int j;

    /* Shared command responses */
    shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
    shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
    shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
    shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
    shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
    shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n"));
    shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
    shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
    shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
    shared.space = createObject(OBJ_STRING,sdsnew(" "));
    shared.plus = createObject(OBJ_STRING,sdsnew("+"));

    /* Shared command error responses */
    shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
    shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
    shared.nokeyerr = createObject(OBJ_STRING,sdsnew("-ERR no such key\r\n"));
    shared.syntaxerr = createObject(OBJ_STRING,sdsnew("-ERR syntax error\r\n"));
    shared.sameobjecterr = createObject(OBJ_STRING,sdsnew("-ERR source and destination objects are the same\r\n"));
    shared.outofrangeerr = createObject(OBJ_STRING,sdsnew("-ERR index out of range\r\n"));
    shared.noscripterr = createObject(OBJ_STRING,sdsnew("-NOSCRIPT No matching script. Please use EVAL.\r\n"));
    shared.loadingerr = createObject(OBJ_STRING,sdsnew("-LOADING Redis is loading the dataset in memory\r\n"));
    shared.slowevalerr = createObject(OBJ_STRING,sdsnew("-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
    shared.slowscripterr = createObject(OBJ_STRING,sdsnew("-BUSY Redis is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n"));
    shared.slowmoduleerr = createObject(OBJ_STRING,sdsnew("-BUSY Redis is busy running a module command.\r\n"));
    shared.masterdownerr = createObject(OBJ_STRING,sdsnew("-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to'no'.\r\n"));
    shared.bgsaveerr = createObject(OBJ_STRING,sdsnew("-MISCONF Redis is configured to save RDB snapshots, but it's currently unable to persist to disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n"));
    shared.roslaveerr = createObject(OBJ_STRING,sdsnew("-READONLY You can't write against a read only replica.\r\n"));
    shared.noautherr = createObject(OBJ_STRING,sdsnew("-NOAUTH Authentication required.\r\n"));
    shared.oomerr = createObject(OBJ_STRING,sdsnew("-OOM command not allowed when used memory >'maxmemory'.\r\n"));
    shared.execaborterr = createObject(OBJ_STRING,sdsnew("-EXECABORT Transaction discarded because of previous errors.\r\n"));
    shared.noreplicaserr = createObject(OBJ_STRING,sdsnew("-NOREPLICAS Not enough good replicas to write.\r\n"));
    shared.busykeyerr = createObject(OBJ_STRING,sdsnew("-BUSYKEY Target key name already exists.\r\n"));

    /* The shared NULL depends on the protocol version. */
    shared.null[0] = NULL;
    shared.null[1] = NULL;
    shared.null[2] = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
    shared.null[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));

    shared.nullarray[0] = NULL;
    shared.nullarray[1] = NULL;
    shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
    shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));

    shared.emptymap[0] = NULL;
    shared.emptymap[1] = NULL;
    shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
    shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0\r\n"));

    shared.emptyset[0] = NULL;
    shared.emptyset[1] = NULL;
    shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
    shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0\r\n"));

    for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {char dictid_str[64];
        int dictid_len;

        dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
        shared.select[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),
                "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                dictid_len, dictid_str));
    }
    shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
    shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
    shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
    shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
    shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
    shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
    shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14);
    shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
    shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);

    /* Shared command names */
    shared.del = createStringObject("DEL",3);
    shared.unlink = createStringObject("UNLINK",6);
    shared.rpop = createStringObject("RPOP",4);
    shared.lpop = createStringObject("LPOP",4);
    shared.lpush = createStringObject("LPUSH",5);
    shared.rpoplpush = createStringObject("RPOPLPUSH",9);
    shared.lmove = createStringObject("LMOVE",5);
    shared.blmove = createStringObject("BLMOVE",6);
    shared.zpopmin = createStringObject("ZPOPMIN",7);
    shared.zpopmax = createStringObject("ZPOPMAX",7);
    shared.multi = createStringObject("MULTI",5);
    shared.exec = createStringObject("EXEC",4);
    shared.hset = createStringObject("HSET",4);
    shared.srem = createStringObject("SREM",4);
    shared.xgroup = createStringObject("XGROUP",6);
    shared.xclaim = createStringObject("XCLAIM",6);
    shared.script = createStringObject("SCRIPT",6);
    shared.replconf = createStringObject("REPLCONF",8);
    shared.pexpireat = createStringObject("PEXPIREAT",9);
    shared.pexpire = createStringObject("PEXPIRE",7);
    shared.persist = createStringObject("PERSIST",7);
    shared.set = createStringObject("SET",3);
    shared.eval = createStringObject("EVAL",4);

    /* Shared command argument */
    shared.left = createStringObject("left",4);
    shared.right = createStringObject("right",5);
    shared.pxat = createStringObject("PXAT", 4);
    shared.time = createStringObject("TIME",4);
    shared.retrycount = createStringObject("RETRYCOUNT",10);
    shared.force = createStringObject("FORCE",5);
    shared.justid = createStringObject("JUSTID",6);
    shared.entriesread = createStringObject("ENTRIESREAD",11);
    shared.lastid = createStringObject("LASTID",6);
    shared.default_username = createStringObject("default",7);
    shared.ping = createStringObject("ping",4);
    shared.setid = createStringObject("SETID",5);
    shared.keepttl = createStringObject("KEEPTTL",7);
    shared.absttl = createStringObject("ABSTTL",6);
    shared.load = createStringObject("LOAD",4);
    shared.createconsumer = createStringObject("CREATECONSUMER",14);
    shared.getack = createStringObject("GETACK",6);
    shared.special_asterick = createStringObject("*",1);
    shared.special_equals = createStringObject("=",1);
    shared.redacted = makeObjectShared(createStringObject("(redacted)",10));

    for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {shared.integers[j] =
            makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
        shared.integers[j]->encoding = OBJ_ENCODING_INT;
    }
    for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {shared.mbulkhdr[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),"*%d\r\n",j));
        shared.bulkhdr[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),"$%d\r\n",j));
        shared.maphdr[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),"%%%d\r\n",j));
        shared.sethdr[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),"~%d\r\n",j));
    }
    /* The following two shared objects, minstring and maxstring, are not
     * actually used for their value but as a special object meaning
     * respectively the minimum possible string and the maximum possible
     * string in string comparisons for the ZRANGEBYLEX command. */
    shared.minstring = sdsnew("minstring");
    shared.maxstring = sdsnew("maxstring");
}
/* Our shared "common" objects */
struct sharedObjectsStruct shared;
struct sharedObjectsStruct {
    robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
    *queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4],
    *emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
    *outofrangeerr, *noscripterr, *loadingerr,
    *slowevalerr, *slowscripterr, *slowmoduleerr, *bgsaveerr,
    *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
    *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
    *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
    *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
    *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,  
    *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, 
    *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
    *lastid, *ping, *setid, *keepttl, *load, *createconsumer,
    *getack, *special_asterick, *special_equals, *default_username, *redacted,
    *ssubscribebulk,*sunsubscribebulk, *smessagebulk,
    *select[PROTO_SHARED_SELECT_CMDS],
    *integers[OBJ_SHARED_INTEGERS],
    *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
    *bulkhdr[OBJ_SHARED_BULKHDR_LEN],  /* "$<value>\r\n" */
    *maphdr[OBJ_SHARED_BULKHDR_LEN],   /* "%<value>\r\n" */
    *sethdr[OBJ_SHARED_BULKHDR_LEN];   /* "~<value>\r\n" */
    sds minstring, maxstring;
};

struct sharedObjectsStruct shared; 定义了一个名为 shared 的构造体变量,该构造体用于存储 Redis 中一些共享的字符串对象,如 OKERR 等。这些共享对象在 Redis 外部被宽泛应用,能够缩小内存占用和进步性能。createSharedObjects() 函数则是初始化这些共享对象。

共享对象是 Redis 中曾经事后创立好的一些固定字符串对象,它们的值曾经提前在 Redis 启动时被设置好了。因为这些字符串对象是共享的,能够被多个客户端独特应用,所以它们在 Redis 内存中只有一份拷贝,这样就能够节俭很多内存空间。

在 Redis 中,共享对象是由 struct sharedObjectsStruct 构造体中的成员变量示意的。这个构造体蕴含了很多的共享对象,例如空字符串对象、OK 字符串对象、错误信息对象、空列表对象、空哈希对象等等。在 Redis 启动时,会调用 createSharedObjects()函数创立这些共享对象,并将它们存储在 sharedObjectsStruct 构造体中,以便后续应用。

应用共享对象能够进步 Redis 的性能和内存应用效率,因为它们不须要在每次应用时从新创立,而是能够间接援用。

7 文件描述符限度

void adjustOpenFilesLimit(void) {
    rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
    struct rlimit limit;

    if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
            strerror(errno));
        server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
    } else {
        rlim_t oldlimit = limit.rlim_cur;

        /* Set the max number of files if the current limit is not enough
         * for our needs. */
        if (oldlimit < maxfiles) {
            rlim_t bestlimit;
            int setrlimit_error = 0;

            /* Try to set the file limit to match 'maxfiles' or at least
             * to the higher value supported less than maxfiles. */
            bestlimit = maxfiles;
            while(bestlimit > oldlimit) {
                rlim_t decr_step = 16;

                limit.rlim_cur = bestlimit;
                limit.rlim_max = bestlimit;
                if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
                setrlimit_error = errno;

                /* We failed to set file limit to 'bestlimit'. Try with a
                 * smaller limit decrementing by a few FDs per iteration. */
                if (bestlimit < decr_step) {
                    bestlimit = oldlimit;
                    break;
                }
                bestlimit -= decr_step;
            }

            /* Assume that the limit we get initially is still valid if
             * our last try was even lower. */
            if (bestlimit < oldlimit) bestlimit = oldlimit;

            if (bestlimit < maxfiles) {
                unsigned int old_maxclients = server.maxclients;
                server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
                /* maxclients is unsigned so may overflow: in order
                 * to check if maxclients is now logically less than 1
                 * we test indirectly via bestlimit. */
                if (bestlimit <= CONFIG_MIN_RESERVED_FDS) {
                    serverLog(LL_WARNING,"Your current'ulimit -n'""of %llu is not enough for the server to start. ""Please increase your open file limit to at least"
                        "%llu. Exiting.",
                        (unsigned long long) oldlimit,
                        (unsigned long long) maxfiles);
                    exit(1);
                }
                serverLog(LL_WARNING,"You requested maxclients of %d"
                    "requiring at least %llu max file descriptors.",
                    old_maxclients,
                    (unsigned long long) maxfiles);
                serverLog(LL_WARNING,"Server can't set maximum open files ""to %llu because of OS error: %s.",
                    (unsigned long long) maxfiles, strerror(setrlimit_error));
                serverLog(LL_WARNING,"Current maximum open files is %llu."
                    "maxclients has been reduced to %d to compensate for"
                    "low ulimit."
                    "If you need higher maxclients increase'ulimit -n'.",
                    (unsigned long long) bestlimit, server.maxclients);
            } else {
                serverLog(LL_NOTICE,"Increased maximum number of open files"
                    "to %llu (it was originally set to %llu).",
                    (unsigned long long) maxfiles,
                    (unsigned long long) oldlimit);
            }
        }
    }
}

adjustOpenFilesLimit() 函数用于查看零碎对于 Redis 过程的关上文件数限度,如果零碎限度过低,则尝试将 Redis 过程的文件关上数限度减少到一个更高的值。

这个函数首先会调用 getrlimit() 函数获取以后零碎对于过程的文件关上数限度,而后计算出 Redis 目前曾经关上的文件数和 Redis 配置文件中指定的最大关上文件数之间的较小值。如果以后零碎的文件关上数限度比这个值小,则会将 Redis 的文件关上数限度调整为这个值。

在 Linux 零碎中,每个过程能够同时关上的文件数是有限度的,这个限度能够通过 ulimit 命令来查看和批改。当 Redis 须要同时关上大量的文件(例如在进行 RDB 长久化时),如果以后的文件关上数限度过低,就会导致 Redis 过程无奈失常工作。因而,这个函数的作用就是查看文件关上数限度是否过低,并尝试将其减少到一个足够大的值,保障 Redis 过程能够失常工作。

   rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;

这行代码的作用是依据最大客户端数和配置文件中指定的最小保留文件描述符数来计算出以后操作系统容许的最大文件描述符数。在 Redis 中,每个客户端都会应用一个文件描述符,如果超过了操作系统容许的最大文件描述符数,就会呈现文件描述符耗尽的状况,导致 Redis 无奈失常工作。因而,这个函数的作用是调整 Redis 过程的文件描述符限度,确保 Redis 过程可能反对足够数量的客户端连贯。

 if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
            strerror(errno));
        server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
 }

这段代码是在获取零碎关上文件描述符限度失败时,设置服务器的最大客户端数量为 1024 减去一些保留文件描述符的数量。它应用了 getrlimit 零碎调用来获取以后的限度,并将其存储在 limit 构造中。如果 getrlimit 调用失败,则会打印一条正告音讯,并将 server.maxclients 设置为默认值 1024-CONFIG_MIN_RESERVED_FDS。这是为了确保服务器不会超出零碎限度而解体。

8 初始化时钟

const char * monotonicInit() {#if defined(USE_PROCESSOR_CLOCK) && defined(__x86_64__) && defined(__linux__)
    if (getMonotonicUs == NULL) monotonicInit_x86linux();
    #endif

    #if defined(USE_PROCESSOR_CLOCK) && defined(__aarch64__)
    if (getMonotonicUs == NULL) monotonicInit_aarch64();
    #endif

    if (getMonotonicUs == NULL) monotonicInit_posix();

    return monotonic_info_string;
}

monotonicInit() 函数是 Redis 在启动时初始化时钟的函数,用于确保 Redis 在不同的操作系统上都能正确应用枯燥时钟(monotonic clock)。该函数返回一个字符串指针,指向一个字符串,用于记录时钟初始化的详细信息。在日志记录和调试时可能会用到这个信息。

函数首先通过宏定义判断以后环境是否反对 USE_PROCESSOR_CLOCK,如果是 x86_64 架构的 Linux 零碎,则会调用monotonicInit_x86linux() 函数进行初始化;如果是 aarch64 架构,则调用 monotonicInit_aarch64() 进行初始化;否则,调用 monotonicInit_posix() 进行初始化。最终返回一个字符串类型的枯燥递增工夫戳信息。

9 外围组件–事件循环

 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

这行代码创立了一个事件循环(event loop),并将其赋值给了 Redis 服务器状态中的 el 变量。事件循环是 Redis 服务器的外围组件之一,它会监听一组文件描述符上的事件(例如可读、可写或异样事件),并在这些事件产生时调用相应的回调函数。事件循环通常由零碎提供的 I/O 多路复用机制来实现,例如 Linux 中的 epoll 或 BSD 中的 kqueue。在 Redis 中,事件循环的具体实现是由第三方库 ae 提供的。

aeCreateEventLoop 函数的参数 server.maxclients+CONFIG_FDSET_INCR 示意要为事件循环调配的文件描述符汇合(fdset)的大小。文件描述符汇合是事件循环的重要组成部分,它用于记录事件循环要监听的文件描述符。在 Redis 中,fdset 的大小默认为 FD_SETSIZE,通常是 1024。但因为 Redis 的客户端连接数可能会很大,因而在这里应用了 server.maxclients+CONFIG_FDSET_INCR 来扩充 fdset 的大小,以便能够同时监听更多的文件描述符。其中,server.maxclients 是 Redis 服务器配置文件中设置的最大客户端连接数,CONFIG_FDSET_INCR 是 Redis 配置文件中的另一个参数,用于管制文件描述符汇合的增量大小,它的默认值是 32。因而,server.maxclients+CONFIG_FDSET_INCR 示意将最大客户端连接数与增量大小相加,作为事件循环所需的 fdset 大小。

该函数时 redis 事件散发的外围函数,在前面我会具体的分析她。

10 调配数据库数组

 server.db = zmalloc(sizeof(redisDb)*server.dbnum);

这行代码是在 Redis 服务器启动时调配了一个 redisDb 类型的数组,用于存储 Redis 数据库的相干信息。

在 Redis 服务器中,每个数据库对应一个 redisDb 构造体,其中蕴含了该数据库中所有的键值对信息,以及一些其余的相干配置信息,如过期工夫等。

server.dbnum变量是在 redis.conf 配置文件中指定的,示意 Redis 服务器中要应用的数据库数量。这行代码通过调用 zmalloc 函数调配了一个长度为 server.dbnumredisDb数组,用于存储所有的 Redis 数据库。这些数据库在服务器启动时被创立,并在内存中长久化存储,以便随时提供疾速的数据读写操作。

值得注意的是,每个 redisDb 构造体中都有一个 dict 类型的成员变量,用于存储键值对信息。Redis 的字典数据结构在内存中以哈希表的模式实现,提供了十分高效的键值对存储和查找操作。

11 设置监听端口

if (server.port != 0 &&
        listenToPort(server.port,&server.ipfd) == C_ERR) {
        /* Note: the following log text is matched by the test suite. */
        serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
        exit(1);
    }
    if (server.tls_port != 0 &&
        listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
        /* Note: the following log text is matched by the test suite. */
        serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
        exit(1);
    }

这段代码是用来监听 Redis 服务器的 TCP 和 TLS 端口的。如果监听失败,则会记录日志并退出程序。具体来说,它首先查看服务器配置中的 porttls_port 是否为零,如果不为零,则别离调用 listenToPort 函数来监听对应的端口,并将对应的文件描述符别离存储在 server.ipfdserver.tlsfd 中。如果监听失败,则记录一条日志,并通过调用 exit 函数退出程序。

int listenToPort(int port, socketFds *sfd) {
    int j;
    char **bindaddr = server.bindaddr;

    /* If we have no bind address, we don't listen on a TCP socket */
    if (server.bindaddr_count == 0) return C_OK;

    for (j = 0; j < server.bindaddr_count; j++) {char* addr = bindaddr[j];
        int optional = *addr == '-';
        if (optional) addr++;
        if (strchr(addr,':')) {
            /* Bind IPv6 address. */
            sfd->fd[sfd->count] = anetTcp6Server(server.neterr,port,addr,server.tcp_backlog);
        } else {
            /* Bind IPv4 address. */
            sfd->fd[sfd->count] = anetTcpServer(server.neterr,port,addr,server.tcp_backlog);
        }
        if (sfd->fd[sfd->count] == ANET_ERR) {
            int net_errno = errno;
            serverLog(LL_WARNING,
                "Warning: Could not create server TCP listening socket %s:%d: %s",
                addr, port, server.neterr);
            if (net_errno == EADDRNOTAVAIL && optional)
                continue;
            if (net_errno == ENOPROTOOPT     || net_errno == EPROTONOSUPPORT ||
                net_errno == ESOCKTNOSUPPORT || net_errno == EPFNOSUPPORT ||
                net_errno == EAFNOSUPPORT)
                continue;

            /* Rollback successful listens before exiting */
            closeSocketListeners(sfd);
            return C_ERR;
        }
        if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id);
        anetNonBlock(NULL,sfd->fd[sfd->count]);
        anetCloexec(sfd->fd[sfd->count]);
        sfd->count++;
    }
    return C_OK;
}

12 本地 unix 套接字

 if (server.unixsocket != NULL) {unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            (mode_t)server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
        anetCloexec(server.sofd);
    }

这段代码是在 Redis 服务器启动时,尝试创立一个 Unix 套接字,用于本地通信。具体实现是通过调用 anetUnixServer 函数创立一个 Unix 套接字,如果创立失败则会记录错误信息并终止服务器过程。如果创立胜利,则会将套接字设置为非阻塞模式并且设置文件描述符标记 FD_CLOEXEC。这样,服务器就能够通过 Unix 套接字与本地客户端进行通信了。须要留神的是,如果 Redis 服务器配置文件中没有指定 Unix 套接字门路,则不会执行这段代码。

13 检测套接字

  /* Abort if there are no listening sockets at all. */
    if (server.ipfd.count == 0 && server.tlsfd.count == 0 && server.sofd < 0) {serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
        exit(1);
    }

这段代码的作用是查看是否存在正在监听的套接字。如果没有任何一个监听套接字,则记录一个正告日志并退出程序。这是一个平安保护措施,因为如果 Redis 服务器没有关上任何端口或套接字,那么它将无奈提供服务,也无奈承受客户端连贯。

14 创立 Redis 数据库

 /* Create the Redis databases, and initialize other internal state. */
    for (j = 0; j < server.dbnum; j++) {server.db[j].dict = dictCreate(&dbDictType);
        server.db[j].expires = dictCreate(&dbExpiresDictType);
        server.db[j].expires_cursor = 0;
        server.db[j].blocking_keys = dictCreate(&keylistDictType);
        server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType);
        server.db[j].watched_keys = dictCreate(&keylistDictType);
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
        server.db[j].defrag_later = listCreate();
        server.db[j].slots_to_keys = NULL; /* Set by clusterInit later on if necessary. */
        listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
    }

这段代码创立了 Redis 数据库,并初始化了一些外部状态。具体来说,它应用了一个循环来创立指定数量的 Redis 数据库,每个数据库都蕴含了以下属性:

  • dict:用于保留键值对的字典(底层实现为哈希表);
  • expires:用于过期键的字典(底层实现为哈希表),用于实现键的 TTL 性能;
  • expires_cursor:游标,用于在过期键字典中定期地删除过期键;
  • blocking_keys:用于阻塞操作的键的列表(底层实现为字典);
  • ready_keys:用于保留已筹备好的键(底层实现为字典);
  • watched_keys:用于保留被监督的键(底层实现为字典);
  • id:数据库的 ID;
  • avg_ttl:键的均匀 TTL(Time To Live)值;
  • defrag_later:期待进行碎片整顿操作的键的列表;
  • slots_to_keys:用于保留槽和键之间映射关系的字典(用于 Redis 集群)。

在循环内,还应用了 listCreate() 函数创立了一个 defrag_later 列表,用于保留须要进行碎片整顿的键。其中,listSetFreeMethod() 函数用于设置 defrag_later 列表的开释函数,这里应用了 sdsfree() 函数来开释 sds 字符串。

前面会具体的介绍。

15 初始化 LRU

void evictionPoolAlloc(void) {
    struct evictionPoolEntry *ep;
    int j;

    ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE);
    for (j = 0; j < EVPOOL_SIZE; j++) {ep[j].idle = 0;
        ep[j].key = NULL;
        ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE);
        ep[j].dbid = 0;
    }
    EvictionPoolLRU = ep;
}

    

evictionPoolAlloc() 函数的作用是分配内存来初始化用于 LRU eviction 的 key pool。

具体地,它通过调用 zmalloc() 调配 sizeof(*ep)*EVPOOL_SIZE) 大小的内存,其中 ep 是一个 struct evictionPoolEntry 类型的数组。而后它遍历 ep 数组的每个元素,并对其进行初始化,将 idle 域设置为 0,key 域设置为 NULL,cached 域应用 sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE) 函数初始化为空的 sds 字符串,dbid 域设置为 0。最初,将 ep 赋值给全局变量 EvictionPoolLRU

这个 key pool 用于在 Redis 的 LRU eviction 中缓存一些被淘汰的 key 的信息,以便在须要时能够疾速从缓存中获取这些信息而无需从新计算。

16 外部状态初始化

server.pubsub_channels = dictCreate(&keylistDictType);
    server.pubsub_patterns = dictCreate(&keylistDictType);
    server.pubsubshard_channels = dictCreate(&keylistDictType);
    server.cronloops = 0;
    server.in_exec = 0;
    server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
    server.busy_module_yield_reply = NULL;
    server.core_propagates = 0;
    server.propagate_no_multi = 0;
    server.module_ctx_nesting = 0;
    server.client_pause_in_transaction = 0;
    server.child_pid = -1;
    server.child_type = CHILD_TYPE_NONE;
    server.rdb_child_type = RDB_CHILD_TYPE_NONE;
    server.rdb_pipe_conns = NULL;
    server.rdb_pipe_numconns = 0;
    server.rdb_pipe_numconns_writing = 0;
    server.rdb_pipe_buff = NULL;
    server.rdb_pipe_bufflen = 0;
    server.rdb_bgsave_scheduled = 0;
    server.child_info_pipe[0] = -1;
    server.child_info_pipe[1] = -1;
    server.child_info_nread = 0;
    server.aof_buf = sdsempty();
    server.lastsave = time(NULL); /* At startup we consider the DB saved. */
    server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
    server.rdb_save_time_last = -1;
    server.rdb_save_time_start = -1;
    server.rdb_last_load_keys_expired = 0;
    server.rdb_last_load_keys_loaded = 0;
    server.dirty = 0;
    resetServerStats();
    /* A few stats we don't want to reset: server startup time, and peak mem. */
    server.stat_starttime = time(NULL);
    server.stat_peak_memory = 0;
    server.stat_current_cow_peak = 0;
    server.stat_current_cow_bytes = 0;
    server.stat_current_cow_updated = 0;
    server.stat_current_save_keys_processed = 0;
    server.stat_current_save_keys_total = 0;
    server.stat_rdb_cow_bytes = 0;
    server.stat_aof_cow_bytes = 0;
    server.stat_module_cow_bytes = 0;
    server.stat_module_progress = 0;
    for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
        server.stat_clients_type_memory[j] = 0;
    server.stat_cluster_links_memory = 0;
    server.cron_malloc_stats.zmalloc_used = 0;
    server.cron_malloc_stats.process_rss = 0;
    server.cron_malloc_stats.allocator_allocated = 0;
    server.cron_malloc_stats.allocator_active = 0;
    server.cron_malloc_stats.allocator_resident = 0;
    server.lastbgsave_status = C_OK;
    server.aof_last_write_status = C_OK;
    server.aof_last_write_errno = 0;
    server.repl_good_slaves_count = 0;
    server.last_sig_received = 0;

这是 Redis 服务器在启动时初始化本身的一系列变量和数据结构,其中包含:

  • server.pubsub_channels、server.pubsub_patterns、server.pubsubshard_channels:三个字典,用于保护 Redis 的公布 / 订阅性能。
  • server.cronloops:一个计数器,记录 Redis 的定时工作曾经执行的次数。
  • server.in_exec:一个标记,示意是否正在执行 Redis 的事务操作。
  • server.busy_module_yield_flags、server.busy_module_yield_reply:在 Redis 执行长时间的阻塞操作时,用于批示是否能够中断阻塞操作。
  • server.core_propagates、server.propagate_no_multi:用于 Redis 的主从复制性能。
  • server.module_ctx_nesting:一个计数器,记录 Redis 以后嵌套的模块上下文数量。
  • server.client_pause_in_transaction:一个标记,示意是否在 Redis 事务操作中暂停了客户端。
  • server.child_pid、server.child_type、server.rdb_child_type:用于 Redis 的长久化性能,记录长久化子过程的相干信息。
  • server.rdb_pipe_conns、server.rdb_pipe_numconns、server.rdb_pipe_numconns_writing、server.rdb_pipe_buff、server.rdb_pipe_bufflen:用于 Redis 进行 RDB 长久化时,缓存数据写入管道的相干信息。
  • server.rdb_bgsave_scheduled:一个标记,示意是否曾经安顿了 RDB 长久化操作。
  • server.child_info_pipe、server.child_info_nread:用于与长久化子过程进行通信。
  • server.aof_buf:一个缓冲区,用于 Redis 进行 AOF 长久化时,缓存待写入 AOF 文件的数据。
  • server.lastsave、server.lastbgsave_try、server.rdb_save_time_last、server.rdb_save_time_start、server.rdb_last_load_keys_expired、server.rdb_last_load_keys_loaded:用于记录 Redis 的长久化状态。
  • server.dirty:一个计数器,记录 Redis 的脏键数量。
  • server.stat_starttime、server.stat_peak_memory、server.stat_current_cow_peak、server.stat_current_cow_bytes、server.stat_current_cow_updated、server.stat_current_save_keys_processed、server.stat_current_save_keys_total、server.stat_rdb_cow_bytes、server.stat_aof_cow_bytes、server.stat_module_cow_bytes、server.stat_module_progress、server.stat_clients_type_memory、server.stat_cluster_links_memory:一系列统计信息,用于记录 Redis 的运行状态。
  • server.cron_malloc_stats:一个构造体,用于记录 Redis 的内存应用状况。
  • server.lastbgsave_status、server.aof_last_write_status、server.aof_last_write_errno、server.repl_good_slaves_count、server.last_sig_received:用于 Redis 的长久化和主从复制性能。

17 工夫事件

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {serverPanic("Can't create event loop timers.");
        exit(1);
    }
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

这段代码是 Redis 服务器启动时的一部分,次要性能是创立一个定时器事件,并将其增加到事件循环中。这个定时器事件每秒会执行一次 serverCron 函数,用于执行一些周期性的工作,例如查看过期键值对、清理过期数据等。

如果创立定时器事件失败(返回 AE_ERR),那么服务器将调用 serverPanic 函数进入解体状态,并退出程序。这种状况应该极为常见,通常是因为系统资源有余或其余重大问题导致的。

aeCreateTimeEvent 函数用于在 eventLoop 上创立一个指定工夫距离后触发的工夫事件。

参数解释如下:

  • eventLoop: 事件循环。
  • milliseconds: 指定工夫距离,单位是毫秒。
  • proc: 事件处理函数。
  • clientData: 事件处理函数的参数。
  • finalizerProc: 事件完结时执行的函数。

函数的返回值是工夫事件的 ID。如果创立事件失败,返回 AE_ERR。

该函数先为工夫事件分配内存,而后初始化各个字段。其中,when 字段示意事件的触发工夫,是以后工夫加上指定的工夫距离。而后将事件退出事件循环的工夫事件链表 timeEventHead 中,并返回工夫事件的 ID。

18 监听 TCP 连贯申请

 if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }

这段代码是创立一个监听 TCP 连贯申请的套接字,并将该套接字上的事件增加到事件循环中,以便在有新的连贯申请达到时触发相应的处理函数 acceptTcpHandler()

具体来说,createSocketAcceptHandler() 函数会创立一个套接字,将其绑定到指定的 IP 地址和端口上,并将其设置为监听状态。而后,它将该套接字上的可读事件增加到事件循环中,当该事件被触发时,事件循环会调用相应的处理函数 acceptTcpHandler(),来承受新的连贯申请并进行解决

int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
    int j;

    for (j = 0; j < sfd->count; j++) {if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
            /* Rollback */
            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
            return C_ERR;
        }
    }
    return C_OK;
}

该函数的作用是创立用于监听 socket 连贯申请的事件处理器,以及将处理器注册到事件循环中。该函数的输出参数 sfd 是一个构造体,用于保留多个监听 socket 的文件描述符。函数还承受一个名为 accept_handler 的回调函数指针作为输出参数,用于解决新连贯的申请。

在函数外部,应用了一个 for 循环遍历 sfd->fd 数组中的每个文件描述符,将每个文件描述符注册为可读事件,并将 accept_handler 回调函数指针作为参数传入注册函数 aeCreateFileEvent 中。如果某个文件描述符的注册失败,就会执行 for 循环的回滚操作,删除曾经注册的文件描述符事件处理器。最终,如果所有文件描述符都胜利注册了事件处理器,则函数返回 C_OK(示意执行胜利),否则返回 C_ERR(示意执行失败)。

server.ipfd 是一个 socketFds 构造体类型的变量,用于存储服务器监听的 TCP 套接字描述符。这个构造体定义如下:

typedef struct socketFds {
    int *fd; /* 监听套接字描述符数组 */
    int count; /* 监听套接字数量 */
} socketFds;

这个构造体蕴含两个字段,一个是 fd 数组用于存储监听套接字描述符,另一个是 count 示意监听套接字数量。在服务器启动时,服务器会创立并监听多个 TCP 套接字,并将所有的套接字描述符存储在 server.ipfd 变量中,以便后续操作。

19 Unix

if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

这段代码是在创立一个监听 Unix domain socket(AF_UNIX)的文件事件,用于接管客户端的连贯申请。具体来说,它应用了 aeCreateFileEvent 函数创立一个文件事件,当 server.sofd 变为可读时,就会调用 acceptUnixHandler 函数解决新的连贯申请。如果创立文件事件出错,就会触发服务器宕机,即调用 serverPanic 函数。

20 管道

 if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE,
        modulePipeReadable,NULL) == AE_ERR) {
            serverPanic("Error registering the readable event for the module pipe.");
    }

这段代码是创立一个管道的可读事件,当管道有数据可读时会触发一个回调函数modulePipeReadable

具体来说,aeCreateFileEvent函数会向事件循环 server.el 注册一个文件事件,用于监听 server.module_pipe[0] 这个文件描述符的可读事件。如果事件创立失败,那么 serverPanic 函数会输入一条错误信息并终止程序。

在这里,modulePipeReadable函数被注册为回调函数,当 server.module_pipe[0] 有数据可读时,事件循环将主动调用它。

21 事件循环睡眠前后

aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);

aeSetBeforeSleepProcaeSetAfterSleepProc 是设置事件循环在进入睡眠前和睡眠后要执行的函数。

aeSetBeforeSleepProc用于注册一个在进入事件循环睡眠之前被调用的函数,该函数能够用于在处理事件循环之前执行特定工作。例如,能够在此处调用 redisCheckAofBackgroundWriting() 查看是否须要执行 AOF 文件的后盾写入。

aeSetAfterSleepProc用于注册一个在事件循环进入睡眠后被调用的函数,该函数能够用于在进入事件循环之前执行特定工作。例如,在此处能够更新 Redis 的外部状态或执行一些清理工作。

这两个函数能够通过 aeMain 函数中的相应调用设置,并在每个事件循环周期中执行。

22 服务器位数

if (server.arch_bits == 32 && server.maxmemory == 0) {serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with'noeviction'policy now.");
        server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
        server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
    }

这段代码是在判断服务器实例的位数是否为 32 位,如果是 32 位,则设置一个 3GB 的最大内存限度,并将最大内存策略设置为不驱赶策略。这是因为 32 位的服务器过程无奈应用大于 4GB 的内存,因而如果没有设置最大内存限度,则可能导致内存不足并导致服务器解体。为了防止这种状况,这里设置一个最大内存限度,以确保服务器过程不会超过 3GB 的内存使用量。同时,将最大内存策略设置为不驱赶策略,以确保在达到最大内存限度时,不会驱赶任何键值对,而是回绝写入新的键值对,从而防止因驱赶键值对而导致数据失落的危险。

23 其余函数

 if (server.cluster_enabled) clusterInit();
    scriptingInit(1);
    functionsInit();
    slowlogInit();
    latencyMonitorInit();

这段代码次要做了以下事件:

  1. 如果 Redis 开启了集群模式,就调用 clusterInit()函数进行集群初始化;
  2. 调用 scriptingInit()函数初始化脚本零碎;
  3. 调用 functionsInit()函数初始化 Redis 内置函数;
  4. 调用 slowlogInit()函数初始化慢查问日志零碎;
  5. 调用 latencyMonitorInit()函数初始化提早监视器。

这些初始化函数的作用是为 Redis 提供不同的性能和服务,如集群反对、脚本反对、内置函数、慢查问日志和提早监视器等。通过这些初始化操作,Redis 能够更好地服务于用户的不同需要和场景。

24 更新默认用户明码

 ACLUpdateDefaultUserPassword(server.requirepass);

ACLUpdateDefaultUserPassword 函数是 Redis 用于更新默认用户明码的函数。在 Redis 中,默认状况下,应用 requirepass 参数所设置的明码作为用户认证明码。当 requirepass 参数被批改时,如果默认用户曾经存在,那么它的明码须要被更新。ACLUpdateDefaultUserPassword 函数就是用来更新这个默认用户明码的。

25 看门狗

applyWatchdogPeriod()

applyWatchdogPeriod()函数用于启用或禁用 Redis 的看门狗程序。看门狗程序是一个定期的工作,用于查看 Redis 是否处于假死状态,如果是,则通过发送 SIGUSR1 信号重启 Redis 过程。

在该函数中,首先会查看配置文件中的 watchdog-period 参数是否被设置为 0,如果是,则禁用看门狗程序;否则,依据该参数的值来设置看门狗程序的执行周期。执行周期由 server.watchdog_period 变量来示意,以毫秒为单位。

如果在运行 Redis 的操作系统中没有实现定时器(例如 OpenVZ),则会禁用看门狗程序并打印正告信息。

该函数在 Redis 服务器启动时被调用。

26 客户端内存限度

 if (server.maxmemory_clients != 0)
        initServerClientMemUsageBuckets();

这段代码是在查看服务器是否设置了客户端的最大内存应用限度(maxmemory_clients),如果设置了,就会调用 initServerClientMemUsageBuckets() 函数来初始化一个用于记录客户端内存应用状况的数据结构。该函数会在 dict.c 文件中定义。在启用了客户端内存限度后,服务器会定期检查客户端的内存应用状况,并在客户端应用的内存超出限度时,通过断开与客户端的连贯来保障服务器的稳定性。

总结

明天呢,咱们看了看初始化服务端的代码,是不是很懵,对的。然而咱们一步一步的从内部向内剥去,最终肯定会吃透他的。加油

正文完
 0