Redis Cluster Gossip 协定

明天来讲一下 Reids Cluster 的 Gossip 协定和集群操作,文章的思维导图如下所示。

集群模式和 Gossip 简介

对于数据存储畛域,当数据量或者申请流量到肯定水平后,就必然会引入分布式。比方 Redis,尽管其单机性能非常优良,然而因为下列起因时,也不得不引入集群。

  • 单机无奈保障高可用,须要引入多实例来提供高可用性
  • 单机可能提供高达 8W 左右的QPS,再高的QPS则须要引入多实例
  • 单机可能反对的数据量无限,解决更多的数据须要引入多实例;
  • 单机所解决的网络流量曾经超过服务器的网卡的上限值,须要引入多实例来分流。

有集群,集群往往须要保护肯定的元数据,比方实例的ip地址,缓存分片的 slots 信息等,所以须要一套分布式机制来保护元数据的一致性。这类机制个别有两个模式:分散式和集中式

分散式机制将元数据存储在局部或者所有节点上,不同节点之间进行一直的通信来保护元数据的变更和一致性。Redis Cluster,Consul 等都是该模式。

而集中式是将集群元数据集中存储在内部节点或者中间件上,比方 zookeeper。旧版本的 kafka 和 storm 等都是应用该模式。

两种模式各有优劣,具体如下表所示:

模式长处毛病集中式数据更新及时,时效好,元数据的更新和读取,时效性十分好,一旦元数据呈现了变更,立刻就更新到集中式的内部节点中,其余节点读取的时候立刻就能够感知到;较大数据更新压力,更新压力全副集中在内部节点,作为单点影响整个零碎分散式数据更新压力扩散,元数据的更新比拟扩散,不是集中某一个节点,更新申请比拟扩散,而且有不同节点解决,有肯定的延时,升高了并发压力数据更新提早,可能导致集群的感知有肯定的滞后

分散式的元数据模式有多种可选的算法进行元数据的同步,比如说 Paxos、Raft 和 Gossip。Paxos 和 Raft 等都须要全副节点或者大多数节点(超过一半)失常运行,整个集群能力稳固运行,而 Gossip 则不须要半数以上的节点运行。

Gossip 协定,顾名思义,就像风言风语一样,利用一种随机、带有传染性的形式,将信息流传到整个网络中,并在肯定工夫内,使得零碎内的所有节点数据统一。对你来说,把握这个协定不仅能很好地了解这种最罕用的,实现最终一致性的算法,也能在后续工作中得心应手地实现数据的最终一致性。

Gossip 协定又称 epidemic 协定(epidemic protocol),是基于流行病传播方式的节点或者过程之间信息替换的协定,在P2P网络和分布式系统中利用宽泛,它的方法论也特地简略:

在一个处于有界网络的集群里,如果每个节点都随机与其余节点替换特定信息,通过足够长的工夫后,集群各个节点对该份信息的认知终将收敛到统一。

这里的“特定信息”个别就是指集群状态、各节点的状态以及其余元数据等。Gossip协定是完全符合 BASE 准则,能够用在任何要求最终一致性的畛域,比方分布式存储和注册核心。另外,它能够很不便地实现弹性集群,容许节点随时高低线,提供快捷的失败检测和动静负载平衡等。

此外,Gossip 协定的最大的益处是,即便集群节点的数量减少,每个节点的负载也不会减少很多,简直是恒定的。这就容许 Redis Cluster 或者 Consul 集群治理的节点规模能横向扩大到数千个。

Redis Cluster 的 Gossip 通信机制

Redis Cluster 是在 3.0 版本引入集群性能。为了让让集群中的每个实例都晓得其余所有实例的状态信息,Redis 集群规定各个实例之间依照 Gossip 协定来通信传递信息。

上图展现了主从架构的 Redis Cluster 示意图,其中实线示意节点间的主从复制关系,而虚线示意各个节点之间的 Gossip 通信。

