乐趣区

Redis5源码学习浅析redis命令之migrate篇

baiyan

命令使用

命令含义:将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功,key 保证会出现在目标实例上,而当前实例上的 key 会被删除
命令格式:

MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [KEYS key [key ...]]

命令实战:将键 key1、key2、key3 批量迁移到本机 6380 端口的 redis 实例上,并存储到目标实例的第 0 号数据库,超时时间为 1000 毫秒。可选项 COPY 如果表示不移除源实例上的 key,REPLACE 选项表示替换目标实例上已存在的 key。KEYS 选项表示可以同时批量传送多个 keys(但前面的 key 参数的位置必须设置为空)

127.0.0.1:6379> migrate 127.0.0.1 6380 "" 0 5000 KEYS key1 key2 key3
OK

返回值:迁移成功时返回 OK,否则返回错误

源码分析

migrate 命令的执行过程可分为参数校验、连接建立、组装数据、发送数据、处理返回五个阶段。同样的,migrate 命令的处理函数为 migrateCommand():

参数校验

void migrateCommand(client *c) {
    migrateCachedSocket *cs; // 连接另一个实例的 socket
    int copy = 0, replace = 0, j; // 是否开启 copy 及 replace 选项标记
    char *password = NULL; // 密码
    long timeout; // 超时时间
    long dbid; // 数据库 id
    robj **ov = NULL; /* 要迁移的对象 */
    robj **kv = NULL; /* 键名 */
    robj **newargv = NULL; 
    rio cmd, payload; // 重要,存储目标实例执行的命令及 DUMP 的 payload
    int may_retry = 1;
    int write_error = 0;
    int argv_rewritten = 0;

    /* 支持同时传输多个 key. */
    int first_key = 3; /* 第一个键参数的位置. */
    int num_keys = 1;  /* 默认只传送一个 key. */

    /* 校验其他选项,从 COPY 选项开始校验 */
    for (j = 6; j < c->argc; j++) {
        int moreargs = j < c->argc-1;
        if (!strcasecmp(c->argv[j]->ptr,"copy")) { // 如果命令参数等于 copy,开启 copy 选项
            copy = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 如果命令参数等于 replace,开启 replace 选项
            replace = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { // 如果命令参数等于 auth,开启 auth 选项
            if (!moreargs) { // 参数数量超出规定数量,报错
                addReply(c,shared.syntaxerr);
                return;
            }
            j++;
            password = c->argv[j]->ptr;
        } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { // 如果设置了 keys 参数,表明要同时传输多个 keys 值过去
            if (sdslen(c->argv[3]->ptr) != 0) { // 如果开启了 keys 选项,前面 key 参数的位置必须设置为空
                addReplyError(c,
                    "When using MIGRATE KEYS option, the key argument"
                    "must be set to the empty string");
                return;
            }
            first_key = j+1;
            num_keys = c->argc - j - 1;
            break; /* 现在 first_key 值指向 keys 的第一个值.,并将 num_keys 设置为 keys 的数量 */
        } else {addReply(c,shared.syntaxerr);
            return;
        }
    }

    /* 选择的 db 和超时时间数据校验,看是否是合法的数字格式 */
    if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
        getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
    {return;}
    if (timeout <= 0) timeout = 1000;

    /* 接下来会检查是否有可以迁移的键 */
    ov = zrealloc(ov,sizeof(robj*)*num_keys);
    kv = zrealloc(kv,sizeof(robj*)*num_keys);
    int oi = 0;

   /* 检查所有的键,判断输入的键中,是否存在合法的键来进行迁移 */
    for (j = 0; j < num_keys; j++) {if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { // 去键空间字典中查找该键,如果该键没有超时
            kv[oi] = c->argv[first_key+j]; // 将未超时的键存到 kv 数组中,说明当前 key 是可以 migrate 的;否则如果超时就无法进行 migrate
            oi++;
        }
    }
    num_keys = oi; // 更新当前可 migrate 的 key 总量
    if (num_keys == 0) { // 如果没有可以迁移的 key,那么给客户端返回“NOKEY" 字符串
        zfree(ov); zfree(kv);
        addReplySds(c,sdsnew("+NOKEY\r\n"));
        return;
    }

刚开始执行 migrate 命令的时候,由于 migrate 参数很多,需要对其逐个做校验。尤其是在启用 keys 参数同时迁移多个 keys 的时候,需要进行参数的动态判断。同时需要判断是否有合法的键来进行迁移。只有没有过期的键才能够迁移,否则不进行迁移,最大化节省系统资源。

连接建立

假如我们要从当前 6379 端口上的 redis 实例迁移到 6380 端口上的 redis 实例,我们必然要建立一个 socket 连接:

try_again:
    write_error = 0;

    /* 连接建立 */
    cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
    if (cs == NULL) {zfree(ov); zfree(kv);
        return; 
    }

我们看到,在主流程中调用了 migrateGetSocket()函数创建了一个 socket,这里是一个带缓存的 socket。我们暂时不跟进这个函数,后面我会以扩展的形式来跟进。

组装数据

基于这个 socket,我们可以将数据以 TCP 协议中规定的字节流形式传输到目标实例上。这就需要一个序列化的过程了。6379 实例需要将 keys 序列化,6380 需要将数据反序列化。这就需要借助我们之前讲过的 DUMP 命令和 RESTORE 命令,分别来进行序列化和反序列化了。
redis 并没有立即进行 DUMP 将 key 序列化,而是首先组装要在目标 redis 实例上所要执行的命令,比如 AUTH/SELECT/RESTORE 等命令。要想在目标实例上执行命令,那么必须同样基于之前建立的 socket 连接,以当前的 redis 实例作为客户端,往与目标 redis 实例建立的 TCP 连接中,写入按照 redis 协议封装的命令集合(如 *2 \r\n SELECT \r\n $1 \r\n 1 \r\n)。redis 使用了自己封装的 I / O 抽象层 rio,它实现了一个 I / O 缓冲区。通过读取其缓冲区中的数据,就可以往我们在建立 socket 的时候生成的 fd 中写入数据啦。首先 redis 会建立一个 rio 缓冲区,并按照 redis 数据传输协议所要求的格式,组装要在目标实例上执行的 redis 命令:

    // 初始化一个 rio 缓冲区
    rioInitWithBuffer(&cmd,sdsempty());

    /* 组装 AUTH 命令 */
    if (password) {serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 按照 redis 协议写入一条命令开始的标识 \*2。表示命令一共有 2 个参数
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); // 写入 $4\r\n  AUTH \r\n
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, sdslen(password))); // 同上,按照协议格式写入密码
    }

    /* 在目标实例上选择数据库 */
    int select = cs->last_dbid != dbid; /* 判断是否已经选择过数据库,如果选择过就不用再次执行 SELECT 命令 */
    if (select) { // 如果没有选择过,需要执行 SELECT 命令选择数据库
        serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 同上,写入开始表示 \*2
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); // 同上,写入 $6\r\n SELECT \r\n
        serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); // 写入 $1\r\n 1 \r\n
    }

