HashMap的线程安全性 vs ConcurrentHashMap

发布于:2025-07-27 ⋅ 阅读:(20) ⋅ 点赞:(0)
HashMap的线程安全性

众所周知HashMap是线程不安全的,主要体现在以下几个方面。

在分析之前,为了和之前的文章中的线程安全性观点做个呼应,我们首先回忆一下前面分析过的导致线程安全问题的主要原因:

  1. 要访问的对象在内存中是共享的,比如实例属性是存储在堆内存的、是共享的内存区域。
  2. 对该内存对象的操作不具备原子性。

具备以上两点就可能会导致线程安全问题。而HashMap的数据就是存储在实例属性中的,在这个基础上,我们分析一下HashMap线程不安全的主要原因。

HashMap的put方法

HashMap的put方法涉及到多个步骤,肯定不是原子操作,在执行这些步骤的过程中也并没有对操作对象 — HashMap中存储的数据进行线程安全性的保护,所以,put方法一定会导致线程安全问题。

put方法源码比较长(调用了final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) 方法),这里就不贴出了。主要逻辑就是访问HashMap存储数据的属性table,根据put对象的hash值定位hash桶后把值放入桶内。这个过程中没有进行任何并发保护,所以如果有两个线程A和B同时put同一个值,A线程的值可能会被B线程覆盖。另外一种情形就是,A线程和B线程put的key值并不相同、但是hash值一样,正常情况下两个key值会被放置到同一个hash桶内、以列表或者红黑树的方式分别存储,但是并发情况下如果正好是空桶、A线程的数据可能会被B线程覆盖,导致A线程put之后get不到数据的情况。

resize方法

HashMap的多种put方式都有可能会触发resize,resize操作的过程中会创建新桶、数据移入新桶、丢弃旧桶。resize的这一系列操作相比put操作来说更耗时,因此也最有可能发生线程安全问题。

case1:线程A执行put操作触发resize扩容,线程B通过get方法获取数据。线程A创建新桶、已经切换table为新桶、但是数据尚未完全迁移到新桶的情况下,线程B的get方法获取不到本来应该能获取到的数据。

case2:线程A的put操作触发resize扩容、线程B同样也在put,这种情况就会更加复杂,线程B可能触发也可能不触发resize(因为putVal是在写入旧桶数据之后再根据size和threshold判断是否需要扩容,这个时候threshold可能被线程A修改了就不再需要扩容,也可能没被线程A修改这个时候就需要扩容),不管线程B是否需要扩容都会导致要么数据被丢失、要么数据被覆盖、要么在极端情况下可能会导致链表处理过程混乱甚至死循环。

HashMap适合的场景

虽然HashMap在线程安全方面表现这么糟糕,但是在很多场景下我们还是避免不了要用HashMap,否则,HashMap在Java世界中就不会这么著名、被这么广泛使用了,或者说他就没有什么存在的意义了。

前面我们对HashMap在线程安全方面表现的分析,有利于帮我们总结出来HashMap的适用场景,或者说HashMap最佳编程实践。

首先,你想想你需要HashMap缓存数据的场景,有多少是提前就能大概知道容量的?如果你知道容量,那你在new一个HashMap对象的时候就初始化好容量,既能避免他扩容时带来的开销,又能避免扩容时可能导致的线程安全问题。

其次,仔细想想你的场景会不会多个线程操作HashMap,并且,写入数据是否会和获取数据并发?或者,是否会存在多个并发的写入线程?存在的话果断启用HashMap。

再者,你的场景是否能支持一次性缓存好数据、后续应用只要get数据就可以?是的话就放心用HashMap,系统初始化的时候就把数据一次性缓存好。

最后,如果判断你的场景可能会导致HashMap的线程安全问题,劝你果断弃用HashMap,不要用sychronized或者ReentranceLock之类的机制解决HashMap的线程安全问题,不是解决不了而是没有必要,否则还用ConcurrentHashMap干啥?ConcurrentHashMap那么牛逼放在那儿不用白白浪费岂不可惜?

ConcurrentHashMap

和HashMap相对比,为了确保线程安全,ConcurrentHashMap主要做了以下(非常牛逼的)改造。

数据结构

我们知道HashMap存储数据的结构是Node:

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        V value;
        Node<K,V> next;

包含key值的hash、key值、value,以及指向下一节点的next。next节点为hash值相同的不同对象生成列表。列表长度达到8以后,数据结构由列表变成红黑树,就采用Node的扩展类TreeNode存储数据。

ConcurrentHashMap的Node节点:

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

和HashMap相比,val 和 next变成了volatile 修饰,volatile 的作用我们前面做过分析了,在线程安全方面的主要作用是确保不同线程之间的内存可见性和禁止指令重排。

ForwardingNode节点

ForwardingNode是ConcurrentHashMap为确保线程安全引入的重要数据结构,ForwardingNode的hash被标注为 MOVED:

	static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

除了父类的基础数据之外,引入了一个新的属性nextTable,指向扩容迁移过程中的新表:

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

