Redis系列-跳表源码分析
摘要:跳表是一种具有 O ( l o g n ) O(logn) O(logn)级别的查询时间复杂度的数据结构,redis的有序集合在数据比较多或者单个数据比较大的时候底层使用的数据结构就是跳表。跳表相比于其他查询结构如红黑树、B树、B+树等等,具有易于实现的特点。LeetCode上没有手写红黑树或者B树的题目,但是有题目要求手写跳表,可见其相对易于实现。本文将以redis里的跳表源码来分析跳表这种数据结构,所使用的redis版本为6.0.19。
关键词:Redis,源码分析,跳表,数据结构,leetcode
源码分析
结构体
首先跳表结构体定义在server.h
文件中,代码如下:
// 跳表节点定义
typedef struct zskiplistNode {
sds ele; // 节点名称
double score; // 节点分数
struct zskiplistNode *backward; // 后向指针,跳表的最低层是一个双向链表
struct zskiplistLevel {
struct zskiplistNode *forward; // 前向指针
unsigned long span;
} level[]; // 这里把不同level的节点聚合在一起,减少内存
} zskiplistNode;
// 跳表结构体定义
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl; // 指向跳表的指针
} zset;
接下来从高层的zset的角度来看一下跳表的创建、查询和插入等操作。
跳表创建和销毁
创建一个跳表的代码如下:
// 创建一个跳表节点
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
// 按照level数量分配内存
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
/* Create a new skiplist. */
// 创建一个跳表
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl)); // 分配空间
zsl->level = 1;
zsl->length = 0;
// 这里相当于是创建一个伪头部,其高度为32
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
这里使用了一个伪头部便于后续的操作,减少一些条件的判断。再来看下跳表的销毁,销毁链表是遍历最底层的链表,因为所有的节点都至少出现在最低层。
void zslFreeNode(zskiplistNode *node) {
sdsfree(node->ele);
zfree(node);
}
/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {
zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header);
while(node) { // 遍历最底层的双向链表
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
下图为redis 一个跳表的示意图。
zslInsert 跳表插入
// 返回一个随机的level,表示要插入的节点多少level高
int zslRandomLevel(void) {
int level = 1;
// ZSKIPLiST_P等于0.25,相当于每一个节点有四分之一的概率往上一层
// 这里的实现相当于是random() < ZSKIPLIST_P, 这里只是利用位运算来提高浮点数比较的效率
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
// 最高只能是32层
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
// 往跳表中插入一个节点,需要由调用方保证待插入的元素不存在于跳表中
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score)); // 参数校验
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) { // 从最高层次开始
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 在一个层次的链表上遍历的时候,要找到一个位置,链表是有序的,优先按照分数排序,如果分数一样,则按照名称的字典序排序
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span; // rank[i]记录每一层经过了多少元素找到的位置,后续用于更新跨度span
x = x->level[i].forward;
}
update[i] = x; // update保存的是每一层的更新节点,即待插入节点如果在该层的上一个节点
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
// 到这里已经找到待插入节点在最底层要插入的位置了
level = zslRandomLevel();
if (level > zsl->level) { // 高出的部分
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length; // 因为只有一个节点,所以跨度是当前跳表里所有元素的数量
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
// 接下里就是要更新各层的前向指针以及后向指针了
for (i = 0; i < level; i++) {
// 将当前节点插入到update[i]节点后面
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) { // 高度不够,未设计到的层次的对应节点的跨度也要加一
update[i]->level[i].span++;
}
// 后向指针,相当于双向链表中的prev,不会指向伪头部
x->backward = (update[0] == zsl->header) ? NULL : update[0];
// 使后一个节点的后向指针指向自己
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x; // 插入的节点在末尾,更新tail
zsl->length++;
return x;
}
总结一下一个节点插入到跳表的过程:
- 找到待插入的位置。从最高层次的伪头部开始,如果前向指针不为空且前向指针指向的节点的分数小于待插入节点的分数(或者等于,但是名称要小于),则在当前层次中往后遍历。如果不满足,则下落到下一层次,继续进行以上过程,直到最底层,在这一过程中记录每一层次下落前的节点,新插入节点插入在这些节点的后面
- 创建节点,并且根据概率决定该节点的层次数,大概是该节点具有n层次的概率为 1 / 4 n 1 / 4^n 1/4n
- 然后根据这个层次将该节点插入到第一步收集的节点的后面
- 更新最底层的后向指针,因为某些操作的需要,最底层维护成一个双向链表
以下图为例,在之前所说的那个跳表中,如果要插入一个score为7的节点,那么节点的遍历过程如下图的蓝色箭头,而蓝色圆圈所标识的位置就是新节点要插入的位置。
zslDelete 跳表删除
// 具体完成一个节点的删除,且更新所在层次的链表
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) { // 说明待删除节点在这一个层次
update[i]->level[i].span += x->level[i].span - 1; // 更新跨度
update[i]->level[i].forward = x->level[i].forward; // 更新前一个节点的前向指针
} else {
update[i]->level[i].span -= 1; // 待删除节点没有到这一个层次,只需要将跨度减一即可
}
}
// 更新最底层的双向链表
if (x->level[0].forward) { // 不是最后一个节点
x->level[0].forward->backward = x->backward;
} else { // 删除了最后一个节点,需要更新tail
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) // 最高层次更新
zsl->level--;
zsl->length--; // 跳表中的元素数量更新
}
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
// 删除节点,返回1表示删除成功,返回0表示跳表中没有这个节点
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
// 还是和插入节点一样的查找过程
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
// 此时的x及往后的部分节点可能是待删除的节点
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
// 根据调用方传入的二级指针,如果为空,说明调用方不关心这个节点,直接释放掉,否则要让这个二级指针去执行被删除的节点
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
具体的过程和插入一个节点的过程类似,也是先最高层次开始往下,每一层链表查找。最后落在最底层的某个节点,判断该节点的下一个节点的score和ele是否和待删除节点的信息一致,如果一致,删除该节点,并且根据查找过程中维护的下落前的各个层次的节点来更新各个层次的单向链表,最后更新最底层的双向链表。如果不一致,则说明跳表中没有待删除节点。
zslUpdateScore 更新节点
zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
/* We need to seek to element to update to start: this is useful anyway,
* we'll have to update or remove it. */
// 一样的查找过程,在这一过程中记录每一层下落前的节点,方便后续修改
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < curscore ||
(x->level[i].forward->score == curscore &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* Jump to our element: note that this function assumes that the
* element with the matching score exists. */
x = x->level[0].forward; // 保证该节点的分数和名称都和参数一致
serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
/* If the node, after the score update, would be still exactly
* at the same position, we can just update the score without
* actually removing and re-inserting the element in the skiplist. */
// 通过最底层的双向链表判断,更新后的分数是否还能留在原位置,直接更新,然后返回
if ((x->backward == NULL || x->backward->score < newscore) &&
(x->level[0].forward == NULL || x->level[0].forward->score > newscore))
{
x->score = newscore;
return x;
}
/* No way to reuse the old node: we need to remove and insert a new
* one at a different place. */
// 否则只能删除该节点,然后重新插入一个新节点
zslDeleteNode(zsl, x, update);
zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
/* We reused the old node x->ele SDS string, free the node now
* since zslInsert created a new one. */
// 复用用来保存节点名称的sds,所以将节点指向的ele置为null,然后清理节点
x->ele = NULL;
zslFreeNode(x);
return newnode;
}
更新一个节点的分数大致过程如下:
- 找到待插入的位置。从最高层次的伪头部开始,如果前向指针不为空且前向指针指向的节点的分数小于待插入节点的分数(或者等于,但是名称要小于),则在当前层次中往后遍历。如果不满足,则下落到下一层次,继续进行以上过程,直到最底层,在这一过程中记录每一层次下落前的节点,新插入节点插入在这些节点的后面
- 取当前节点的前向指针指向的节点作为待更新节点,检查该节点的名称和分数是否和传入的名称、旧分数一致,如果不一致,直接返回,否则进行更新
- 在最底层,检查后向指针指向的节点是否为空或者节点的分数小于新分数,以及前向指针指向的节点是否为空或者该节点的分数大于新分数,如果满足,说明新分数的节点的位置不需要调整,直接返回
- 否则,需要删除旧节点,然后重新插入一个新节点
跳表实现
实现代码如下:
import (
"math/rand"
"time"
)
const (
MAX_HEIGHT=32
Rand_LEVEL_P = 0.25
)
type SkiplistNodeLevel struct {
level int
next *SkiplistNode // 指向下一个节点
}
type SkiplistNode struct {
val int
prev *SkiplistNode // 指向前一个节点
level int
levels []*SkiplistNodeLevel
}
func NewSkiplistNode(val int, level int) *SkiplistNode {
node := &SkiplistNode{val: val, levels: make([]*SkiplistNodeLevel, level), level: level}
for i := 0; i < level; i++ {
node.levels[i] = &SkiplistNodeLevel{level: i}
}
return node
}
func NewSkiplistNodeNoVal(level int) *SkiplistNode {
node := &SkiplistNode{levels: make([]*SkiplistNodeLevel, level), level: level}
for i := 0; i < level; i++ {
node.levels[i] = &SkiplistNodeLevel{level: i}
}
return node
}
type Skiplist struct {
level int // 除伪头部外最高层次
length int // 元素数量
head, tail *SkiplistNode
}
func NewSkiplist() *Skiplist {
sl := &Skiplist{level: 1, head: NewSkiplistNodeNoVal(MAX_HEIGHT)}
return sl
}
func (s *Skiplist) Search(target int) bool {
var cur *SkiplistNode = s.head
for i := s.level - 1; i >= 0; i-- {
for cur.levels[i].next != nil && cur.levels[i].next.val < target {
cur = cur.levels[i].next
}
}
if cur.levels[0].next != nil && cur.levels[0].next.val == target {
return true
}
return false
}
func (s *Skiplist) randomLevel() int {
rand.Seed(time.Now().UnixNano())
level := 1
for (level < MAX_HEIGHT && 0x7fff * rand.Float64() < 0x7fff * Rand_LEVEL_P) {
level++
}
return min(level, MAX_HEIGHT)
}
func (s *Skiplist) Insert(val int) {
var cur *SkiplistNode = s.head
prevNodes := make([]*SkiplistNode, MAX_HEIGHT)
for i := s.level - 1; i >= 0; i-- {
for cur.levels[i].next != nil && cur.levels[i].next.val < val {
cur = cur.levels[i].next
}
prevNodes[i] = cur
}
level := s.randomLevel()
node := NewSkiplistNode(val, level)
for i := min(level - 1, s.level - 1); i >= 0; i-- {
node.levels[i].next = prevNodes[i].levels[i].next
prevNodes[i].levels[i].next = node
}
if level > s.level {
for i := s.level; i < level; i++ {
s.head.levels[i].next = node
}
s.level = level
}
node.prev = prevNodes[0]
if node.levels[0].next != nil {
node.levels[0].next.prev = node
} else {
s.tail = node
}
}
func (s *Skiplist) Delete(val int) bool {
var cur *SkiplistNode = s.head
prevNodes := make([]*SkiplistNode, MAX_HEIGHT)
for i := s.level - 1; i >= 0; i-- {
for cur.levels[i].next != nil && cur.levels[i].next.val < val {
cur = cur.levels[i].next
}
prevNodes[i] = cur
}
if cur.levels[0].next == nil || cur.levels[0].next.val != val {
return false
}
deletingNode := cur.levels[0].next
for i := deletingNode.level - 1; i >= 0; i-- {
prevNodes[i].levels[i].next = deletingNode.levels[i].next
}
if deletingNode.levels[0].next != nil {
deletingNode.levels[0].next.prev = prevNodes[0]
} else {
s.tail = prevNodes[0]
}
return true
}
部分函数调整后(leetCode函数名笔者不认可),可以通过LeetCode 1206的所有测试用例,运行截图如下:
扩展
工业界其他使用跳表的场景还有,基于LSM-Tree的数据库比如LevelDB和RocksDB等等,都有使用跳表。