那么接下来需要进行 DUMP 的序列化操作了。由于序列化操作耗时较久,所以可能出现这种情况:在之前第一次检测是否超时的时候没有超时,但是由于这次序列化操作时间较久,执行期间,这个键超时了,那么 redis 简单粗暴地丢弃该超时键,直接放弃迁移这个键:

    int non_expired = 0; // 暂存新的未过期的键的数量

    /* 如果在 DUMP 的过程中过期了,直接 continue. */
    for (j = 0; j < num_keys; j++) {
        long long ttl = 0;
        long long expireat = getExpire(c->db,kv[j]);

        if (expireat != -1) {ttl = expireat-mstime();
            if (ttl < 0) {continue;}
            if (ttl < 1) ttl = 1;
        }

        /* 经过上面的筛选之后,都是最新的、没有过期的键,这些键可以最终被迁移了. */
        kv[non_expired++] = kv[j];

然后,在目标实例上最终我们需要执行 RESTORE 命令,将之前经过 DUMP 序列化的字节流反序列化,过程和上面同理:

        serverAssertWithInfo(c,NULL,
            rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 同上,写入开始表示 \* 5 或 4

        if (server.cluster_enabled) // 如果集群模式开启
            serverAssertWithInfo(c,NULL,
                rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
        else
            serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); // 同上,写入 $7 RESTORE \r\n
        serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
                sdslen(kv[j]->ptr))); // 将所有需要反序列化的 key 写入
        serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 写入过期时间

接下来,我们就需要最终执行 DUMP 命令,将我们需要传输的所有键等数据序列化了,这里 redis 调用了 createDumpPayload()来创建一个 DUMP 载荷,这就是最终序列化好的数据:

        createDumpPayload(&payload,ov[j],kv[j]); // 序列化数据
        serverAssertWithInfo(c,NULL,
            rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                               sdslen(payload.io.buffer.ptr))); // 将序列化数据存到 rio cmd 中等待发送
        sdsfree(payload.io.buffer.ptr);

        if (replace)
            serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); // replace 选项开启

发送数据

目前,我们需要发送的、按照 redis 协议组装好的所有序列化好的命令及数据都存放在了 cmd 这个 rio 结构体变量缓存中。我们当前的 6379redis 实例仿佛就是一个客户端,而要传输的目标实例 6380 就是一个服务端。接下来就需要读取缓存并且往直前建立好的 socket 中写入数据,将数据最终传输至目标实例:

    errno = 0;
    {
        sds buf = cmd.io.buffer.ptr;
        size_t pos = 0, towrite;
        int nwritten = 0;

        while ((towrite = sdslen(buf)-pos) > 0) {towrite = (towrite > (64*1024) ? (64*1024) : towrite); // 按照 64K 的块大小来发送
            nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout); // 往 socket fd 中写入数据(数据来源于 rio 的缓存)if (nwritten != (signed)towrite) {
                write_error = 1;
                goto socket_err;
            }
            pos += nwritten;
        }
    }