Redis Cluster 中的每个节点都保护一份本人视角下的以后整个集群的状态,次要包含:

  1. 以后集群状态
  2. 集群中各节点所负责的 slots信息,及其migrate状态
  3. 集群中各节点的master-slave状态
  4. 集群中各节点的存活状态及狐疑Fail状态

也就是说下面的信息,就是集群中Node互相八卦流传风言风语的内容主题,而且比拟全面,既有本人的更有他人的,这么一来大家都互相传,最终信息就全面而且统一了。

Redis Cluster 的节点之间会互相发送多种音讯,较为重要的如下所示:

  • MEET:通过「cluster meet ip port」命令,已有集群的节点会向新的节点发送邀请,退出现有集群,而后新节点就会开始与其余节点进行通信;
  • PING:节点依照配置的工夫距离向集群中其余节点发送 ping 音讯,音讯中带有本人的状态,还有本人保护的集群元数据,和局部其余节点的元数据;
  • PONG: 节点用于回应 PING 和 MEET 的音讯,构造和 PING 音讯相似,也蕴含本人的状态和其余信息,也能够用于信息播送和更新;
  • FAIL: 节点 PING 不通某节点后,会向集群所有节点播送该节点挂掉的音讯。其余节点收到音讯后标记已下线。

Redis 的源码中 cluster.h 文件定义了全副的音讯类型,代码为 redis 4.0版本。

// 留神,PING 、 PONG 和 MEET 实际上是同一种音讯。// PONG 是对 PING 的回复,它的理论格局也为 PING 音讯,// 而 MEET 则是一种非凡的 PING 音讯,用于强制音讯的接收者将音讯的发送者增加到集群中(如果节点尚未在节点列表中的话)#define CLUSTERMSG_TYPE_PING 0          /* Ping 音讯 */#define CLUSTERMSG_TYPE_PONG 1          /* Pong 用于回复Ping */#define CLUSTERMSG_TYPE_MEET 2          /* Meet 申请将某个节点增加到集群中 */#define CLUSTERMSG_TYPE_FAIL 3          /* Fail 将某个节点标记为 FAIL */#define CLUSTERMSG_TYPE_PUBLISH 4       /* 通过公布与订阅性能播送音讯 */#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* 申请进行故障转移操作,要求音讯的接收者通过投票来反对音讯的发送者 */#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* 音讯的接收者批准向音讯的发送者投票 */#define CLUSTERMSG_TYPE_UPDATE 7        /* slots 曾经发生变化,音讯发送者要求音讯接收者进行相应的更新 */#define CLUSTERMSG_TYPE_MFSTART 8       /* 为了进行手动故障转移,暂停各个客户端 */#define CLUSTERMSG_TYPE_COUNT 9         /* 音讯总数 */复制代码

通过上述这些音讯,集群中的每一个实例都能取得其它所有实例的状态信息。这样一来,即便有新节点退出、节点故障、Slot 变更等事件产生,实例间也能够通过 PING、PONG 音讯的传递,实现集群状态在每个实例上的同步。上面,咱们顺次来看看几种常见的场景。

定时 PING/PONG 音讯

Redis Cluster 中的节点都会定时地向其余节点发送 PING 音讯,来替换各个节点状态信息,查看各个节点状态,包含在线状态、疑似下线状态 PFAIL 和已下线状态 FAIL。

Redis 集群的定时 PING/PONG 的工作原理能够概括成两点:

  • 一是,每个实例之间会依照肯定的频率,从集群中随机筛选一些实例,把 PING 音讯发送给筛选进去的实例,用来检测这些实例是否在线,并替换彼此的状态信息。PING 音讯中封装了发送音讯的实例本身的状态信息、局部其它实例的状态信息,以及 Slot 映射表。
  • 二是,一个实例在接管到 PING 音讯后,会给发送 PING 音讯的实例,发送一个 PONG 音讯。PONG 音讯蕴含的内容和 PING 音讯一样。

下图显示了两个实例间进行 PING、PONG 消息传递的状况,其中实例一为发送节点,实例二是接管节点

