您是小站的第 27964 位访客,欢迎~

您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码分析——6、跳表skiplist的实现

2021年9月10日 00:52 197人围观

简介skiplist是很有用的一种数据结构,在面试中也常见,效率上基本和红黑树等价,而编码实现又比红黑树简单很多

[toc]

概要

数组和链表是两种最基本的数据结构,其余任何一个复杂数据结构都是有数组或者链表组合而来的。这两者有着互补的特性——数组查找快,插入慢;链表插入快,查找慢。仿佛插入和插入成了两个对立的特性,一直以来,出现了各种结构来均很插入和查找,最典型的就是红黑树了。

一、skiplist原理

链表查找慢的根本原因是因为链表中每个结点的地址不是连续的,我们没有办法准确定位到第k个结点。按照通用的解法,无法定位的时候可以想办法添加索引,但是这个索引该怎么加?

如果我们给每个结点都加一个索引,用来记录结点的地址,那么最后结果就是这样。

对于这个结构——其实是个指针数组,我们可以很方便的实现二分查找算法,但是插入的时候还是O(N)的复杂度,那么能否再简化呢? 我们知道,在顺序查找的时候需要逐个对比,所以复杂度是O(N),那么假如我们一次跳过2个结点,那么复杂度就是O(N/2)了,当然由于N无穷大,所以复杂度还是O(N)。类似地,当一次跳过3个结点的时候复杂度是O(N/3),而假设当一次跳过N/2的时候,这就进化成了二分查找。此外,与数组的随机访问不同,链表只能顺序访问,也就是只能前进,不能后退(不考虑双向链表),所以说在查找的时候只能朝前走。

既然无法确定每次需要走多少步,那么我们跳过随机,以此建立索引,如下图:

跟进一步,我们建立多层链表

  1. 单向链表3->6->7->9->12->17->19->21->25->26
  2. 随机选取一些结点,形成一条新链表,6->9->17->21->26
  3. 再次选取一些结点,形成一条新链表,9->21
  4. 再次选取一些结点,形成一条新链表,21

每一条链表就是一个level,以此,可以定义一个指针数组来存放每个level中每个结点的next结点。

二、redis中skiplist的实现

1. 定义

typedef struct zskiplistNode { 
    sds ele; // 结点value 
    double score; //几点分数 
    struct zskiplistNode *backward; // 后退指针,可以反向查找 
    struct zskiplistLevel { 
        struct zskiplistNode *forward; // 前进指针 
        unsigned long span; // 到下一结点的距离 
    } level[]; /* 柔型数组,定义了层级指针和距离下个结点的跨度 */ 
} zskiplistNode; 
  • ele: 结点value
  • score: 结点的打分,以此排序,如果分数相同,则按ele的字典序
  • backward: 后退指针,可以反向查找
  • level: 结点的层级信息,包含了该层的前进指针和到下一结点的距离

为什么会定义backwardspan两个变量?backwardspan并不是skiplist实现的必须变量,是redis定制化的。

backward: 通过backward可以实现逆序输出,比如zrevrange命令时可以直接按照backward遍历,再无需逆序翻转。

span: span变量的意义是该层级到下一节点的距离,其实这个变量的真实意义是算出该结点的排名——比如rank命令,只需要累加变量路径上所有结点的span值就可以了。那么为什么不直接用rank来记录绝对排名呢?如果用rank记录绝排名,那么在插入和删除的时候该结点后面的所有结点的rank值都要修改,严重影响效率。而如果用span,则只需要修改结点前后相邻两个结点的span

typedef struct zskiplist { 
    struct zskiplistNode *header, *tail; /* skiplist的头结点和尾指针 */ 
    unsigned long length; /* 结点数 */ 
    int level; /*最大层级 */ 
} zskiplist; 
  • header: skiplist的头结点,不保存数据信息,只是为了让插入简单

  • tail: skiplist的尾指针,指向最后一个结点,如果skiplist尾空,则tail指向NULL

  • length: skiplist的结点数

  • level: skiplist的最大层数

2. 创建skiplist

新建一个空的skiplist,level为1,且给header分配空间,为的是插入结点方便。