处理返回

在目标 redis 上分别执行 AUTH、SELECT、RESTORE 命令,RESTORE 命令会反序列化并将 key 写入目标实例。那么这几个命令执行完毕之后,我们如何知道它们是否执行成功呢?同样的,目标 redis 6380 实例在执行完命令之后,也会有相应的返回值,我们需要根据返回值来判断命令是否执行成功、是否将 key 成功迁移完成:

    char buf0[1024]; /* 存储 AUTH 命令返回值. */
    char buf1[1024]; /* 存储 SELECT 命令返回值 */
    char buf2[1024]; /* 存储 RESTORE 命令返回值. */

    /* 从 socket fd 中读取 AUTH 命令返回值. */
    if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
        goto socket_err;

    /*  从 socket fd 中读取 SELECT 命令返回值. */
    if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
        goto socket_err;

    int error_from_target = 0;
    int socket_error = 0;
    int del_idx = 1; 

    /* 迁移完成之后需要将原有实例上的 key 删除 */
    if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));

    for (j = 0; j < num_keys; j++) {
        /*  从 socket fd 中读取 RESTORE 命令返回值 */
        if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
            socket_error = 1;
            break;
        }
        if ((password && buf0[0] == '-') ||
            (select && buf1[0] == '-') ||
            buf2[0] == '-')
        {if (!error_from_target) {...} else {if (!copy) { // 没有开启 copy 选项,需要删除原有实例的键
               ...
               /* 删除原有实例上的键 */
                dbDelete(c->db,kv[j]);
                ...
            }
        }
    }
    ...
    /* 如果发生 socket 错误,关闭连接 */
    if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
    ...
    sdsfree(cmd.io.buffer.ptr); // 释放 cmd 的 rio 缓冲区
    zfree(ov); zfree(kv); zfree(newargv); // 释放存储 key 的 robj 结构体
    return;

综上,migrate 命令就执行完成了。我们总结一下它的执行过程:

  • 命令参数校验
  • 按照 redis 协议组装目标实例上需要执行的命令
  • 将要传输的 key 序列化
  • 创建 socket 连接
  • 通过 socket 连接将命令及数据传输至目标实例
  • 目标实例执行命令并存储相应的 key
  • 处理目标实例的返回值
  • 如果失败执行重试逻辑,如果成功则执行完毕

扩展

缓存 socket 的实现

在 migrate 命令执行过程中,调用了 migrateGetSocket()创建 socket。redis 借助字典结构,实现了缓存 socket,避免了多次创建 socket 所带来的开销:

migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
    int fd;
    sds name = sdsempty();
    migrateCachedSocket *cs;

    /* 查找字典中是否有相应 ip:port 的缓存 socket. */
    name = sdscatlen(name,host->ptr,sdslen(host->ptr));
    name = sdscatlen(name,":",1);
    name = sdscatlen(name,port->ptr,sdslen(port->ptr));
    // 查找字典
    cs = dictFetchValue(server.migrate_cached_sockets,name);
    if (cs) { // 如果找到了,说明之前创建过 ip:port 的 socket
        sdsfree(name);
        cs->last_use_time = server.unixtime;
        return cs; // 直接返回缓存 socket
    }
 
    /* 如果在字典中没有找到,说明没有缓存,需要重新创建. */
    /* 判断是否缓存的 socket 过多,最大为 64 个 */
    if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
        /* 如果字典中缓存的 socket 过多,需要随机删除一些 */
        dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
        cs = dictGetVal(de);
        close(cs->fd);
        zfree(cs);
        dictDelete(server.migrate_cached_sockets,dictGetKey(de));
    }

    /* 创建 socket */
    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
                                atoi(c->argv[2]->ptr));
    if (fd == -1) {sdsfree(name);
        addReplyErrorFormat(c,"Can't connect to target node: %s",
            server.neterr);
        return NULL;
    }
    anetEnableTcpNoDelay(server.neterr,fd);

    /* 检查是否在超时时间内创建完成 */
    if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {sdsfree(name);
        addReplySds(c,
            sdsnew("-IOERR error or timeout connecting to the client\r\n"));
        close(fd);
        return NULL;
    }

    /* 将新创建的 socket 加入缓存并返回给调用者 */
    cs = zmalloc(sizeof(*cs));
    cs->fd = fd;
    cs->last_dbid = -1;
    cs->last_use_time = server.unixtime;
    // 将新创建的 socket 加入字典,缓存起来等待下次使用
    dictAdd(server.migrate_cached_sockets,name,cs);
    return cs;
}
退出移动版