我之前统计过咱们线上某redis数据被拜访的工夫散布,大略90%的申请只会拜访最新15分钟的数据,99%的申请拜访最新1小时的数据,只有不到千分之一的申请会拜访超过1天的数据。咱们之前这份数据存了两天(近500g内存数据),如果算上主备的话用掉了120多个Redis实例(一个实例8g内存),光把过期工夫从2天改成1天就能省下60多个redis实例,而且对原业务也没有啥太大影响。

当然Redis曾经实现了数据过期的主动清理机制,我所要做的只是改下数据写入时的过期工夫而已。假如Redis没有数据过期的机制,咱们要怎么办? 大略一想就晓得很麻烦,认真想的话还得思考很多的细节。所以我感觉过期数据在缓存零碎中是不起眼但十分重要的性能,除了省事外,它也能帮咱们节俭很多老本。接下来咱们看下Redis中是如何实现数据过期的。

实时清理

家喻户晓,Redis外围流程是单线程执行的,它基本上是解决完一条申请再出解决另外一条申请,解决申请的过程并不仅仅是响应用户发动的申请,Redis也会做好多其余的工作,以后其中就包含数据的过期。

Redis在读写某个key的时候,它就会调用expireIfNeeded()先判断这个key是否曾经过期了,如果已过期,就会执行删除。

int expireIfNeeded(redisDb *db, robj *key) {    if (!keyIsExpired(db,key)) return 0;    /* 如果是在slave上下文中运行,间接返回1,因为slave的key过期是由master管制的,     * master会给slave发送数据删除命令。      *      * 如果返回0示意数据不须要清理,返回1示意数据这次标记为过期 */    if (server.masterhost != NULL) return 1;    if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;    /* 删除key */    server.stat_expiredkeys++;    propagateExpire(db,key,server.lazyfree_lazy_expire);    notifyKeyspaceEvent(NOTIFY_EXPIRED,        "expired",key,db->id);    int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :                                               dbSyncDelete(db,key);    if (retval) signalModifiedKey(NULL,db,key);    return retval;}

判断是否过期也很简略,Redis在dictEntry中存储了上次更新的工夫戳,只须要判断以后工夫戳和上次更新工夫戳之间的gap是否超过设定的过期工夫即可。

咱们重点来关注下这行代码。

int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :                                               dbSyncDelete(db,key);

lazyfree_lazy_expire 是Redis的配置项之一,它的作用是是否开启惰性删除(默认不开启),很显然如果开启就会执行异步删除,接下来咱们具体说下Redis的惰性删除。

惰性删除

何为惰性删除,从实质上讲惰性删除就是新开一个线程异步解决数据删除的工作。为什么要有惰性删除?家喻户晓,Redis外围流程是单线程执行,如果某个一步执行特地耗时,会间接影响到Redis的性能,比方删除一个几个G的hash key,那这个实例不间接原地升天。。 针对这种状况,须要新开启一个线程去异步删除,避免阻塞出Redis的主线程,当然Redis理论实现的时候dbAsyncDelete()并不齐全是异步,咱们间接看代码。

#define LAZYFREE_THRESHOLD 64int dbAsyncDelete(redisDb *db, robj *key) {    /* 从db->expires中删除key,只是删除其指针而已,并没有删除理论值 */    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);    /* If the value is composed of a few allocations, to free in a lazy way     * is actually just slower... So under a certain limit we just free     * the object synchronously. */    /*    * 在字典中摘除这个key(没有真正删除,只是查不到而已),如果被摘除的dictEntry不为    * 空就去执行上面的开释逻辑     */    dictEntry *de = dictUnlink(db->dict,key->ptr);    if (de) {        robj *val = dictGetVal(de);        /* Tells the module that the key has been unlinked from the database. */        moduleNotifyKeyUnlink(key,val);        /* lazy_free并不是齐全异步的,而是先评估开释操作所需工作量,如果影响较小就间接在主线程中删除了 */        size_t free_effort = lazyfreeGetFreeEffort(key,val);        /* 如果开释这个对象须要做大量的工作,就把他放到异步线程里做         * 但如果这个对象是共享对象(refcount > 1)就不能间接开释了,当然这很少发送,但有可能redis         * 外围会调用incrRefCount来爱护对象,而后调用dbDelete。这我只须要间接调用dictFreeUnlinkedEntry,         * 等价于调用decrRefCount */        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {            atomicIncr(lazyfree_objects,1);            bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);            dictSetVal(db->dict,de,NULL);        }    }    /* 开释键值对所占用的内存,如果是lazyFree,val曾经是null了,只须要开释key的内存即可 */    if (de) {        dictFreeUnlinkedEntry(db->dict,de);        if (server.cluster_enabled) slotToKeyDel(key->ptr);        return 1;    } else {        return 0;    }}
  1. 首先在db->expires中把这个key给删除掉,(db->expires保留了所有带有过期工夫数据的key,不便做数据过期)
  2. 而后把这个数据节点从db中摘掉,数据理论还在内存里,只是查不到而已。
  3. 接下来就是要清理数据了,redis并不是间接把清理工作放到异步线程里做,而是调用lazyfreeGetFreeEffort()来评估清理工作对性能的影响,如果影响较小,就间接在主线程里做了。反之影响太大才会将删除的工作提交到异步线程里。
  4. 开释key和val占用的内存空间,如果是异步删除,val曾经是null,这里只须要开释key占用的空间即可。