/* Create a new skiplist. */ 
zskiplist *zslCreate(void) { 
    int j; 
    zskiplist *zsl; 

    zsl = zmalloc(sizeof(*zsl)); 
    zsl->level = 1; // 层数为1 
    zsl->length = 0; // 没有有效结点 
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); // 创建header结点,但并算有效结点 
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { 
        zsl->header->level[j].forward = NULL; 
        zsl->header->level[j].span = 0; 
    } 
    zsl->header->backward = NULL; 
    zsl->tail = NULL; 
    return zsl; 
} 

3. 查找结点

根据结点分数和元素,查找结点排名。skiplist的查找分两个方向:向下向前,当不能前进时则向下。

查找“7”

  1. 自定向下查找,从L2层开始,header[2]->forward->value 大于 7,所以该层不通,转向下一层。
  2. 从L1层开始查找,header[1]->forward->value小于7,所以继续前进。
  3. 在L1层,结点6的forward->value=11,大于7,所以L1层终止前进。
  4. 在L0层,结点6的forward->vlaue=11,也大于7,所以L0层终止前进。
  5. 已到达最底层L0层,还没找到,则说明不存在。
/* Find the rank for an element by both score and key. 
 * Returns 0 when the element cannot be found, rank otherwise. 
 * Note that the rank is 1-based due to the span of zsl->header to the 
 * first element. */ 
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) { 
    zskiplistNode *x; 
    unsigned long rank = 0; 
    int i; 

    x = zsl->header; 
    for (i = zsl->level-1; i >= 0; i--) { /* 向下沉 */ 
        /*该层结点不为空,且score都小于查找score,或者是在score相同的情况下ele小于查找ele*/ 
        while (x->level[i].forward &&  /* 向前走 */ 
            (x->level[i].forward->score < score || 
                (x->level[i].forward->score == score && 
                sdscmp(x->level[i].forward->ele,ele) <= 0))) { 
            rank += x->level[i].span; // 排名就是所以已跳过的span总和 
            x = x->level[i].forward;  
        } 

        /* x might be equal to zsl->header, so test if obj is non-NULL */ 
        if (x->ele && sdscmp(x->ele,ele) == 0) { 
            return rank; 
        } 
    } 
    return 0; 
} 

4. 删除结点

在删除结点的时候,与其相关的前向结点的span也要一起修改。

删除“20”

  1. 标记查找20时每一层的下降结点指针(结点11)。
  2. 标记每一层下降结点的rank值,也就是所有经过结点的span累加和。
  3. 剔除结点20(链表的删除算法,结点11的forward指向27)。
  4. 修改后驱结点(27)的backward指针值(指向11)。
  5. 修改结点11的span值(减1)。
/* Internal function used by zslDelete, zslDeleteRangeByScore and 
 * zslDeleteRangeByRank. */ 
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { 
    int i; 
    for (i = 0; i < zsl->level; i++) { 
        if (update[i]->level[i].forward == x) { /* update[]和x在同层级 */ 
            update[i]->level[i].span += x->level[i].span - 1; /* 前向结点的span-1,因为少了一个结点 */ 
            update[i]->level[i].forward = x->level[i].forward; /* 修改前向结点的forward指针,执行删除结点的forw */ 
        } else { /* update和x不在同层,也就是说update的level比x的level要高,此时直接更新update的span值 */ 
            update[i]->level[i].span -= 1; 
        } 
    } 
    /* 修改L0的backward指针 */ 
    if (x->level[0].forward) { /* 被删除结点不是尾结点,则后向结点的前向指针应该是删除结点的前向结点 */ 
        x->level[0].forward->backward = x->backward; 
    } else { /* 如果被删除的尾结点,则更新ziplist的tail指针,指向前向结点 */ 
        zsl->tail = x->backward; 
    } 
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) /* 如果被删是level最大的结点,则ziplist的level需要降低 */ 
        zsl->level--; 
    zsl->length--; /* 长度减1 */ 
} 

/* Delete an element with matching score/element from the skiplist. 
 * The function returns 1 if the node was found and deleted, otherwise 
 * 0 is returned. 
 * 
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise 
 * it is not freed (but just unlinked) and *node is set to the node pointer, 
 * so that it is possible for the caller to reuse the node (including the 
 * referenced SDS string at node->ele). */ 
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) { 
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; 
    int i; 
    /* update记录了每一层被删除结点的后一结点 */ 
    x = zsl->header; 
    for (i = zsl->level-1; i >= 0; i--) { 
        while (x->level[i].forward && 
                (x->level[i].forward->score < score || 
                    (x->level[i].forward->score == score && 
                     sdscmp(x->level[i].forward->ele,ele) < 0))) /* 在这一层上一直往前走,直到出现score大于等于要删除的结点 */ 
        { 
            x = x->level[i].forward; 
        } 
        update[i] = x; /* 记录该层删除结点p的前向结点 */ 
    } 
    /* We may have multiple elements with the same score, what we need 
     * is to find the element with both the right score and object. */ 
    x = x->level[0].forward; 
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) { /* 第0层找到了要删除的结点 */ 
        zslDeleteNode(zsl, x, update); /* 更新前向结点的span值 */ 
        if (!node) 
            zslFreeNode(x); 
        else 
            *node = x; 
        return 1; 
    } 
    return 0; /* not found */ 
} 

