Redis 是如何基于状态机的设计思路来实现主从复制的?
(1) Redis 主从复制
从原理上来说,Redis 的主从复制次要包含了全量复制、增量复制和长连贯同步三种状况。
- 全量复制传输 RDB 文件;
- 增量复制传输主从断连期间的命令;
- 长连贯同步则是把主节点失常收到的申请传输给从节点。
(2) 为什么须要主从复制
在应用 Redis 或 MySQL 数据库时,常常会应用主从复制来实现主从节点间的数据同步,以此晋升服务的高可用性。
(3) Redis 主从复制原理
- 从库设置主库
- 主从建设连贯
- 主从握手并判断复制类型
- 复制类型判断与执行
(3.1) 从库设置主库
次要是取得了主库的 IP 和端口号。
形式一:在实例 A 上执行 replicaof masterip masterport 的主从复制命令,指明实例 B 的 IP(masterip)和端口号(masterport)。
形式二:在实例 A 的配置文件中设置 replicaof masterip masterport,实例 A 能够通过解析文件取得主库 IP 和端口号。
形式三:在实例 A 启动时,设置启动参数–replicaof [masterip] [masterport]。实例 A 解析启动参数,就能取得主库的 IP 和端口号。
(3.2) 主从建设连贯
从库取得了主库的 IP 和端口号,就会尝试和主库建设 TCP 网络连接,并且会在建设好的网络连接上,监听是否有主库发送的命令。
(3.3) 主从握手
简略来说,握手过程就是主从库间互相发送 PING-PONG 音讯,同时从库依据配置信息向主库进行验证。
从库把本人的 IP、端口号,以及对无盘复制和 PSYNC 2 协定的反对状况发给主库。
(3.4) 复制类型判断与执行
等到主从库之间的握手实现后,从库就会给主库发送 PSYNC 命令。主库会依据从库发送的命令参数作出相应的三种回复,别离是执行 全量复制
(4) 主从复制源码解读
// file: src/server.h
struct redisServer {
// ...
/* 复制相干 */
char *masterauth; // 用于和主库进行验证的明码
char *masterhost; // 主库主机名
int masterport; // 主库端口号
int repl_timeout; //
client *master; // 从库上用来和主库连贯的客户端
client *cached_master; // 从库上缓存的主库信息
int repl_state; // 从库的复制状态
off_t repl_transfer_size;
off_t repl_transfer_read;
off_t repl_transfer_last_fsync_off;
connection *repl_transfer_s;
int repl_transfer_fd;
char *repl_transfer_tmpfile;
time_t repl_transfer_lastio;
int repl_serve_stale_data;
int repl_slave_ro;
int repl_slave_ignore_maxmemory;
time_t repl_down_since;
int repl_disable_tcp_nodelay;
int slave_priority;
int slave_announce_port;
char *slave_announce_ip;
// ...
(4.1) 从库设置主库
// file: src/server.c
void initServerConfig(void) {
// 初始化复制状态 默认为没有
server.repl_state = REPL_STATE_NONE;
实例执行了 replicaof masterip masterport 命令
// file: src/replication.c
void replicaofCommand(client *c) {
/* SLAVEOF is not allowed in cluster mode as replication is automatically
* configured using the current address of the master node. */
if (server.cluster_enabled) {addReplyError(c,"REPLICAOF not allowed in cluster mode.");
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {if (server.masterhost) {replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from'%s')",
} else {
long port;
if (c->flags & CLIENT_SLAVE)
/* If a client is already a replica they cannot run this command,
* because it involves flushing all replicas (including this
* client) */
addReplyError(c, "Command is not valid when client is a replica.");
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
/* Check if we are already attached to the specified slave */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization"
"with the master we are already connected"
"with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified"
/* There was no previous master or the user specified a different one,
* we can continue. */
replicationSetMaster(c->argv[1]->ptr, port);
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from'%s')",
server.masterhost, server.masterport, client);
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {freeClient(server.master);
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
/* Update oom_score_adj */
/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) {replicationDiscardCachedMaster();
/* Fire the role change modules event. */
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
server.repl_state = REPL_STATE_CONNECT;
replicationSetMaster 函数除了会记录主库的 IP、端口号之外,还会把从库实例的状态机设置为 REPL_STATE_CONNECT。
此时,主从复制的状态机会从 REPL_STATE_NONE
(4.2) 主从建设连贯
replicationCron() 工作。这个工作的执行频率是每 1000ms 执行一次
replicationCron() 工作的函数实现逻辑是在 server.c 中,在该工作中,一个重要的判断就是,查看从库的复制状态机状态。如果状态机状态是 REPL_STATE_CONNECT,那么从库就开始和主库建设连贯。连贯的建设是通过调用 connectWithMaster() 函数来实现的。
* @param *eventLoop
* @param id
* @param *clientData
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
run_with_period(1000) replicationCron();}
// file: src/replication.c
/* --------------------------- REPLICATION CRON ---------------------------- */
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
static long long replication_cron_loops = 0;
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the'repl-timeout'parameter in redis.conf to a larger value.");
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
listIter li;
listNode *ln;
robj *ping_argv[1];
/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
/* Note that we don't send the PING if the clients are paused during
* a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
server.cluster_enabled &&
server.cluster->mf_end &&
if (!manual_failover_in_progress) {ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
* The newline will be ignored by the slave but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
while((ln = listNext(&li))) {
client *slave = ln->value;
int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
if (is_presync) {connWrite(slave->conn, "\n", 1);
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_ONLINE) {if (slave->flags & CLIENT_PRE_PSYNC)
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
/* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
* by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
* from terminating. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
if (slave->repl_last_partial_write != 0 &&
(server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
{serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
/* When we free the backlog, we always use a new
* replication ID and clear the ID2. This is needed
* because when there is no backlog, the master_repl_offset
* is not updated, but we would still retain our replication
* ID, leading to the following problem:
* 1. We are a master instance.
* 2. Our slave is promoted to master. It's repl-id-2 will
* be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset.
* 4. Later we are turned into a slave, connect to the new
* master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency
* because we received writes. */
"Replication backlog freed after %d seconds"
"without connected replicas.",
(int) server.repl_backlog_time_limit);
/* If AOF is disabled and we no longer have attached slaves, we can
* free our Replication Script Cache as there is no need to propagate
* EVALSHA at all. */
if (listLength(server.slaves) == 0 &&
server.aof_state == AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
/* Start a BGSAVE good for replication if we have slaves in
* In case of diskless replication, we make sure to wait the specified
* number of seconds (according to configuration) so that other slaves
* have the time to arrive before we start streaming. */
if (!hasActiveChildProcess()) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
listNode *ln;
listIter li;
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
/* Remove the RDB file used for replication if Redis is not running
* with any persistence. */
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
int connectWithMaster(void) {
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
server.repl_transfer_s = NULL;
return C_ERR;
server.repl_transfer_lastio = server.unixtime;
// 连贯建设后,将状态机设置为 REPL_STATE_CONNECTING
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
(4.3) 主从握手并判断复制类型
从库建设 TCP 连贯后,从库实例其实并没有立刻开始进行数据同步,而是会先和主库之间进行握手通信。
握手通信的目标,次要包含从库和主库进行验证,以及从库将本身的 IP 和端口号发给主库。
(4.4) 复制类型判断与执行
/* Try a partial resynchronization with the master if we are about to reconnect.
* If there is no cached master structure, at least try to issue a
* "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
* command in order to obtain the master replid and the master replication
* global offset.
* This function is designed to be called from syncWithMaster(), so the
* following assumptions are made:
* 1) We pass the function an already connected socket "fd".
* 2) This function does not close the file descriptor "fd". However in case
* of successful partial resynchronization, the function will reuse
* 'fd' as file descriptor of the server.master client structure.
* The function is split in two halves: if read_reply is 0, the function
* writes the PSYNC command on the socket, and a new function call is
* needed, with read_reply set to 1, in order to read the reply of the
* command. This is useful in order to support non blocking operations, so
* that we write, return into the event loop, and read when there are data.
* When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
* was a write error, or PSYNC_WAIT_REPLY to signal we need another call
* with read_reply set to 1. However even when read_reply is set to 1
* the function may return PSYNC_WAIT_REPLY again to signal there were
* insufficient data to read to complete its work. We should re-enter
* into the event loop and wait in such a case.
* The function returns:
* PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
* PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
* In this case the master replid and global replication
* offset is saved.
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
* PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
* PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
* PSYNC_TRY_LATER: Master is currently in a transient error condition.
* Notable side effects:
* 1) As a side effect of the function call the function removes the readable
* event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
* 2) server.master_initial_offset is set to the right value according
* to the master reply. This will be used to populate the 'server.master'
* structure replication offset.
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
/* Initially set master_initial_offset to -1 to mark the current
* master replid and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.master_initial_offset = -1;
if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
connSetReadHandler(conn, NULL);
/* Reading half */
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
connSetReadHandler(conn, NULL);
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the replid
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
offset = strchr(replid,' ');
if (offset) offset++;
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
} else {memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
/* We are going to full resync, discard the cached master structure. */
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted. */
"Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
if (end-start == CONFIG_RUN_ID_SIZE) {char new[CONFIG_RUN_ID_SIZE+1];
new[CONFIG_RUN_ID_SIZE] = '\0';
if (strcmp(new,server.cached_master->replid)) {
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);
/* Set the old ID as our ID2, up to the current offset+1. */
server.second_replid_offset = server.master_repl_offset+1;
/* Update the cached master ID and our own primary ID to the
* new one. */
/* Disconnect all the sub-slaves: they need to be notified. */
/* Setup the replication to continue. */
/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (server.repl_backlog == NULL) createReplicationBacklog();
/* If we reach this point we received either an error (since the master does
* not understand PSYNC or because it is in a special state and cannot
* serve our request), or an unexpected reply from the master.
* Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
* return PSYNC_TRY_LATER if we believe this is a transient error. */
if (!strncmp(reply,"-NOMASTERLINK",13) ||
"Master is currently unable to PSYNC"
"but should be in the future: %s", reply);
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
"Unexpected reply to PSYNC from master: %s", reply);
} else {
"Master does not support PSYNC or is in"
"error state (reply: %s)", reply);
(5) 参考资料
Redis 源码分析与实战 学习笔记 Day21 主从复制:基于状态机的设计与实现…