关于golang:redis服务器

1次阅读

共计 35483 个字符,预计需要花费 89 分钟才能阅读完成。

这一次次要讲下 redis 中服务器这个构造体相干代码,次要从是代码层面进行解说

<!– more –>

redis 服务器

redis 服务器构造体次要代码在redis.h/redisServer,上面给出该构造体源码,能够看到源码中对该构造体定义很长,这一节咱们一点点剖析,当然有些中央可能我也了解不到位 hhh

// redis 服务器实例
struct redisServer {
    char *configfile;   /* 配置文件的绝对路径 */
    int hz;      /* serverCron() 每秒调用的次数 */
    redisDb *db; /* 数据库数组,外面寄存的是该服务器所有的数据库 */
    dict *commands;             /* 命令表(受到 rename 配置选项的作用)*/
    dict *orig_commands;        /* 命令表(无 rename 配置选项的作用)*/
    aeEventLoop *el;  /* 事件状态 */
    unsigned lruclock:REDIS_LRU_BITS; /* 最近一次应用时钟 */
    int shutdown_asap;          /* 敞开服务器的标识 */
    int activerehashing;        /* 在执行 serverCron() 时进行渐进式 rehash */
    char *requirepass;          /* 是否设置了明码 */
    char *pidfile;              /* PID 文件门路 */
    int arch_bits;              /* 架构类型 32or64 */
    int cronloops;              /* serverCron() 函数的运行次数计数器 */
    char runid[REDIS_RUN_ID_SIZE+1];  /* 本服务器的 RUN ID ID 在每秒都会变动 */
    int sentinel_mode;          /* 服务器是否运行在 SENTINEL 模式 */
    int port;                   /* TCP 监听端口 */
    int tcp_backlog;            /* TCP 连贯中已实现队列 (实现三次握手之后) 的长度 */
    char *bindaddr[REDIS_BINDADDR_MAX]; /* 绑定地址 */
    int bindaddr_count;         /* bindaddr 地址数量 */
    char *unixsocket;           /* UNIX socket 门路 */
    mode_t unixsocketperm;      /* UNIX socket permission */
    int ipfd[REDIS_BINDADDR_MAX]; /* TCP 套接字描述符 */
    int ipfd_count;             /* ipfd 中应用的套接字数量 */
    int sofd;                   /* Unix 套接字描述符 */
    int cfd[REDIS_BINDADDR_MAX];/* 集群总线监听套接字 */
    int cfd_count;              /* cfd 应用到的套接字数量 */
    list *clients;              /* 链表,保留了所有客户端状态构造 */
    list *clients_to_close;     /* 链表,保留了所有待敞开的客户端 */
    list *slaves, *monitors;    /* 链表,保留了所有从服务器,以及所有监视器 */
    redisClient *current_client; /* C 服务器的以后客户端,仅用于解体报告 */
    int clients_paused;         /* 客服端是否被 paused */
    mstime_t clients_pause_end_time; /* 执行 undo clients_paused 的工夫 */
    char neterr[ANET_ERR_LEN];   /* anet.c 网络谬误缓冲区 */
    dict *migrate_cached_sockets;/* MIGRATE 缓冲套接字 */
    int loading;                /* 服务器是否正在被载入 */
    off_t loading_total_bytes; /* 正在载入的数据的大小 */
    off_t loading_loaded_bytes; /* 已载入数据的大小 */
    time_t loading_start_time; /* 开始进行载入的工夫 */
    off_t loading_process_events_interval_bytes;
    // 常用命令的快捷连贯
    struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
                        *rpopCommand;
    time_t stat_starttime;          /* 服务器启动工夫 */
    long long stat_numcommands;     /* 已解决命令的数量 */
    long long stat_numconnections;  /* 服务器接到的连贯申请数量 */
    long long stat_expiredkeys;     /* 已过期的键数量 */
    long long stat_evictedkeys;     /* 因为回收内存而被开释的过期键的数量 */
    long long stat_keyspace_hits;   /* 胜利查找键的次数 */
    long long stat_keyspace_misses; /* 查找键失败的次数 */
    size_t stat_peak_memory;        /* 已应用内存峰值 */
    long long stat_fork_time;       /* 最初一次执行 fork() 时耗费的工夫 */
    long long stat_rejected_conn;   /* 服务器因为客户端数量过多而回绝客户端连贯的次数 */
    long long stat_sync_full;       /* 执行 full sync 的次数 */
    long long stat_sync_partial_ok; /* PSYNC 胜利执行的次数 */
    long long stat_sync_partial_err;/* PSYNC 执行失败的次数 */
    list *slowlog;                  /* 保留了所有慢查问日志的链表 */
    long long slowlog_entry_id;     /* SLOWLOG 以后条目 ID */
    long long slowlog_log_slower_than; /* 服务器配置 slowlog-log-slower-than 选项的值(SLOWLOG 工夫限度) */
    unsigned long slowlog_max_len;     /* 服务器配置 slowlog-max-len 选项的值(SLOWLOG 记录的最大项目数) */
    size_t resident_set_size;       /* serverCron()中 rss 采样次数. */
    long long ops_sec_last_sample_time; /* 最初一次进行抽样的工夫 */
    long long ops_sec_last_sample_ops;  /* 最初一次抽样时,服务器已执行命令的数量 */
    long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES]; /* 抽样后果 */
    int ops_sec_idx; /* 数组索引,用于保留抽样后果,并在须要时回绕到 0 */
    int verbosity;                  /* 日志等级 Redis 总共反对四个级别:debug、verbose、notice、warning,默认为 notice */
    int maxidletime;                /* 客户端超时最大工夫 */
    int tcpkeepalive;               /* 是否开启 SO_KEEPALIVE 选项 */
    int active_expire_enabled;      /* 测试时候能够禁用 */
    size_t client_max_querybuf_len; /* 客户端查问缓冲区长度限度 */
    int dbnum;                      /* 服务器初始化应该创立多少个服务器 config 中 databases 16 能够设定该选项 */
    int daemonize;                  /* 如果作为守护过程运行,则为 True */
    // 客户端输入缓冲区大小限度
    // 数组的元素有 REDIS_CLIENT_LIMIT_NUM_CLASSES 个
    // 每个代表一类客户端:一般、从服务器、pubsub,诸如此类
    clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
    int aof_state;                  /* AOF 状态(开启 / 敞开 / 可写)*/
    int aof_fsync;                  /* 所应用的 fsync 策略(每个写入 / 每秒 / 从不)*/
    char *aof_filename;             /* AOF 文件名字 */
    int aof_no_fsync_on_rewrite;    /* 如果重写是在 prog 中,请不要 fsync */
    int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */
    off_t aof_rewrite_base_size;    /* 最初一次执行 BGREWRITEAOF 时,AOF 文件的大小 */
    off_t aof_current_size;         /* AOF 文件的以后字节大小 */
    int aof_rewrite_scheduled;      /* BGSAVE 终止后重写 */
    pid_t aof_child_pid;            /* 负责进行 AOF 重写的子过程 ID */
    list *aof_rewrite_buf_blocks;   /* AOF 重写缓存链表,链接着多个缓存块 */
    sds aof_buf;      /* AOF 缓冲区 */
    int aof_fd;       /* 以后所选 AOF 文件的文件描述符 */
    int aof_selected_db; /* 以后在 AOF 中抉择的数据库 */
    time_t aof_flush_postponed_start; /* 推延 AOF flush 的 UNIX 工夫 */
    time_t aof_last_fsync;            /* 最初始终执行 fsync 的工夫 */
    time_t aof_rewrite_time_last;   /* 最初一次 AOF 重写运行所用的工夫 */

    time_t aof_rewrite_time_start;  /* 以后 AOF 重写开始工夫 */
    int aof_lastbgrewrite_status;   /* 最初一次执行 BGREWRITEAOF 的后果 REDIS_OK 或 REDIS_ERR */
    unsigned long aof_delayed_fsync;  /* 记录 AOF 的 write 操作被推延了多少次 */
    int aof_rewrite_incremental_fsync;/* 批示是否须要每写入一定量的数据,就被动执行一次 fsync() */
    int aof_last_write_status;      /* REDIS_OK or REDIS_ERR */
    int aof_last_write_errno;       /* 如果 aof_last_write_status 是 ERR,则无效 */
    long long dirty;                /* 自从上次 SAVE 执行以来,数据库被批改的次数 */
    long long dirty_before_bgsave;  /* BGSAVE 执行前的数据库被批改次数 */
    pid_t rdb_child_pid;            /* 负责执行 BGSAVE 的子过程的 ID,没在执行 BGSAVE 时,设为 -1 */
    struct saveparam *saveparams;   /* 为 RDB 保留点数组 */
    int saveparamslen;              /* saveparams 长度 */
    char *rdb_filename;             /* RDB 文件的名称 */
    int rdb_compression;            /* 是否在 RDB 中应用压缩 */
    int rdb_checksum;               /* 是否应用 RDB 校验和 */
    time_t lastsave;                /* 最初一次实现 SAVE 的工夫 */
    time_t lastbgsave_try;          /* 最初一次尝试执行 BGSAVE 的工夫 */
    time_t rdb_save_time_last;      /* 最近一次 BGSAVE 执行消耗的工夫 */
    time_t rdb_save_time_start;     /* 数据库最近一次开始执行 BGSAVE 的工夫 */
    int lastbgsave_status;          /* 最初一次执行 SAVE 的状态 REDIS_OK or REDIS_ERR */
    int stop_writes_on_bgsave_err;  /* 如果不能 BGSAVE,不容许写入 */
    /* Propagation of commands in AOF / replication */
    redisOpArray also_propagate;    /* Additional command to propagate. */
    char *logfile;                  /* 日志文件的门路 */
    int syslog_enabled;             /* 是否启用了 syslog */
    char *syslog_ident;             /* 指定 syslog 的标示符,如果下面的 syslog-enabled no,则这个选项有效 */
    int syslog_facility;            /* 指定 syslog facility, 必须是 USER 或者 LOCAL0 到 LOCAL7 */
    int slaveseldb;                 /* Last SELECTed DB in replication output */
    long long master_repl_offset;   /* 全局复制偏移量(一个累计值)*/
    int repl_ping_slave_period;     /* Master 每 N 秒 ping 一次 slave */
    // backlog 自身
    char *repl_backlog;             /* Replication backlog for partial syncs */
    long long repl_backlog_size;    /* Backlog 循环缓冲区大小 */
    long long repl_backlog_histlen; /* backlog 中数据的长度 */
    long long repl_backlog_idx;     /* backlog 的以后索引 */
    long long repl_backlog_off;     /* backlog 中能够被还原的第一个字节的偏移量 */
    time_t repl_backlog_time_limit; /* backlog 的过期工夫 */
    time_t repl_no_slaves_since;    /* 间隔上一次有从服务器的工夫 */
    int repl_min_slaves_to_write;   /* 是否开启最小数量从服务器写入性能 */
    int repl_min_slaves_max_lag;    /* 定义最小数量从服务器的最大提早值 */
    int repl_good_slaves_count;     /* 提早良好的从服务器的数量 lag <= max_lag. */
    char *masterauth;               /* 主服务器的验证明码 */
    char *masterhost;               /* 主服务器的地址 */
    int masterport;                 /* 主服务器的端口 */
    int repl_timeout;               /* 主机闲暇 N 秒后超时 */
    redisClient *master;     /* 主服务器所对应的客户端 */
    redisClient *cached_master; /* 被缓存的主服务器,PSYNC 时应用 */
    int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
    int repl_state;          /* 复制的状态(服务器是从服务器时应用)*/
    off_t repl_transfer_size; /* 在同步期间从主机读取的 RDB 的大小 */
    off_t repl_transfer_read; /* 在同步期间从主设施读取的 RDB 字节数 */
    // 最近一次执行 fsync 时的偏移量
    // 用于 sync_file_range 函数
    off_t repl_transfer_last_fsync_off; /* 上次 fsync-ed 时偏移 */
    int repl_transfer_s;     /* 主服务器的套接字 */
    int repl_transfer_fd;    /* 保留 RDB 文件的临时文件的描述符 */
    char *repl_transfer_tmpfile; /* 保留 RDB 文件的临时文件名字 */
    time_t repl_transfer_lastio; /* 最近一次读入 RDB 内容的工夫 */
    int repl_serve_stale_data; /* Serve stale data when link is down? */
    int repl_slave_ro;          /* 从服务器是否只读 */
    time_t repl_down_since; /* 连贯断开的时长 */
    int repl_disable_tcp_nodelay;   /* 是否要在 SYNC 之后敞开 NODELAY */
    int slave_priority;             /* 从服务器优先级 */
    char repl_master_runid[REDIS_RUN_ID_SIZE+1];  /* 本服务器(从服务器)以后主服务器的 RUN ID */
    long long repl_master_initial_offset;         /* Master PSYNC offset. */

    /* --------- 上面一些属性有些很难用到,对此我也没认真看 */
    /* Replication script cache. */
    // 复制脚本缓存
    // 字典
    dict *repl_scriptcache_dict;        /* SHA1 all slaves are aware of. */
    // FIFO 队列
    list *repl_scriptcache_fifo;        /* First in, first out LRU eviction. */
    // 缓存的大小
    int repl_scriptcache_size;          /* Max number of elements. */

    /* Synchronous replication. */
    list *clients_waiting_acks;         /* Clients waiting in WAIT command. */
    int get_ack_from_slaves;            /* If true we send REPLCONF GETACK. */
    int maxclients;                 /* 最大并发客户端数 */
    unsigned long long maxmemory;   /* 要应用的最大内存字节数 */
    int maxmemory_policy;           /* Policy for key eviction */
    int maxmemory_samples;          /* Pricision of random sampling */
    unsigned int bpop_blocked_clients; /* 列表阻止的客户端数量 */
    list *unblocked_clients; /* 在下一个循环之前解锁的客户端列表 */
    list *ready_keys;        /* List of readyList structures for BLPOP & co */


    /* Sort parameters - qsort_r() is only available under BSD so we
     * have to take this state global, in order to pass it to sortCompare() */
    int sort_desc;
    int sort_alpha;
    int sort_bypattern;
    int sort_store;


    /* Zip structure config, see redis.conf for more information  */
    size_t hash_max_ziplist_entries;
    size_t hash_max_ziplist_value;
    size_t list_max_ziplist_entries;
    size_t list_max_ziplist_value;
    size_t set_max_intset_entries;
    size_t zset_max_ziplist_entries;
    size_t zset_max_ziplist_value;
    size_t hll_sparse_max_bytes;
    time_t unixtime;        /* Unix time sampled every cron cycle. */
    long long mstime;       /* Like 'unixtime' but with milliseconds resolution. */


    /* Pubsub */
    // 字典,键为频道,值为链表
    // 链表中保留了所有订阅某个频道的客户端
    // 新客户端总是被增加到链表的表尾
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */

    // 这个链表记录了客户端订阅的所有模式的名字
    list *pubsub_patterns;  /* A list of pubsub_patterns */

    int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
                                   xor of REDIS_NOTIFY... flags. */


    /* Cluster */

    int cluster_enabled;      /* 群集是否已启用 */
    mstime_t cluster_node_timeout; /* 集群节点超时工夫. */
    char *cluster_configfile; /* 集群主动生成的配置文件名 */
    struct clusterState *cluster;  /* 集群的状态 */

    int cluster_migration_barrier; /* Cluster replicas migration barrier. */
    /* Scripting */

    // Lua 环境
    lua_State *lua; /* The Lua interpreter. We use just one for all clients */
    
    // 复制执行 Lua 脚本中的 Redis 命令的伪客户端
    redisClient *lua_client;   /* The "fake client" to query Redis from Lua */

    // 以后正在执行 EVAL 命令的客户端,如果没有就是 NULL
    redisClient *lua_caller;   /* The client running EVAL right now, or NULL */

    // 一个字典,值为 Lua 脚本,键为脚本的 SHA1 校验和
    dict *lua_scripts;         /* A dictionary of SHA1 -> Lua scripts */
    // Lua 脚本的执行时限
    mstime_t lua_time_limit;  /* Script timeout in milliseconds */
    // 脚本开始执行的工夫
    mstime_t lua_time_start;  /* Start time of script, milliseconds time */

    // 脚本是否执行过写命令
    int lua_write_dirty;  /* True if a write command was called during the
                             execution of the current script. */

    // 脚本是否执行过带有随机性质的命令
    int lua_random_dirty; /* True if a random command was called during the
                             execution of the current script. */

    // 脚本是否超时
    int lua_timedout;     /* True if we reached the time limit for script
                             execution. */

    // 是否要杀死脚本
    int lua_kill;         /* Kill the script if true. */


    /* Assert & bug reporting */

    char *assert_failed;
    char *assert_file;
    int assert_line;
    int bug_report_start; /* True if bug report header was already logged. */
    int watchdog_period;  /* Software watchdog period in ms. 0 = off */
};

