概述
跳跃表是zset
(有序集合)的基础数据结构。跳跃表可以高效地保持元素有序,并且实现相比平衡树简单、直观。Redis的跳跃表是基于William Pugh在《Skip lists: a probabilistic alternative to balanced trees》中描述的算法实现的。做了以下几点改动:
- 允许重复分数。
- 比较不仅会涉及键,还可能涉及节点数据(键相等时)。
- 有一个后退指针,所以是一个双向链表。便于实现
ZREVRANGE
等命令。
实现结构
实现在redis.h
和t_zset.c
两个文件中。
跳跃表的基本结构如下(来自William Pugh的论文):
- 表头:维护跳跃表各层的指针。
- 中间节点:维护节点数据和各层的前进后退指针。
- 层:保存指向该层下一个节点的指针和与下个节点的间隔(span)。为了提高查找效率,程序总是先从高层开始访问,然后随着范围的缩小慢慢降低层次。
- 表尾:全部为NULL,表示跳跃表各层的末尾。
跳跃表的一个节点用结构zskiplistNode
表示:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
typedef struct zskiplistNode {
// redis中通用对象,用来保存节点数据
robj *obj;
// 分数
double score;
// 后退指针,只有第0层有效
struct zskiplistNode *backward;
// 各层的前进指针及与下一个节点的间隔
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplist;
|
跳跃表用结构zskiplist
表示:
1
2
3
4
5
6
7
8
|
typedef struct zskiplist {
// 跳跃表的头部和尾部
struct zskiplistNode *header, *tail;
// 跳跃表的长度
unsigned long length;
// 层高
int level;
} zskiplist;
|
3.跳跃表操作
3.1.创建跳跃表
调用zslCreate
创建一个空的跳跃表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// 创建一个层高为level的跳跃表节点,以score为分数,obj为数据
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 分配空间
zsl = zmalloc(sizeof(*zsl));
// 层高初始化为1,插入数据时可能会增加
zsl->level = 1;
// 长度为0
zsl->length = 0;
// 分配头部节点,层高ZSKIPLIST_MAXLEVEL(当前值为32,对于2^32个元素足够了)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 初始化头部各层的指针和间隔
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
|
3.2.销毁跳跃表
跳跃表结构使用完后,需要调用zslFree
来释放。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
void zslFreeNode(zskiplistNode *node) {
// obj是redis对象,使用引用计数管理的
decrRefCount(node->obj);
zfree(node);
}
void zslFree(zskiplist *zsl) {
// 跟踪第0层(最下面一层)的forward指针可以遍历所有元素
zskiplistNode *node = zsl->header->level[0].forward, *next;
// 释放头部节点
zfree(zsl->header);
while(node) {
next = node->level[0].forward;
// 释放当前节点
zslFreeNode(node);
node = next;
}
// 释放zskiplist结构
zfree(zsl);
}
|
3.3.插入节点
向跳跃表中插入节点时,以Redis对象obj
和与之关联的分数score
调用zslInsert
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
// update记录的是各层新节点插入位置的前一个节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
// 分数不能是NaN
redisAssert(!isnan(score));
x = zsl->header;
// 从最高层向下处理,每次遇到使score和obj介于其中的x和x->forward,则新节点必定在x和x->forward之间
// 到下一层更精准的定位插入位置。下一层继续从位置x前进查找,因为由上一层得知新节点必然在x之后。
// 记录节点x在跳跃表中的rank。
for (i = zsl->level-1; i >= 0; i--) {
// x节点的rank由上一层计算得到
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// score相同时,比较两个obj
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 计算rank
rank[i] += x->level[i].span;
// 移向下一个继续判断
x = x->level[i].forward;
}
// 新节点要插入x与x->level[i]->forward之间,x->level[i]需要做相应调整(span,forward等字段)
update[i] = x;
}
// 因为允许重复score,所以这里并没有做重复判断。应该由调用者测试新插入的对象是否已经存在。
// 随机新节点层高
level = zslRandomLevel();
if (level > zsl->level) {
// 新节点比当前层高大
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
// 没有节点,span是整个跳跃表的长度
update[i]->level[i].span = zsl->length;
}
}
// 创建跳跃表节点
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
// 更新span
// (rank[0] - rank[i])是第i层中新节点与前一个节点的距离(严格说应该是第0层新节点的前一个节点与第i层新节点的前一个节点间距离)
// 新节点将update[i]后面的间隔分成两部分。
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
// +1算上新节点
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 未更新的层需要增加新节点之前的节点span,因为后面多了一个新节点
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 更新后退节点
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
|
随机层高算法:
1
2
3
4
5
6
7
|
int zslRandomLevel(void) {
int level = 1;
// level每次循环有1/4概率增加层高
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
|
3.4.范围
3.4.1.数值范围
zslValueGteMin
判断value
是否大于等于(由minex
决定是否取等于)范围spec
中的最小值:
1
2
3
|
static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
|
zslValueGteMax
判断value
是否小于等于(由maxex
决定是否取等于)范围spec
中的最大值:
1
2
3
|
static int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
|
zslIsInRange
判断跳跃表中是否有节点处于范围range
中。这里有个前提,所有节点的score
都相等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
// 范围大小为0
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
x = zsl->tail;
// 跳跃表中最大值不大于范围range的最小值
if (x == NULL || !zslValueGteMin(x->score,range))
return 0;
// 跳跃表中最小值不小于范围range的最大值
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score,range))
return 0;
return 1;
}
|
zslFirstInRange
返回跳跃表中处于范围range
中的第一个节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
// 跳跃表中没有节点处于范围range内
if (!zslIsInRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 找到最后一个小于range最小值的节点
while (x->level[0].forward &&
!zslValueGteMin(x->level[i].forward->score,range))
x = x->level[i].forward;
}
// x为小于range最小值的最大节点,x->level[0].forward为大于range最小值的最小节点
x = x->level[0].forward;
redisAssert(x != NULL);
if (!zslValueLteMax(x->score,range)) return NULL;
return x;
}
|
zslLastInRange
返回跳跃表中处于范围range
中的最后一个节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
// 跳跃表中没有节点处于范围range内
if (!zslIsInRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 找到小于等于range最大值的最后一个节点
while (x->level[0].forward &&
!zslValueLteMax(x->level[i].forward->score,range))
x = x->level[i].forward;
}
redisAssert(x != NULL);
if (!zslValueGteMin(x->score,range)) return NULL;
return x;
}
|
3.4.2.字典序范围
zslLexValueGteMin
判断value
是否大于等于(由minex
决定是否取等于)范围spec
的最小值。通过字典序来比较。
1
2
3
4
5
|
static int zslLexValueGteMin(robj *value, zlexrangespec *spec) {
return spec->minex ?
(compareStringObjectsForLexRange(value,spec->min) > 0) :
(compareStringObjectsForLexRange(value,spec->min) >= 0);
}
|
这里使用compareStringObjectsForLexRange
来比较字典序:
1
2
3
4
5
6
7
8
|
int compareStringObjectsForLexRange(robj *a, robj *b) {
if (a == b) return 0;
// minstring和maxstring是Redis预定义的用于表示最小和最大字符串的
if (a == shared.minstring || b == shared.maxstring) return -1;
if (a == shared.maxstring || b == shared.minstring) return 1;
return compareStringObjects(a,b);
}
|
zslLexValueLteMax
判断value
是否小于等于(由maxex
决定是否取等于)范围spec
的最大值。通过字典序来比较。
1
2
3
4
5
|
static int zslLexValueLteMax(robj *value, zlexrangespec *spec) {
return spec->maxex ?
(compareStringObjectsForLexRange(value,spec->max) < 0) :
(compareStringObjectsForLexRange(value,spec->max) <= 0);
}
|
zslIsInLexRange
判断跳跃表中是否有节点的数据obj
处于范围range
中。这里有个前提,所有节点的score
都相等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *x;
// 范围为空
if (compareStirngObjectsForLexRange(range->min,range->max) > 1 ||
(compareStringObjects(range->min,range->max) == 0 &&
(range->minex || range->maxex)))
return 0;
x = zsl->tail;
// 跳跃表最大值小于范围range的最小值
if (x == NULL || !zslLexValueGteMin(x->obj,range))
return 0;
x = zsl->header->level[0].forward;
// 跳跃表最小值大于范围range的最大值
if (x == NULL || !zslLexValueLteMax(x->obj,range))
return 0;
return 1;
}
|
zslFirstInLexRange
返回跳跃表处于范围range
中的第一个节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *spec) {
zskiplistNode *x;
int i;
// 跳跃表没有处于范围range中的节点
if (!zslIsInLexRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 找到最后一个小于range最小值的节点
while (x->level[i].forward &&
!zslLexValueGteMin(x->level[i].forward->obj,range))
x = x->level[i].forward;
}
// x为最后一个小于range最小值的节点,x->level[0].forward为第一个大于range最小值的节点
x = x->level[0].forward;
redisAssert(x != NULL);
// x是第一个大于range最小值的节点,必须满足小于range的最大值
if (!zslLexValueLteMax(x->obj,range)) return NULL;
return x;
}
|
zslLastInLexRange
返回跳跃表处于范围range
中的最后一个节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *x;
int i;
// 跳跃表没有处于范围range中的节点
if (!zslIsInLexRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 找到小于等于range最大值的最后一个节点
while (x->level[i].forward &&
zslLexValueLteMax(x->level[i].forward->obj,range))
x = x->level[i].forward;
}
redisAssert(x != NULL);
// x是小于range最大值的最后一个节点,必须满足大于range最小值这个限制
if (!zslLexValueGteMin(x->obj,range)) return NULL;
return x;
}
|
3.5.删除节点
Redis中实现了跳跃表的多种删除方式。下面依次介绍:
3.5.1.删除指定score和obj的节点
这是一种最基本的删除方式,由分数score
和对象obj
指定需要删除的节点。调用zslDelete
完成这个任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
int zslDelete(zskiplist *zsl, double score, robj *obj) {
// 各层需要修改的节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
// 与zslInsert操作一致
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
// 可能有多个节点拥有相同score,所以这里需要比较对象
x = x->level[0].forward;
if (x && x->score == score && equalStringObjects(x->obj,obj)) {
// 从跳跃表中删除该节点
zslDeleteNode(zsl, x, update);
// 释放节点
zslFreeNode(x);
return 1;
}
// 没找到
return 0;
}
|
zslDelete
调用通用的节点删除函数zslDeleteNode
,zslDeleteNode
被其他几个删除函数使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
// 待删除的节点在该层存在,调整forward指针和span
// 两个间隔合并为一个,同时减少了一个节点
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
// 该层没有这个节点,只需要减少span
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
// 更新前进节点的后退指针
x->level[0].forward->backward = x->backward;
} else {
// x是跳跃表尾部
zsl->tail = x->backward;
}
// 如果某一层只有该节点,该节点删除后已经没有节点了,直接移除这一层。但是跳跃表至少有一层。
while (zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) {
zsl->level--;
}
zsl->length--;
}
|
3.5.2.删除一个score范围内的节点
删除分数介于min
和max
之间的所有节点。函数zslDeleteRangeByScore
完成这个功能。范围由zrangespec
指定:
1
2
3
4
5
6
|
typedef struct {
// 最小最大值
double min, max;
// 是否包括最小值/最大值
int minex, maxex;
} zrangespec;
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// 删除的节点也需要从dict中删除
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (range->minex ?
x->level[i].forward->score <= range->min :
x->level[i].forward->score < range->min))
x = x->level[i].forward;
update[i] = x;
}
// x是最后一个 < 或 <= min的节点了。
x = x->level[0].forward;
// 依次删除每个处于范围内的节点。因为是依次向后删除,所以update都是适用的。
while (x &&
(range->maxex ? x->score < range->max : x->score <= range->max))
{
zskiplistNode *next = x->level[0].forward;
// 使用通用节点删除函数zslDeleteNode
zslDeleteNode(zsl,x,update);
// 从dict中删除这个对象
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}
|
3.5.2.删除一个字典序范围内的节点
函数zslDeleteRangeByLex
删除以字典序表示的范围内所有节点。这个方法有个前提条件,跳跃表中的所有score都是相同的。在概述中提到过,如果score
相同,会比较obj
。
字典序范围用结构zlexrangespec
表示:
1
2
3
4
5
6
|
typedef struct {
// 最小最大值
robj *min, *max;
// 是否包括最小/大值
int minex, maxex;
} zlexrangespec;
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
// 除了使用zslLexValueGteMin比较,其他与函数zslDeleteRangeByScore基本相同
// zslLexValueGteMin判断obj是否大于等于zlexrangespec的最小值
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
!zslLexValueGteMin(x->level[i].forward->obj,range))
x = x->level[i].forward;
update[i] = x;
}
// x是最后一个 < 或 <= min的节点了
x = x->level[0].forward;
while (x && zslLexValueLteMax(x->obj,range)) {
zskiplistNode *next = x->level[0].forward;
// 使用通用节点删除函数zslDeleteNode
zslDeleteNode(zsl,x,update);
// 从dict中删除这个对象
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}
|
3.5.4.删除一个rank范围内的节点
zslDeleteRangeByRank
可以删除一个rank范围内的所有节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long traversed = 0, removed = 0;
int i = 0;
x = zsl->header;
// 排行直接通过span判断即可
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) < start) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
traversed++;
// x->level[0].forward是rank为start的节点(因为上面比较是 <start)
x = x->level[0].forward;
// 删除节点之后span会改变,使用累计的traversed与end判断
while (x && traversed <= end) {
zskiplistNode *next = x->level[0].forward;
// 使用通用节点删除函数zslDeleteNode
zslDeleteNode(zsl,x,update);
// 从dict中删除这个对象
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
traversed++;
x = next;
}
return removed;
}
|
3.6.查询
3.6.1.查找score和obj的rank
查找跳跃表中由score
和obj
指定的节点的rank。rank从1开始,没有该节点时返回0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) < 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
// x可能等于表头
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0;
}
|
3.6.2.查找某个rank的节点
函数zslGetElementByRank
查找某个rank的节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i = 0;
x = zsl->header;
// rank可以使用span计算
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
|
4.总结
跳跃表实现简单,插入、查找、删除等操作性能都不错。Redis基于William Pugh的算法按照自己的需求做了一些扩展,如允许相同分数、增加后退指针等。