5. 插入结点

skiplist的插入结点相对比较复杂,主要是因为除了插入结点外还需要修改与之相关结点的span值。

插入“30”

  1. 标记查找30时所有层的下降结点指针(结点11, 27)。
  2. 标记查找30时所有层下降结点的rank值(结点11, 27)。
  3. 在结点27后插入结点30。
  4. 修改后继结点的backward值。
  5. 修改所有下降层结点(11, 17)的span值。
/* Insert a new node in the skiplist. Assumes the element does not already 
 * exist (up to the caller to enforce that). The skiplist takes ownership 
 * of the passed SDS string 'ele'. */ 
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { 
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; 
    unsigned int rank[ZSKIPLIST_MAXLEVEL]; 
    int i, level; 
    /* 
        update: update保存了每一层插入位置的前向结点地址,这些结点的span值都需要改动 
        rank: rank保存看每一层插入位置前向结点的排名,是为span准备的 
    */ 

    serverAssert(!isnan(score)); 
    x = zsl->header; 
    for (i = zsl->level-1; i >= 0; i--) { 
        /* store rank that is crossed to reach the insert position */ 
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; /* 层级降低,则该层的rank起始值就是上层rank在终止值 */ 
        while (x->level[i].forward && 
                (x->level[i].forward->score < score || 
                    (x->level[i].forward->score == score && 
                    sdscmp(x->level[i].forward->ele,ele) < 0))) 
            /* 循环直到 当前结点x的下一结点的score大于等于插入的score,也就是新节点是插到x后面的 */ 
        { 
            rank[i] += x->level[i].span; /* 在前进过程中该层的排名rank累加 */ 
            x = x->level[i].forward; 
        } 
        update[i] = x; /* 新节点插入到x结点后面 */ 
    } 
    /* we assume the element is not already inside, since we allow duplicated 
     * scores, reinserting the same element should never happen since the 
     * caller of zslInsert() should test in the hash table if the element is 
     * already inside or not. */ 
    level = zslRandomLevel(); /* 随机生成新节点的层级高度 */ 
    if (level > zsl->level) { 
        for (i = zsl->level; i < level; i++) { 
            /* 
                如果新节点的level比其他结点的level都要高,也就是说新节点是最高的,则新加一层, 
                那么新节点的前向结点就是skiplist的头结点, 
                而此时这一层的span应该是从头到尾的,也就是整个链表的长度 
            */ 
            rank[i] = 0; 
            update[i] = zsl->header; 
            update[i]->level[i].span = zsl->length; 
        } 
        zsl->level = level; 
    } 
    x = zslCreateNode(level,score,ele); 
    for (i = 0; i < level; i++) { 
         /* 
            这里是链表的插入算法,比如a->b->c,要在b后面插入d 
            则应该是 d->c && b->d,结果就是a->b->d->c 
         */ 
        x->level[i].forward = update[i]->level[i].forward; /* d->c */ 
        update[i]->level[i].forward = x; /* b->d */ 

        /* update span covered by update[i] as x is inserted here */ 
        /*  
            接下来就要更新span值了 
        */ 
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); 
        update[i]->level[i].span = (rank[0] - rank[i]) + 1; 
    } 

    /* increment span for untouched levels */ 
    for (i = level; i < zsl->level; i++) { /* 如果新节点的level不是最高,那么需要更新那些比它高的结点的span值 */ 
        update[i]->level[i].span++; 
    } 

    x->backward = (update[0] == zsl->header) ? NULL : update[0]; /* 后退指针 */ 
    if (x->level[0].forward) 
        x->level[0].forward->backward = x; /*下一结点的后退指针 */ 
    else 
        zsl->tail = x; 
    zsl->length++; 
    return x; 
}