ForwardingNode节点是一个特殊节点,是ConcurrentHashMap在扩容过程中的一个过渡节点。ConcurrentHashMap扩容是逐个桶进行的,当某一个桶内的所有节点都完成扩容(创建新桶-迁移旧桶数据到新桶)后,旧桶会被置为ForwardingNode节点,该桶只有一个ForwardingNode节点数据,ForwardingNode节点不保存数据,只是一个占位标记,表明当前桶的数据已经被迁移到新表,nextTable指向新表,get方法在获取数据的过程中如果碰到ForwardingNode则会通过nextTable从新表获取数据。

get方法

ConcurrentHashMap的get方法在不加锁的情况下就可以实现线程安全:

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法能保证线程安全的主要原因有两个:

一个是tabAt方法,用Usafe的getObjectVolatile,能够确保tab数组中节点在读取过程中的可见性,即使有其他线程并发修改数据也能获取到修改后的数据,保证数据一致性。

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

另外一个就是e.find方法,eh<0表示这个节点可能是ForwardingNode节点,如果是ForwardingNode节点的话,就会调用ForwardingNode的find方法获取数据:

Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }

会从我们上面提到的naxtTable,也就是新表中获取数据,而且遍历nextTable的过程中可能还会碰到ForwardingNode(if (e instanceof ForwardingNode)),应对的是在一个线程扩容尚未完成的情况下,另外一个线程又针对新表再次扩容,这种情况应该非常少见,但是,ConcurrentHashMap支持这种极端情况下的线程安全性。

put方法

put方法调用putVal,ConcurrentHashMap会自旋、CAS、局部锁(锁桶内首节点)等方式并用,保证put方法的线程安全性:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //不允许key值null,也不允许value null
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
           //获取table后,自旋的方式完成写入
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
               //如果tab为空,首次写入数据,线程安全的方式进行初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
               //否则,tab不空并且,定位到了空桶中,CAS方式无锁线程安全地写入数据
               //这里用到了CAS compare and swap,无锁但是线程安全
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                //否则,当前正在扩容,调用helpTransfer方法协助扩容快速完成,等待下次自旋再插入
                tab = helpTransfer(tab, f);
            else {
                //否则,定位到的桶处于正常状态,桶不空,hash冲突,需要把数据写入桶内
                V oldVal = null;
                synchronized (f) {
                    //锁当前桶的首节点
                    if (tabAt(tab, i) == f) {
                       //双检,再次确认数据未被并发修改
                        if (fh >= 0) {
                            //普通节点
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                //从首节点开始遍历
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                     //找到已存在的数据,可以put了
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                //pred=当前节点
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    //已经到链表尾部了,还没找到,说明当前要put的需要是一个新的节点
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    //new一个新节点,链接到桶内链表的尾部
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            //否则,是红黑树,调用红黑树算法完成put
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //已经完成插入了
                    if (binCount >= TREEIFY_THRESHOLD)
                        //需要树化
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        //只是替换了原来的节点,没有新加入节点,结构没有变化,可以直接返回
                        return oldVal;
                    break;
                }
            }
        }
        // 检查计数,可能会触发扩容
        addCount(1L, binCount);
        return null;
    }

节点加入之后会调用addCount增加计数,并且有可能会触发扩容。addCount和扩容方法是ConcurrentHashMap中技术含量最高的方法,值得程序员深入学习。

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

需要先了解几个变量:
baseCount: 是ConcurrentHashMap的基础计数器,在没有多线程竞争的情况下,计数会直接累加到baseCount上。
counterCells: 是ConcurrentHashMap的分片计数器,多线程竞争的情况下,put方法完成之后计数没办法累加到baseCount上的情况下,会根据当前线程落在分片计数器数组的槽位进行计数,缩短多线程环境下的计数竞争。

addCount方法主要完成两个逻辑,一个是计数,一个是扩容。

计数部分也就是addCount方法源码中第一个大if的部分,首先尝试CAS直接累加到baseCount上,如果成功的话,计数工作完成。

否则如果不成功的话,使用ThreadLocalRandom.getProbe() & m计算出当前线程在分片计数器counterCells中的槽位后,尝试直接将计数通过CAS方式更新到槽位中,如果成功,计数工作完成。

否则,竞争太激烈了,调用fullAddCount方法,扩容也一并交给了fullAddCount方法,所以,fullAddCount调用之后就直接return了。

如果计数成功完成的话,判断check<=1的情况下不需要进行扩容,直接return。否则,调用s=sumCount()方法计算容量:baseCount和各分片计数器的和就是ConcurrentHashMap的容量,这个容量是为了下面做是否需要扩容的判断的。

sizeCtl:是ConcurrentHashMap用来控制扩容的一个变量,设计很复杂,-1 表示正在初始化,-(1+n)表示有n个线程正在扩容中。否则,>0 表示下次需要扩容的容量(size + threshold)。

所以如果put之后发现容量大于sizeCtl,就触发扩容。

扩容逻辑也是ConcurrentHashMap的核心,放在下次,和fullAddCount方法一起分析。

many Thanks!


网站公告

今日签到

点亮在社区的每一天
去签到