上面重点讲下 redis 服务器启动的流程,次要包含以下几个步骤,不懂的同学能够看下 redis.c/main 函数,就能够大抵理解其过程

  • 查看服务器是否以 Sentinel 模式启动
  • 初始化全局服务器配置initServerConfig()
  • 如果是 Sentinel 模式,则初始化相干配置initSentinelConfiginitSentinel
  • 加载配置文件loadServerConfig()
  • 将服务器过程设置为守护过程daemonize
  • 初始化服务器initServer
  • 如果服务器过程为守护过程,则创立 PID 文件createPidFile
  • 为服务器过程设置名字redisSetProcTitle
  • 打印 logoredisAsciiArt
  • 加载数据库loadDataFromDisk

    • AOF 长久化已关上,则应用loadAppendOnlyFile()
    • 否则应用加载 RDB 文件rdbLoad()
  • 运行事件处理器,始终到服务器敞开为止aeMain

上面对下面几个函数顺次进行解说

Sentinel 模式

Sentinel模式就是哨兵模式,上面给出该模式的一个例子

其中 server1 是主服务器,其余 server2,3,4 为从服务器。在生产环境中,未免会有意外起因导致 redis 服务器挂掉,如果此时挂掉的是一个 master 节点,主节点宕机,主从复制将不能持续进行,写数据将会阻塞,而哨兵的存在次要是为了切换掉宕机的 master,而后从 master 上面的 slave 节点中选举一个作为新的 master,并且把旧的 master 的 slave 全副转移到新的 master 下面,持续原有的主从复制。哨兵自身是一个独立的过程,自身也是有单点问题的,所以哨兵也有本身的集群,用来保障哨兵自身的容错机制。

