乐趣区

关于golang:redis持久化

本次次要是对 redis 中驰名的长久化策略进行代码层面形容,次要包含 RDB 长久化和 AOF 长久化

<!– more –>

因为 AOF 文件的更新频率比 RDB 高,所以如果服务器开启 AOF 长久化,redis 优先应用 AOF 文件还原,只有当 AOF 长久化敞开,才应用 RDB 文件进行还原

RDB 长久化

RDB 长久化次要有两个命令实现:SAVEBGSAVE

SAVEBGSAVE

  • SAVE会阻塞 redis 服务器,晓得 RDB 文件创建结束
void saveCommand(redisClient *c) {
    // BGSAVE 曾经在执行中,不能再执行 SAVE
    // 否则将产生竞争条件
    if (server.rdb_child_pid != -1) {addReplyError(c,"Background save already in progress");
        return;
    }
    // 执行 
    if (rdbSave(server.rdb_filename) == REDIS_OK) {addReply(c,shared.ok);
    } else {addReply(c,shared.err);
    }
}
  • BGSAVE不会阻塞,他会创立一个子过程,由子过程解决 RDB 文件保留
void bgsaveCommand(redisClient *c) {
    // 不能反复执行 BGSAVE
    if (server.rdb_child_pid != -1) {addReplyError(c,"Background save already in progress");
    // 不能在 BGREWRITEAOF 正在运行时执行
    } else if (server.aof_child_pid != -1) {addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
    // 执行 BGSAVE
    } else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) {addReplyStatus(c,"Background saving started");
    } else {addReply(c,shared.err);
    }
}
int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;
    // 如果 BGSAVE 曾经在执行,那么出错
    if (server.rdb_child_pid != -1) return REDIS_ERR;
    // 记录 BGSAVE 执行前的数据库被批改次数
    server.dirty_before_bgsave = server.dirty;
    // 最近一次尝试执行 BGSAVE 的工夫
    server.lastbgsave_try = time(NULL);
    // fork() 开始前的工夫,记录 fork() 返回耗时用
    start = ustime();
    if ((childpid = fork()) == 0) {
        int retval;
        /* 子过程 */
        // 敞开网络连接 fd
        closeListeningSockets(0);
        // 设置过程的题目,不便辨认
        redisSetProcTitle("redis-rdb-bgsave");
        // 执行保留操作
        retval = rdbSave(filename);
        // 打印 copy-on-write 时应用的内存数
        if (retval == REDIS_OK) {size_t private_dirty = zmalloc_get_private_dirty();
            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
        }
        // 向父过程发送信号
        exitFromChild((retval == REDIS_OK) ? 0 : 1);
    } else {
        /* 父过程 */
        // 计算 fork() 执行的工夫
        server.stat_fork_time = ustime()-start;
        // 如果 fork() 出错,那么报告谬误
        if (childpid == -1) {
            server.lastbgsave_status = REDIS_ERR;
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        // 打印 BGSAVE 开始的日志
        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
        // 记录数据库开始 BGSAVE 的工夫
        server.rdb_save_time_start = time(NULL);
        // 记录负责执行 BGSAVE 的子过程 ID
        server.rdb_child_pid = childpid;
        // 敞开主动 rehash
        updateDictResizePolicy();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

两个命令外部都是执行 rdbSave 函数

/* 
 * 将数据库保留到磁盘上。* 保留胜利返回 REDIS_OK,出错 / 失败返回 REDIS_ERR。*/
int rdbSave(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    char tmpfile[256];
    char magic[10];
    int j;
    long long now = mstime();
    FILE *fp;
    rio rdb;
    uint64_t cksum;

    // 创立临时文件
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    // 初始化 I/O
    rioInitWithFile(&rdb,fp);

    // 设置校验和函数
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;

    // 写入 RDB 版本号
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
    // 写入谬误,跳转到 werr
    if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;

    // 遍历所有数据库
    for (j = 0; j < server.dbnum; j++) {

        // 指向数据库
        redisDb *db = server.db+j;

        // 指向数据库键空间
        dict *d = db->dict;

        // 跳过空数据库
        if (dictSize(d) == 0) continue;

        // 创立键空间迭代器
        di = dictGetSafeIterator(d);
        if (!di) {fclose(fp);
            return REDIS_ERR;
        }

        /* 
         * 写入 DB 选择器
         */
        if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(&rdb,j) == -1) goto werr;

        /*
         * 遍历数据库,并写入每个键值对的数据
         */
        while((de = dictNext(di)) != NULL) {sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;
            
            // 依据 keystr,在栈中创立一个 key 对象
            initStaticStringObject(key,keystr);

            // 获取键的过期工夫
            expire = getExpire(db,&key);

            // 保留键值对数据
            if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
        }
        dictReleaseIterator(di);
    }
    di = NULL; /* So that we don't release it again on error. */

    /* 
     * 写入 EOF 代码
     */
    if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;

    /* 
     * CRC64 校验和。*
     * 如果校验和性能已敞开,那么 rdb.cksum 将为 0,* 在这种状况下,RDB 载入时会跳过校验和查看。*/
    cksum = rdb.cksum;
    memrev64ifbe(&cksum);
    rioWrite(&rdb,&cksum,8);

    // 冲洗缓存,确保数据已写入磁盘
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* 
     * 应用 RENAME,原子性地对临时文件进行改名,笼罩原来的 RDB 文件。*/
    if (rename(tmpfile,filename) == -1) {redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }

    // 写入实现,打印日志
    redisLog(REDIS_NOTICE,"DB saved on disk");

    // 清零数据库脏状态
    server.dirty = 0;

    // 记录最初一次实现 SAVE 的工夫
    server.lastsave = time(NULL);

    // 记录最初一次执行 SAVE 的状态
    server.lastbgsave_status = REDIS_OK;

    return REDIS_OK;

werr:
    // 敞开文件
    fclose(fp);
    // 删除文件
    unlink(tmpfile);

    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));

    if (di) dictReleaseIterator(di);

    return REDIS_ERR;
}

