关于c:Redis源码解读跳跃表

2次阅读

共计 8376 个字符,预计需要花费 21 分钟才能阅读完成。

Redis 中的跳跃表构造

每个节点都有各自的分层,后退节点,后退节点,键以及分值。

  • 后退节点即用来从跳跃表尾部从后向前遍历。
  • 后退节点有两局部:后退指针,以及后退的步长。这个步长能够用来计算排位,比方在查找某个节点过程中,将两头的跨度加起来,就能够失去它在跳跃表中的排位。

跳跃表节点定义:

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward; // 后退节点
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 后退节点
        unsigned long span;
    } level[];} zskiplistNode;

跳跃表节点外部有一个对于层数的柔性数组,代替了传统的 up、down 指针。

跳跃表定义:

typedef struct zskiplist {
    struct zskiplistNode *header, *tail; // 首尾节点
    unsigned long length; // 元素个数
    int level; // 层高
} zskiplist;

咱们再察看下面的 head 节点,head 节点不存放数据,并且层高固定为 32 层,32 层曾经满足对于 2 ^ 64 个元素查找的效率了,所以跳跃表最高也就是 32 层。

创立、开释跳跃表

创立跳跃表节点

创立一个指定层数的跳跃表节点,sds 字符串 ele 将会被该节点援用。sds 字符串即是跳跃表节点的键

zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    zskiplistNode *zn =
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}

创立跳跃表

创立跳跃表的时候会将 head 节点设置为 32 层

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

开释跳跃表节点

开释跳跃表节点,同时也开释 sds string

留神:开释 node 即代表该节点的所有层都被开释了

void zslFreeNode(zskiplistNode *node) {sdsfree(node->ele);
    zfree(node);
}

开释跳跃表

/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);
    while(node) {next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}

增删改

插入节点

插入节点须要先找到插入地位。因为插入时节点会随机回升,所以要从底层开始,始终到回升的最大层做解决。并且回升的层数也是随机的,所以解决时须要分外留神。

zslInsert() 函数在查找插入地位的过程中,用数组记录每一层中,最初一个排在待插入节点后面的那个,并且记录它们在每层中的排名。这样记录实现后,之后对每层的更新都很不便。

查找过程:

从最下面一层开始查找,x 作为遍历节点。当 x->forward 的 score 值大于 待插入节点的 score 值,或者 score 值相等,但键值小于 待插入节点的 ele,就阐明该层最初一个排在待插入节点后面的节点就是 x 了。

否则,x 还不是最初一个符合要求的节点,就须要将 x 的 rank 值加到 rank[i] 中。

更新跳跃表:

查找完结后,所有层的插入地位都找到了,接下来须要确认该次插入实际上须要回升多少层,通过 zslRandomLevel() 得出,记作 level

  • 如果 level 大于以后跳跃表的最大层数,则 update 数组中处于 (zsl->level, level] 的节点以及 rank 值须要另外解决。高于 zsl->level 的层中,不存在其它节点,所以将这个区间中的 update 设为 head 节点,并将 head 节点在这写层的 span 设置为 zkl->length,这对之后更新待插入节点以及 update 节点的 span 值做铺垫。还有 rank 值设为 0。

之后就是理论插入过程了,创立好新的节点 x,遍历 0 – level 层,将 x 插入,批改 forward 指向 (backward 指向最初批改,只须要改第一层的即可)。还有一个 span 值咱们须要更新:

        // rank[0] 示意插入节点前一的排名,rank[i] 示意第 i + 1 层中,插入节点的前一节点的排名
        // rank[0]-rank[i] 就示意了 update[i] 到 update[0] 的间隔
        // update[i]->level[i].span - (rank[0] - rank[i]) 就示意原跨度减去后面的间隔
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        
        // update[i] 的跨度即为 update[i] 到插入节点在该层的跨度
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;

这里是笔者认为最难了解的中央 (笔者脑子不好使,推敲了挺久才搞清楚),如果能看懂笔者的正文更好,看不懂的话,咱们还有图来帮忙了解:

假如当初在第 i 层更新,

所以如果是高于本来跳跃表最高层的层,head 的 span 值设置为 zsl->length

插入完当前,如果 level 是小于原 zkl->level 的,则须要将这些未插入 x 的 update 节点加一个跨度,因为它们的前面多出了一个 x 节点。