能够将 redis 中 sentinel 想成一个非凡的 redis 服务器,然而他不会像 redis 一般服务器那样去加载 rdb 或者 aof 文件,在 initSentinel 函数中,会创立一个 sentinel 构造体 sentinelState,代码如下

/* Sentinel 的状态构造 */
struct sentinelState {
    // 以后纪元
    uint64_t current_epoch;    

    // 保留了所有被这个 sentinel 监督的主服务器
    // 字典的键是主服务器的名字
    // 字典的值则是一个指向 sentinelRedisInstance 构造的指针
    dict *masters;      

    // 是否进入了 TILT 模式?int tilt;          

    // 目前正在执行的脚本的数量
    int running_scripts;    

    // 进入 TILT 模式的工夫
    mstime_t tilt_start_time;   

    // 最初一次执行工夫处理器的工夫
    mstime_t previous_time;    

    // 一个 FIFO 队列,蕴含了所有须要执行的用户脚本
    list *scripts_queue;    

} sentinel;


// 以 Sentinel 模式初始化服务器
void initSentinel(void) {
    int j;


    // 清空 Redis 服务器的命令表(该表用于一般模式)dictEmpty(server.commands,NULL);
    // 将 SENTINEL 模式所用的命令增加进命令表
    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
        int retval;
        struct redisCommand *cmd = sentinelcmds+j;

        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
        redisAssert(retval == DICT_OK);
    }

    /* 初始化 Sentinel 的状态 */
    // 初始化纪元
    sentinel.current_epoch = 0;

    // 初始化保留主服务器信息的字典
    sentinel.masters = dictCreate(&instancesDictType,NULL);

    // 初始化 TILT 模式的相干选项
    sentinel.tilt = 0;
    sentinel.tilt_start_time = 0;
    sentinel.previous_time = mstime();

    // 初始化脚本相干选项
    sentinel.running_scripts = 0;
    sentinel.scripts_queue = listCreate();}