新节点上线

Redis Cluster 退出新节点时,客户端须要执行 CLUSTER MEET 命令,如下图所示。

节点一在执行 CLUSTER MEET 命令时会首先为新节点创立一个 clusterNode 数据,并将其增加到本人保护的 clusterState 的 nodes 字典中。无关 clusterState 和 clusterNode 关系,咱们在最初一节会有详尽的示意图和源码来解说。

而后节点一会依据据 CLUSTER MEET 命令中的 IP 地址和端口号,向新节点发送一条 MEET 音讯。新节点接管到节点一发送的MEET音讯后,新节点也会为节点一创立一个 clusterNode 构造,并将该构造增加到本人保护的 clusterState 的 nodes 字典中。

接着,新节点向节点一返回一条PONG音讯。节点一接管到节点B返回的PONG音讯后,得悉新节点曾经胜利的接管了本人发送的MEET音讯。

最初,节点一还会向新节点发送一条 PING 音讯。新节点接管到该条 PING 音讯后,能够晓得节点A曾经胜利的接管到了本人返回的P ONG音讯,从而实现了新节点接入的握手操作。

MEET 操作胜利之后,节点一会通过稍早时讲的定时 PING 机制将新节点的信息发送给集群中的其余节点,让其余节点也与新节点进行握手,最终,通过一段时间后,新节点会被集群中的所有节点意识。

节点疑似下线和真正下线

Redis Cluster 中的节点会定期检查曾经发送 PING 音讯的接管方节点是否在规定工夫 ( cluster-node-timeout ) 内返回了 PONG 音讯,如果没有则会将其标记为疑似下线状态,也就是 PFAIL 状态,如下图所示。

而后,节点一会通过 PING 音讯,将节点二处于疑似下线状态的信息传递给其余节点,例如节点三。节点三接管到节点一的 PING 音讯得悉节点二进入 PFAIL 状态后,会在本人保护的 clusterState 的 nodes 字典中找到节点二所对应的 clusterNode 构造,并将主节点一的下线报告增加到 clusterNode 构造的 fail_reports 链表中。

随着工夫的推移,如果节点十 (举个例子) 也因为 PONG 超时而认为节点二疑似下线了,并且发现自己保护的节点二的 clusterNode 的 fail_reports 中有半数以上的主节点数量的未过期的将节点二标记为 PFAIL 状态报告日志,那么节点十将会把节点二将被标记为已下线 FAIL 状态,并且节点十会立即向集群其余节点播送主节点二曾经下线的 FAIL 音讯,所有收到 FAIL 音讯的节点都会立刻将节点二状态标记为已下线。如下图所示。

须要留神的是,报告疑似下线记录是由时效性的,如果超过 cluster-node-timeout *2 的工夫,这个报告就会被疏忽掉,让节点二又复原成失常状态。

Redis Cluster 通信源码实现

综上,咱们理解了 Redis Cluster 在定时 PING/PONG、新节点上线、节点疑似下线和真正下线等环节的原理和操作流程,上面咱们来真正看一下 Redis 在这些环节的源码实现和具体操作。

波及的数据结构体

首先,咱们先来解说一下其中波及的数据结构,也就是上文提到的 ClusterNode 等构造。

每个节点都会保护一个 clusterState 构造,示意以后集群的整体状态,它的定义如下所示。

typedef struct clusterState {   clusterNode *myself;  /* 以后节点的clusterNode信息 */   ....   dict *nodes;          /* name到clusterNode的字典 */   ....   clusterNode *slots[CLUSTER_SLOTS]; /* slot 和节点的对应关系*/   ....} clusterState;复制代码

它有三个比拟要害的字段,具体示意图如下所示:

  • myself 字段,是一个 clusterNode 构造,用来记录本人的状态;
  • nodes 字典,记录一个 name 到 clusterNode 构造的映射,以此来记录其余节点的状态;
  • slot 数组,记录slot 对应的节点 clusterNode构造。

