共计 7177 个字符,预计需要花费 18 分钟才能阅读完成。
Redis 数据 DB
Redis 是一个一个的 DB,那么这个 DB 到底是一个什么样构造的数据呢?
如下是 Redis 官网的源码(5.0)
/* Redis 数据库示意。有多个数据库标识从 0(默认数据库) 到配置的最大值的整数数据库。数据库号是构造中的“id”字段 */ | |
typedef struct redisDb { | |
dict *dict; /* 这个数据库的键空间(字典类型)*/ | |
dict *expires; /* 设置超时的键的超时 */ | |
dict *blocking_keys; /* 客户端期待数据的密钥 (BLPOP) */ | |
dict *ready_keys; /* 接管到推送的阻塞键 */ | |
dict *watched_keys; /* EXEC CAS 的监督键 */ | |
int id; /* 数据库 ID */ | |
long long avg_ttl; /* 均匀 TTL,仅用于统计 */ | |
list *defrag_later; /* 要一一进行磁盘整顿的键名列表 */ | |
} redisDb; |
咱们能够看到 Redis 的数据库次要的数据是寄存在字典中的
Redis 数据 Dict 字典
官网源码地址:https://github.com/redis/redi…
咱们找到 dict 字典的定义:
// 字典类型数据定义 | |
typedef struct dict { | |
dictType *type; /* 字典类型数组 */ | |
void *privdata; /* 公有数据 */ | |
dictht ht[2]; /* 字典 Hash 表数组 */ | |
long rehashidx; /* 如果 rehashidx == -1,示意没有进行 Rehash, 如果为负数示意有 ReHash 操作 */ | |
unsigned long iterators; /* 以后正在运行的迭代器数 */ | |
} dict; |
次要的数据是寄存在咱们的字典 Hash 表数组中的咱们在来看一下这个 dictht,字典 Hash 表
// 字典 Hash 表类型数据定义 | |
typedef struct dictht { | |
dictEntry **table; /* Hash 表,寄存一个又一个的字典元素, 实际上是一个数组 */ | |
unsigned long size; /* 哈希表大小,即哈希表数组大小 */ | |
unsigned long sizemask; /* 哈希表大小掩码,总是等于 size-1,次要用于计算索引 */ | |
unsigned long used; /* 已应用节点数,即已应用键值对数 */ | |
} dictht; |
那么更加次要的就是咱们的每一个字典的元素,示意咱们寄存的元素数据
// 字典元素类型数据定义 | |
typedef struct dictEntry { | |
// 无类型指针,Key 指向 Val 值 | |
void *key; | |
// 值,是一个专用体, 他有可能是一个指针,或者一个 64 位正整数,或者 64 位 int,浮点数 | |
union { | |
// 值指针 | |
void *val; | |
// 64 位正整数 | |
uint64_t u64; | |
// 64 位 int | |
int64_t s64; | |
// 浮点数 | |
double d; | |
} v; | |
// next 节点,每一个 dictEntry 都是一个链表,用于解决 Hash 抵触 | |
struct dictEntry *next; | |
} dictEntry; |
Redis 解决 Hash 抵触
咱们晓得上方的 dict,有两个 Hash 表,那么为什么咱们要放两个 Hash 表呢?
答案就是咱们 Redis 的 Hash 表在进行扩容的时候须要用到的,那么上面咱们来看一下源码中的解释吧。
int dictRehash(dict *d, int n);
源码地位:https://github.com/redis/redi…
首先咱们必定须要晓得咱们是在哪一步进行扩容的,必定是在咱们产生 Add 操作的时候咱们定位到 Add 的办法:
/* 增加一个元素到指标哈希表 */ | |
int dictAdd(dict *d, void *key, void *val) | |
{ | |
// 向字典中增加 key | |
dictEntry *entry = dictAddRaw(d,key,NULL); | |
if (!entry) return DICT_ERR; | |
// 而后设置节点的值 | |
dictSetVal(d, entry, val); | |
return DICT_OK; | |
} |
而后咱们定位到 dictAddRaw,这一步应用链表解决 Hash 抵触
/* 低级增加或查找: | |
* 此函数增加了元素,但不是设置值而是将 dictEntry 构造返回给用户,这将确保按需填写值字段. | |
* | |
* 此函数也间接公开给要调用的用户 API 次要是为了在哈希值外部存储非指针,例如: | |
* entry = dictAddRaw(dict,mykey,NULL); | |
* if (entry != NULL) dictSetSignedIntegerVal(entry,1000); | |
* | |
* 返回值: | |
* | |
* 如果键曾经存在,则返回 NULL,如果不存在,则应用现有条目填充“* existing”. | |
* 如果增加了键,则哈希条目将返回以由调用方进行操作。*/ | |
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) | |
{ | |
long index; | |
dictEntry *entry; | |
dictht *ht; | |
// 判断是否正在 ReHash,如果须要则调用_dictRehashStep(后续 ReHash 中的步骤),每次 ReHash 一条数据,直到实现整个 ReHash | |
if (dictIsRehashing(d)) _dictRehashStep(d); | |
/* 获取新元素的索引, 依据 Key 计算索引,并且判断是否须要进行扩容 ReHash(!!!重点)(第一次 ReHash 调用)*/ | |
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) | |
return NULL; | |
/* 解决 Hash 抵触,以及 ReHash 时效率问题 */ | |
/* 分配内存并存储新条目。假如在数据库系统中更有可能更频繁地拜访最近增加的条目,则将元素插入顶部 */ | |
// 判断是否须要 ReHash,如果是那么以后的 HashTable 为字典下的第二个,如果不须要扩容则应用原来的的 | |
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; | |
// 创立元素, 分配内存 | |
entry = zmalloc(sizeof(*entry)); | |
// 进行元素链表操作,元素的下一个节点指向 Hash 表中的相应索引,如果以前这个下标有元素则链到以后元素前面 | |
entry->next = ht->table[index]; | |
// Hash 表节点索引设置为本人,替换原来的元素 | |
ht->table[index] = entry; | |
ht->used++; | |
/* 设置这个 Hash 元素的 Key. */ | |
dictSetKey(d, entry, key); | |
return entry; | |
} |
看完了 Hash 抵触的解决形式咱们再来看一下扩容,首先咱们看一下 dictIsRehashing,是如何判断须要进行 ReHash 的
ReHash 解决扩容 ReHash 过程
// 如果字典的 rehashidx 不是 -1,那就示意须要进行 Hash 扩容 | |
dictIsRehashing(d) ((d)->rehashidx != -1) |
那么在什么中央批改了 rehashidx 呢,就是在咱们计算 Index 的时候
/* 返回可用插槽填充的索引, 依据“Key”的哈希计算,如果 Key 曾经存在,则返回 -1 | |
* 请留神,如果咱们正在从新哈希表,索引总是在第二个(新)哈希表的上下文中返回,也就是 ht[1] */ | |
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) | |
{ | |
unsigned long idx, table; | |
dictEntry *he; | |
if (existing) *existing = NULL; | |
/* 如果须要,扩容哈希表,如果失败返回 -1(ReHash 扩容机制)*/ | |
if (_dictExpandIfNeeded(d) == DICT_ERR) | |
return -1; | |
/* 从两个 Hash 表进行查问,可能这个 Key 放入了第二个哈希表 */ | |
for (table = 0; table <= 1; table++) { | |
/* 依据数组长度 - 1 而后取模计算卡槽 */ | |
idx = hash & d->ht[table].sizemask; | |
/* 依据 Hash 表获取元素,并且判断这个 Key 有没有在 Hash 表外面,如果存在返回 -1 */ | |
he = d->ht[table].table[idx]; | |
while(he) {if (key==he->key || dictCompareKeys(d, key, he->key)) {if (existing) *existing = he; | |
return -1; | |
} | |
he = he->next; | |
} | |
// 如果不在 ReHash,间接返回第一个 Hash 表的 index 卡槽,如果是 ReHash 那么就把 idx 放入第二个 Hash 表 | |
if (!dictIsRehashing(d)) break; | |
} | |
return idx; | |
} |
此处开始判断是否须要进行扩容
/* 如果须要,扩容 Hash 表 */ | |
static int _dictExpandIfNeeded(dict *d) | |
{ | |
/* 如果曾经在 ReHash 中了,间接返回 */ | |
if (dictIsRehashing(d)) return DICT_OK; | |
/* 如果哈希表为空,将其开展到初始大小。初始大小 4 */ | |
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); | |
/* 如果咱们的曾经应用的元素个数和 Hash 表数组长度达到 1:1 比率,那么就要进行扩容了(全局设置)或者咱们应该防止它,但之间的比率元素 / 存储桶超过 "平安" 阈值,咱们调整大小加倍存储桶的数量 */ | |
/* 简略来说就是咱们应用的元素等于数组的长度那么咱们就扩容 Hash 表,容量扩容一倍 */ | |
if (d->ht[0].used >= d->ht[0].size && | |
(dict_can_resize || | |
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) | |
{return dictExpand(d, d->ht[0].used*2); | |
} | |
return DICT_OK; | |
} |
将第二张 Hash 表从新初始化,后续 ReHash 中的元素都会放入第二张 Hash 表
/* 扩容或者创立 Hash 表 */ | |
int dictExpand(dict *d, unsigned long size) | |
{ | |
/* 如果正在 ReHash,或者应用数量大于原 size * 2,返回 -1 */ | |
if (dictIsRehashing(d) || d->ht[0].used > size) | |
return DICT_ERR; | |
dictht n; /* 新的哈希表 */ | |
unsigned long realsize = _dictNextPower(size); | |
/* 从新大小重为雷同的表大小没有用途,返回 -1 */ | |
if (realsize == d->ht[0].size) return DICT_ERR; | |
/* 调配新的哈希表并初始化所有指向 NULL 的指针 */ | |
n.size = realsize; | |
n.sizemask = realsize-1; | |
/* 分配内存扩容空间 */ | |
n.table = zcalloc(realsize*sizeof(dictEntry*)); | |
n.used = 0; | |
/* 这是第一次初始化吗?如果是这样,它不是真正的重述,咱们只是设置第一个哈希表,以便它能够承受键。*/ | |
if (d->ht[0].table == NULL) {d->ht[0] = n; | |
return DICT_OK; | |
} | |
/* 筹备第二个哈希表以进行增量重哈希,将第二个长期寄存的 Hash 表从新初始化,开始 ReHash 操作 */ | |
d->ht[1] = n; | |
d->rehashidx = 0; | |
return DICT_OK; | |
} |
ReHash 过程是指咱们将状态设置为了 ReHash,并且将新增的元素写入到了第二张 Hash 表,这个时候咱们就须要将第二张 Hash 表和第一张 Hash 表
/* 字典 ReHash 操作,每次第一个参数示意字典,第二个参数示意每次 ReHash 的数量,例如 100 扩容至两百,如果没有 Hash 抵触,咱们须要传入 100 能力实现 ReHash */ | |
int dictRehash(dict *d, int n) { | |
int empty_visits = n*10; /* 可拜访的最大空桶数 */ | |
/* 不在 ReHash 过程间接返回 */ | |
if (!dictIsRehashing(d)) return 0; | |
/* ReHash 第二张表时会先 */ | |
while(n-- && d->ht[0].used != 0) { | |
dictEntry *de, *nextde; | |
/* 请留神,rehashidx 不会溢出,因为咱们确信还有更多元素,因为 ht [0] .used!= 0*/ | |
assert(d->ht[0].size > (unsigned long)d->rehashidx); | |
// 如果卡槽是空的那么从 ReHashIndex 开始自增,因为须要遍历,rehashidx 从开始被默认置为 0,如果须要将原来的 Hash 表实现 ReHash,就须要从 0 遍历残缺张 Hash 表 | |
while(d->ht[0].table[d->rehashidx] == NULL) { | |
d->rehashidx++; | |
// 如果查找了 n * 10 个卡槽还是为空的话那么咱们返回 1,不执行操作 | |
if (--empty_visits == 0) return 1; | |
} | |
// 获取原来的 ht[0] 的相应卡槽的 Hash 表 | |
de = d->ht[0].table[d->rehashidx]; | |
/* 而后将卡槽中的 Key Value 都放入 ht[1] 示意将数据从 ht[0] 挪动到 ht[1]*/ | |
while(de) { | |
uint64_t h; | |
nextde = de->next; | |
/* 获取新哈希表中的索引 */ | |
h = dictHashKey(d, de->key) & d->ht[1].sizemask; | |
de->next = d->ht[1].table[h]; | |
d->ht[1].table[h] = de; | |
d->ht[0].used--; | |
d->ht[1].used++; | |
de = nextde; | |
} | |
// 如果为空那么持续 +1,晓得 ht[0] 的表变成空的 | |
d->ht[0].table[d->rehashidx] = NULL; | |
d->rehashidx++; | |
} | |
// 实现 ReHash,示意将 ht[0] 所有数据曾经挪动到 ht[1],而后将 ht[1] 赋值给 ht[0],而后清空 ht[1],一次 ReHash 操作实现 | |
/* 查看咱们是否曾经从新 ReHash 了第一张 Hash 表.. */ | |
if (d->ht[0].used == 0) {zfree(d->ht[0].table); | |
d->ht[0] = d->ht[1]; | |
_dictReset(&d->ht[1]); | |
d->rehashidx = -1; | |
return 0; | |
} | |
/* 返回数据,这一步通常因为 ReHash 没有执行完,只 ReHash 了一部分(未实现 ReHash)*/ | |
// 返回 1 示意给定时工作循环调度,while 条件,示意没有 ReHash 实现 | |
return 1; | |
} |
并且有任务调度 ReHash
在 Server 中 https://github.com/redis/redi…
/* 数据库定时工作 */ | |
void databasesCron(void) { | |
/* Rehash */ | |
if (server.activerehashing) {for (j = 0; j < dbs_per_call; j++) { | |
// 数据库 ReHash | |
int work_done = incrementallyRehash(rehash_db); | |
if (work_done) {break;} else { | |
/* If this db didn't need rehash, we'll try the next one. */ | |
rehash_db++; | |
rehash_db %= server.dbnum; | |
} | |
} | |
} | |
// 每个数据库每次执行 1 毫秒的 ReHash | |
int incrementallyRehash(int dbid) { | |
/* Keys dictionary */ | |
if (dictIsRehashing(server.db[dbid].dict)) {dictRehashMilliseconds(server.db[dbid].dict,1); | |
return 1; /* 曾经应用了毫秒作为循环周期。... */ | |
} | |
/* Expires */ | |
if (dictIsRehashing(server.db[dbid].expires)) {dictRehashMilliseconds(server.db[dbid].expires,1); | |
return 1; /* 曾经应用了毫秒作为循环周期。... */ | |
} | |
return 0; | |
} | |
/* Rehash 在 ms+"delta" 毫秒。delta 值较大, 小于 0,大多数状况下小于 1。准确上界取决于 dictRehash(d,100) 的运行工夫 */ | |
int dictRehashMilliseconds(dict *d, int ms) {if (d->iterators > 0) return 0; | |
// 记录开始同步 | |
long long start = timeInMilliseconds(); | |
// 记录 ReHash 的数量 | |
int rehashes = 0; | |
// 每次 ReHash100 条数据 | |
while(dictRehash(d,100)) { | |
rehashes += 100; | |
// 如果执行到指定工夫 例如 一毫秒,以后工夫 - 开始工夫 > 1 毫秒,则间接 Break | |
if (timeInMilliseconds()-start > ms) break; | |
} | |
return rehashes; | |
} |