其中有一个 master 字典,这外面记录了记录了所有被 Sentinel 监督的主服务器的相干信息,其中:

  • 字典的键是被监督主服务器的名字。
  • 而字典的值则是被监督主服务器对应的 sentinel.c/sentinelRedisInstance 构造。

    每个 sentinelRedisInstance 构造代表一个被 Sentinel 监督的 Redis 服务器实例(instance),这个实例能够是主服务器、从服务器、或者另外一个 Sentinel。上面给出这个构造体的代码

// Sentinel 会为每个被监督的 Redis 实例创立相应的 sentinelRedisInstance 实例
//(被监督的实例能够是主服务器、从服务器、或者其余 Sentinel)typedef struct sentinelRedisInstance {
    
    // 标识值,记录了实例的类型,以及该实例的以后状态
    // 当为 SRI_MASTER 为主服务器,当为 SRI_SLAVE 为从服务器,当为 SRI_SENTINEL 为 sentinel 服务器
    int flags;      
    
    // 实例的名字
    // 主服务器的名字由用户在配置文件中设置
    // 从服务器以及 Sentinel 的名字由 Sentinel 主动设置
    // 格局为 ip:port,例如 "127.0.0.1:26379"
    char *name;    

    // 实例的运行 ID
    char *runid;    

    // 配置纪元,用于实现故障转移
    uint64_t config_epoch;  

    // 实例的地址
    sentinelAddr *addr; 

    // 用于发送命令的异步连贯
    redisAsyncContext *cc; 

    // 用于执行 SUBSCRIBE 命令、接管频道信息的异步连贯
    // 仅在实例为主服务器时应用
    redisAsyncContext *pc; 

    // 已发送但尚未回复的命令数量
    int pending_commands;   

    // cc 连贯的创立工夫
    mstime_t cc_conn_time; 
    
    // pc 连贯的创立工夫
    mstime_t pc_conn_time; 

    // 最初一次从这个实例接管信息的工夫
    mstime_t pc_last_activity; 

    // 实例最初一次返回正确的 PING 命令回复的工夫
    mstime_t last_avail_time; 
    
    // 实例最初一次发送 PING 命令的工夫
    mstime_t last_ping_time;  
    
    // 实例最初一次返回 PING 命令的工夫,无论内容正确与否
    mstime_t last_pong_time;  

    // 最初一次向频道发送问候信息的工夫
    // 只在以后实例为 sentinel 时应用
    mstime_t last_pub_time;   

    // 最初一次接管到这个 sentinel 发来的问候信息的工夫
    // 只在以后实例为 sentinel 时应用
    mstime_t last_hello_time; 

    // 最初一次回复 SENTINEL is-master-down-by-addr 命令的工夫
    // 只在以后实例为 sentinel 时应用
    mstime_t last_master_down_reply_time; 

    // 实例被判断为 SDOWN 状态的工夫
    mstime_t s_down_since_time; 

    // 实例被判断为 ODOWN 状态的工夫
    mstime_t o_down_since_time; 

    // SENTINEL down-after-milliseconds 选项所设定的值
    // 实例无响应多少毫秒之后才会被判断为主观下线(subjectively down)mstime_t down_after_period; 

    // 从实例获取 INFO 命令的回复的工夫
    mstime_t info_refresh;  


    // 实例的角色
    int role_reported;
    // 角色的更新工夫
    mstime_t role_reported_time;

    // 最初一次从服务器的主服务器地址变更的工夫
    mstime_t slave_conf_change_time; 


    /* 主服务器实例特有的属性 */

    // 其余同样监控这个主服务器的所有 sentinel
    dict *sentinels;    

    // 如果这个实例代表的是一个主服务器
    // 那么这个字典保留着主服务器属下的从服务器
    // 字典的键是从服务器的名字,字典的值是从服务器对应的 sentinelRedisInstance 构造
    dict *slaves;       

    // SENTINEL monitor <master-name> <IP> <port> <quorum> 选项中的 quorum 参数
    // 判断这个实例为主观下线(objectively down)所需的反对投票数量
    int quorum;         

    // SENTINEL parallel-syncs <master-name> <number> 选项的值
    // 在执行故障转移操作时,能够同时对新的主服务器进行同步的从服务器数量
    int parallel_syncs; 

    // 连贯主服务器和从服务器所需的明码
    char *auth_pass;    


    /* 从服务器实例特有的属性 */

    // 主从服务器连贯断开的工夫
    mstime_t master_link_down_time; 

    // 从服务器优先级
    int slave_priority; 

    // 执行故障转移操作时,从服务器发送 SLAVEOF <new-master> 命令的工夫
    mstime_t slave_reconf_sent_time; 

    // 主服务器的实例(在本实例为从服务器时应用)struct sentinelRedisInstance *master; 

    // INFO 命令的回复中记录的主服务器 IP
    char *slave_master_host;    
    
    // INFO 命令的回复中记录的主服务器端口号
    int slave_master_port;      

    // INFO 命令的回复中记录的主从服务器连贯状态
    int slave_master_link_status; 

    // 从服务器的复制偏移量
    unsigned long long slave_repl_offset; 


    /* 故障转移相干属性 */


    // 如果这是一个主服务器实例,那么 leader 将是负责进行故障转移的 Sentinel 的运行 ID。// 如果这是一个 Sentinel 实例,那么 leader 就是被选举进去的领头 Sentinel。// 这个域只在 Sentinel 实例的 flags 属性的 SRI_MASTER_DOWN 标记处于关上状态时才无效。char *leader;       
    
    // 领头的纪元
    uint64_t leader_epoch; 
    
    // 以后执行中的故障转移的纪元
    uint64_t failover_epoch; 
    // 故障转移操作的以后状态
    int failover_state; 

    // 状态扭转的工夫
    mstime_t failover_state_change_time;

    // 最初一次进行故障迁徙的工夫
    mstime_t failover_start_time;  

    // SENTINEL failover-timeout <master-name> <ms> 选项的值
    // 刷新故障迁徙状态的最大时限
    mstime_t failover_timeout;      

    mstime_t failover_delay_logged; 
    
    // 指向被晋升为新主服务器的从服务器的指针
    struct sentinelRedisInstance *promoted_slave; 

    // 一个文件门路,保留着 WARNING 级别的事件产生时执行的,// 用于告诉管理员的脚本的地址
    char *notification_script;

    // 一个文件门路,保留着故障转移执行之前、之后、或者被停止时,// 须要执行的脚本的地址
    char *client_reconfig_script;

} sentinelRedisInstance;

