Redis 中的跳跃表构造
每个节点都有各自的分层,后退节点,后退节点,键以及分值。
- 后退节点即用来从跳跃表尾部从后向前遍历。
- 后退节点有两局部:后退指针,以及后退的步长。这个步长能够用来计算排位,比方在查找某个节点过程中,将两头的跨度加起来,就能够失去它在跳跃表中的排位。
跳跃表节点定义:
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward; // 后退节点
struct zskiplistLevel {
struct zskiplistNode *forward; // 后退节点
unsigned long span;
} level[];} zskiplistNode;
跳跃表节点外部有一个对于层数的柔性数组,代替了传统的 up、down 指针。
跳跃表定义:
typedef struct zskiplist {
struct zskiplistNode *header, *tail; // 首尾节点
unsigned long length; // 元素个数
int level; // 层高
} zskiplist;
咱们再察看下面的 head 节点,head 节点不存放数据,并且层高固定为 32 层,32 层曾经满足对于 2 ^ 64 个元素查找的效率了,所以跳跃表最高也就是 32 层。
创立、开释跳跃表
创立跳跃表节点
创立一个指定层数的跳跃表节点,sds 字符串 ele 将会被该节点援用。sds 字符串即是跳跃表节点的键
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
创立跳跃表
创立跳跃表的时候会将 head 节点设置为 32 层
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
开释跳跃表节点
开释跳跃表节点,同时也开释 sds string
留神:开释 node 即代表该节点的所有层都被开释了
void zslFreeNode(zskiplistNode *node) {sdsfree(node->ele);
zfree(node);
}
开释跳跃表
/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header);
while(node) {next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
增删改
插入节点
插入节点须要先找到插入地位。因为插入时节点会随机回升,所以要从底层开始,始终到回升的最大层做解决。并且回升的层数也是随机的,所以解决时须要分外留神。
zslInsert() 函数在查找插入地位的过程中,用数组记录每一层中,最初一个排在待插入节点后面的那个,并且记录它们在每层中的排名。这样记录实现后,之后对每层的更新都很不便。
查找过程:
从最下面一层开始查找,x 作为遍历节点。当 x->forward 的 score 值大于 待插入节点的 score 值,或者 score 值相等,但键值小于 待插入节点的 ele,就阐明该层最初一个排在待插入节点后面的节点就是 x 了。
否则,x 还不是最初一个符合要求的节点,就须要将 x 的 rank 值加到 rank[i] 中。
更新跳跃表:
查找完结后,所有层的插入地位都找到了,接下来须要确认该次插入实际上须要回升多少层,通过 zslRandomLevel() 得出,记作 level
- 如果 level 大于以后跳跃表的最大层数,则 update 数组中处于 (zsl->level, level] 的节点以及 rank 值须要另外解决。高于 zsl->level 的层中,不存在其它节点,所以将这个区间中的 update 设为 head 节点,并将 head 节点在这写层的 span 设置为 zkl->length,这对之后更新待插入节点以及 update 节点的 span 值做铺垫。还有 rank 值设为 0。
之后就是理论插入过程了,创立好新的节点 x,遍历 0 – level 层,将 x 插入,批改 forward 指向 (backward 指向最初批改,只须要改第一层的即可)。还有一个 span 值咱们须要更新:
// rank[0] 示意插入节点前一的排名,rank[i] 示意第 i + 1 层中,插入节点的前一节点的排名
// rank[0]-rank[i] 就示意了 update[i] 到 update[0] 的间隔
// update[i]->level[i].span - (rank[0] - rank[i]) 就示意原跨度减去后面的间隔
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
// update[i] 的跨度即为 update[i] 到插入节点在该层的跨度
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
这里是笔者认为最难了解的中央 (笔者脑子不好使,推敲了挺久才搞清楚),如果能看懂笔者的正文更好,看不懂的话,咱们还有图来帮忙了解:
假如当初在第 i 层更新,
所以如果是高于本来跳跃表最高层的层,head 的 span 值设置为 zsl->length
插入完当前,如果 level 是小于原 zkl->level 的,则须要将这些未插入 x 的 update 节点加一个跨度,因为它们的前面多出了一个 x 节点。
最初是解决第一层的 backward 节点。如果 x 是第一个插入的元素,即 update[0] == head,则 x->backward = NULL,否则就等于 update[0]。而 x 的下一个节点,也须要将 backward 指针指向 x。如果 x->forward == NULL,则阐明 x 是最初一个节点,不须要解决,但须要将 跳跃表的尾指针指向 x;否则,批改 x->forward 的 backward 指针。
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; // update 保留了每一层最初一个排在新节点后面的那个节点
unsigned long rank[ZSKIPLIST_MAXLEVEL]; // rank 记录了这些节点在跳跃表中的排名
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {for (i = zsl->level; i < level; i++) {rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
// rank[0] 示意插入节点前一的排名,rank[i] 示意第 i + 1 层中,插入节点的前一节点的排名
// rank[0]-rank[i] 就示意了 update[i] 到 update[0] 的间隔
// update[i]->level[i].span - (rank[0] - rank[i]) 就示意原跨度减去后面的间隔
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
// update[i] 的跨度即为 update[i] 到插入节点在该层的跨度
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
删除节点
先来看一个供其它删除函数调用的一个函数:
先看参数:x 是要删除的节点,update 数组和下面插入的含意是一样的,即每一层的 x 的前一元素。
首先是更新 update 数组中每个节点的属性,而后批改 backward 指针。最初更新跳跃表的 level 和 length。
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {if (update[i]->level[i].forward == x) {update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {x->level[0].forward->backward = x->backward;
} else {zsl->tail = x->backward;}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
而后是真正删除一个节点的函数:
下面有一段正文,意思是:从跳跃表中删除一个分值和键匹配的元素。如果节点被找到并删除了返回 1,其它状况返回 0.
如果参数 node 为空,则从跳跃表中删除的节点会被开释,否则不会被开释,只是从跳跃表中删除了,并且*node
会被设置为被删除节点的指针,因为它还有可能被调用者应用。(包含节点中的 SDS string)
上面是函数的逻辑:
与 zslInsert() 函数一样,用 update 数组保留每一层中最初一个小于 删除节点 的节点,循环完结后,x->forward 就是第一层中第一个大于等于 删除节点 的节点。
所以接下来指向判断 x->forward 是否合乎删除要求就能够将元素删除,并将每一层中节点的相干属性通过 zslDeleteNode() 更新。
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
更新节点 score 值
该函数试图在 curscore 和 ele 匹配的节点处更新该节点的 score,并且更新后节点还在原地位。如果无奈保障更新后节点地位不变,则会删除该节点,再将含有新 score 值的节点插入跳跃表
后面还是一样,找到所有层中的 update 节点,而后 serverAssert() 断言假如 x 以后是匹配 curscore 和 ele 的。之后进行判断,看新的 score 值是否在放在 x 地位并使跳跃表依然放弃有序性。如果能够,则间接更新 x 的 score 值,而后返回。
如果不能够,则须要从跳跃表删掉 x,而后再以新的 score 值与原来的 ele 创立一个新节点并插入跳跃表中。
/* Update the score of an element inside the sorted set skiplist.
* Note that the element must exist and must match 'score'.
* This function does not update the score in the hash table side, the
* caller should take care of it.
*
* Note that this function attempts to just update the node, in case after
* the score update, the node would be exactly at the same position.
* Otherwise the skiplist is modified by removing and re-adding a new
* element, which is more costly.
*
* The function returns the updated element skiplist node pointer. */
zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
/* We need to seek to element to update to start: this is useful anyway,
* we'll have to update or remove it. */
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {while (x->level[i].forward &&
(x->level[i].forward->score < curscore ||
(x->level[i].forward->score == curscore &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{x = x->level[i].forward;
}
update[i] = x;
}
/* Jump to our element: note that this function assumes that the
* element with the matching score exists. */
x = x->level[0].forward;
serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
/* If the node, after the score update, would be still exactly
* at the same position, we can just update the score without
* actually removing and re-inserting the element in the skiplist. */
if ((x->backward == NULL || x->backward->score < newscore) &&
(x->level[0].forward == NULL || x->level[0].forward->score > newscore))
{
x->score = newscore;
return x;
}
/* No way to reuse the old node: we need to remove and insert a new
* one at a different place. */
zslDeleteNode(zsl, x, update);
zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
/* We reused the old node x->ele SDS string, free the node now
* since zslInsert created a new one. */
x->ele = NULL;
zslFreeNode(x);
return newnode;
}
至于 zskiplist 的“查”笔者没有钻研,有趣味的读者能够自行钻研。