这里第三步中为什么异步删除不齐全是异步? 我感觉还是得从异步工作提交bioCreateLazyFreeJob()中一窥端倪。

void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {    va_list valist;    /* Allocate memory for the job structure and all required     * arguments */    struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));    job->free_fn = free_fn;    va_start(valist, arg_count);    for (int i = 0; i < arg_count; i++) {        job->free_args[i] = va_arg(valist, void *);    }    va_end(valist);    bioSubmitJob(BIO_LAZY_FREE, job);} void bioSubmitJob(int type, struct bio_job *job) {    job->time = time(NULL);    // 多线程须要加锁,把待处理的job增加到队列开端    pthread_mutex_lock(&bio_mutex[type]);    listAddNodeTail(bio_jobs[type],job);    bio_pending[type]++;    pthread_cond_signal(&bio_newjob_cond[type]);    pthread_mutex_unlock(&bio_mutex[type]);}

我了解,在异步删除的时候须要加锁将异步工作提交到队列里,如果加锁和工作提交所带来的性能影响大于间接删除的影响,那么异步删除还不如同步呢。

定期抽样删除

这里思考下另外一个问题,如果数据写入后就再也没有读写了,是不是实时清理的性能就无奈涉及到这些数据,而后这些数据就永远都会占用空间。针对这种状况,Redis也实现了定期删除的策略。家喻户晓,Redis外围流程是单线程执行,所以注定它不能长时间停下来去干某个特定的工作,所以Redis定期清理也是每次只做一点点。

/* 有两种清理模式,疾速清理和慢速清理 */void activeExpireCycle(int type) {    /* Adjust the running parameters according to the configured expire     * effort. The default effort is 1, and the maximum configurable effort     * is 10. */    unsigned long    effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */    config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +                           ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,  // 每次抽样的数据量大小     config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +                                 ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, // 每次清理的持续时间    config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +                                  2*effort,  // 最大CPU周期使用率     config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-                                    effort; // 可承受的过期数据占比     /* This function has some global state in order to continue the work     * incrementally across calls. */    static unsigned int current_db = 0; /* Last DB tested. */    static int timelimit_exit = 0;      /* Time limit hit in previous call? */    static long long last_fast_cycle = 0; /* When last fast cycle ran. */    int j, iteration = 0;    int dbs_per_call = CRON_DBS_PER_CALL;    long long start = ustime(), timelimit, elapsed;    /* When clients are paused the dataset should be static not just from the     * POV of clients not being able to write, but also from the POV of     * expires and evictions of keys not being performed. */    if (checkClientPauseTimeoutAndReturnIfPaused()) return;    // 疾速清理     if (type == ACTIVE_EXPIRE_CYCLE_FAST) {        /* Don't start a fast cycle if the previous cycle did not exit         * for time limit, unless the percentage of estimated stale keys is         * too high. Also never repeat a fast cycle for the same period         * as the fast cycle total duration itself. */        // 如果上次执行没有触发timelimit_exit, 跳过执行        if (!timelimit_exit &&            server.stat_expired_stale_perc < config_cycle_acceptable_stale)            return;        // 两个疾速清理周期内不执行疾速清理        if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)            return;        last_fast_cycle = start;    }    /* We usually should test CRON_DBS_PER_CALL per iteration, with     * two exceptions:     *     * 1) Don't test more DBs than we have.     * 2) If last time we hit the time limit, we want to scan all DBs     * in this iteration, as there is work to do in some DB and we don't want     * expired keys to use memory for too much time. */    if (dbs_per_call > server.dbnum || timelimit_exit)        dbs_per_call = server.dbnum;    /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU     * time per iteration. Since this function gets called with a frequency of     * server.hz times per second, the following is the max amount of     * microseconds we can spend in this function.      * config_cycle_slow_time_perc是清理所能占用的CPU周期数配置,这里将周期数转化为具体的工夫 */    timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;    timelimit_exit = 0;    if (timelimit <= 0) timelimit = 1;    if (type == ACTIVE_EXPIRE_CYCLE_FAST)        timelimit = config_cycle_fast_duration; /* in microseconds. */    /* Accumulate some global stats as we expire keys, to have some idea     * about the number of keys that are already logically expired, but still     * existing inside the database. */    long total_sampled = 0;    long total_expired = 0;        for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {        /* Expired and checked in a single loop. */        unsigned long expired, sampled;                redisDb *db = server.db+(current_db % server.dbnum);        /* Increment the DB now so we are sure if we run out of time         * in the current DB we'll restart from the next. This allows to         * distribute the time evenly across DBs. */        current_db++;        /* Continue to expire if at the end of the cycle there are still         * a big percentage of keys to expire, compared to the number of keys         * we scanned. The percentage, stored in config_cycle_acceptable_stale         * is not fixed, but depends on the Redis configured "expire effort". */        do {            unsigned long num, slots;            long long now, ttl_sum;            int ttl_samples;            iteration++;            /* 如果没有可清理的,间接完结 */            if ((num = dictSize(db->expires)) == 0) {                db->avg_ttl = 0;                break;            }            slots = dictSlots(db->expires);            now = mstime();            /* 如果slot的填充率小于1%,采样的老本太高,跳过执行,期待下次适合的机会。*/            if (num && slots > DICT_HT_INITIAL_SIZE &&                (num*100/slots < 1)) break;            /* 记录本次采样的数据和其中过期的数量 */            expired = 0;            sampled = 0;            ttl_sum = 0;            ttl_samples = 0;            // 每次最多抽样num个             if (num > config_keys_per_loop)                num = config_keys_per_loop;            /* 这里因为性能考量,咱们拜访了hashcode的的底层实现,代码和dict.c有些类型,             * 但十几年内很难扭转。              *              * 留神:hashtable很多特定的中央是空的,所以咱们的终止条件须要思考到已扫描的bucket             * 数量。 但实际上扫描空bucket是很快的,因为都是在cpu 缓存行里线性扫描,所以能够多             * 扫一些bucket */            long max_buckets = num*20;            long checked_buckets = 0;            // 这里有采样数据和bucket数量的限度。             while (sampled < num && checked_buckets < max_buckets) {                for (int table = 0; table < 2; table++) {                    if (table == 1 && !dictIsRehashing(db->expires)) break;                    unsigned long idx = db->expires_cursor;                    idx &= db->expires->ht[table].sizemask;                    dictEntry *de = db->expires->ht[table].table[idx];                    long long ttl;                    /* 遍历以后bucket中的所有entry*/                    checked_buckets++;                    while(de) {                        /* Get the next entry now since this entry may get                         * deleted. */                        dictEntry *e = de;                        de = de->next;                        ttl = dictGetSignedIntegerVal(e)-now;                        if (activeExpireCycleTryExpire(db,e,now)) expired++;                        if (ttl > 0) {                            /* We want the average TTL of keys yet                             * not expired. */                            ttl_sum += ttl;                            ttl_samples++;                        }                        sampled++;                    }                }                db->expires_cursor++;            }            total_expired += expired;            total_sampled += sampled;            /* 更新ttl统计信息 */            if (ttl_samples) {                long long avg_ttl = ttl_sum/ttl_samples;                /* Do a simple running average with a few samples.                 * We just use the current estimate with a weight of 2%                 * and the previous estimate with a weight of 98%. */                if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;                db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);            }            /* We can't block forever here even if there are many keys to             * expire. So after a given amount of milliseconds return to the             * caller waiting for the other active expire cycle.              * 不能始终阻塞在这里做清理工作,如果超时了要完结清理循环*/            if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */                elapsed = ustime()-start;                if (elapsed > timelimit) {                    timelimit_exit = 1;                    server.stat_expired_time_cap_reached_count++;                    break;                }            }            /*             * 如果过期key数量超过采样数的10%+effort,阐明过期测数量较多,要多清理下,所以             * 持续循环做一次采样清理。                  */        } while (sampled == 0 ||                 (expired*100/sampled) > config_cycle_acceptable_stale);    }    elapsed = ustime()-start;    server.stat_expire_cycle_time_used += elapsed;    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);    /* Update our estimate of keys existing but yet to be expired.     * Running average with this sample accounting for 5%. */    double current_perc;    if (total_sampled) {        current_perc = (double)total_expired/total_sampled;    } else        current_perc = 0;    server.stat_expired_stale_perc = (current_perc*0.05)+                                     (server.stat_expired_stale_perc*0.95);}

代码有些长,大抵总结下其执行流程,细节见代码正文。

  1. 首先通过配置或者默认值计算出几个参数,这几个参数间接或间接决定了这些执行的终止条件,别离如下。
  • config_keys_per_loop: 每次循环抽样的数据量
  • config_cycle_fast_duration: 疾速清理模式下每次清理的持续时间
  • config_cycle_slow_time_perc: 慢速清理模式下每次清理最大耗费CPU周期数(cpu最大使用率)
  • config_cycle_acceptable_stale: 可承受的过期数据量占比,如果本次采样中过期数量小于这个阈值就完结本次清理。
  1. 根据上述参数计算出终止条件的具体值(最大采样数量和超时限度)。
  2. 遍历清理所有的db。
  3. 针对每个db在终止条件的限度下循环清理。

总结

Redis的数据过期策略比较简单,代码也不是特地多,但一如既然处处贯通者性能的思考。当然Redis只是提供了这样一个性能,如果想用好的话还得依据具体的业务需要和理论的数据调整过期工夫的配置,就好比我在文章结尾举的那个例子。

本文是Redis源码分析系列博文,同时也有与之对应的Redis中文正文版,有想深刻学习Redis的同学,欢送star和关注。
Redis中文注解版仓库:https://github.com/xindoo/Redis
Redis源码分析专栏:https://zxs.io/s/1h
如果感觉本文对你有用,欢送一键三连