最初是解决第一层的 backward 节点。如果 x 是第一个插入的元素,即 update[0] == head,则 x->backward = NULL,否则就等于 update[0]。而 x 的下一个节点,也须要将 backward 指针指向 x。如果 x->forward == NULL,则阐明 x 是最初一个节点,不须要解决,但须要将 跳跃表的尾指针指向 x;否则,批改 x->forward 的 backward 指针。

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; // update 保留了每一层最初一个排在新节点后面的那个节点
    unsigned long rank[ZSKIPLIST_MAXLEVEL]; // rank 记录了这些节点在跳跃表中的排名
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {for (i = zsl->level; i < level; i++) {rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        // rank[0] 示意插入节点前一的排名,rank[i] 示意第 i + 1 层中,插入节点的前一节点的排名
        // rank[0]-rank[i] 就示意了 update[i] 到 update[0] 的间隔
        // update[i]->level[i].span - (rank[0] - rank[i]) 就示意原跨度减去后面的间隔
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        
        // update[i] 的跨度即为 update[i] 到插入节点在该层的跨度
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

删除节点

先来看一个供其它删除函数调用的一个函数:

先看参数:x 是要删除的节点,update 数组和下面插入的含意是一样的,即每一层的 x 的前一元素。

首先是更新 update 数组中每个节点的属性,而后批改 backward 指针。最初更新跳跃表的 level 和 length。

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {if (update[i]->level[i].forward == x) {update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {x->level[0].forward->backward = x->backward;
    } else {zsl->tail = x->backward;}
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

而后是真正删除一个节点的函数:

下面有一段正文,意思是:从跳跃表中删除一个分值和键匹配的元素。如果节点被找到并删除了返回 1,其它状况返回 0.
如果参数 node 为空,则从跳跃表中删除的节点会被开释,否则不会被开释,只是从跳跃表中删除了,并且 *node 会被设置为被删除节点的指针,因为它还有可能被调用者应用。(包含节点中的 SDS string)

上面是函数的逻辑:

与 zslInsert() 函数一样,用 update 数组保留每一层中最初一个小于 删除节点 的节点,循环完结后,x->forward 就是第一层中第一个大于等于 删除节点 的节点。

所以接下来指向判断 x->forward 是否合乎删除要求就能够将元素删除,并将每一层中节点的相干属性通过 zslDeleteNode() 更新。

/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

更新节点 score 值

该函数试图在 curscore 和 ele 匹配的节点处更新该节点的 score,并且更新后节点还在原地位。如果无奈保障更新后节点地位不变,则会删除该节点,再将含有新 score 值的节点插入跳跃表

后面还是一样,找到所有层中的 update 节点,而后 serverAssert() 断言假如 x 以后是匹配 curscore 和 ele 的。之后进行判断,看新的 score 值是否在放在 x 地位并使跳跃表依然放弃有序性。如果能够,则间接更新 x 的 score 值,而后返回。

如果不能够,则须要从跳跃表删掉 x,而后再以新的 score 值与原来的 ele 创立一个新节点并插入跳跃表中。

/* Update the score of an element inside the sorted set skiplist.
 * Note that the element must exist and must match 'score'.
 * This function does not update the score in the hash table side, the
 * caller should take care of it.
 *
 * Note that this function attempts to just update the node, in case after
 * the score update, the node would be exactly at the same position.
 * Otherwise the skiplist is modified by removing and re-adding a new
 * element, which is more costly.
 *
 * The function returns the updated element skiplist node pointer. */
zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    /* We need to seek to element to update to start: this is useful anyway,
     * we'll have to update or remove it. */
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {while (x->level[i].forward &&
                (x->level[i].forward->score < curscore ||
                    (x->level[i].forward->score == curscore &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {x = x->level[i].forward;
        }
        update[i] = x;
    }

    /* Jump to our element: note that this function assumes that the
     * element with the matching score exists. */
    x = x->level[0].forward;
    serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);

    /* If the node, after the score update, would be still exactly
     * at the same position, we can just update the score without
     * actually removing and re-inserting the element in the skiplist. */
    if ((x->backward == NULL || x->backward->score < newscore) &&
        (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
    {
        x->score = newscore;
        return x;
    }

    /* No way to reuse the old node: we need to remove and insert a new
     * one at a different place. */
    zslDeleteNode(zsl, x, update);
    zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
    /* We reused the old node x->ele SDS string, free the node now
     * since zslInsert created a new one. */
    x->ele = NULL;
    zslFreeNode(x);
    return newnode;
}

至于 zskiplist 的“查”笔者没有钻研,有趣味的读者能够自行钻研。

正文完
 0