redis源码分析——6、跳表skiplist的实现
[toc]
概要
数组和链表是两种最基本的数据结构,其余任何一个复杂数据结构都是有数组或者链表组合而来的。这两者有着互补的特性——数组查找快,插入慢;链表插入快,查找慢。仿佛插入和插入成了两个对立的特性,一直以来,出现了各种结构来均很插入和查找,最典型的就是红黑树了。
一、skiplist原理
链表查找慢的根本原因是因为链表中每个结点的地址不是连续的,我们没有办法准确定位到第k个结点。按照通用的解法,无法定位的时候可以想办法添加索引,但是这个索引该怎么加?
如果我们给每个结点都加一个索引,用来记录结点的地址,那么最后结果就是这样。
对于这个结构——其实是个指针数组,我们可以很方便的实现二分查找算法,但是插入的时候还是O(N)的复杂度,那么能否再简化呢? 我们知道,在顺序查找的时候需要逐个对比,所以复杂度是O(N),那么假如我们一次跳过2个结点,那么复杂度就是O(N/2)了,当然由于N无穷大,所以复杂度还是O(N)。类似地,当一次跳过3个结点的时候复杂度是O(N/3),而假设当一次跳过N/2的时候,这就进化成了二分查找。此外,与数组的随机访问不同,链表只能顺序访问,也就是只能前进,不能后退(不考虑双向链表),所以说在查找的时候只能朝前走。
既然无法确定每次需要走多少步,那么我们跳过随机,以此建立索引,如下图:
跟进一步,我们建立多层链表
- 单向链表3->6->7->9->12->17->19->21->25->26
- 随机选取一些结点,形成一条新链表,6->9->17->21->26
- 再次选取一些结点,形成一条新链表,9->21
- 再次选取一些结点,形成一条新链表,21
每一条链表就是一个level,以此,可以定义一个指针数组来存放每个level中每个结点的next结点。
二、redis中skiplist的实现
1. 定义
typedef struct zskiplistNode {
sds ele; // 结点value
double score; //几点分数
struct zskiplistNode *backward; // 后退指针,可以反向查找
struct zskiplistLevel {
struct zskiplistNode *forward; // 前进指针
unsigned long span; // 到下一结点的距离
} level[]; /* 柔型数组,定义了层级指针和距离下个结点的跨度 */
} zskiplistNode;
- ele: 结点value
- score: 结点的打分,以此排序,如果分数相同,则按ele的字典序
- backward: 后退指针,可以反向查找
- level: 结点的层级信息,包含了该层的前进指针和到下一结点的距离
为什么会定义backward和span两个变量?backward
和span
并不是skiplist实现的必须变量,是redis定制化的。
backward: 通过backward
可以实现逆序输出,比如zrevrange
命令时可以直接按照backward
遍历,再无需逆序翻转。
span: span
变量的意义是该层级到下一节点的距离,其实这个变量的真实意义是算出该结点的排名——比如rank
命令,只需要累加变量路径上所有结点的span
值就可以了。那么为什么不直接用rank
来记录绝对排名呢?如果用rank
记录绝排名,那么在插入和删除的时候该结点后面的所有结点的rank
值都要修改,严重影响效率。而如果用span
,则只需要修改结点前后相邻两个结点的span
。
typedef struct zskiplist {
struct zskiplistNode *header, *tail; /* skiplist的头结点和尾指针 */
unsigned long length; /* 结点数 */
int level; /*最大层级 */
} zskiplist;
-
header: skiplist的头结点,不保存数据信息,只是为了让插入简单
-
tail: skiplist的尾指针,指向最后一个结点,如果skiplist尾空,则
tail
指向NULL -
length: skiplist的结点数
-
level: skiplist的最大层数
2. 创建skiplist
新建一个空的skiplist,level为1,且给header分配空间,为的是插入结点方便。
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1; // 层数为1
zsl->length = 0; // 没有有效结点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); // 创建header结点,但并算有效结点
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
3. 查找结点
根据结点分数和元素,查找结点排名。skiplist的查找分两个方向:向下和向前,当不能前进时则向下。
查找“7”
- 自定向下查找,从L2层开始,header[2]->forward->value 大于 7,所以该层不通,转向下一层。
- 从L1层开始查找,header[1]->forward->value小于7,所以继续前进。
- 在L1层,结点6的forward->value=11,大于7,所以L1层终止前进。
- 在L0层,结点6的forward->vlaue=11,也大于7,所以L0层终止前进。
- 已到达最底层L0层,还没找到,则说明不存在。
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) { /* 向下沉 */
/*该层结点不为空,且score都小于查找score,或者是在score相同的情况下ele小于查找ele*/
while (x->level[i].forward && /* 向前走 */
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span; // 排名就是所以已跳过的span总和
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}
4. 删除结点
在删除结点的时候,与其相关的前向结点的span
也要一起修改。
删除“20”
- 标记查找20时每一层的下降结点指针(结点11)。
- 标记每一层下降结点的rank值,也就是所有经过结点的
span
累加和。 - 剔除结点20(链表的删除算法,结点11的forward指向27)。
- 修改后驱结点(27)的backward指针值(指向11)。
- 修改结点11的span值(减1)。
/* Internal function used by zslDelete, zslDeleteRangeByScore and
* zslDeleteRangeByRank. */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) { /* update[]和x在同层级 */
update[i]->level[i].span += x->level[i].span - 1; /* 前向结点的span-1,因为少了一个结点 */
update[i]->level[i].forward = x->level[i].forward; /* 修改前向结点的forward指针,执行删除结点的forw */
} else { /* update和x不在同层,也就是说update的level比x的level要高,此时直接更新update的span值 */
update[i]->level[i].span -= 1;
}
}
/* 修改L0的backward指针 */
if (x->level[0].forward) { /* 被删除结点不是尾结点,则后向结点的前向指针应该是删除结点的前向结点 */
x->level[0].forward->backward = x->backward;
} else { /* 如果被删除的尾结点,则更新ziplist的tail指针,指向前向结点 */
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) /* 如果被删是level最大的结点,则ziplist的level需要降低 */
zsl->level--;
zsl->length--; /* 长度减1 */
}
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
/* update记录了每一层被删除结点的后一结点 */
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0))) /* 在这一层上一直往前走,直到出现score大于等于要删除的结点 */
{
x = x->level[i].forward;
}
update[i] = x; /* 记录该层删除结点p的前向结点 */
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) { /* 第0层找到了要删除的结点 */
zslDeleteNode(zsl, x, update); /* 更新前向结点的span值 */
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
5. 插入结点
skiplist的插入结点相对比较复杂,主要是因为除了插入结点外还需要修改与之相关结点的span值。
插入“30”
- 标记查找30时所有层的下降结点指针(结点11, 27)。
- 标记查找30时所有层下降结点的rank值(结点11, 27)。
- 在结点27后插入结点30。
- 修改后继结点的backward值。
- 修改所有下降层结点(11, 17)的span值。
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
/*
update: update保存了每一层插入位置的前向结点地址,这些结点的span值都需要改动
rank: rank保存看每一层插入位置前向结点的排名,是为span准备的
*/
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; /* 层级降低,则该层的rank起始值就是上层rank在终止值 */
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
/* 循环直到 当前结点x的下一结点的score大于等于插入的score,也就是新节点是插到x后面的 */
{
rank[i] += x->level[i].span; /* 在前进过程中该层的排名rank累加 */
x = x->level[i].forward;
}
update[i] = x; /* 新节点插入到x结点后面 */
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel(); /* 随机生成新节点的层级高度 */
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
/*
如果新节点的level比其他结点的level都要高,也就是说新节点是最高的,则新加一层,
那么新节点的前向结点就是skiplist的头结点,
而此时这一层的span应该是从头到尾的,也就是整个链表的长度
*/
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
/*
这里是链表的插入算法,比如a->b->c,要在b后面插入d
则应该是 d->c && b->d,结果就是a->b->d->c
*/
x->level[i].forward = update[i]->level[i].forward; /* d->c */
update[i]->level[i].forward = x; /* b->d */
/* update span covered by update[i] as x is inserted here */
/*
接下来就要更新span值了
*/
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) { /* 如果新节点的level不是最高,那么需要更新那些比它高的结点的span值 */
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0]; /* 后退指针 */
if (x->level[0].forward)
x->level[0].forward->backward = x; /*下一结点的后退指针 */
else
zsl->tail = x;
zsl->length++;
return x;
}
上一篇: 用C++写了个协程库,欢迎star
下一篇: redis源码分析——9、AOF持久化