RDB 文件内容

首先给出一个残缺的 RDB 文件的格局

后续为形容不便,大写为常量,小写为变量或者数据

  • REDIS 这个其实就是 RDB 文件的标识符
  • db_version长度 4 字节,记录 RDB 文件的版本号,redis3.0 个别应用 0006(第六版)
  • databases示意任意个数据库
  • EOF示意注释内容完结
  • check_sum校验和,8 字节,通过后面 4 局部内容计算得出

上面重点说下 databases 字段,每个 database 都是包含如下几个局部。

  • SELECTDB一字节,示意接下来要读一个数据库号码
  • db_number示意一个数据库号码,长度 1、2、5 字节,当读入该数字后,redis 会调用 select 命令进行数据库切换
  • key_value_pairs示意数据库中所有的键值对数据,其中又分为不带过期工夫的键值对,和带过期工夫的键值对

    • 不带过期的键值对,由 TYPEkeyvalue 组成

    • 带过期的键值对,由 EXPIRETIME_MSmsTYPEkeyvalue 组成

AOF 长久化

AOF 长久化是通过保留 redis 服务器在运行期间所执行的 命令进行记录数据,AOF 长久化分为命令追加、文件写入、文件同步三个步骤,上面别离对这三个步骤进行论述

命令追加

当 AOF 长久化处于关上的状态,服务器在执行一个写命令之后,会以某种协定的形式将被执行的写命令追加到服务器 redisServer 中的 aof_buf 缓冲区开端

文件写入与同步