如果此时启动 sentinel 时候,配置文件如下

#####################
# master1 configure #
#####################
sentinel monitor master1 127.0.0.1 6379 2
sentinel down-after-milliseconds master1 30000
sentinel parallel-syncs master1 1
sentinel failover-timeout master1 900000
#####################
# master2 configure #
#####################
sentinel monitor master2 127.0.0.1 12345 5
sentinel down-after-milliseconds master2 50000
sentinel parallel-syncs master2 5
sentinel failover-timeout master2 450000

则会为 2 个服务器创立如下构造体

sentinel 构造体中 maste 字典内容如下

当一个 redis 服务器以 sentinel 模式启动,则它会主动去替换一些一般模式服务器的代码,比方一般 redis 服务器应用 redis.h/REDIS_SERVERPORT 作为端口,然而 sentinel 模式下会以 sentinel.c/REDIS_SENTINEL_PORT 作为端口,同时一般 redis 服务器的反对的命令在 redis.c/redisCommandTable 中,然而 sentinel 模式下反对的命令在sentinel.c/sentinelcmds,其中代码较少,上面给出代码

// 服务器在 sentinel 模式下可执行的命令
struct redisCommand sentinelcmds[] = {{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
    {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};

sentinel 次要就是为了利用于主服务器下线导致集群不可用状况,因而最重要的就是如何检测和如何防备,上面通过主观下线和主观下线两种形式进行阐明

  • 主观下线

默认状况下,每个 sentinel 会每秒钟向其余所有主服务器、从服务器、sentinels 发送 ping 音讯,返回后果分为无效返回(+PONG、-LOADING、-MASTERDOWN)三者之一或有效返回(上述三种其余回复或者指定工夫内没有回复),若呈现有效返回状况,则会将 sentinelRedisInstance 属性中的 flag 字段关上 SRI_S_DOWN 标记