clusterNode 构造保留了一个节点的以后状态,比方节点的创立工夫、节点的名字、节点 以后的配置纪元、节点的IP地址和端口号等等。除此之外,clusterNode构造的 link 属性是一个clusterLink构造,该构造保留了连贯节点所需的无关信息,比方套接字描述符,输出缓冲区和输入缓冲区。clusterNode 还有一个 fail_report 的列表,用来记录疑似下线报告。具体定义如下所示。

typedef struct clusterNode {    mstime_t ctime; /* 创立节点的工夫 */    char name[CLUSTER_NAMELEN]; /* 节点的名字 */    int flags;      /* 节点标识,标记节点角色或者状态,比方主节点从节点或者在线和下线 */    uint64_t configEpoch; /* 以后节点已知的集群对立epoch */    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */    int numslots;   /* Number of slots handled by this node */    int numslaves;  /* Number of slave nodes, if this is a master */    struct clusterNode **slaves; /* pointers to slave nodes */    struct clusterNode *slaveof; /* pointer to the master node. Note that it                                    may be NULL even if the node is a slave                                    if we don't have the master node in our                                    tables. */    mstime_t ping_sent;      /* 以后节点最初一次向该节点发送 PING 音讯的工夫 */    mstime_t pong_received;  /* 以后节点最初一次收到该节点 PONG 音讯的工夫 */    mstime_t fail_time;      /* FAIL 标记位被设置的工夫 */    mstime_t voted_time;     /* Last time we voted for a slave of this master */    mstime_t repl_offset_time;  /* Unix time we received offset for this node */    mstime_t orphaned_time;     /* Starting time of orphaned master condition */    long long repl_offset;      /* 以后节点的repl便宜 */    char ip[NET_IP_STR_LEN];  /* 节点的IP 地址 */    int port;                   /* 端口 */    int cport;                  /* 通信端口,个别是端口+1000 */    clusterLink *link;          /* 和该节点的 tcp 连贯 */    list *fail_reports;         /* 下线记录列表 */} clusterNode;复制代码

clusterNodeFailReport 是记录节点下线报告的构造体, node 是报告节点的信息,而 time 则代表着报告工夫。

typedef struct clusterNodeFailReport {    struct clusterNode *node;  /* 报告以后节点曾经下线的节点 */    mstime_t time;             /* 报告工夫 */} clusterNodeFailReport;复制代码

音讯构造体

理解了 Reids 节点保护的数据结构体后,咱们再来看节点进行通信的音讯构造体。 通信音讯最外侧的构造体为 clusterMsg,它包含了很多音讯记录信息,包含 RCmb 标记位,音讯总长度,音讯协定版本,音讯类型;它还包含了发送该音讯节点的记录信息,比方节点名称,节点负责的slot信息,节点ip和端口等;最初它蕴含了一个 clusterMsgData 来携带具体类型的音讯。

typedef struct {    char sig[4];        /* 标记位,"RCmb" (Redis Cluster message bus). */    uint32_t totlen;    /* 音讯总长度 */    uint16_t ver;       /* 音讯协定版本 */    uint16_t port;      /* 端口 */    uint16_t type;      /* 音讯类型 */    uint16_t count;     /*  */    uint64_t currentEpoch;  /* 示意本节点以后记录的整个集群的对立的epoch,用来决策选举投票等,与configEpoch不同的是:configEpoch示意的是master节点的惟一标记,currentEpoch是集群的惟一标记。 */    uint64_t configEpoch;   /* 每个master节点都有一个惟一的configEpoch做标记,如果和其余master节点抵触,会强制自增使本节点在集群中惟一 */    uint64_t offset;    /* 主从复制偏移相干信息,主节点和从节点含意不同 */    char sender[CLUSTER_NAMELEN]; /* 发送节点的名称 */    unsigned char myslots[CLUSTER_SLOTS/8]; /* 本节点负责的slots信息,16384/8个char数组,一共为16384bit */    char slaveof[CLUSTER_NAMELEN]; /* master信息,如果本节点是slave节点的话,协定带有master信息 */    char myip[NET_IP_STR_LEN];    /* IP */    char notused1[34];  /* 保留字段 */    uint16_t cport;      /* 集群的通信端口 */    uint16_t flags;      /* 本节点以后的状态,比方 CLUSTER_NODE_HANDSHAKE、CLUSTER_NODE_MEET */    unsigned char state; /* Cluster state from the POV of the sender */    unsigned char mflags[3]; /* 本条音讯的类型,目前只有两类:CLUSTERMSG_FLAG0_PAUSED、CLUSTERMSG_FLAG0_FORCEACK */    union clusterMsgData data;} clusterMsg;复制代码

