ConcurrentHashMap实现原理
目录
主要特点:
- 使用Node数组作为桶数组,每个桶可能是一个链表或者红黑树。
- 通过CAS和synchronized实现线程安全,每个桶的头节点作为锁,减小锁的粒度。
- 扩容时支持多线程协同工作,分片迁移数据。
- volatile变量保证内存可见性,get操作无需加锁。
- 使用计数器(如baseCount和CounterCell)来高效统计元素数量。
核心源码解读
(1)数据结构: 采用数组+链表/红黑树
- 链表长度 ≥8 且数组长度 ≥64 时,链表转为红黑树(
TREEIFY_THRESHOLD
) - 红黑树节点数 ≤6 时退化为链表(
UNTREEIFY_THRESHOLD
)
提前看看几个使用较多的方法:
// 计算 Node hash 哈希值:将原始哈希码 key.hashCode() 的高低位混合,减少哈希冲突
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS; // 高位参与运算,HASH_BITS 屏蔽负数标记
}
// 哈希值定位数组索引,使用 tabAt 方法保证内存可见性
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
// 通过 Unsafe.getObjectVolatile 直接读取内存中的最新值,无需加锁
return (Node<K,V>) U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// 通过 CAS(比较并交换)操作原子性地更新哈希表中指定位置的节点。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
特殊Node hash值,hash 小于0表示扩容节点/红黑树
// (1) 扩容节点(ForwardingNode):若遇到 ForwardingNode(哈希值 MOVED = -1)
static final int MOVED = -1; // hash for forwarding nodes
// (2) 红黑树(TreeBin):若节点为 TreeBin(哈希值 TREEBIN = -2)
static final int TREEBIN = -2; // hash for roots of trees
sizeCtl
状态管理:(状态控制变量,用于管理哈希表的初始化、扩容状态及扩容触发阈值)
sizeCtl >= 0
:表示扩容阈值(元素总数>= sizeCtl
时,触发扩容);sizeCtl = -1
:表示哈希表正在初始化;sizeCtl < -1
:表示正在扩容,存储扩容标识(resizeStamp),低16位存储当前参与扩容的线程数
/**
* The number of bits used for generation stamp in sizeCtl.
**/
private static int RESIZE_STAMP_BITS = 16;
/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 高16位存储标识
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
// 低16位存储协助线程数,扩容时示例:
U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
(2)初始化
延迟初始化:首次调用 put
时通过 initTable()
初始化数组,利用 sizeCtl
变量(volatile)控制状态:负数表示正在初始化或扩容,正数为扩容阈值。
// 默认构造器,do nothing
public ConcurrentHashMap() {
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化数组
tab = initTable();
...
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCtl小于0,等待其他线程完成初始化
if ((sc = sizeCtl) < 0)
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // CAS 竞争初始化权
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 创建 Node 数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); // 更新阈值
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
(3)并发扩容
通过 transfer()
方法迁移数据,多线程协作处理不同桶区间。扩容期间查询操作通过 ForwardingNode
find() 转发到新数组进行查询。
主要扩容场景:
(1)addCount()更新元素总数时,发现元素总数超过扩容阈值sizeCtl;
(2)树化前,单个哈希桶的链表长度 >= 8,但数组长度 < 64, 优先扩容;
(3)插入元素时遇到 ForwardingNode
,协助扩容、数据迁移**;**
…
// putVal 会触发扩容 / 协助扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
...
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
...
}
else if ((fh = f.hash) == MOVED)
// MOVED 表示当前正在扩容,当前线程协助迁移数据并扩容
//(3)插入元素时遇到 ForwardingNode,协助扩容、数据迁移;
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
... put逻辑 ...
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// (2)树化前,单个哈希桶的链表长度 >= 8,但数组长度 < 64, 优先扩容;
treeifyBin(tab, i);
...
}
}
}
// 增/减数量
// (1)addCount()更新元素总数时,发现元素总数超过扩容阈值;
addCount(1L, binCount);
return null;
}
/**
(1) addCount()更新元素总数时,发现元素总数超过扩容阈值,开始扩容;
@param check : 是否需要检查扩容(通常插入操作check>=0,删除操作可能为-1)
**/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
... 计数逻辑 ...
}
// 检查并触发扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && //元素总数超过阈值sizeCtl,在上次扩容时确定 2n*0.75
(tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) { // 未达最大容量
// 生成扩容标识
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
// sizeCtl小于0,已有其他线程在扩容
if (sc < 0) {
// 检查扩容是否已完成或协助线程数已达上限(避免过度竞争)
if (sc == rs + MAX_RESIZERS || // 协助线程数是否超限
sc == rs + 1 || // 扩容已完成(线程数归零)
(nt = nextTable) == null || transferIndex <= 0)
break;
// 尝试通过 CAS 增加协助线程数(sizeCtl +1)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 当前无扩容,尝试发起新扩容,SIZECTL 低十六位存储扩容线程数,初始设置为2(= 扩容线程数1 + 1),后续扩容完成则是0+1=1,对应上面 sc == rs + 1 判断扩容是否已完成
else if (U.compareAndSwapInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
...
}
}
}
/**
(2)树化前,单个哈希桶的链表长度 >= 8,但数组长度 < 64, 优先扩容;
**/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 单个哈希桶的链表长度 >= 8,但数组长度 < 64, 优先扩容;
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
... 树化 ...
}
}
}
// 尝试扩容
private final void tryPresize(int size) {
// 计算目标容量(确保是2的幂)
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 哈希表未初始化时先进行初始化
if (tab == null || (n = tab.length) == 0) {
...
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break; // 目标容量 <= 当前阈值 或 已达最大容量,直接退出
else if (tab == table) { // 确认当前 table 未被其他线程替换
int rs = resizeStamp(n); // 生成扩容唯一标识(与容量相关)
if (sc < 0) { // sizeCtl小于0 已有其他线程在扩容
Node<K,V>[] nt; // 新表
// 校验扩容是否可参与:
// 1. 扩容标识是否匹配,防止不同容量扩容冲突
// 2. 协助线程数是否超限(sc == rs + 1)
// 3. nextTable 是否还在
// 4. 是否还有待迁移的桶(transferIndex > 0)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || // 扩容标识是否匹配,防止不同容量扩容冲突
sc == rs + 1 || // 扩容已完成(线程数归零)
sc == rs + MAX_RESIZERS || // 协助线程数是否超限
(nt = nextTable) == null || // nextTable 是否还在
transferIndex <= 0) // 是否还有待迁移的桶(transferIndex > 0)
break;
// CAS操作 增加协助线程数,成功后参与迁移
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 当前无扩容,尝试发起新扩容,SIZECTL 低十六位存储扩容线程数,初始设置为2(= 扩容线程数1 + 1),后续扩容完成则是0+1=1,对应上面 sc == rs + 1 判断扩容是否已完成
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 启动迁移,nextTable 由 transfer 初始化
transfer(tab, null);
}
}
}
/**
插入元素时遇到 ForwardingNode,协助扩容、数据迁移;
**/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 检查到:当前表非空,且当前节点为 ForwardingNode(当前为迁移标记节点),且 ForwardingNode 中提取新表 nextTable 不为空,开始协助扩容迁移
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 传入tab长度,生成唯一标识当前扩容阶段的戳记,避免不同扩容操作混淆(用于区分不同扩容阶段)
int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;
// 新/旧数组未被替换,扩容中(sizeCtl 为负数)
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 检查扩容是否已完成或协助线程数已达上限(避免过度竞争)
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
transferIndex <= 0)
break;
// 尝试通过 CAS 增加协助线程数(sizeCtl +1)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab); // 调用数据迁移方法
break;
}
}
return nextTab;
}
// 若不符合上述if判断,表示没有扩容或者扩容已完成,返回当前 table 即可
return table;
}
核心扩容方法,
// 核心扩容逻辑:扩容、迁移数据
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// static final int NCPU = Runtime.getRuntime().availableProcessors();
// 计算每个线程处理的桶区间大小 stride(最小为 MIN_TRANSFER_STRIDE=16)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 初始化新数组(仅由第一个发起扩容的线程执行)
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 扩容为原来两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 迁移起始位置(从最后一个桶开始)
transferIndex = n;
}
// 新数组大小
int nextn = nextTab.length;
// 创建 ForwardingNode 扩容
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // 标记是否继续分配任务
boolean finishing = false; // 标记是否迁移完成
// 开始分配任务并迁移数据
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// ------------------------ 不同线程竞争桶区间分配任务 ------------------------
while (advance) {
int nextIndex, nextBound;
// 当前区间未处理完 或 迁移已完成, 退出循环
if (--i >= bound || finishing)
advance = false;
// transferIndex <= 0 无剩余任务,后续退出迁移
else if ((nextIndex = transferIndex) <= 0) { // nextIndex 从最后一个桶开始(上述赋值:transferIndex = n)
// 标记i=-1
i = -1;
advance = false;
}
// CAS竞争任务区间(transferIndex从nextIndex更新为nextBound)
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
// 每个线程处理的桶区间大小 stride,往前推进
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 竞争区间 [bound, i] 处理这个区间的扩容迁移任务
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// ------------------------ 迁移完成检查 ------------------------
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 最终完成, 更新全局变量
if (finishing) {
nextTable = null; // 设置 nextTable 为空
table = nextTab; // 替换为新数组
sizeCtl = (n << 1) - (n >>> 1); // 新阈值(2n * 0.75)
return;
}
// CAS减少协助线程数
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 若自己是最后一个线程,触发最终检查
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 设置 finishing为true
finishing = advance = true;
i = n; // recheck before commit
}
}
// 对于空桶,标记为已迁移
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 跳过已迁移节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// ------------------------ 迁移 ------------------------
synchronized (f) {// 锁定当前节点,避免并发修改
if (tabAt(tab, i) == f) {// 二次校验防止并发修改
Node<K,V> ln, hn;
// fh >= 0 处理链表节点
if (fh >= 0) {
// 通过 runBit 和 lastRun 快速分割链表,避免逐个节点重新散列。
// 这里为什么是 fh & n? 详见下述解释
int runBit = fh & n; //计算散列位(0或n),判断低位ln还是扩容后的高位hn
Node<K,V> lastRun = f;
// 遍历链表,找到最后一段连续相同散列位的节点,主要目的是直接复用 lastRun 之后的节点,减少新建节点开销
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// lastRun 是低位元素
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// lastRun 是高位元素
else {
hn = lastRun;
ln = null;
}
// 迁移 lastRun 之前的节点,到 扩容后的高位/原低位
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 低位迁移
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // 原位i
setTabAt(nextTab, i + n, hn); // 偏移i+n
setTabAt(tab, i, fwd); // 标记旧桶为已迁移
advance = true;
}
// 处理树节点(逻辑类似,需考虑树化或链表化)
else if (f instanceof TreeBin) {
...
}
}
}
}
}
}
- runBit 为什么是通过
int runBit = fh & n
来作为切割依据?
容量是2的幂时,计算key的桶位置是用位操作,即通过 hash & (n-1)
确定的,
假设原容量是n=16,二进制是10000,n-1=1111。此时hash & 1111得到的是0到15的位置。
扩容后的容量是32,二进制是100000,n-1=11111。新的位置是hash & 11111,也就是0到31。
原来的位置是 hash & 1111,而新的位置可能是原来的位置或者原来的位置+16(例如,如果hash的第5位是1的话,如10010,则这个必然在高位)。所以新的位置其实是原位置或者原位置加n。这时候,只需要判断hash的某一位是否为1,就能确定节点应该放在原位还是高位。具体来说,这个位就是n对应的二进制位。
比如,n=16时,二进制是10000,所以检查hash的第5位是否为1,如果是,则新位置是原位置+16,否则保持原位。
ForwardingNode 内部类表示迁移节点,可通过 nextTable访问新数组,
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable; // 指向新表的引用
// 需要指定扩容后的新数组 nextTable
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
// 迁移节点查找,访问新数组 nextTable
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
(4)put 操作流程
主要流程:
(1)通过 spread 通过原始hash code 计算哈希 hash
,让高位16位也参与计算确保哈希均匀分布;
(2)根据Hash查找对应的桶位置 (n - 1) & hash
, 若没有冲突直接插入新节点new Node<K,V>(hash, key, value, null)
;
(3-1)若当前桶位置发生Hash冲突,且fh >= 0表示为链表,遍历链表插入/更新;
(3-2)若当前桶位置发生Hash冲突,且Node为红黑树,调用红黑树插入方法;
(4)判断链表长度是否大于 TREEIFY_THRESHOLD = 8,执行扩容或者链表树化;
(5)统计元素总数并且检查是否超过阈值需要扩容;
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// (1) 计算哈希, 确保哈希均匀分布
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
...
// (2) 定位桶位:(n - 1) & hash 确定数组索引
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS 插入:若桶为空,通过 casTabAt 原子插入新节点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
...
else {
V oldVal = null;
// (3) 同步锁处理冲突:
// 若桶非空,使用 synchronized 锁定头节点,遍历链表/红黑树插入或更新值
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // (3-1) 链表处理
binCount = 1;
// 遍历链表插入/更新
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 查找到了key,直接设置返回
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 哈希冲突 - 继续遍历链表查找
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // (3-2) 调用红黑树插入方法
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 树化
treeifyBin(tab, i);
if (oldVal != null)
// put 返回旧值
return oldVal;
break;
}
}
}
// 增/减总数统计
addCount(1L, binCount);
return null;
}
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
// 尝试无竞争更新 baseCount
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 若 counterCells 已初始化或 CAS更新 baseCount 失败, 都说明已存在竞争
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || // CounterCell数组未初始化
(a = as[ThreadLocalRandom.getProbe() & m]) == null || // 当前线程的Cell未分配(ThreadLocalRandom.getProbe():获取当前线程的哈希码,用于在 CounterCell 数组中选择一个槽位,减少不同线程竞争同一Cell的概率)
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// CAS更新Cell值失败
// 初始化Cell、扩容Cell数组等
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
... 检查并扩容逻辑 ...
s = sumCount();
...
}
}
baseCount
是基础的计数器变量,但在高并发下频繁 CAS 更新会导致性能问题(可能导致U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 频繁失败),因此引入 CounterCell
数组分散线程竞争。
例如,在jdk8的时候是有引入一个类Striped64,其中LongAdder和DoubleAdder就是对这个类的实现。这两个方法都是为解决高并发场景而生的。(是AtomicLong的加强版,AtomicLong在高并发场景性能会比LongAdder差。但是LongAdder的空间复杂度会高点)
ConcurrentHashMap 高并发下更新元素计数 的核心方法 fullAddCount 借鉴了 LongAdder 的分段计数思想,避免所有线程竞争同一变量,分散到不同 CounterCell
槽位,减少 CAS 冲突。主要流程:
(1)未初始化counterCells数组,cas加锁初始化数组并插入新的 CounterCell(x)
;
(2-1)已初始化,若线程probe对应槽位上为空,cas加锁插入新的 CounterCell(x)
**;
(2-2)已初始化,若线程probe对应槽位上不为空,cas加锁更新 CounterCell(x)
计数;
(3)若(2-2)更新失败表示存在冲突 collide=true
,翻倍扩容数组,最大容量为 NCPU
(与CPU核心数对齐);
(4)兜底策略 - 当 CounterCell 初始化或扩容失败时,回退到无锁更新 baseCount;
// 设计借鉴了 LongAdder 的分段计数思想,通过分散竞争来优化性能
private final void fullAddCount(long x, boolean wasUncontended) {
int h; // 当前线程的 probe(哈希值),用于定位 CounterCell 数组的槽位,减少竞争
// probe未初始化,强制初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // 标记槽位是否冲突(是否需要扩容)
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// counterCells数组已初始化
if ((as = counterCells) != null && (n = as.length) > 0) {
// (1) CounterCell数组对应位置槽位为空,尝试创建新Cell(初始值为x)并插入数组
if ((a = as[(n - 1) & h]) == null) { // 当前槽位为空
if (cellsBusy == 0) { // cellsBusy == 0 无其他线程在修改数组
// 创建CounterCell
CounterCell r = new CounterCell(x);
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // CAS加锁 CELLSBUSY
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; // 插入新CounterCell
created = true;
}
} finally {
cellsBusy = 0; // 释放锁 CELLSBUSY
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) // CAS更新Cell值+x
break;
else if (counterCells != as || n >= NCPU) // 数组已扩容或达到数量上限
collide = false; // 无需继续扩容
else if (!collide)
collide = true; // 上面cas更新 CELLVALUE 失败,标记冲突,下次循环可能触发扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // CAS加锁扩容
try {
if (counterCells == as) {// 数组未被其他线程修改
CounterCell[] rs = new CounterCell[n << 1];// 容量翻倍
for (int i = 0; i < n; ++i) // 逐个复制旧元素
rs[i] = as[i];
counterCells = rs;// 更新新的CounterCell数组
}
} finally {
cellsBusy = 0;// 释放锁 CELLSBUSY
}
collide = false;// 重置冲突标志
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h); // 更新线程哈希值 probe,减少后续冲突
}
// counterCells数组未初始化,初始化CounterCell并插入
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {// cas加锁
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];// 初始容量为2
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 兜底策略:当 CounterCell 初始化或扩容失败时,回退到无锁更新 baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
(5)计数 size
将全局计数(baseCount
)和分片计数(CounterCell
数组)结合。
/**
返回 int 类型,最大值为 Integer.MAX_VALUE
**/
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/**
ConcurrentHashMap-only methods
支持返回 long 类型,推荐优先使用
**/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
/**
* 遍历 CounterCell 数组,累加所有单元格的值到 baseCount,得到当前总元素数 s
**/
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
(6)get 操作
- 无锁读取:依赖
volatile
修饰的Node.val
和Node.next
保证可见性 - 扩容兼容性:若遇到
ForwardingNode
,通过其find()
方法在新数组中查找数据
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 头节点匹配直接返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// 无锁读取
return e.val;
}
// 处理特殊节点(如红黑树或 ForwardingNode)
else if (eh < 0)
// 调用红黑树或扩容节点的查找逻辑
return (p = e.find(h, key)) != null ? p.val : null;
// 头节点不匹配,链表读取
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))// 命中链表中的某个节点
return e.val;
}
}
return null;
}
(7)remove 操作
主要流程:
(1)通过 spread 通过原始hash code 计算哈希 hash
,让高位16位也参与计算确保哈希均匀分布;
(2)根据Hash查找对应的桶位置 (n - 1) & hash
, 若没有返回 null;
(3)若当前桶位置正在扩容,协助迁移数据helpTransfer
后重试;
(4)加锁处理链表/树,匹配键值,删除节点;
(5)更新元素总数-1;
public V remove(Object key) {
return replaceNode(key, null, null);
}
/**
* Implementation for the four public remove/replace methods:
* Replaces node value with v, conditional upon match of cv if
* non-null. If resulting value is null, delete.
* cv: 非 null 时,只有旧值匹配 cv 时才操作
*/
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 若表未初始化或桶为空,直接退出
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
// 若桶正在扩容(MOVED状态),协助扩容后重试
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
// 加锁处理链表或树
synchronized (f) {
if (tabAt(tab, i) == f) {
// 链表处理
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
// 匹配键值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
// cv若非空判断cv是否匹配
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;// 替换值
else if (pred != null)
pred.next = e.next;// 删除中间节点
else
setTabAt(tab, i, e.next);// 删除头节点
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
// 树处理逻辑
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
// 操作处理完成后,
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1); // 若是删除,更新元素计数-1
return oldVal;
}
break;
}
}
}
return null;
}
(8)遍历操作
以遍历键值对 entrySet
举例,keySet()
与 values()
同理,
public Set<Map.Entry<K,V>> entrySet() {
EntrySetView<K,V> es;
return (es = entrySet) != null ? es : (entrySet = new EntrySetView<K,V>(this));
}
EntrySetView
的迭代器(EntryIterator
)是弱一致性的,允许遍历过程中其他线程并发修改数据,不会抛出 ConcurrentModificationException
。
弱一致性体现:Traverser
在初始化或恢复状态时,保存的是遍历开始时的哈希表引用(tab
)。即使其他线程触发了扩容(生成新表更新了table),迭代器仍可能继续遍历旧表的部分数据,直到遇到 ForwardingNode
才会切换到新表。这意味着遍历结果可能是新旧表的混合视图。
static final class EntrySetView<K,V> extends CollectionView<K,V,Map.Entry<K,V>>
implements Set<Map.Entry<K,V>>, java.io.Serializable {
private static final long serialVersionUID = 2249069246763182397L;
// 初始化会保存ConcurrentHashMap的引用
EntrySetView(ConcurrentHashMap<K,V> map) { super(map); }
...
/**
* @return an iterator over the entries of the backing map
*/
public Iterator<Map.Entry<K,V>> iterator() {
ConcurrentHashMap<K,V> m = map;
Node<K,V>[] t;
int f = (t = m.table) == null ? 0 : t.length;
return new EntryIterator<K,V>(t, f, 0, f, m);
}
...
}
迭代器实现:
// entry迭代器,继承自 EntryIterator -> BaseIterator -> Traverser
static final class EntryIterator<K,V> extends BaseIterator<K,V>
implements Iterator<Map.Entry<K,V>> {
EntryIterator(Node<K,V>[] tab, int index, int size, int limit,
ConcurrentHashMap<K,V> map) {
super(tab, index, size, limit, map);
}
/**
next 实现
**/
public final Map.Entry<K,V> next() {
Node<K,V> p;
if ((p = next) == null)
throw new NoSuchElementException();
K k = p.key;
V v = p.val;
lastReturned = p;
advance();
return new MapEntry<K,V>(k, v, map);
}
}
// 进入父类 Traverser, Traverser 是用于 并发安全遍历哈希表 的核心内部类,实现了推进数组遍历的方法 advance
static class Traverser<K,V> {
Node<K,V>[] tab; // 当前遍历的哈希表(可能在遍历过程中切换到扩容后的新表)
Node<K,V> next; // 当前遍历到的节点,用于链式推进。
TableStack<K,V> stack, spare; // stack 保存遍历状态的栈(用于处理 ForwardingNode 跳转到新表的情况)
int index; // 当前遍历的桶索引(动态调整,可能跨新旧表)
int baseIndex; // 初始表的起始索引
int baseLimit; // 初始表的结束索引
final int baseSize; // 初始表的容量
Traverser(Node<K,V>[] tab, int size, int index, int limit) {
this.tab = tab;
this.baseSize = size;
this.baseIndex = this.index = index;
this.baseLimit = limit;
this.next = null;
}
/**
* 推进遍历
(1)若当前节点是链表或树节点,直接移动到下一个节点。
(2)若遇到 ForwardingNode,切换到新表并保存旧表状态到栈。
(3)恢复状态:当新表遍历到边界时,弹出栈顶状态,恢复旧表的遍历。
(4)跨段遍历:通过 index += baseSize 实现逻辑分片遍历,避免遗漏旧表数据。
*/
final Node<K,V> advance() {
Node<K,V> e;
if ((e = next) != null)
e = e.next; // 移动到链表/树的下一个节点
for (;;) {
Node<K,V>[] t; int i, n; // must use locals in checks
if (e != null)
return next = e; // 更新next为当前节点,并返回
// 遍历完成或表为空,返回nul
if (baseIndex >= baseLimit || (t = tab) == null ||
(n = t.length) <= (i = index) || i < 0)
return next = null;
// hash小于0,处理特殊节点(ForwardingNode / TreeBin)
if ((e = tabAt(t, i)) != null && e.hash < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;// tab 切换到新表
e = null;
pushState(t, i, n); // 保存当前表的状态到栈
continue;// 重新循环处理新表
}
else if (e instanceof TreeBin)
e = ((TreeBin<K,V>)e).first;// 获取树的第一个节点
else
e = null;
}
if (stack != null)
recoverState(n); // 弹出栈顶状态,恢复旧表遍历
else if ((index = i + baseSize) >= n)
index = ++baseIndex; // visit upper slots if present
}
}
/**
* 遇到 ForwardingNode 时,将当前表的遍历状态(表引用、索引、容量)压入栈,以便后续恢复
*/
private void pushState(Node<K,V>[] t, int i, int n) {
TableStack<K,V> s = spare; // reuse if possible
if (s != null)
spare = s.next;
else
s = new TableStack<K,V>();
s.tab = t;// 旧数组
s.length = n;// 旧数组长度
s.index = i;// 当前遍历的索引
s.next = stack;
stack = s;// 更新栈顶
}
/**
* 当新表遍历完成后,从栈中弹出旧表状态,恢复索引和表引用,继续遍历旧表的剩余部分
*
* @param n length of current table
*/
private void recoverState(int n) {
TableStack<K,V> s; int len;
while ((s = stack) != null && (index += (len = s.length)) >= n) {
n = len;
index = s.index;
tab = s.tab;
s.tab = null;
TableStack<K,V> next = s.next;
s.next = spare; // save for reuse
stack = next;
spare = s;
}
if (s == null && (index += baseSize) >= n)
index = ++baseIndex;
}
}
基于源码总结一些使用注意事项
(1)ConcurrentHashMap 明确禁止 null
键和值,使用时会直接抛出 NullPointerException
;
(2)size()
:返回 int
,可能溢出(当键值对超过 Integer.MAX_VALUE
时),推荐优先使用mappingCount()
。注意计数均为近似值,高并发情况下不保证绝对精确;
(3)根据场景调整好初始容量和负载因子,避免频繁扩容(触发 transfer
方法重组数据、扩容期间会产生更多CPU时间片占用以及内存占用);
(4)ConcurrentHashMap 是弱一致性的迭代器**,** 迭代器不保证实时反映所有并发修改,并发修改也不会抛出异常,因此最终迭代的结果可能包含了新旧数据两部分。