  • 主观下线

当一个 sentinel 对一台服务器设置为主观下线后,还须要判断是否主观下线,它会向其余监督该服务器的 sentinels 进行询问,当接管到足够数量(设置的 quorum 参数)的 sentinels 说该服务器也下线,则表明该服务器主观下线。主观下线会关上 SRI_O_DOWN 标记

当一个主服务器被断定为主观下线后,监督这个下线服务器的全副 sentinels 会进行协商,选举出一个 lead sentinel,这个 lead sentinel 会对下线服务器进行故障转移,包含三个步骤

1、在已下线主服务器的从服务器当选一个主服务器,而后向其发送 SLAVEOF no one 命令,设置为主服务器

2、让已下线主服务器上面的从服务器用刚刚选举的主服务器作为主服务器

3、将已下线的主服务器认刚刚选举的主服务器作为本人的主服务器,当这个下线服务器再次上线时,就会真的设置为本人的主服务器

初始化全局服务器配置

redis.c/initServerConfig()

void initServerConfig() {
    int j;

    // 设置服务器的运行 ID
    getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
    // 设置默认配置文件门路
    server.configfile = NULL;
    // 设置默认服务器频率
    server.hz = REDIS_DEFAULT_HZ;
    // 为运行 ID 加上结尾字符
    server.runid[REDIS_RUN_ID_SIZE] = '\0';
    // 设置服务器的运行架构
    server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
    // 设置默认服务器端口号
    server.port = REDIS_SERVERPORT;
    server.tcp_backlog = REDIS_TCP_BACKLOG;
    server.bindaddr_count = 0;
    server.unixsocket = NULL;
    server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM;
    server.ipfd_count = 0;
    server.sofd = -1;
    server.dbnum = REDIS_DEFAULT_DBNUM;
    server.verbosity = REDIS_DEFAULT_VERBOSITY;
    server.maxidletime = REDIS_MAXIDLETIME;
    server.tcpkeepalive = REDIS_DEFAULT_TCP_KEEPALIVE;
    server.active_expire_enabled = 1;
    server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN;
    server.saveparams = NULL;
    server.loading = 0;
    server.logfile = zstrdup(REDIS_DEFAULT_LOGFILE);
    server.syslog_enabled = REDIS_DEFAULT_SYSLOG_ENABLED;
    server.syslog_ident = zstrdup(REDIS_DEFAULT_SYSLOG_IDENT);
    server.syslog_facility = LOG_LOCAL0;
    server.daemonize = REDIS_DEFAULT_DAEMONIZE;
    server.aof_state = REDIS_AOF_OFF;
    server.aof_fsync = REDIS_DEFAULT_AOF_FSYNC;
    server.aof_no_fsync_on_rewrite = REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
    server.aof_rewrite_perc = REDIS_AOF_REWRITE_PERC;
    server.aof_rewrite_min_size = REDIS_AOF_REWRITE_MIN_SIZE;
    server.aof_rewrite_base_size = 0;
    server.aof_rewrite_scheduled = 0;
    server.aof_last_fsync = time(NULL);
    server.aof_rewrite_time_last = -1;
    server.aof_rewrite_time_start = -1;
    server.aof_lastbgrewrite_status = REDIS_OK;
    server.aof_delayed_fsync = 0;
    server.aof_fd = -1;
    server.aof_selected_db = -1; /* 保障不选中任意数据库 */
    server.aof_flush_postponed_start = 0;
    server.aof_rewrite_incremental_fsync = REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
    server.pidfile = zstrdup(REDIS_DEFAULT_PID_FILE);
    server.rdb_filename = zstrdup(REDIS_DEFAULT_RDB_FILENAME);
    server.aof_filename = zstrdup(REDIS_DEFAULT_AOF_FILENAME);
    server.requirepass = NULL;
    server.rdb_compression = REDIS_DEFAULT_RDB_COMPRESSION;
    server.rdb_checksum = REDIS_DEFAULT_RDB_CHECKSUM;
    server.stop_writes_on_bgsave_err = REDIS_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
    server.activerehashing = REDIS_DEFAULT_ACTIVE_REHASHING;
    server.notify_keyspace_events = 0;
    server.maxclients = REDIS_MAX_CLIENTS;
    server.bpop_blocked_clients = 0;
    server.maxmemory = REDIS_DEFAULT_MAXMEMORY;
    server.maxmemory_policy = REDIS_DEFAULT_MAXMEMORY_POLICY;
    server.maxmemory_samples = REDIS_DEFAULT_MAXMEMORY_SAMPLES;
    server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
    server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
    server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
    server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
    server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
    server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
    server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
    server.hll_sparse_max_bytes = REDIS_DEFAULT_HLL_SPARSE_MAX_BYTES;
    server.shutdown_asap = 0;
    server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
    server.repl_timeout = REDIS_REPL_TIMEOUT;
    server.repl_min_slaves_to_write = REDIS_DEFAULT_MIN_SLAVES_TO_WRITE;
    server.repl_min_slaves_max_lag = REDIS_DEFAULT_MIN_SLAVES_MAX_LAG;
    server.cluster_enabled = 0;
    server.cluster_node_timeout = REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT;
    server.cluster_migration_barrier = REDIS_CLUSTER_DEFAULT_MIGRATION_BARRIER;
    server.cluster_configfile = zstrdup(REDIS_DEFAULT_CLUSTER_CONFIG_FILE);
    server.lua_caller = NULL;
    server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
    server.lua_client = NULL;
    server.lua_timedout = 0;
    server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
    server.loading_process_events_interval_bytes = (1024*1024*2);

    // 初始化 LRU 工夫
    server.lruclock = getLRUClock();

    // 初始化并设置保留条件
    resetServerSaveParams();

    appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
    appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
    appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */

    // 初始化和复制相干的状态
    server.masterauth = NULL;
    server.masterhost = NULL;
    server.masterport = 6379;
    server.master = NULL;
    server.cached_master = NULL;
    server.repl_master_initial_offset = -1;
    server.repl_state = REDIS_REPL_NONE;
    server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
    server.repl_serve_stale_data = REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA;
    server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
    server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
    server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
    server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
    server.master_repl_offset = 0;

    // 初始化 PSYNC 命令所应用的 backlog
    server.repl_backlog = NULL;
    server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE;
    server.repl_backlog_histlen = 0;
    server.repl_backlog_idx = 0;
    server.repl_backlog_off = 0;
    server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
    server.repl_no_slaves_since = time(NULL);

    // 设置客户端的输入缓冲区限度
    for (j = 0; j < REDIS_CLIENT_LIMIT_NUM_CLASSES; j++)
        server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];

    // 初始化浮点常量
    R_Zero = 0.0;
    R_PosInf = 1.0/R_Zero;
    R_NegInf = -1.0/R_Zero;
    R_Nan = R_Zero/R_Zero;

    // 初始化命令表
    // 在这里初始化是因为接下来读取 .conf 文件时可能会用到这些命令
    server.commands = dictCreate(&commandTableDictType,NULL);
    server.orig_commands = dictCreate(&commandTableDictType,NULL);
    populateCommandTable();
    server.delCommand = lookupCommandByCString("del");
    server.multiCommand = lookupCommandByCString("multi");
    server.lpushCommand = lookupCommandByCString("lpush");
    server.lpopCommand = lookupCommandByCString("lpop");
    server.rpopCommand = lookupCommandByCString("rpop");
    
    // 初始化慢查问日志
    server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
    server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN;

    // 初始化调试项
    server.assert_failed = "<no assertion failed>";
    server.assert_file = "<no file>";
    server.assert_line = 0;
    server.bug_report_start = 0;
    server.watchdog_period = 0;
}

次要包含以下几个方面

  • 网络监听相干,如绑定地址,TCP 端口等
  • 虚拟内存相干,如 swap 文件、page 大小等
  • 保留机制,多长时间内有多少次更新才进行保留
  • 复制相干,如是否是 slave,master 地址、端口
  • Hash 相干设置
  • 初始化命令表

加载配置文件

下面加载的能够设想成是一个默认配置文件,若 初始化时候,指定了配置文件,则会将其中一些字段进行批改config.c/loadServerConfig

void loadServerConfig(char *filename, char *options) {sds config = sdsempty();
    char buf[REDIS_CONFIGLINE_MAX+1];

    // 载入文件内容
    if (filename) {
        FILE *fp;

        if (filename[0] == '-' && filename[1] == '\0') {fp = stdin;} else {if ((fp = fopen(filename,"r")) == NULL) {
                redisLog(REDIS_WARNING,
                    "Fatal error, can't open config file '%s'", filename);
                exit(1);
            }
        }
        while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL)
            config = sdscat(config,buf);
        if (fp != stdin) fclose(fp);
    }

    // 追加 options 字符串到内容的开端
    if (options) {config = sdscat(config,"\n");
        config = sdscat(config,options);
    }

    // 依据字符串内容,设置服务器配置
    loadServerConfigFromString(config);

    sdsfree(config);
}

设置为守护过程

代码如下

void daemonize(void) {
    int fd;

    if (fork() != 0) exit(0); /* 父过程退出 */
    setsid(); /* 创立新会话 */

    /* 将输入定位到 /dev/null */
    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
        if (fd > STDERR_FILENO) close(fd);
    }
}

初始化服务器initServer

代码如下