clusterMsgData 是一个 union 构造体,它能够为 PING,MEET,PONG 或者 FAIL 等音讯体。其中当音讯为 PING、MEET 和 PONG 类型时,ping 字段是被赋值的,而是 FAIL 类型时,fail 字段是被赋值的。

// 留神这是 union 关键字union clusterMsgData {    /* PING, MEET 或者 PONG 音讯时,ping 字段被赋值 */    struct {        /* Array of N clusterMsgDataGossip structures */        clusterMsgDataGossip gossip[1];    } ping;    /*  FAIL 音讯时,fail 被赋值 */    struct {        clusterMsgDataFail about;    } fail;    // .... 省略 publish 和 update 音讯的字段};复制代码

clusterMsgDataGossip 是 PING、PONG 和 MEET 音讯的构造体,它会包含发送音讯节点保护的其余节点信息,也就是上文中 clusterState 中 nodes 字段蕴含的信息,具体代码如下所示,你也会发现二者的字段是相似的。

typedef struct {    /* 节点的名字,默认是随机的,MEET音讯发送并失去回复后,集群会为该节点设置正式的名称*/    char nodename[CLUSTER_NAMELEN];     uint32_t ping_sent; /* 发送节点最初一次给接管节点发送 PING 音讯的工夫戳,收到对应 PONG 回复后会被赋值为0 */    uint32_t pong_received; /* 发送节点最初一次收到接管节点发送 PONG 音讯的工夫戳 */    char ip[NET_IP_STR_LEN];  /* IP address last time it was seen */    uint16_t port;       /* IP*/           uint16_t cport;      /* 端口*/      uint16_t flags;      /* 标识*/     uint32_t notused1;   /* 对齐字符*/} clusterMsgDataGossip;typedef struct {    char nodename[CLUSTER_NAMELEN]; /* 下线节点的名字 */} clusterMsgDataFail;复制代码

看完了节点保护的数据结构体和发送的音讯构造体后,咱们就来看看 Redis 的具体行为源码了。

随机周期性发送PING音讯

Redis 的 clusterCron 函数会被定时调用,每被执行10次,就会筹备向随机的一个节点发送 PING 音讯。

它会先随机的选出 5 个节点,而后从中抉择最久没有与之通信的节点,调用 clusterSendPing 函数发送类型为 CLUSTERMSG_TYPE_PING 的音讯

// cluster.c 文件 // clusterCron() 每执行 10 次(至多距离一秒钟),就向一个随机节点发送 gossip 信息if (!(iteration % 10)) {    int j;    /* 随机 5 个节点,选出其中一个 */    for (j = 0; j < 5; j++) {        de = dictGetRandomKey(server.cluster->nodes);        clusterNode *this = dictGetVal(de);        /* 不要 PING 连贯断开的节点,也不要 PING 最近曾经 PING 过的节点 */        if (this->link == NULL || this->ping_sent != 0) continue;        if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))            continue;        /* 比照 pong_received 字段,选出更长时间未收到其 PONG 音讯的节点(示意良久没有承受到该节点的PONG音讯了) */        if (min_pong_node == NULL || min_pong > this->pong_received) {            min_pong_node = this;            min_pong = this->pong_received;        }    }    /* 向最久没有收到 PONG 回复的节点发送 PING 命令 */    if (min_pong_node) {        serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);        clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);    }}复制代码

