golang/java实现跳表的数据结构
1、跳表数据结构说明
最近在写一款中间件,类似于redis的中间件,我准备用golang语言来编写,目前已经实现了redis的大部分数据结构,比如string、hash、set、list,而开始实现zset的时候发现要使用跳表,在网上实现的跳表数据结构感觉都不太合适,所以就自己来实现了一个,我也是瞅着空闲的时间来实现了一把,我也不知道性能如何,还没来得及测试,欢迎大家在此基础上进行修正和讨论;
跳表(Skip List)是一种概率性的动态数据结构,通常用于实现有序集合(如集合、映射等)的查找、插入和删除操作。它能够在平均情况下提供对数时间复杂度(O(log n)),比起传统的链表和二叉搜索树,它在实现上更简单。
1.1、跳表的基本结构
跳表由多个层级的链表构成,每一层都是对下一层链表的“跳跃”,因此得名“跳表”。最底层是一个有序的链表,其上层链表则是从下层链表中按一定规则选出的元素组成。跳表的结构通常由以下几个层级构成:
第0层(底层):这是最基础的一层,包含了所有的元素,形成一个有序链表。
第1层及以上:这些层是通过概率性的方式从第0层元素中选择一部分元素构成的。在每一层,节点的出现概率通常是固定的(例如每个节点有50%的概率出现在上一层)。
1.2、跳表的操作
查找(Search):
从最上层的链表开始查找,按照节点的值进行比较,如果当前节点的值比目标值小,就跳到下一个节点,否则下降到下一层继续查找。
这种方式可以减少查找的时间复杂度,期望时间为 O(log n)。
插入(Insert):
首先进行查找操作,找到插入位置。
然后根据一定的概率决定是否在上层创建新节点,最终插入节点到每一层对应的链表中。插入操作也有期望时间 O(log n)。
删除(Delete):
类似于查找,找到待删除的节点后,将其从每一层的链表中移除,时间复杂度为 O(log n)。
1.3、跳表的优点
简单易实现:跳表比起平衡二叉树(如AVL树、红黑树等)实现起来更加简单,尤其在处理动态变化的数据时。
动态平衡:跳表是基于概率的动态数据结构,不需要像平衡树那样进行显式的重平衡操作。
较好的空间利用:相比于红黑树的每个节点存储颜色、父子指针,跳表的每个节点只需要存储向后和向下的指针。
1.4、跳表的缺点
空间开销:由于需要维护多层链表,跳表的空间复杂度是 O(n),其中 n 是元素的数量。对于每个元素,可能需要在不同层级上维护多个指针。
不适用于非常小的数据集:对于非常小的数据集,跳表可能并不如线性结构(如链表)高效。
跳表在很多场景中都能提供较好的性能,特别是在需要频繁插入和删除操作的有序集合中,常用于数据库、缓存系统、分布式系统等场景。
1.5、我的实现和设计
我这边设计是按照我跳表的思想进行设计的,首先有一个垂直指针指向下一层,每一层有一个水平的单向指针指向了下一个元素,每个节点是作为一个Entry来构成,Entry里面包含了两个元素,一个是element,一个score,我这边只想做redis的zset的数据结构,所以就没考虑其他数据类型,当然是可以抽出一个通用的跳表结构,我优点忙,就难得整了,设计大概如下:
从图中可以清楚的指导,节点SkipListNode里面的有一个right指针和down指针分别对应水平指针和垂直指针,用来和水平节点和上下节点建立关系的;每一层的head节点本身不存在实际的数据,只作为一个头结点存在,我是根据自身的一些常见设计的,可能毕竟简单,但是够用足以,各位看官如果觉得哪里不合理的或者有更好的方式请在评论区讨论;
2、java的实现
本来我最早是用golang实现的,但是golang写起来感觉还是没java那么舒服,所以就先用java实现一下。
2.1、SkipListNode.java
class SkipListNode {
String element; //元素的名称
double score; //元素的分数
SkipListNode right;//水平指针
SkipListNode down;//垂直指针
public SkipListNode(String element, double score) {
this.element = element;
this.score = score;
this.right = null;
this.down = null;
}
}
2.2、SkipList.java
class SkipList {
private static final int MAX_LEVEL = 8; // 最大层数
private int currentMaxLevel = 1; // 当前最大层数
private SkipListNode head;
private int levels;
private Map<String, SkipListNode> ref;
private int size;
public SkipList() {
this.head = new SkipListNode(null, Double.NEGATIVE_INFINITY); // 头节点,起始无穷小
this.levels = 1;
this.ref = new HashMap<>();
size = 0;
}
// 随机生成层数
private int randomLevel() {
int level = 1;
int maxAllowedLevel = Math.min(currentMaxLevel + 1, MAX_LEVEL);
while (level < maxAllowedLevel && Math.random() < 0.5) {
level++;
}
return level;
}
// 插入元素
public void insert(String element, double score) {
// Step 1: 查找插入位置,并记录路径
List<SkipListNode> path = new ArrayList<>();
SkipListNode current = head;
// 查找插入位置并记录路径
while (current != null) {
while (current.right != null && current.right.score < score) {
current = current.right;
}
path.add(current); // 记录路径
current = current.down;
}
// Step 2: 生成新节点的层数
int newLevel = randomLevel();
currentMaxLevel = Math.max(currentMaxLevel, newLevel); // 更新当前最大层数
// Step 3: 创建新节点,并逐层插入
SkipListNode downNode = null; // 用来连接下层的节点
SkipListNode newNode = new SkipListNode(element, score); // 只创建一次新节点
// Step 4: 按层插入节点
for (int i = 0; i < newLevel; i++) {
// 如果路径中没有足够的层,则扩展头节点
if (i >= path.size()) {
SkipListNode newHead = new SkipListNode(null, Double.NEGATIVE_INFINITY); // 创建新头节点
newHead.down = head; // 将原头节点挂到新头节点下层
head = newHead; // 更新头节点
levels++;
path.add(0, newHead); // 将新头节点添加到路径中
}
// 当前层的前一个节点
SkipListNode prev = path.get(path.size() - 1 - i);
// 插入当前层的新节点
newNode.right = prev.right; // 将当前节点的 right 指向原来节点的 right
prev.right = newNode; // 将前一个节点的 right 指向新节点
newNode.down = downNode; // 将新节点的 down 指向下层节点
downNode = newNode; // 更新下层节点为当前新节点
// 如果新层还需要节点,继续创建新节点实例
if (i < newLevel - 1) {
newNode = new SkipListNode(element, score); // 在此处复用对象
}
}
ref.put(element, newNode);
size++;
}
// 查找单个元素
public SkipListNode find(String element) {
SkipListNode current = head;
while (current != null) {
// 在右边链表中查找匹配的节点
while (current.right != null && current.right.score <= Double.POSITIVE_INFINITY) {
if (current.right.element != null && current.right.element.equals(element)) {
return current.right;
}
current = current.right;
}
// 如果当前层找不到,向下层移动
current = current.down;
}
return null; // 未找到时返回 null
}
public List<String> rangeSearchIndex(int startIndex, int endIndex) {
List<String> result = new ArrayList<>();
SkipListNode current = head;
// 找到最底层的起始节点
while (current.down != null) {
current = current.down;
}
current = current.right; // 跳过头节点
// 遍历跳表,找到索引范围内的元素
int currentIndex = 0;
while (current != null && currentIndex <= endIndex) {
if (currentIndex >= startIndex) {
result.add(current.element);
}
current = current.right;
currentIndex++;
}
return result;
}
// 查找范围内的元素
public List<String> rangeSearch(double startScore, double endScore) {
List<String> result = new ArrayList<>();
SkipListNode current = head;
// 找到范围的起点
while (current != null) {
while (current.right != null) {
if (current.right.score < startScore) {
current = current.right;
} else {
//找到0 level
if (current.down != null) {
while (current.down != null) {
current = current.down;
}
}
while (current.right != null) {
if (current.right.score >= startScore && current.right.score <= endScore) {
result.add(current.right.element);
}
current = current.right;
}
}
}
current = current.down;
}
return result;
}
// 查找排名,通过层索引到 0 level的位置,然后从 0 level开始搜索
public int rank(String element) {
SkipListNode current = head;
int rank = 0; // 记录排名
// 首先找到目标节点
SkipListNode targetNode = ref.get(element);
if (targetNode == null) {
return -1; // 如果元素不存在,返回 -1 表示未找到
}
double targetScore = targetNode.score;
// 遍历跳表,计算排名
boolean found = false;
end:
while (current != null) {
// 向右移动,累加小于目标分值的节点数量
SkipListNode rightNode = current.right;
while (rightNode != null && rightNode.score <= targetScore) {
if (rightNode.score == targetScore) {
//找到targetScore的0 level的位置
found = true;
if (rightNode.down == null) {
//如果rightNode.down为空,代表是0 level
current = rightNode;
} else {
//找到0 level的最大位置
while (rightNode.down != null) {
rightNode = rightNode.down;
}
current = rightNode;
}
break end;
}
rightNode = rightNode.right;
}
// 向下层移动
current = current.down;
}
if (!found) {
return -1;
}
SkipListNode find0Level = head;
// 找到最底层的起始节点
while (find0Level.down != null) {
find0Level = find0Level.down;
}
find0Level = find0Level.right; // 跳过头节点
while (find0Level != null && find0Level != current) {
rank++;
find0Level = find0Level.right;
}
// 最后加上当前层中等于目标分值的节点
return rank + 1; // 排名从 1 开始
}
// 打印跳表(调试用)
public void print() {
SkipListNode current = head;
while (current != null) {
SkipListNode node = current.right;
while (node != null) {
System.out.print("(" + node.element + ", " + node.score + ") ");
node = node.right;
}
System.out.println();
current = current.down;
}
}
public boolean delete(String element) {
SkipListNode current = head;
boolean found = false;
SkipListNode skipListNode = ref.get(element);
if (skipListNode == null) {
return false;
}
// 遍历每一层,找到并删除目标节点
while (current != null) {
while (current.right != null ) {
if(current.right.element.equals(element)){
found = true;
current.right = current.right.right; // 跳过目标节点
break;
}
current = current.right;
}
// 向下层移动
current = current.down;
}
if (found){
size--;
}
// 如果目标节点存在但已被删除,返回 true,否则返回 false
return found;
}
java的实现比较简单,只是大概得实现了一下,没有太多废话,具体大家可以看代码,注释还是挺清楚的
3、golang的实现
golang的实现就稍微详细一些,因为我用于zset的实现结构里面在,但是也可能存在一些问题,我还没来得及的详细的测试,目前还在开发中
3.1、Entry
// Entry 每个节点存入的数据Entry
type Entry struct {
Element string //存入的元素
Score float64 //元素对应的分值
}
3.2、Node
// Node 节点
type Node struct {
Entry *Entry //节点Entry
Right *Node //水平指针
Down *Node //垂直指针
}
3.3、SkipList
// SkipList 跳表结构
type SkipList struct {
CurrentMaxLevel uint8 //当前最大的层级
Head *Node //头节点
Levels uint8 //层数
Ref map[string]*float64 //引用关系
Size uint16 //存入的元素个数
NodeCount uint16
}
3.4、初始化创建的一些函数
// 初始化节点
func createNode(element string, score float64) *Node {
return &Node{
Entry: &Entry{
Element: element,
Score: score,
},
Right: nil,
Down: nil,
}
}
//创建一个空的SkipList
func NewSkipList() *SkipList {
return &SkipList{
CurrentMaxLevel: 1,
Levels: 1,
Ref: make(map[string]*float64),
Head: createNode("", utils.MinFloatValue()), // 头节点,起始无穷小
Size: 0,
}
}
createNode:主要用来创建节点使用
NewSkipList:创建一个新的跳表
3.5、绑定SkipList的方法
func (sl *SkipList) Insert(element string, score float64) {
sl.Push(&Entry{
Element: element,
Score: score,
})
}
func (sl *SkipList) Push(entry *Entry) {
// Step 1: 查找插入位置,并记录路径
path := make([]*Node, 0)
current := sl.Head
score := entry.Score
element := entry.Element
sl.Remove(element)
// 查找插入位置并记录路径
for i := 0; current != nil; i++ {
for current.Right != nil && current.Right.Entry.Score < score {
current = current.Right
}
path = append(path, current) // 记录路径
current = current.Down
}
// Step 2: 生成新节点的层数
newLevel := sl.randomLevel()
if newLevel > sl.CurrentMaxLevel {
sl.CurrentMaxLevel = newLevel
}
// Step 3: 创建新节点,并逐层插入
var downNode *Node // 用来连接下层的节点
newNode := createNode(element, score) // 只创建一次新节点
// Step 4: 按层插入节点
for i := 0; i < int(newLevel); i++ {
// 如果路径中没有足够的层,则扩展头节点
if i >= len(path) {
newHead := createNode("", utils.MinFloatValue()) // 创建新头节点
newHead.Down = sl.Head // 将原头节点挂到新头节点下层
sl.Head = newHead // 更新头节点
sl.Levels++
newPath := make([]*Node, len(path)+1)
newPath[0] = newHead
copy(newPath[1:], path)
path = newPath // 将新头节点添加到路径中
}
sl.NodeCount++
// 当前层的前一个节点
prev := path[len(path)-1-i]
// 插入当前层的新节点
newNode.Right = prev.Right // 将当前节点的 right 指向原来节点的 right
prev.Right = newNode // 将前一个节点的 right 指向新节点
newNode.Down = downNode // 将新节点的 down 指向下层节点
downNode = newNode // 更新下层节点为当前新节点
// 如果新层还需要节点,继续创建新节点实例
if i < int(newLevel-1) {
newNode = createNode(element, score) // 在此处复用对象
}
}
sl.Ref[element] = &score
sl.Size++
}
func (sl *SkipList) Find(element string) *Entry {
if score, ok := sl.Ref[element]; ok {
return &Entry{element, *score}
}
return nil
}
// 在 正态分布(又叫高斯分布)的 randomLevel 中,mean 和 stddev 是决定生成的层数分布的两个关键参数:
// mean(均值):正态分布的均值决定了层数的集中趋势,即跳表生成的节点层数的“中心”位置。一般来说,mean 越大,跳表的层数就越高,生成的高层节点的概率也越大。
// 在跳表的上下文中,mean 通常代表了跳表的 预期层数。例如,如果 mean = 3,则大多数节点的层数会集中在 3 层附近。
// stddev(标准差):标准差决定了层数的 分散程度。标准差越小,层数会越集中在均值附近;标准差越大,层数的变化范围越广,生成高层节点的概率会更大,也会导致跳表更高。
// 小标准差意味着跳表的层数相对稳定,大多数节点的层数接近于 mean。
// 大标准差意味着层数的变化较大,可能会有一些节点生成非常高层数的情况。
// todo 这个随机生成层数的算法没有经过数据的验证,后面测试中如果发现分布不均匀的时候再来调整
func (sl *SkipList) randomLevel() uint8 {
var mean float64
var stddev float64
if sl.NodeCount <= 1000 {
//当节点数小于1000的时候,我们设置均值为3,标准差为0.5
mean = 3.0
stddev = 0.5
} else {
// 根据节点数动态设置 mean。使用 log2(nodeCount) 或者比例
mean = math.Log2(float64(sl.NodeCount))
if mean > float64(maxLevel) {
mean = float64(maxLevel) // 层数不会超过最大值
}
// 当跳表当前层数较低时,可以增加 mean,帮助生成更多的高层节点
if sl.CurrentMaxLevel < maxLevel {
mean = (mean + float64(sl.CurrentMaxLevel)) / 2
}
// 根据当前层数设置 stddev
// 如果当前层数较小,增加 stddev,允许更大的层数波动
stddev = mean / 2
if sl.CurrentMaxLevel < 3 {
stddev = mean // 增加波动性
} else {
stddev = mean / 2 // 当层数多时,减少波动
}
}
// 使用正态分布生成一个随机数
level := uint8(mean + stddev*rand.NormFloat64())
//随机生成的层数不能大于了当前最大层上一层,比如当前最大的是3,生成的level最大只能是4,不能是大于4
if level > sl.CurrentMaxLevel {
level = sl.CurrentMaxLevel + 1
}
if level > maxLevel {
level = maxLevel
}
return level
}
// Rank 查找排名,通过层索引到 0 level的位置,然后从 0 level开始搜索
func (sl *SkipList) Rank(element string) int {
current := sl.Head
rank := 0 // 记录排名
// 首先找到目标节点
targetScore := sl.Ref[element]
if targetScore == nil {
return -1 // 如果元素不存在,返回 -1 表示未找到
}
// 遍历跳表,计算排名
found := false
end:
for current != nil {
// 向右移动,累加小于目标分值的节点数量
rightNode := current.Right
for rightNode != nil && rightNode.Entry.Score <= *targetScore {
if rightNode.Entry.Score == *targetScore {
//找到targetScore的0 level的位置
found = true
if rightNode.Down == nil {
//如果rightNode.down为空,代表是0 level
current = rightNode
} else {
//找到0 level的最大位置
for rightNode.Down != nil {
rightNode = rightNode.Down
}
current = rightNode
}
break end
}
rightNode = rightNode.Right
}
// 向下层移动
current = current.Down
}
if !found {
return -1
}
find0Level := sl.Head
// 找到最底层的起始节点
for find0Level.Down != nil {
find0Level = find0Level.Down
}
find0Level = find0Level.Right // 跳过头节点
for find0Level != nil && find0Level != current {
rank++
find0Level = find0Level.Right
}
// 最后加上当前层中等于目标分值的节点
return rank + 1 // 排名从 1 开始
}
func (sl *SkipList) FindForScore(element string) float64 {
if score, exists := sl.Ref[element]; exists {
return *score
}
return utils.MinFloatValue()
}
func (sl *SkipList) RangeCount(startScore, endScore float64) (count uint16) {
current := sl.Head
for current != nil {
for current.Right != nil {
if current.Right.Entry.Score < startScore {
current = current.Right
} else {
//找到0 level
if current.Down != nil {
for current.Down != nil {
current = current.Down
}
}
for current.Right != nil {
if current.Right.Entry.Score >= startScore && current.Right.Entry.Score <= endScore {
count++
}
current = current.Right
}
}
}
current = current.Down
}
return
}
func (sl *SkipList) Range(startScore, endScore float64, offset, count int, reverse bool) (ret []*context.Pair[float64, string]) {
current := sl.Head
limit := count > 0
if limit {
ret = make([]*context.Pair[float64, string], count)
}
// 找到范围的起点
randIndex := 0
offsetIndex := 0
endFind:
for current != nil {
for current.Right != nil {
if current.Right.Entry.Score < startScore {
current = current.Right
} else {
//找到0 level
if current.Down != nil {
for current.Down != nil {
current = current.Down
}
}
for current.Right != nil {
if current.Right.Entry.Score >= startScore && current.Right.Entry.Score <= endScore {
if offsetIndex >= offset {
if limit && !reverse {
ret[randIndex] = &context.Pair[float64, string]{Key: current.Right.Entry.Element, Value: current.Right.Entry.Score}
randIndex++
offsetIndex++
if randIndex >= count {
break endFind
}
} else {
ret = append(ret, &context.Pair[float64, string]{Key: current.Right.Entry.Element, Value: current.Right.Entry.Score})
}
} else {
offsetIndex++
}
}
current = current.Right
}
}
}
current = current.Down
}
return
}
func (sl *SkipList) RangeIndex(startIndex, endIndex int) (ret []context.Pair[float64, string]) {
// 处理负索引,-1代表最后一个元素,-2代表倒数第二个元素
ret = make([]context.Pair[float64, string], 0)
if endIndex < 0 {
endIndex = int(sl.Size) + endIndex // 将负数索引转换为正数
}
if startIndex < 0 {
startIndex = int(sl.Size) + startIndex // 将负数索引转换为正数
}
current := sl.Head
// 找到最底层的起始节点
for current.Down != nil {
current = current.Down
}
current = current.Right // 跳过头节点
// 遍历跳表,找到索引范围内的元素
currentIndex := 0
for current != nil && currentIndex <= endIndex {
if currentIndex >= startIndex {
ret = append(ret, context.Pair[float64, string]{Key: current.Entry.Element, Value: current.Entry.Score})
}
current = current.Right
currentIndex++
}
return
}
func (sl *SkipList) Remove(element string) bool {
current := sl.Head
found := false
score := sl.Ref[element]
if score == nil {
return false
}
// 遍历每一层,找到并删除目标节点
for current != nil {
//每一层都是从头开始搜索
rightNode := current
currLayerFound := false
for rightNode.Right != nil && rightNode.Right.Entry.Score <= *score {
if rightNode.Right.Entry.Element == element {
rightNode.Right = rightNode.Right.Right // 跳过目标节点
found = true
//如果搜索到了,那么修改下down的指针
current = rightNode.Down
currLayerFound = true
sl.NodeCount--
break
}
rightNode = rightNode.Right
}
// 向下层移动
if !currLayerFound {
current = current.Down
}
}
if found {
sl.Size--
delete(sl.Ref, element)
return true
}
// 如果目标节点存在但已被删除,返回 true,否则返回 false
return false
}