void initServer() {
    int j;

    // 设置信号处理函数
    // 因为是守护过程,所以没有管制终端,屏蔽 SIGHUP
    signal(SIGHUP, SIG_IGN); 
    // SIGPIPE 是写管道发现读过程终止时产生的信号,redis 是服务器,会遇到各种 client,所以须要疏忽
    signal(SIGPIPE, SIG_IGN);
    setupSignalHandlers();

    // 设置 syslog
    if (server.syslog_enabled) {
        openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
            server.syslog_facility);
    }

    // 初始化并创立数据结构
    server.current_client = NULL;
    server.clients = listCreate();
    server.clients_to_close = listCreate();
    server.slaves = listCreate();
    server.monitors = listCreate();
    server.slaveseldb = -1; 
    server.unblocked_clients = listCreate();
    server.ready_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.clients_paused = 0;

    // 创立共享对象
    createSharedObjects();
    adjustOpenFilesLimit();
    server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);

    // 关上 TCP 监听端口,用于期待客户端的命令申请
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);

    // 关上 UNIX 本地端口
    if (server.unixsocket != NULL) {unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
    }

    /* Abort if there are no listening sockets at all. */
    if (server.ipfd_count == 0 && server.sofd < 0) {redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
        exit(1);
    }

    // 创立并初始化数据库构造
    for (j = 0; j < server.dbnum; j++) {server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].eviction_pool = evictionPoolAlloc();
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
    }

    // 创立 PUBSUB 相干构造
    server.pubsub_channels = dictCreate(&keylistDictType,NULL);
    server.pubsub_patterns = listCreate();
    listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
    listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);

    server.cronloops = 0;
    server.rdb_child_pid = -1;
    server.aof_child_pid = -1;
    aofRewriteBufferReset();
    server.aof_buf = sdsempty();
    server.lastsave = time(NULL); /* At startup we consider the DB saved. */
    server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
    server.rdb_save_time_last = -1;
    server.rdb_save_time_start = -1;
    server.dirty = 0;
    resetServerStats();
    /* A few stats we don't want to reset: server startup time, and peak mem. */
    server.stat_starttime = time(NULL);
    server.stat_peak_memory = 0;
    server.resident_set_size = 0;
    server.lastbgsave_status = REDIS_OK;
    server.aof_last_write_status = REDIS_OK;
    server.aof_last_write_errno = 0;
    server.repl_good_slaves_count = 0;
    updateCachedTime();

    /* Create the serverCron() time event, that's our main way to process
     * background operations. */
    // 为 serverCron() 创立工夫事件
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {redisPanic("Can't create the serverCron time event.");
        exit(1);
    }

    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    // 为 TCP 连贯关联连贯应答(accept)处理器
    // 用于承受并应答客户端的 connect() 调用
    for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic("Unrecoverable error creating server.ipfd file event.");
            }
    }

    // 为本地套接字关联应答处理器
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

    /* Open the AOF file if needed. */
    // 如果 AOF 长久化性能曾经关上,那么关上或创立一个 AOF 文件
    if (server.aof_state == REDIS_AOF_ON) {
        server.aof_fd = open(server.aof_filename,
                               O_WRONLY|O_APPEND|O_CREAT,0644);
        if (server.aof_fd == -1) {
            redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
                strerror(errno));
            exit(1);
        }
    }

    // 对于 32 位实例来说,默认将最大可用内存限度在 3 GB
    if (server.arch_bits == 32 && server.maxmemory == 0) {redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with'noeviction'policy now.");
        server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
        server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
    }

    // 如果服务器以 cluster 模式关上,那么初始化 cluster
    if (server.cluster_enabled) clusterInit();

    // 初始化复制性能无关的脚本缓存
    replicationScriptCacheInit();

    // 初始化脚本零碎
    scriptingInit();

    // 初始化慢查问性能
    slowlogInit();

    // 初始化 BIO 零碎
    bioInit();}

下面大多数正文曾经对代码进行解说,上面对 slowlogInit 进行独自解说

/*
 * 初始化服务器慢查问性能。*
 * 这个函数只应该在服务器启动时执行一次。*/
void slowlogInit(void) {
    // 保留日志的链表,FIFO 程序
    server.slowlog = listCreate();
    // 日志数量计数器
    server.slowlog_entry_id = 0;
    // 日志链表的释构函数
    listSetFreeMethod(server.slowlog,slowlogFreeEntry);
}

/*
 * 慢查问日志
 */
typedef struct slowlogEntry {
    // 命令与命令参数
    robj **argv;
    // 命令与命令参数的数量
    int argc;
    // 惟一标识符
    long long id;      
    // 执行命令耗费的工夫,以微秒为单位
    // 正文里说的 nanoseconds 是谬误的
    long long duration; 
    // 命令执行时的工夫,格局为 UNIX 工夫戳
    time_t time;       
} slowlogEntry;

其中还有一个函数bioInit,redis 的 BIO 零碎在 redis3.0 版本次要做两件事件:AOF 长久化和敞开文件,能够将 BIO 零碎设想成上面:创立一个队列,而后创立一些线程,来了一个工作就往队列外面增加工作,线程去工作队列外面取工作进去执行

因为在 redis3.0 中只须要做两件事件,所以工作的构造体代码如下

/* 
 * 示意后台任务的数据结构
 *
 * 这个构造只由 API 应用,不会被裸露给内部。*/
struct bio_job {

    // 工作创立时的工夫
    time_t time; 

    /* 
     * 工作的参数。参数多于三个时,能够传递数组或者构造 arg1 个别是文件描述符
     */
    void *arg1, *arg2, *arg3;
};
  • 工作初始化

首先是相干动态变量的初始化

#define REDIS_BIO_NUM_OPS       2 // 2 个工作
// 工作线程,斥互和条件变量
static pthread_t bio_threads[REDIS_BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
// 寄存工作的队列
static list *bio_jobs[REDIS_BIO_NUM_OPS];
// 初始化变量
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {pthread_mutex_init(&bio_mutex[j],NULL);
    pthread_cond_init(&bio_condvar[j],NULL);
    bio_jobs[j] = listCreate();
    bio_pending[j] = 0;
}
// 创立线程
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {void *arg = (void*)(unsigned long) j;
    // 这里的函数参数是 arg = j,也就是每个线程传入一个编号 j,0 代表敞开文件,1 代表 aof 初始化
    if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
        exit(1);
    }
    bio_threads[j] = thread;
}

// bioProcessBackgroundJobs 函数就是后盾执行工作的函数
void *bioProcessBackgroundJobs(void *arg) {
    ...
    if (type == REDIS_BIO_CLOSE_FILE) {close((long)job->arg1);
    } else if (type == REDIS_BIO_AOF_FSYNC) {aof_fsync((long)job->arg1);
    } else {redisPanic("Wrong job type in bioProcessBackgroundJobs().");
    }
    ...
}