clusterSendPing 函数的具体行为咱们后续再理解,因为该函数在其余环节也会常常用到

节点退出集群

当节点执行 CLUSTER MEET 命令后,会在本身给新节点保护一个 clusterNode 构造体,该构造体的 link 也就是TCP连贯字段是 null,示意是新节点尚未建设连贯。

clusterCron 函数中也会解决这些未建设连贯的新节点,调用 createClusterLink 创建连贯,而后调用 clusterSendPing 函数来发送 MEET 音讯

/* cluster.c clusterCron 函数局部,为未创立连贯的节点创立连贯 */if (node->link == NULL) {    int fd;    mstime_t old_ping_sent;    clusterLink *link;    /* 和该节点建设连贯 */    fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,        node->cport, NET_FIRST_BIND_ADDR);    /* .... fd 为-1时的异样解决 */    /* 建设 link */    link = createClusterLink(node);    link->fd = fd;    node->link = link;    aeCreateFileEvent(server.el,link->fd,AE_READABLE,            clusterReadHandler,link);    /* 向新连贯的节点发送 PING 命令,避免节点被识进入下线 */    /* 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 */    old_ping_sent = node->ping_sent;    clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?            CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);    /* .... */    /* 如果以后节点(发送者)没能收到 MEET 信息的回复,那么它将不再向指标节点发送命令。*/    /* 如果接管到回复的话,那么节点将不再处于 HANDSHAKE 状态,并持续向指标节点发送一般 PING 命令*/    node->flags &= ~CLUSTER_NODE_MEET;}复制代码

避免节点假超时及状态过期

避免节点假超时和标记疑似下线标记也是在 clusterCron 函数中,具体如下所示。它会查看以后所有的 nodes 节点列表,如果发现某个节点与本人的最初一个 PONG 通信工夫超过了预约的阈值的一半时,为了避免节点是假超时,会被动开释掉与之的 link 连贯,而后会被动向它发送一个 PING 音讯。

/* cluster.c clusterCron 函数局部,遍历节点来查看 fail 的节点*/while((de = dictNext(di)) != NULL) {    clusterNode *node = dictGetVal(de);    now = mstime(); /* Use an updated time at every iteration. */    mstime_t delay;    /* 如果等到 PONG 达到的工夫超过了 node timeout 一半的连贯 */    /* 因为只管节点仍然失常,但连贯可能曾经出问题了 */    if (node->link && /* is connected */        now - node->link->ctime >        server.cluster_node_timeout && /* 还未重连 */        node->ping_sent && /* 曾经发过ping音讯 */        node->pong_received < node->ping_sent && /* 还在期待pong音讯 */        /* 期待pong音讯超过了 timeout/2 */        now - node->ping_sent > server.cluster_node_timeout/2)    {        /* 开释连贯,下次 clusterCron() 会主动重连 */        freeClusterLink(node->link);    }    /* 如果目前没有在 PING 节点*/    /* 并且曾经有 node timeout 一半的工夫没有从节点那里收到 PONG 回复 */    /* 那么向节点发送一个 PING ,确保节点的信息不会太旧,有可能始终没有随机中 */    if (node->link &&        node->ping_sent == 0 &&        (now - node->pong_received) > server.cluster_node_timeout/2)    {        clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);        continue;    }    /* .... 解决failover和标记遗失下线 */}复制代码

解决failover和标记疑似下线

如果避免节点假超时解决后,节点仍旧未收到指标节点的 PONG 音讯,并且工夫曾经超过了 cluster_node_timeout,那么就将该节点标记为疑似下线状态。

