JAVA集合篇--深入理解ConcurrentHashMap图解版

发布于:2025-06-23 ⋅ 阅读:(11) ⋅ 点赞:(0)

一、前言

                        在Java并发编程中,线程安全的Map实现一直是一个重要话题。虽然我们可以使用Collections.synchronizedMap()或者HashTable来获得线程安全的Map,但它们的性能在高并发场景下往往不尽人意。ConcurrentHashMap作为Java并发包中的重要组件,以其优雅的设计和出色的性能成为了并发编程的首选。


二、为什么需要ConcurrentHashMap

1.1 传统方案的问题

在ConcurrentHashMap出现之前,我们主要有以下几种线程安全的Map实现:

HashTable

// HashTable的put方法
public synchronized V put(K key, V value) {
    // 整个方法都被synchronized修饰
    // 所有操作都是串行的
}

Collections.synchronizedMap()

// SynchronizedMap的实现
public V put(K key, V value) {
    synchronized (mutex) {  // mutex通常是this
        return m.put(key, value);
    }
}

这两种方案的共同问题是:

  • 粗粒度锁:整个Map只有一把锁,所有操作都需要竞争这把锁
  • 读写冲突:读操作也需要获取锁,无法并发进行
  • 性能瓶颈:在高并发场景下,锁竞争激烈,性能急剧下降

1.2 性能对比示例

public class MapPerformanceTest {
    private static final int THREAD_COUNT = 16;
    private static final int OPERATION_COUNT = 100_000;
    
    public static void main(String[] args) throws InterruptedException {
        // 测试HashTable
        Map<String, Integer> hashTable = new Hashtable<>();
        testConcurrentAccess("HashTable", hashTable);
        
        // 测试ConcurrentHashMap
        Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
        testConcurrentAccess("ConcurrentHashMap", concurrentMap);
    }
    
    private static void testConcurrentAccess(String mapType, Map<String, Integer> map) 
            throws InterruptedException {
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < OPERATION_COUNT; j++) {
                        map.put("key" + j, j);
                        map.get("key" + j);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            }).start();
        }
        
        startLatch.countDown();
        endLatch.await();
        
        long endTime = System.currentTimeMillis();
        System.out.println(mapType + " 耗时: " + (endTime - startTime) + "ms");
    }
}

典型测试结果

  • HashTable: 2500ms+
  • ConcurrentHashMap: 800ms-

三、ConcurrentHashMap的演进历史

2.1 JDK 1.5-1.7:分段锁时代

在JDK 1.7及之前,ConcurrentHashMap采用**分段锁(Segment)**的设计:

// JDK 1.7 ConcurrentHashMap结构
static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile HashEntry<K,V>[] table;  // 该段的哈希表
    transient int count;                        // 该段的元素数量
    transient int modCount;                     // 修改次数
    transient int threshold;                    // 扩容阈值
    final float loadFactor;                     // 负载因子
}

// HashEntry结构
static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;  // 链表指针
}

分段锁原理

  1. 将整个Map分成多个Segment(默认16个)
  2. 每个Segment相当于一个小的HashMap,拥有独立的锁
  3. 不同Segment之间的操作可以并发进行
  4. 只有操作同一个Segment时才需要竞争锁

 

// JDK 1.7 put操作
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 根据hash值确定segment
    int j = (hash >>> segmentShift) & segmentMask;
    
    if ((s = (Segment<K,V>)UNSAFE.getObject
         (segments, (j << SSHIFT) + SBASE)) == null)
        s = ensureSegment(j);
    
    return s.put(key, hash, value, false);
}

2.2 JDK 1.8+:CAS + synchronized的革新

JDK 1.8对ConcurrentHashMap进行了重大改革:

主要变化

  1. 取消分段锁:改为对每个桶(数组元素)进行锁定
  2. 引入红黑树:链表长度超过8时转换为红黑树
  3. CAS操作:大量使用CAS操作减少锁的使用
  4. 锁粒度更细:锁的粒度从Segment级别降低到桶级别
// JDK 1.8+ 主要数据结构
transient volatile Node<K,V>[] table;      // 主数组
private transient volatile Node<K,V>[] nextTable;  // 扩容时的新数组
private transient volatile int sizeCtl;    // 控制标识符

// Node结构
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
}

// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;
    boolean red;
}

 

四、扩容的区别

