共计 8435 个字符,预计需要花费 22 分钟才能阅读完成。
baiyan
命令使用
命令含义:将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功,key 保证会出现在目标实例上,而当前实例上的 key 会被删除
命令格式:
MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [KEYS key [key ...]]
命令实战:将键 key1、key2、key3 批量迁移到本机 6380 端口的 redis 实例上,并存储到目标实例的第 0 号数据库,超时时间为 1000 毫秒。可选项 COPY 如果表示不移除源实例上的 key,REPLACE 选项表示替换目标实例上已存在的 key。KEYS 选项表示可以同时批量传送多个 keys(但前面的 key 参数的位置必须设置为空)
127.0.0.1:6379> migrate 127.0.0.1 6380 "" 0 5000 KEYS key1 key2 key3
OK
返回值:迁移成功时返回 OK,否则返回错误
源码分析
migrate 命令的执行过程可分为参数校验、连接建立、组装数据、发送数据、处理返回五个阶段。同样的,migrate 命令的处理函数为 migrateCommand():
参数校验
void migrateCommand(client *c) {
migrateCachedSocket *cs; // 连接另一个实例的 socket
int copy = 0, replace = 0, j; // 是否开启 copy 及 replace 选项标记
char *password = NULL; // 密码
long timeout; // 超时时间
long dbid; // 数据库 id
robj **ov = NULL; /* 要迁移的对象 */
robj **kv = NULL; /* 键名 */
robj **newargv = NULL;
rio cmd, payload; // 重要,存储目标实例执行的命令及 DUMP 的 payload
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
/* 支持同时传输多个 key. */
int first_key = 3; /* 第一个键参数的位置. */
int num_keys = 1; /* 默认只传送一个 key. */
/* 校验其他选项,从 COPY 选项开始校验 */
for (j = 6; j < c->argc; j++) {
int moreargs = j < c->argc-1;
if (!strcasecmp(c->argv[j]->ptr,"copy")) { // 如果命令参数等于 copy,开启 copy 选项
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 如果命令参数等于 replace,开启 replace 选项
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) { // 如果命令参数等于 auth,开启 auth 选项
if (!moreargs) { // 参数数量超出规定数量,报错
addReply(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) { // 如果设置了 keys 参数,表明要同时传输多个 keys 值过去
if (sdslen(c->argv[3]->ptr) != 0) { // 如果开启了 keys 选项,前面 key 参数的位置必须设置为空
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
"must be set to the empty string");
return;
}
first_key = j+1;
num_keys = c->argc - j - 1;
break; /* 现在 first_key 值指向 keys 的第一个值.,并将 num_keys 设置为 keys 的数量 */
} else {addReply(c,shared.syntaxerr);
return;
}
}
/* 选择的 db 和超时时间数据校验,看是否是合法的数字格式 */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
{return;}
if (timeout <= 0) timeout = 1000;
/* 接下来会检查是否有可以迁移的键 */
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
/* 检查所有的键,判断输入的键中,是否存在合法的键来进行迁移 */
for (j = 0; j < num_keys; j++) {if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { // 去键空间字典中查找该键,如果该键没有超时
kv[oi] = c->argv[first_key+j]; // 将未超时的键存到 kv 数组中,说明当前 key 是可以 migrate 的;否则如果超时就无法进行 migrate
oi++;
}
}
num_keys = oi; // 更新当前可 migrate 的 key 总量
if (num_keys == 0) { // 如果没有可以迁移的 key,那么给客户端返回“NOKEY" 字符串
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
刚开始执行 migrate 命令的时候,由于 migrate 参数很多,需要对其逐个做校验。尤其是在启用 keys 参数同时迁移多个 keys 的时候,需要进行参数的动态判断。同时需要判断是否有合法的键来进行迁移。只有没有过期的键才能够迁移,否则不进行迁移,最大化节省系统资源。
连接建立
假如我们要从当前 6379 端口上的 redis 实例迁移到 6380 端口上的 redis 实例,我们必然要建立一个 socket 连接:
try_again:
write_error = 0;
/* 连接建立 */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {zfree(ov); zfree(kv);
return;
}
我们看到,在主流程中调用了 migrateGetSocket()函数创建了一个 socket,这里是一个带缓存的 socket。我们暂时不跟进这个函数,后面我会以扩展的形式来跟进。
组装数据
基于这个 socket,我们可以将数据以 TCP 协议中规定的字节流形式传输到目标实例上。这就需要一个序列化的过程了。6379 实例需要将 keys 序列化,6380 需要将数据反序列化。这就需要借助我们之前讲过的 DUMP 命令和 RESTORE 命令,分别来进行序列化和反序列化了。
redis 并没有立即进行 DUMP 将 key 序列化,而是首先组装要在目标 redis 实例上所要执行的命令,比如 AUTH/SELECT/RESTORE 等命令。要想在目标实例上执行命令,那么必须同样基于之前建立的 socket 连接,以当前的 redis 实例作为客户端,往与目标 redis 实例建立的 TCP 连接中,写入按照 redis 协议封装的命令集合(如 *2 \r\n SELECT \r\n $1 \r\n 1 \r\n)。redis 使用了自己封装的 I / O 抽象层 rio,它实现了一个 I / O 缓冲区。通过读取其缓冲区中的数据,就可以往我们在建立 socket 的时候生成的 fd 中写入数据啦。首先 redis 会建立一个 rio 缓冲区,并按照 redis 数据传输协议所要求的格式,组装要在目标实例上执行的 redis 命令:
// 初始化一个 rio 缓冲区
rioInitWithBuffer(&cmd,sdsempty());
/* 组装 AUTH 命令 */
if (password) {serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 按照 redis 协议写入一条命令开始的标识 \*2。表示命令一共有 2 个参数
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); // 写入 $4\r\n AUTH \r\n
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, sdslen(password))); // 同上,按照协议格式写入密码
}
/* 在目标实例上选择数据库 */
int select = cs->last_dbid != dbid; /* 判断是否已经选择过数据库,如果选择过就不用再次执行 SELECT 命令 */
if (select) { // 如果没有选择过,需要执行 SELECT 命令选择数据库
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 同上,写入开始表示 \*2
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); // 同上,写入 $6\r\n SELECT \r\n
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); // 写入 $1\r\n 1 \r\n
}
那么接下来需要进行 DUMP 的序列化操作了。由于序列化操作耗时较久,所以可能出现这种情况:在之前第一次检测是否超时的时候没有超时,但是由于这次序列化操作时间较久,执行期间,这个键超时了,那么 redis 简单粗暴地丢弃该超时键,直接放弃迁移这个键:
int non_expired = 0; // 暂存新的未过期的键的数量
/* 如果在 DUMP 的过程中过期了,直接 continue. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
if (expireat != -1) {ttl = expireat-mstime();
if (ttl < 0) {continue;}
if (ttl < 1) ttl = 1;
}
/* 经过上面的筛选之后,都是最新的、没有过期的键,这些键可以最终被迁移了. */
kv[non_expired++] = kv[j];
然后,在目标实例上最终我们需要执行 RESTORE 命令,将之前经过 DUMP 序列化的字节流反序列化,过程和上面同理:
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 同上,写入开始表示 \* 5 或 4
if (server.cluster_enabled) // 如果集群模式开启
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); // 同上,写入 $7 RESTORE \r\n
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr))); // 将所有需要反序列化的 key 写入
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 写入过期时间
接下来,我们就需要最终执行 DUMP 命令,将我们需要传输的所有键等数据序列化了,这里 redis 调用了 createDumpPayload()来创建一个 DUMP 载荷,这就是最终序列化好的数据:
createDumpPayload(&payload,ov[j],kv[j]); // 序列化数据
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr))); // 将序列化数据存到 rio cmd 中等待发送
sdsfree(payload.io.buffer.ptr);
if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); // replace 选项开启
发送数据
目前,我们需要发送的、按照 redis 协议组装好的所有序列化好的命令及数据都存放在了 cmd 这个 rio 结构体变量缓存中。我们当前的 6379redis 实例仿佛就是一个客户端,而要传输的目标实例 6380 就是一个服务端。接下来就需要读取缓存并且往直前建立好的 socket 中写入数据,将数据最终传输至目标实例:
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {towrite = (towrite > (64*1024) ? (64*1024) : towrite); // 按照 64K 的块大小来发送
nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout); // 往 socket fd 中写入数据(数据来源于 rio 的缓存)if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}
处理返回
在目标 redis 上分别执行 AUTH、SELECT、RESTORE 命令,RESTORE 命令会反序列化并将 key 写入目标实例。那么这几个命令执行完毕之后,我们如何知道它们是否执行成功呢?同样的,目标 redis 6380 实例在执行完命令之后,也会有相应的返回值,我们需要根据返回值来判断命令是否执行成功、是否将 key 成功迁移完成:
char buf0[1024]; /* 存储 AUTH 命令返回值. */
char buf1[1024]; /* 存储 SELECT 命令返回值 */
char buf2[1024]; /* 存储 RESTORE 命令返回值. */
/* 从 socket fd 中读取 AUTH 命令返回值. */
if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;
/* 从 socket fd 中读取 SELECT 命令返回值. */
if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;
int error_from_target = 0;
int socket_error = 0;
int del_idx = 1;
/* 迁移完成之后需要将原有实例上的 key 删除 */
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys; j++) {
/* 从 socket fd 中读取 RESTORE 命令返回值 */
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') ||
(select && buf1[0] == '-') ||
buf2[0] == '-')
{if (!error_from_target) {...} else {if (!copy) { // 没有开启 copy 选项,需要删除原有实例的键
...
/* 删除原有实例上的键 */
dbDelete(c->db,kv[j]);
...
}
}
}
...
/* 如果发生 socket 错误,关闭连接 */
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
...
sdsfree(cmd.io.buffer.ptr); // 释放 cmd 的 rio 缓冲区
zfree(ov); zfree(kv); zfree(newargv); // 释放存储 key 的 robj 结构体
return;
综上,migrate 命令就执行完成了。我们总结一下它的执行过程:
- 命令参数校验
- 按照 redis 协议组装目标实例上需要执行的命令
- 将要传输的 key 序列化
- 创建 socket 连接
- 通过 socket 连接将命令及数据传输至目标实例
- 目标实例执行命令并存储相应的 key
- 处理目标实例的返回值
- 如果失败执行重试逻辑,如果成功则执行完毕
扩展
缓存 socket 的实现
在 migrate 命令执行过程中,调用了 migrateGetSocket()创建 socket。redis 借助字典结构,实现了缓存 socket,避免了多次创建 socket 所带来的开销:
migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
int fd;
sds name = sdsempty();
migrateCachedSocket *cs;
/* 查找字典中是否有相应 ip:port 的缓存 socket. */
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
// 查找字典
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (cs) { // 如果找到了,说明之前创建过 ip:port 的 socket
sdsfree(name);
cs->last_use_time = server.unixtime;
return cs; // 直接返回缓存 socket
}
/* 如果在字典中没有找到,说明没有缓存,需要重新创建. */
/* 判断是否缓存的 socket 过多,最大为 64 个 */
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
/* 如果字典中缓存的 socket 过多,需要随机删除一些 */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
/* 创建 socket */
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
atoi(c->argv[2]->ptr));
if (fd == -1) {sdsfree(name);
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return NULL;
}
anetEnableTcpNoDelay(server.neterr,fd);
/* 检查是否在超时时间内创建完成 */
if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {sdsfree(name);
addReplySds(c,
sdsnew("-IOERR error or timeout connecting to the client\r\n"));
close(fd);
return NULL;
}
/* 将新创建的 socket 加入缓存并返回给调用者 */
cs = zmalloc(sizeof(*cs));
cs->fd = fd;
cs->last_dbid = -1;
cs->last_use_time = server.unixtime;
// 将新创建的 socket 加入字典,缓存起来等待下次使用
dictAdd(server.migrate_cached_sockets,name,cs);
return cs;
}