/* 如果这是一个主节点,并且有一个从服务器申请进行手动故障转移,那么向从服务器发送 PING*/if (server.cluster->mf_end &&    nodeIsMaster(myself) &&    server.cluster->mf_slave == node &&    node->link){    clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);    continue;}/* 后续代码只在节点发送了 PING 命令的状况下执行*/if (node->ping_sent == 0) continue;/* 计算期待 PONG 回复的时长 */ delay = now - node->ping_sent;/* 期待 PONG 回复的时长超过了限度值,将指标节点标记为 PFAIL (疑似下线)*/if (delay > server.cluster_node_timeout) {    /* 超时了,标记为疑似下线 */    if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {        redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",            node->name);        // 关上疑似下线标记        node->flags |= REDIS_NODE_PFAIL;        update_state = 1;    }}复制代码

理论发送Gossip音讯

以下是后方屡次调用过的clusterSendPing()办法的源码,代码中有具体的正文,大家能够自行浏览。次要的操作就是将节点本身保护的 clusterState 转换为对应的音讯构造体,。

/* 向指定节点发送一条 MEET 、 PING 或者 PONG 音讯 */void clusterSendPing(clusterLink *link, int type) {    unsigned char *buf;    clusterMsg *hdr;    int gossipcount = 0; /* Number of gossip sections added so far. */    int wanted; /* Number of gossip sections we want to append if possible. */    int totlen; /* Total packet length. */    // freshnodes 是用于发送 gossip 信息的计数器    // 每次发送一条信息时,程序将 freshnodes 的值减一    // 当 freshnodes 的数值小于等于 0 时,程序进行发送 gossip 信息    // freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2     // 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点)    // 另一个是承受 gossip 信息的节点    int freshnodes = dictSize(server.cluster->nodes)-2;        /* 计算要携带多少节点的信息,起码3个,最多 1/10 集群总节点数量*/    wanted = floor(dictSize(server.cluster->nodes)/10);    if (wanted < 3) wanted = 3;    if (wanted > freshnodes) wanted = freshnodes;    /* .... 省略 totlen 的计算等*/    /* 如果发送的信息是 PING ,那么更新最初一次发送 PING 命令的工夫戳 */    if (link->node && type == CLUSTERMSG_TYPE_PING)        link->node->ping_sent = mstime();    /* 将以后节点的信息(比方名字、地址、端口号、负责解决的槽)记录到音讯外面 */    clusterBuildMessageHdr(hdr,type);    /* Populate the gossip fields */    int maxiterations = wanted*3;    /* 每个节点有 freshnodes 次发送 gossip 信息的机会       每次向指标节点发送 2 个被选中节点的 gossip 信息(gossipcount 计数) */    while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {        /* 从 nodes 字典中随机选出一个节点(被选中节点) */        dictEntry *de = dictGetRandomKey(server.cluster->nodes);        clusterNode *this = dictGetVal(de);        /* 以下节点不能作为被选中节点:         * Myself:节点自身。         * PFAIL状态的节点         * 处于 HANDSHAKE 状态的节点。         * 带有 NOADDR 标识的节点         * 因为不解决任何 Slot 而被断开连接的节点          */        if (this == myself) continue;        if (this->flags & CLUSTER_NODE_PFAIL) continue;        if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||            (this->link == NULL && this->numslots == 0))        {            freshnodes--; /* Tecnically not correct, but saves CPU. */            continue;        }        // 查看被选中节点是否曾经在 hdr->data.ping.gossip 数组外面        // 如果是的话阐明这个节点之前曾经被选中了        // 不要再选中它(否则就会呈现反复)        if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;        /* 这个被选中节点无效,计数器减一 */        clusterSetGossipEntry(hdr,gossipcount,this);        freshnodes--;        gossipcount++;    }    /* .... 如果有 PFAIL 节点,最初增加 */    /* 计算信息长度 */    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);    totlen += (sizeof(clusterMsgDataGossip)*gossipcount);    /* 将被选中节点的数量(gossip 信息中蕴含了多少个节点的信息)记录在 count 属性外面*/    hdr->count = htons(gossipcount);    /* 将信息的长度记录到信息外面 */    hdr->totlen = htonl(totlen);    /* 发送网络申请 */    clusterSendMessage(link,buf,totlen);    zfree(buf);}void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {    clusterMsgDataGossip *gossip;    /* 指向 gossip 信息结构 */    gossip = &(hdr->data.ping.gossip[i]);    /* 将被选中节点的名字记录到 gossip 信息 */       memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);    /* 将被选中节点的 PING 命令发送工夫戳记录到 gossip 信息 */    gossip->ping_sent = htonl(n->ping_sent/1000);    /* 将被选中节点的 PONG 命令回复的工夫戳记录到 gossip 信息 */    gossip->pong_received = htonl(n->pong_received/1000);    /* 将被选中节点的 IP 记录到 gossip 信息 */    memcpy(gossip->ip,n->ip,sizeof(n->ip));    /* 将被选中节点的端口号记录到 gossip 信息 */    gossip->port = htons(n->port);    gossip->cport = htons(n->cport);    /* 将被选中节点的标识值记录到 gossip 信息 */    gossip->flags = htons(n->flags);    gossip->notused1 = 0;}复制代码

上面是 clusterBuildMessageHdr 函数,它次要负责填充音讯构造体中的根底信息和以后节点的状态信息。

/* 构建音讯的 header */void clusterBuildMessageHdr(clusterMsg *hdr, int type) {    int totlen = 0;    uint64_t offset;    clusterNode *master;    /* 如果以后节点是salve,则master为其主节点,如果以后节点是master节点,则master就是以后节点 */    master = (nodeIsSlave(myself) && myself->slaveof) ?              myself->slaveof : myself;    memset(hdr,0,sizeof(*hdr));    /* 初始化协定版本、标识、及类型, */    hdr->ver = htons(CLUSTER_PROTO_VER);    hdr->sig[0] = 'R';    hdr->sig[1] = 'C';    hdr->sig[2] = 'm';    hdr->sig[3] = 'b';    hdr->type = htons(type);    /* 音讯头设置以后节点id */    memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);    /* 音讯头设置以后节点ip */    memset(hdr->myip,0,NET_IP_STR_LEN);    if (server.cluster_announce_ip) {        strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);        hdr->myip[NET_IP_STR_LEN-1] = '0';    }    /* 根底端口及集群内节点通信端口 */    int announced_port = server.cluster_announce_port ?                         server.cluster_announce_port : server.port;    int announced_cport = server.cluster_announce_bus_port ?                          server.cluster_announce_bus_port :                          (server.port + CLUSTER_PORT_INCR);    /* 设置以后节点的槽信息 */    memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));    memset(hdr->slaveof,0,CLUSTER_NAMELEN);    if (myself->slaveof != NULL)        memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);    hdr->port = htons(announced_port);    hdr->cport = htons(announced_cport);    hdr->flags = htons(myself->flags);    hdr->state = server.cluster->state;    /* 设置 currentEpoch and configEpochs. */    hdr->currentEpoch = htonu64(server.cluster->currentEpoch);    hdr->configEpoch = htonu64(master->configEpoch);    /* 设置复制偏移量 */    if (nodeIsSlave(myself))        offset = replicationGetSlaveOffset();    else        offset = server.master_repl_offset;    hdr->offset = htonu64(offset);    /* Set the message flags. */    if (nodeIsMaster(myself) && server.cluster->mf_end)        hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;    /* 计算并设置音讯的总长度 */    if (type == CLUSTERMSG_TYPE_FAIL) {        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);        totlen += sizeof(clusterMsgDataFail);    } else if (type == CLUSTERMSG_TYPE_UPDATE) {        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);        totlen += sizeof(clusterMsgDataUpdate);    }    hdr->totlen = htonl(totlen);}复制代码

后记

原本只想写一下 Redis Cluster 的 Gossip 协定,没想到文章越写,内容越多,最初源码剖析也是有点头重脚轻,大家就对付看一下,也心愿大家持续关注我后续的问题。