上一次咱们说到,redis 在运行过程中,是一个事件循环,每次循环执行对应的工夫事件和文件事件,因而 AOF 长久化的写入也在每次事件循环完结后进行,执行函数flushAppendOnlyFile

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;

    // 缓冲区中没有任何内容,间接返回
    if (sdslen(server.aof_buf) == 0) return;

    // 策略为每秒 FSYNC 
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        // 是否有 SYNC 正在后盾进行?sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    // 每秒 fsync,并且强制写入为假
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {

        /* 
         * 当 fsync 策略为每秒钟一次时,fsync 在后盾执行。* 如果后盾仍在执行 FSYNC,那么咱们能够提早写操作一两秒
         *(如果强制执行 write 的话,服务器主线程将阻塞在 write 下面)*/
        if (sync_in_progress) {

            // 有 fsync 正在后盾进行。。。if (server.aof_flush_postponed_start == 0) {
                /*
                 * 后面没有推延过 write 操作,这里将推延写操作的工夫记录下来
                 * 而后就返回,不执行 write 或者 fsync
                 */
                server.aof_flush_postponed_start = server.unixtime;
                return;

            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* 
                 * 如果之前曾经因为 fsync 而推延了 write 操作
                 * 然而推延的工夫不超过 2 秒,那么间接返回
                 * 不执行 write 或者 fsync
                 */
                return;

            }

            /*
             * 如果后盾还有 fsync 在执行,并且 write 曾经推延 >= 2 秒
             * 那么执行写操作(write 将被阻塞)*/
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }

    /* 
     * 执行到这里,程序会对 AOF 文件进行写入。* 清零提早 write 的工夫记录
     */
    server.aof_flush_postponed_start = 0;

    /* 
     * 执行单个 write 操作,如果写入设施是物理的话,那么这个操作应该是原子的
     *
     * 当然,如果呈现像电源中断这样的不可抗景象,那么 AOF 文件也是可能会呈现问题的
     * 这时就要用 redis-check-aof 程序来进行修复。*/
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf)) {

        static time_t last_write_error_log = 0;
        int can_log = 0;

        // 将日志的记录频率限度在每行 AOF_WRITE_LOG_ERROR_RATE 秒
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        // 如果写入出错,那么尝试将该状况写入到日志外面
        if (nwritten == -1) {if (can_log) {
                redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {if (can_log) {
                redisLog(REDIS_WARNING,"Short write while writing to"
                                       "the AOF file: (nwritten=%lld,"
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            // 尝试移除新追加的不残缺内容
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {if (can_log) {
                    redisLog(REDIS_WARNING, "Could not remove short write"
                             "from the append-only file.  Redis may refuse"
                             "to load the AOF the next time it starts."
                             "ftruncate: %s", strerror(errno));
                }
            } else {/* If the ftrunacate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        // 解决写入 AOF 文件时呈现的谬误
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synched on disk. */
            redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = REDIS_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {
        // 写入胜利,更新最初写入状态
        if (server.aof_last_write_status == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = REDIS_OK;
        }
    }

    // 更新写入后的 AOF 文件大小
    server.aof_current_size += nwritten;

    /* 
     * 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,* 否则的话,开释 AOF 缓存。*/
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        // 清空缓存中的内容,期待重用
        sdsclear(server.aof_buf);
    } else {
        // 开释缓存
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();}

    /*
     * 如果 no-appendfsync-on-rewrite 选项为开启状态,* 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,* 那么不执行 fsync 
     */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    // 总是执行 fsnyc
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {/* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */

        // 更新最初一次执行 fsnyc 的工夫
        server.aof_last_fsync = server.unixtime;

    // 策略为每秒 fsnyc,并且间隔上次 fsync 曾经超过 1 秒
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        // 放到后盾执行
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        // 更新最初一次执行 fsync 的工夫
        server.aof_last_fsync = server.unixtime;
    }
}

在下面代码中,咱们能够看到执行 fsync 有几种可能,这些可能性通过 appendfsync 配置进行决定

appendfsync 选项的值 flushappendonlyfile 函数行为
always 将 aof_buf 缓冲区所有内容写入并同步到 AOF 文件
everysec 将 aof buf 缓冲区中的所有内容写入到 AOF 文件,如果上次同步 AOF 文件的工夫间隔当初超过一秒钟,那么再次对 AOF 文件进行同步,并且这个同步操作是由一个线程专门负责执行的
no 将 aof_buf 缓冲区中的所有内容写入到 AOF 文件,但并不对 AOF 文件进行同步, 何时同步由操作系统来决定

AOF 重写

由 AOF 写入原理可知,每次执行命令,都会向文件中写入命令,那么这就会导致文件较大,而且对于比方这种状况:先增加一个 a 键,再删除一个 a 键,这其实最终的成果是和最后一样的,若将两次执行命令都写入,则其实是没有用的,因而 redis 采纳 AOF 重写的形式,函数为rewriteAppendOnlyFileBackground

/* 
 * 以下是后盾重写 AOF 文件(BGREWRITEAOF)的工作步骤:*
 * 1) 用户调用 BGREWRITEAOF
 *
 * 2) Redis 调用这个函数,它执行 fork():*
 *    2a) 子过程在临时文件中对 AOF 文件进行重写
 *
 *    2b) 父过程将新输出的写命令追加到 server.aof_rewrite_buf 中
 *
 * 3) 当步骤 2a 执行完之后,子过程完结
 *
 * 4) 
 *    父过程会捕获子过程的退出信号,*    如果子过程的退出状态是 OK 的话,*    那么父过程将新输出命令的缓存追加到临时文件,*    而后应用 rename(2) 对临时文件改名,用它代替旧的 AOF 文件,*    至此,后盾 AOF 重写实现。*/
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    // 曾经有过程在进行 AOF 重写了
    if (server.aof_child_pid != -1) return REDIS_ERR;

    // 记录 fork 开始前的工夫,计算 fork 耗时用
    start = ustime();

    if ((childpid = fork()) == 0) {char tmpfile[256];

        /* 子过程 */

        // 敞开网络连接 fd
        closeListeningSockets(0);

        // 为过程设置名字,不便记认
        redisSetProcTitle("redis-aof-rewrite");

        // 创立临时文件,并进行 AOF 重写
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
            // 发送重写胜利信号
            exitFromChild(0);
        } else {
            // 发送重写失败信号
            exitFromChild(1);
        }
    } else {
        /* 父过程 */
        // 记录执行 fork 所耗费的工夫
        server.stat_fork_time = ustime()-start;

        if (childpid == -1) {
            redisLog(REDIS_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }

        redisLog(REDIS_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);

        // 记录 AOF 重写的信息
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;

        // 敞开字典主动 rehash
        updateDictResizePolicy();

        /* 
         * 将 aof_selected_db 设为 -1,* 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令,* 从而确保之后新增加的命令会设置到正确的数据库中
         */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */

AOF 重写的原理,其实是间接读取以后的数据库的值,最初应用一条写语句就能够实现 AOF 重写

而且 AOF 重写是放在后台子过程执行,这样能够防止效率太低,然而应用子过程执行重写形式,则在重写过程中,父过程还会执行新的写命令,因而这段事件的命令也要被记录下来,最初再次同步给子过程

本人的网址:www.shicoder.top
欢送加群聊天 452380935

本文由博客一文多发平台 OpenWrite 公布!

退出移动版