事件处理器循环aeMain

这个循环次要就是做两件事件,beforeSleepaeProcessEvents

// 运行事件处理器,始终到服务器敞开为止
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
// 服务器敞开,进行事件循环
aeDeleteEventLoop(server.el);

/*
 * 设置处理事件前须要被执行的函数
 */
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {eventLoop->beforesleep = beforesleep;}
/*
 * 事件处理器的主循环
 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有须要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 开始处理事件 其实就是一个事件调度函数,包含解决工夫事件和文件事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}
/*
 * 删除事件处理器
 */
void aeDeleteEventLoop(aeEventLoop *eventLoop) {aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);
    zfree(eventLoop);
}

上面独自对这两个函数进行解说

  • beforeSleep

首先先看代码

// 每次处理事件之前执行
void beforeSleep(struct aeEventLoop *eventLoop) {REDIS_NOTUSED(eventLoop);

    // 执行一次疾速的被动过期查看
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

    /* 如果在之前的事件循环迭代中至多有一个客户端阻塞,则向所有 slave 发送 ACK 申请 */
    if (server.get_ack_from_slaves) {robj *argv[3];

        argv[0] = createStringObject("REPLCONF",8);
        argv[1] = createStringObject("GETACK",6);
        argv[2] = createStringObject("*",1); /* Not used argument. */
        replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
        decrRefCount(argv[0]);
        decrRefCount(argv[1]);
        decrRefCount(argv[2]);
        server.get_ack_from_slaves = 0;
    }

    /* 解除阻塞期待同步复制的所有客户端 */
    if (listLength(server.clients_waiting_acks))
        processClientsWaitingReplicas();

    /* 尝试为刚刚解除阻塞的客户端解决挂起的命令 */
    if (listLength(server.unblocked_clients))
        processUnblockedClients();

    // 将 AOF 缓冲区的内容写入到 AOF 文件
    // void flushAppendOnlyFile(int force) force 参数表明是否强制刷新,当为 0 时候,若后盾有 fsync 在执行,则提早
    flushAppendOnlyFile(0);

    // 在进入下个事件循环前,执行一些集群收尾工作
    if (server.cluster_enabled) clusterBeforeSleep();}
  • aeProcessEvents

redis 中的事件次要分为两种事件:文件事件(和其余客户端连贯产生的事件)和工夫事件(定时工夫产生的事件)

redis 解决工夫事件的函数会在服务器运行期间,每隔一段事件运行,解决工夫事件,每个事件以链表模式挂在一起,每次解决时候,都是遍历该链表

/* Process time events
 *
 * 解决所有已达到的工夫事件
 */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
    // 通过重置事件的运行工夫,// 避免因工夫交叉(skew)而造成的事件处理凌乱
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    // 更新最初一次解决工夫事件的工夫
    eventLoop->lastTime = now;

    // 遍历链表
    // 执行那些曾经达到的事件
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        // 跳过有效事件
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        
        // 获取以后工夫
        aeGetTime(&now_sec, &now_ms);

        // 如果以后工夫等于或等于事件的执行工夫,那么阐明事件已达到,执行这个事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            // 执行事件处理器,并获取返回值
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* After an event is processed our time event list may
             * no longer be the same, so we restart from head.
             * Still we make sure to don't process events registered
             * by event handlers itself in order to don't loop forever.
             * To do so we saved the max ID we want to handle.
             *
             * FUTURE OPTIMIZATIONS:
             * Note that this is NOT great algorithmically. Redis uses
             * a single time event so it's not a problem but the right
             * way to do this is to add the new elements on head, and
             * to flag deleted elements in a special way for later
             * deletion (putting references to the nodes to delete into
             * another linked list). */

            // 记录是否有须要循环执行这个事件工夫
            if (retval != AE_NOMORE) {
                // 是的,retval 毫秒之后继续执行这个工夫事件
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // 不,将这个事件删除
                aeDeleteTimeEvent(eventLoop, id);
            }

            // 因为执行事件之后,事件列表可能曾经被扭转了
            // 因而须要将 te 放回表头,持续开始执行事件
            te = eventLoop->timeEventHead;
        } else {te = te->next;}
    }
    return processed;
}

上面的代码就是 redis 的事件调度函数

/* 
 * 事件调度函数
 * 解决所有已达到的工夫事件,以及所有已就绪的文件事件。* 如果不传入非凡 flags 的话,那么函数睡眠直到文件事件就绪,* 或者下个工夫事件达到(如果有的话)。*
 * 如果 flags 为 0,那么函数不作动作,间接返回。* 如果 flags 蕴含 AE_ALL_EVENTS,所有类型的事件都会被解决。* 如果 flags 蕴含 AE_FILE_EVENTS,那么解决文件事件。* 如果 flags 蕴含 AE_TIME_EVENTS,那么解决工夫事件。* 如果 flags 蕴含 AE_DONT_WAIT,那么函数在解决完所有不许阻塞的事件之后,即刻返回。* 函数的返回值为已处理事件的数量
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 获取最近的工夫事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果工夫事件存在的话
            // 那么依据最近可执行工夫事件和当初工夫的时间差来决定文件事件的阻塞工夫
            long now_sec, now_ms;

            // 计算距今最近的工夫事件还要多久能力达到
            // 并将该工夫距保留在 tv 构造中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            // 时间差小于 0,阐明事件曾经能够执行了,将秒和毫秒设为 0(不阻塞)if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            
            // 执行到这一步,阐明没有工夫事件
            // 那么依据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的工夫长度

            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 设置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件能够阻塞直到有事件达到为止
                tvp = NULL; /* wait forever */
            }
        }

        // 解决文件事件,阻塞工夫由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读 / 写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }

    // 执行工夫事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed;
}

/*
 * 获取可执行事件
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 等待时间
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

    // 有至多一个事件就绪?if (retval > 0) {
        int j;
        // 为已就绪事件设置相应的模式
        // 并退出到 eventLoop 的 fired 数组中
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    
    // 返回已就绪事件个数
    return numevents;
}

由下面代码可知,因为文件事件是随机呈现的,如果期待并解决完一次文件事件之后,仍未有任何工夫事件达到,那么服务器将再次期待并解决文件事件。随着文件事件的一直执行,工夫会逐步向工夫事件所设置的达到工夫迫近,并最终来到达到工夫,这时服务器就能够开始解决达到的工夫事件了。

本人的网址:www.shicoder.top
欢送加群聊天 452380935
本文由博客一文多发平台 OpenWrite 公布!

正文完
 0