Redis 是如何基于状态机的设计思路来实现主从复制的?
(1) Redis主从复制
从原理上来说,Redis 的主从复制次要包含了全量复制、增量复制和长连贯同步三种状况。
- 全量复制传输 RDB 文件;
- 增量复制传输主从断连期间的命令;
- 长连贯同步则是把主节点失常收到的申请传输给从节点。
(2) 为什么须要主从复制
为了进步服务的分区容错性,个别都会通过冗余来进步分区容错性。
主从复制技术相似于冗余,是进步分区容错性的一种方法。
在应用 Redis 或 MySQL 数据库时,常常会应用主从复制来实现主从节点间的数据同步,以此晋升服务的高可用性。
(3) Redis主从复制原理
- 从库设置主库
- 主从建设连贯
- 主从握手并判断复制类型
- 复制类型判断与执行
(3.1) 从库设置主库
次要是取得了主库的 IP 和端口号。
能够用三种形式来设置。
形式一:在实例 A 上执行 replicaof masterip masterport 的主从复制命令,指明实例 B 的 IP(masterip)和端口号(masterport)。
形式二:在实例 A 的配置文件中设置 replicaof masterip masterport,实例 A 能够通过解析文件取得主库 IP 和端口号。
形式三:在实例 A 启动时,设置启动参数–replicaof [masterip] [masterport]。实例 A 解析启动参数,就能取得主库的 IP 和端口号。
(3.2) 主从建设连贯
从库取得了主库的IP和端口号,就会尝试和主库建设TCP网络连接,并且会在建设好的网络连接上,监听是否有主库发送的命令。
(3.3) 主从握手
当从库和主库建设好连贯之后,从库就开始和主库进行握手。
简略来说,握手过程就是主从库间互相发送PING-PONG音讯,同时从库依据配置信息向主库进行验证。
从库把本人的 IP、端口号,以及对无盘复制和 PSYNC 2 协定的反对状况发给主库。
(3.4) 复制类型判断与执行
等到主从库之间的握手实现后,从库就会给主库发送 PSYNC 命令。主库会依据从库发送的命令参数作出相应的三种回复,别离是执行全量复制
、执行增量复制、产生谬误。
最初,从库在收到上述回复后,就会依据回复的复制类型,开始执行具体的复制操作。
(4) 主从复制源码解读
主从复制中的状态机具体对应的是什么呢?
// file: src/server.h/* */struct redisServer { // ... /* 复制相干 */ char *masterauth; // 用于和主库进行验证的明码 char *masterhost; // 主库主机名 int masterport; // 主库端口号 int repl_timeout; // client *master; // 从库上用来和主库连贯的客户端 client *cached_master; // 从库上缓存的主库信息 int repl_state; // 从库的复制状态 off_t repl_transfer_size; off_t repl_transfer_read; off_t repl_transfer_last_fsync_off; connection *repl_transfer_s; int repl_transfer_fd; char *repl_transfer_tmpfile; time_t repl_transfer_lastio; int repl_serve_stale_data; int repl_slave_ro; int repl_slave_ignore_maxmemory; time_t repl_down_since; int repl_disable_tcp_nodelay; int slave_priority; int slave_announce_port; char *slave_announce_ip; // ...}
(4.1) 从库设置主库
// file: src/server.cvoid initServerConfig(void) { // 初始化复制状态 默认为没有 server.repl_state = REPL_STATE_NONE; }
实例执行了 replicaof masterip masterport 命令
// file: src/replication.c /* */void replicaofCommand(client *c) { /* SLAVEOF is not allowed in cluster mode as replication is automatically * configured using the current address of the master node. */ if (server.cluster_enabled) { addReplyError(c,"REPLICAOF not allowed in cluster mode."); return; } /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); } } else { long port; if (c->flags & CLIENT_SLAVE) { /* If a client is already a replica they cannot run this command, * because it involves flushing all replicas (including this * client) */ addReplyError(c, "Command is not valid when client is a replica."); return; } if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; /* Check if we are already attached to the specified slave */ if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " "with the master we are already connected " "with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified " "master\r\n")); return; } /* There was no previous master or the user specified a different one, * we can continue. */ replicationSetMaster(c->argv[1]->ptr, port); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')", server.masterhost, server.masterport, client); sdsfree(client); } addReply(c,shared.ok);}
/* Set replication to the specified master address and port. */void replicationSetMaster(char *ip, int port) { int was_master = server.masterhost == NULL; sdsfree(server.masterhost); server.masterhost = sdsnew(ip); server.masterport = port; if (server.master) { freeClient(server.master); } disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ /* Update oom_score_adj */ setOOMScoreAdj(-1); /* Force our slaves to resync with us as well. They may hopefully be able * to partially resync with us, but we can notify the replid change. */ disconnectSlaves(); cancelReplicationHandshake(); /* Before destroying our master state, create a cached master using * our own parameters, to later PSYNC with the new master. */ if (was_master) { replicationDiscardCachedMaster(); replicationCacheMasterUsingMyself(); } /* Fire the role change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, NULL); /* Fire the master link modules event. */ if (server.repl_state == REPL_STATE_CONNECTED) moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, NULL); server.repl_state = REPL_STATE_CONNECT;}
replicationSetMaster 函数除了会记录主库的 IP、端口号之外,还会把从库实例的状态机设置为 REPL_STATE_CONNECT。
此时,主从复制的状态机会从 REPL_STATE_NONE
变迁为 REPL_STATE_CONNECT
。
(4.2) 主从建设连贯
从库是何时开始和主库建设网络连接的呢?
replicationCron() 工作。这个工作的执行频率是每 1000ms 执行一次
replicationCron() 工作的函数实现逻辑是在 server.c 中,在该工作中,一个重要的判断就是,查看从库的复制状态机状态。如果状态机状态是 REPL_STATE_CONNECT,那么从库就开始和主库建设连贯。连贯的建设是通过调用 connectWithMaster() 函数来实现的。
/* * * @param *eventLoop * @param id * @param *clientData */int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron();}
// file: src/replication.c/* --------------------------- REPLICATION CRON ---------------------------- *//* Replication cron function, called 1 time per second. */void replicationCron(void) { static long long replication_cron_loops = 0; /* Non blocking connection timeout? */ if (server.masterhost && (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); cancelReplicationHandshake(); } /* Bulk transfer I/O timeout? */ if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); cancelReplicationHandshake(); } /* Timed out master when we are an already connected slave? */ if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && (time(NULL)-server.master->lastinteraction) > server.repl_timeout) { serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); freeClient(server.master); } /* Check if we should connect to a MASTER */ if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == C_OK) { serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); } } /* Send ACK to master from time to time. * Note that we do not send periodic acks to masters that don't * support PSYNC and replication offsets. */ if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC)) replicationSendAck(); /* If we have attached slaves, PING them from time to time. * So slaves can implement an explicit timeout to masters, and will * be able to detect a link disconnection even if the TCP connection * will not actually go down. */ listIter li; listNode *ln; robj *ping_argv[1]; /* First, send PING according to ping_slave_period. */ if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && listLength(server.slaves)) { /* Note that we don't send the PING if the clients are paused during * a Redis Cluster manual failover: the PING we send will otherwise * alter the replication offsets of master and slave, and will no longer * match the one stored into 'mf_master_offset' state. */ int manual_failover_in_progress = server.cluster_enabled && server.cluster->mf_end && clientsArePaused(); if (!manual_failover_in_progress) { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); } } /* Second, send a newline to all the slaves in pre-synchronization * stage, that is, slaves waiting for the master to create the RDB file. * * Also send the a newline to all the chained slaves we have, if we lost * connection from our master, to keep the slaves aware that their * master is online. This is needed since sub-slaves only receive proxied * data from top-level masters, so there is no explicit pinging in order * to avoid altering the replication offsets. This special out of band * pings (newlines) can be sent, they will have no effect in the offset. * * The newline will be ignored by the slave but will refresh the * last interaction timer preventing a timeout. In this case we ignore the * ping period and refresh the connection once per second since certain * timeouts are set at a few seconds (example: PSYNC response). */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; int is_presync = (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); if (is_presync) { connWrite(slave->conn, "\n", 1); } } /* Disconnect timedout slaves. */ if (listLength(server.slaves)) { listIter li; listNode *ln; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_ONLINE) { if (slave->flags & CLIENT_PRE_PSYNC) continue; if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s", replicationGetSlaveName(slave)); freeClient(slave); continue; } } /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child * from terminating. */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { if (slave->repl_last_partial_write != 0 && (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout) { serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s", replicationGetSlaveName(slave)); freeClient(slave); continue; } } } } /* If this is a master without attached slaves and there is a replication * backlog active, in order to reclaim memory we can free it after some * (configured) time. Note that this cannot be done for slaves: slaves * without sub-slaves attached should still accumulate data into the * backlog, in order to reply to PSYNC queries if they are turned into * masters after a failover. */ if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && server.repl_backlog && server.masterhost == NULL) { time_t idle = server.unixtime - server.repl_no_slaves_since; if (idle > server.repl_backlog_time_limit) { /* When we free the backlog, we always use a new * replication ID and clear the ID2. This is needed * because when there is no backlog, the master_repl_offset * is not updated, but we would still retain our replication * ID, leading to the following problem: * * 1. We are a master instance. * 2. Our slave is promoted to master. It's repl-id-2 will * be the same as our repl-id. * 3. We, yet as master, receive some updates, that will not * increment the master_repl_offset. * 4. Later we are turned into a slave, connect to the new * master that will accept our PSYNC request by second * replication ID, but there will be data inconsistency * because we received writes. */ changeReplicationId(); clearReplicationId2(); freeReplicationBacklog(); serverLog(LL_NOTICE, "Replication backlog freed after %d seconds " "without connected replicas.", (int) server.repl_backlog_time_limit); } } /* If AOF is disabled and we no longer have attached slaves, we can * free our Replication Script Cache as there is no need to propagate * EVALSHA at all. */ if (listLength(server.slaves) == 0 && server.aof_state == AOF_OFF && listLength(server.repl_scriptcache_fifo) != 0) { replicationScriptCacheFlush(); } /* Start a BGSAVE good for replication if we have slaves in * WAIT_BGSAVE_START state. * * In case of diskless replication, we make sure to wait the specified * number of seconds (according to configuration) so that other slaves * have the time to arrive before we start streaming. */ if (!hasActiveChildProcess()) { time_t idle, max_idle = 0; int slaves_waiting = 0; int mincapa = -1; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; slaves_waiting++; mincapa = (mincapa == -1) ? slave->slave_capa : (mincapa & slave->slave_capa); } } if (slaves_waiting && (!server.repl_diskless_sync || max_idle > server.repl_diskless_sync_delay)) { /* Start the BGSAVE. The called function may start a * BGSAVE with socket target or disk target depending on the * configuration and slaves capabilities. */ startBgsaveForReplication(mincapa); } } /* Remove the RDB file used for replication if Redis is not running * with any persistence. */ removeRDBUsedToSyncReplicas(); /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */}
/* * */int connectWithMaster(void) { // server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket(); // if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport, NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_transfer_s); server.repl_transfer_s = NULL; return C_ERR; } // server.repl_transfer_lastio = server.unixtime; // 连贯建设后,将状态机设置为REPL_STATE_CONNECTING server.repl_state = REPL_STATE_CONNECTING; return C_OK;}
(4.3) 主从握手并判断复制类型
从库建设TCP连贯后,从库实例其实并没有立刻开始进行数据同步,而是会先和主库之间进行握手通信。
握手通信的目标,次要包含从库和主库进行验证,以及从库将本身的 IP 和端口号发给主库。
(4.4) 复制类型判断与执行
/* Try a partial resynchronization with the master if we are about to reconnect. * If there is no cached master structure, at least try to issue a * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC * command in order to obtain the master replid and the master replication * global offset. * * This function is designed to be called from syncWithMaster(), so the * following assumptions are made: * * 1) We pass the function an already connected socket "fd". * 2) This function does not close the file descriptor "fd". However in case * of successful partial resynchronization, the function will reuse * 'fd' as file descriptor of the server.master client structure. * * The function is split in two halves: if read_reply is 0, the function * writes the PSYNC command on the socket, and a new function call is * needed, with read_reply set to 1, in order to read the reply of the * command. This is useful in order to support non blocking operations, so * that we write, return into the event loop, and read when there are data. * * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there * was a write error, or PSYNC_WAIT_REPLY to signal we need another call * with read_reply set to 1. However even when read_reply is set to 1 * the function may return PSYNC_WAIT_REPLY again to signal there were * insufficient data to read to complete its work. We should re-enter * into the event loop and wait in such a case. * * The function returns: * * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue. * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. * In this case the master replid and global replication * offset is saved. * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and * the caller should fall back to SYNC. * PSYNC_WRITE_ERROR: There was an error writing the command to the socket. * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. * PSYNC_TRY_LATER: Master is currently in a transient error condition. * * Notable side effects: * * 1) As a side effect of the function call the function removes the readable * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. * 2) server.master_initial_offset is set to the right value according * to the master reply. This will be used to populate the 'server.master' * structure replication offset. */#define PSYNC_WRITE_ERROR 0#define PSYNC_WAIT_REPLY 1#define PSYNC_CONTINUE 2#define PSYNC_FULLRESYNC 3#define PSYNC_NOT_SUPPORTED 4#define PSYNC_TRY_LATER 5/* */int slaveTryPartialResynchronization(connection *conn, int read_reply) { char *psync_replid; char psync_offset[32]; sds reply; /* Writing half */ if (!read_reply) { /* Initially set master_initial_offset to -1 to mark the current * master replid and offset as not valid. Later if we'll be able to do * a FULL resync using the PSYNC command we'll set the offset at the * right value, so that this information will be propagated to the * client structure representing the master into server.master. */ server.master_initial_offset = -1; if (server.cached_master) { psync_replid = server.cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); } else { serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); psync_replid = "?"; memcpy(psync_offset,"-1",3); } /* Issue the PSYNC command */ reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL); if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); connSetReadHandler(conn, NULL); return PSYNC_WRITE_ERROR; } return PSYNC_WAIT_REPLY; } /* Reading half */ reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); if (sdslen(reply) == 0) { /* The master may send empty newlines after it receives PSYNC * and before to reply, just to keep the connection alive. */ sdsfree(reply); return PSYNC_WAIT_REPLY; } connSetReadHandler(conn, NULL); if (!strncmp(reply,"+FULLRESYNC",11)) { char *replid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the replid * and the replication offset. */ replid = strchr(reply,' '); if (replid) { replid++; offset = strchr(replid,' '); if (offset) offset++; } if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master * replid to make sure next PSYNCs will fail. */ memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); } else { memcpy(server.master_replid, replid, offset-replid-1); server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; server.master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", server.master_replid, server.master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ replicationDiscardCachedMaster(); sdsfree(reply); return PSYNC_FULLRESYNC; } if (!strncmp(reply,"+CONTINUE",9)) { /* Partial resync was accepted. */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); /* Check the new replication ID advertised by the master. If it * changed, we need to set the new ID as primary ID, and set or * secondary ID as the old master ID up to the current offset, so * that our sub-slaves will be able to PSYNC with us after a * disconnection. */ char *start = reply+10; char *end = reply+9; while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; if (end-start == CONFIG_RUN_ID_SIZE) { char new[CONFIG_RUN_ID_SIZE+1]; memcpy(new,start,CONFIG_RUN_ID_SIZE); new[CONFIG_RUN_ID_SIZE] = '\0'; if (strcmp(new,server.cached_master->replid)) { /* Master ID changed. */ serverLog(LL_WARNING,"Master replication ID changed to %s",new); /* Set the old ID as our ID2, up to the current offset+1. */ memcpy(server.replid2,server.cached_master->replid, sizeof(server.replid2)); server.second_replid_offset = server.master_repl_offset+1; /* Update the cached master ID and our own primary ID to the * new one. */ memcpy(server.replid,new,sizeof(server.replid)); memcpy(server.cached_master->replid,new,sizeof(server.replid)); /* Disconnect all the sub-slaves: they need to be notified. */ disconnectSlaves(); } } /* Setup the replication to continue. */ sdsfree(reply); replicationResurrectCachedMaster(conn); /* If this instance was restarted and we read the metadata to * PSYNC from the persistence file, our replication backlog could * be still not initialized. Create it. */ if (server.repl_backlog == NULL) createReplicationBacklog(); return PSYNC_CONTINUE; } /* If we reach this point we received either an error (since the master does * not understand PSYNC or because it is in a special state and cannot * serve our request), or an unexpected reply from the master. * * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise * return PSYNC_TRY_LATER if we believe this is a transient error. */ if (!strncmp(reply,"-NOMASTERLINK",13) || !strncmp(reply,"-LOADING",8)) { serverLog(LL_NOTICE, "Master is currently unable to PSYNC " "but should be in the future: %s", reply); sdsfree(reply); return PSYNC_TRY_LATER; } if (strncmp(reply,"-ERR",4)) { /* If it's not an error, log the unexpected event. */ serverLog(LL_WARNING, "Unexpected reply to PSYNC from master: %s", reply); } else { serverLog(LL_NOTICE, "Master does not support PSYNC or is in " "error state (reply: %s)", reply); } sdsfree(reply); replicationDiscardCachedMaster(); return PSYNC_NOT_SUPPORTED;}
(5) 参考资料
https://weikeqin.com/tags/redis/
Redis源码分析与实战 学习笔记 Day21 主从复制:基于状态机的设计与实现 https://time.geekbang.org/col...