4.1、JDK1.7中的扩容

  • 基于SegmentConcurrentHashMap 是由多个 Segment 组成的,每个 Segment 中包含一个 HashMap。当某个 HashMap 达到扩容阈值时,单独为该 Segment 进行扩容,而不会影响其他的 Segment
  • 扩容过程:每个 Segment 维护自己的负载因子,当 Segment 中的元素超过阈值时,该Segment HashMap 会扩容,整体的 ConcurrentHashMap 并不是一次性全部扩容。

4.2、JDK1.8中的扩容

  • 全局扩容ConcurrentHashMap 取消了 Segment,变成了一个全局数组类似于HashMap),因此当 ConcurrentHashMap 中的任意一个元素超出阈值时,整个 ConcurrentHashMap 都会触发扩容。
  • 基于CAS扩容:在扩容时,ConcurrentHashMap 采用了类似 HashMap 的方式,但通过 CAS操作 确保线程的安全,避免了锁住整个数组。在扩容时多个线程可以协同完成扩容。
  • 渐进式扩容:JDK1.8的 ConcurrentHashMap 引入了渐进式扩容机制,扩容不是一次性将所有数据重新分配,而是多个线程共同参与,逐步迁移旧数据到新数组,降低了扩容时性能的开销

4.3、深入JDK 1.8:多线程协作扩容

// JDK 1.8 扩容核心流程
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    
    // 计算每个线程的工作量
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    
    // 首个线程初始化新表
    if (nextTab == null) {
        try {
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false;
    
    // 多线程协作迁移
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        
        // 获取工作范围
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // CAS获取工作范围
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                                         nextBound = (nextIndex > stride ?
                                                      nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        
        // 处理边界和收尾
        if (i < 0 || i >= n || i + n >= nextn) {
            // 扩容完成处理...
        }
        
        // 迁移具体的桶
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true;
        else {
            synchronized (f) {  // 锁定单个桶
                if (tabAt(tab, i) == f) {
                    // 链表/红黑树迁移逻辑
                    // 分为高位和低位两个链表/树
                }
            }
        }
    }
}

// 多线程协作时序图
Thread-1            Thread-2            Thread-3
    |                   |                   |
    | 发起扩容             |                   |
    | 初始化nextTable      |                   |
    | transfer(0-15)      | helpTransfer      |
    |                    | transfer(16-31)   | helpTransfer
    | 迁移bucket-0        |                   | transfer(32-47)
    | 迁移bucket-1        | 迁移bucket-16      |
    | ...                | ...               | 迁移bucket-32
    | 完成分配范围         | 完成分配范围        | ...
    | 检查是否全部完成      | 退出扩容           | 完成分配范围
    | 替换table引用        |                   | 退出扩容
    | 扩容完成            |                   |

优点

  • 多线程并发迁移,效率高
  • 细粒度锁,减少阻塞时间
  • 渐进式迁移,内存使用平滑
  • 支持扩容期间的读操作

缺点

  • 实现复杂度高
  • 需要额外的协调机制
  • 内存开销较大(ForwardingNode等)

五、核心源码分析

3.1 初始化过程

// 构造函数
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    
    if (initialCapacity < concurrencyLevel)
        initialCapacity = concurrencyLevel;
    
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    // 计算最接近size的2的幂次方
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    
    this.sizeCtl = cap;  // 设置初始容量
}

// 懒初始化
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // 其他线程正在初始化,让出CPU
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // CAS成功,获得初始化权限
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);  // 0.75 * n
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

3.2 put操作详解

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    
    int hash = spread(key.hashCode());  // 计算hash值
    int binCount = 0;
    
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        
        // 情况1:表未初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        
        // 情况2:目标桶为空,直接CAS插入
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;  // CAS成功,结束循环
        }
        
        // 情况3:正在扩容,协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        
        // 情况4:桶不为空,需要同步操作
        else {
            V oldVal = null;
            synchronized (f) {  // 锁定桶的头节点
                if (tabAt(tab, i) == f) {  // 双重检查
                    
                    // 4.1 链表结构
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 找到相同key,更新value
                            if (e.hash == hash &&
                                ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 到达链表尾部,插入新节点
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    
                    // 4.2 红黑树结构
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            
            // 检查是否需要树化
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);  // 转换为红黑树
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    
    addCount(1L, binCount);  // 更新计数
    return null;
}

3.3 get操作详解

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        
        // 直接命中头节点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        
        // hash值为负数,特殊节点(红黑树或转发节点)
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        
        // 遍历链表
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get操作的特点

  • 无锁读取:get操作不需要加锁,依赖volatile保证可见性
  • 快速路径:优先检查头节点,大多数情况下可快速返回
  • 支持并发扩容:即使在扩容过程中也能正确读取数据


网站公告

今日签到

点亮在社区的每一天
去签到