原子类
JDK提供的原子类,即Atomic*
类有很多,大体可做如下分类:
形式 | 类别 | 举例 |
---|---|---|
Atomic* | 基本类型原子类 | AtomicInteger、AtomicLong、AtomicBoolean |
Atomic*Array | 数组类型原子类 | AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray |
Atomic*Reference | 引用类型原子类 | AtomicReference、AtomicStampedReference、AtomicMarkableReference |
Atomic*FieldUpdater | 升级类型原子类 | AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater |
*Adder | 自增器 | LongAdder、DoubleAdder |
*Accumulator | 累加器 | LongAccumulator、DoubleAccumulator |
引入原因
JDK为何要引入如此多的原子类API。
因为Java中的运算操作,如自增(++)或自减(–)都不是原子性操作,若没有进行额外的同步操作,在多线程环境下就是线程不安全。给变量增加volatile关键词也不管用。
volatile的作用是:
- 可见性:对volatile变量所有的写操作都能立即反应到其他线程中;
- 有序性:禁止指令的重排序优化。
并不能保证自增或自减操作的原子性。
AtomicLong
JDK1.5引入的AtomicLong,作用是对长整形对象进行原子操作。类似地,AtomicInteger用于对整形对象进行原子操作;AtomicBoolean用于对Boolean对象进行原子操作。这几个API的方法非常相似。
示例
直接给出代码:
public class AtomicLongExample {
private static final AtomicLong counter = new AtomicLong();
public static void increment() {
counter.incrementAndGet();
}
public static long get() {
return counter.get();
}
}
可知:实现起来非常简单。
原理
看看AtomicLong.incrementAndGet()
源码:
public class AtomicLong extends Number implements Serializable {
private static final Unsafe U = Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicLong.class, "value");
public final long incrementAndGet() {
return U.getAndAddLong(this, VALUE, 1L) + 1L;
}
}
继续看Unsafe类的getAndAddLong方法:
@IntrinsicCandidate
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!weakCompareAndSetLong(o, offset, v, v + delta));
return v;
}
@IntrinsicCandidate
public native long getLongVolatile(Object o, long offset);
// weakCompareAndSetLong方法最后调用
@IntrinsicCandidate
public final native boolean compareAndSetLong(Object o, long offset, long expected, long x);
最后是调用native方法。通过Unsafe的CAS指令实现线程安全地、原子性自增操作。
问题
这里提前给出结论,这也是JDK1.8引入LongAdder类的原因:
在多线程竞争不激烈的情况下,这样做是合适的。但是如果线程竞争激烈,会造成大量线程在原地打转、不停尝试去修改值,但再次发现值已被修改,于是继续自旋。浪费大量的CPU资源。
AtomicLong持有的成员变量value是volatile关键字修饰的,线程修改临界资源后,需要刷新到其他线程,也是要费一番功夫的。
严重情况下,volatile可能导致总线风暴问题。
AtomicReference
AtomicReference是JDK1.5开始提供的一种用于原子操作对象引用的类,位于java.util.concurrent.atomic
包。提供一种非阻塞的线程安全方式来更新和读取对象引用,使用的是底层的原子操作来保证操作的原子性。
适用场景
- 非阻塞算法:在多线程环境下需要实现高效、低延迟的非阻塞算法时,AtomicReference是一个很好的选择。它通过CAS操作避免传统锁机制的开销,适用于对性能要求高的场景;
- 引用类型的原子更新:当需要对引用类型(如对象)的更新操作保持原子性时,可使用AtomicReference。例如,在单例模式中,通过CAS操作来确保单例实例的唯一性;
- 构建高效的并发数据结构:可以用于构建各种并发数据结构,如无锁队列、无锁栈、无锁链表等;
- 实现不可变对象的原子替换:对于不可变对象,可使用AtomicReference实现原子替换操作,保证在多线程环境下的安全性。例如,引用类型的配置更新,确保配置更新的原子性和可见性。
实例
使用AtomicReference实现简单CAS操作:
@Slf4j
public class AtomicReferenceExample {
public static void main(String[] args) {
AtomicReference<String> atomicString = new AtomicReference<>("initialValue");
log.info("Current value: " + atomicString.get());
// 使用compareAndSet原子更新值
boolean success = atomicString.compareAndSet("initialValue", "newValue");
log.info("Updated value: " + atomicString.get());
// 直接设置新值
atomicString.set("anotherValue");
log.info("Directly set value: " + atomicString.get());
}
}
可知:和AtomicLong非常相似。
原理
- CAS操作(Compare-And-Set):基于硬件支持的CAS原子操作实现。这种操作包括三个步骤:比较当前值是否等于预期值,如果相等则将其更新为新值,如果不相等则不进行任何操作。CAS是一种无锁(lock-free)算法,可避免传统锁机制带来的性能问题和死锁风险。
- 内存可见性:使用volatile关键字确保引用变量的可见性。所有对AtomicReference的写操作都会立即对所有线程可见,读取操作也总是能获取最新的值。
AtomicReference提供的一系列方法:
get()
:获取当前的引用值;set(V newValue)
:设置当前引用值为新值;lazySet(V newValue)
:惰性设置为新值;compareAndSet(V expect, V update)
:如果当前引用值等于预期值,则将其更新为新值;getAndSet(V newValue)
:获取当前引用值并设置为新值;weakCompareAndSet(V expect, V update)
:类似于compareAndSet
,但在一些平台上可能有更高的性能;weakCompareAndSetPlain(V expectedValue, V newValue)
:getAndUpdate(UnaryOperator<V> updateFunc)
:updateAndGet(UnaryOperator<V> updateFunc)
:getAndAccumulate(V x, BinaryOperator<V> accumulatorFunc)
:accumulateAndGet(V x, BinaryOperator<V> accumulatorFunc)
:
JDK9又新增一系列方法:
- getPlain
- setPlain
- getOpaque
- setOpaque
- getAcquire
- setRelease
- compareAndExchange
- compareAndExchangeAcquire
- compareAndExchangeRelease
- weakCompareAndSetVolatile
- weakCompareAndSetAcquire
- weakCompareAndSetRelease
总结
AtomicReference是一种高效、线程安全的工具,用于在并发环境中进行原子引用操作。它依赖于CAS操作来实现无锁的原子性更新,适用于构建高性能的并发数据结构和非阻塞算法。在实际应用中,可根据具体需求选择使用AtomicReference来替代传统的锁机制,以提高并发程序的性能和可靠性。
AtomicIntegerFieldUpdater
AtomicXXXFieldUpdater是JDK提供的一种用于更新对象中某些字段值的工具,提供一种低开销、高性能的方式来对对象的字段进行原子操作。适合某些特定场景,尤其是在需要对对象的整型字段实现线程安全更新时。包括:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater。下面以AtomicIntegerFieldUpdater为例讲解。
适用场景:
- 高性能计数器或标志位更新:需要频繁更新对象中的某个计数器或标志字段,但又不想将整个对象进行锁定;
- 基于 POJO 对象的非静态字段更新:如果字段不能声明为 AtomicInteger(如 POJO 中的普通整型字段),AtomicIntegerFieldUpdater 提供了原子化更新的方法;
- 内存优化场景:相比直接使用AtomicInteger,通过AtomicIntegerFieldUpdater不需要为每个整型字段额外创建一个AtomicInteger对象;
- 避免锁开销:替代传统的synchronized或ReentrantLock,减少线程间竞争和锁的性能开销。适合高并发环境下的小粒度操作;
- 共享对象字段的线程安全管理:在缓存、计数器池等场景中,共享对象的某个字段需要被多个线程安全地读写。
AtomicIntegerFieldUpdater的核心优势在于提供对对象字段的原子操作,节省内存,减少锁开销。适用于高性能的计数或标志更新场景,但由于功能较为有限,更多的复杂线程同步需求可能需要其他工具(如Lock或ConcurrentHashMap)。
示例
假设需要对某个对象的int字段count进行线程安全的递增操作:
public class Example {
// 字段必须是volatile、非静态、int基础类型
private volatile int count;
// 创建 AtomicIntegerFieldUpdater
private static final AtomicIntegerFieldUpdater<Example> updater = AtomicIntegerFieldUpdater.newUpdater(Example.class, "count");
public int increment() {
return updater.incrementAndGet(this); // 原子递增
}
public int get() {
return updater.get(this); // 获取值
}
public static void main(String[] args) {
Example example = new Example();
log.info(example.increment()); // 输出:1
}
}
原理
AtomicIntegerFieldUpdater是一个抽象泛型类,构造函数是protected,不能直接构造其对象,必须通过它提供的一个静态方法来创建:
// 受保护的不执行任何操作的构造函数,供子类使用
protected AtomicIntegerFieldUpdater() {
}
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>(tclass, fieldName, Reflection.getCallerClass());
}
newUpdater()
静态方法传入的是要修改的类和对应的成员变量的名字,内部通过反射拿到这个类的成员变量,然后包装成一个AtomicIntegerFieldUpdater对象(即AtomicIntegerFieldUpdaterImpl)。所以,这个对象表示的是类的某个成员,而不是对象的成员变量。
然后可使用其一系列方法:
- compareAndSet
- weakCompareAndSet
- set
- lazySet
- get
- getAndSet
- getAndIncrement
- getAndDecrement
- getAndAdd
- incrementAndGet
- decrementAndGet
- addAndGet
- getAndUpdate
- updateAndGet
- getAndAccumulate
- accumulateAndGet
以getAndAdd方法为例,其源码如下:
public int getAndAdd(T obj, int delta) {
int prev, next;
do {
prev = get(obj);
next = prev + delta;
} while (!compareAndSet(obj, prev, next));
return prev;
}
get方法如下:
public final int get(T obj) {
accessCheck(obj);
return U.getIntVolatile(obj, offset);
}
compareAndSet方法如下:
public final boolean compareAndSet(T obj, int expect, int update) {
accessCheck(obj);
return U.compareAndSetInt(obj, offset, expect, update);
}
accecssCheck方法检查该obj是不是cclass类型,如果不是,则拒绝修改并抛出异常。
U是Unsafe,使用其CAS方法,参考Unsafe入门讲解。
AtomicIntegerFieldUpdaterImpl是一个静态私有内部实现类Impl,其构造函数为:
AtomicIntegerFieldUpdaterImpl(final Class<T> tclass, final String fieldName, final Class<?> caller) {
final Field field;
final int modifiers;
try {
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
return tclass.getDeclaredField(fieldName);
}
});
modifiers = field.getModifiers();
sun.reflect.misc.ReflectUtil.ensureMemberAccess(caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) && ((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
} catch (PrivilegedActionException pae) {
throw new RuntimeException(pae.getException());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");
// 受保护字段成员的访问仅限于访问类或其子类之一的接收者,并且访问类又必须是受保护成员定义类的子类(或包兄弟)
// 如果更新程序引用当前包之外的声明类的受保护字段,则接收者参数将缩小到访问类的类型
this.cclass = (Modifier.isProtected(modifiers) && tclass.isAssignableFrom(caller) && !isSamePackage(tclass, caller)) ? caller : tclass;
this.tclass = tclass;
this.offset = U.objectFieldOffset(field);
}
可知:要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是包装类)。
对比
工具 | 特点 | 适用场景 |
---|---|---|
AtomicIntegerFieldUpdater | 低内存开销,对对象的字段进行原子更新 | 对对象的单一字段进行高性能原子操作 |
AtomicInteger | 独立对象的原子更新,使用简单,但需要额外的内存空间 | 独立计数器或标志位 |
synchronized/Lock | 适合复杂的临界区保护 | 复杂的线程同步逻辑 |
LongAdder | 提供高并发场景下的高效累加器,减少线程争用 | 高并发场景的全局计数器 |
LongAdder
JDK1.8中新加入一个原子类LongAdder,该类可保证Long类型操作的原子性。相对于AtomicLong,LongAdder有着更高的性能(减少乐观锁的重试次数)和更好的表现,可完全替代AtomicLong来进行原子操作。
当大量线程同时访问AtomicLong时,会因为大量线程执行CAS操作失败而进行空旋转,导致CPU资源消耗过多,执行效率不高。
示例
直接给出代码:
public class LongAdderExample {
private static final LongAdder counter = new LongAdder();
public static void increment() {
counter.increment();
}
public static long get() {
return counter.sum();
}
}
可见:实现起来非常简单。
原理
基于CAS分段锁的思想实现的。线程去读写一个LongAdder类型的变量时的流程:
LongAdder基于Unsafe提供的CAS操作+valitale去实现的。在LongAdder的父类Striped64中维护着base变量和cells数组,当多个线程操作一个变量时,先会在这个base变量上进行CAS操作,当它发现线程增多时,就会使用cells数组。比如当base将要更新时发现线程增多(调用casBase方法更新base值失败),则会自动使用cells数组,每一个线程对应于一个cell,在每一个线程中对该cells进行CAS操作,这样就可以将单一value的更新压力分担到多个value中去,降低单个value的热度,同时也减少大量线程的空转,提高并发效率,分散并发压力。这种分段锁需要额外维护一个内存空间cells,在高并发场景下,这点成本几乎可忽略。
利用空间来换时间,将热点value分散成一个Cell列表来承接并发的CAS,以此来提升性能。
Benchmark
一个简单不太严谨的Benchmark性能测试:
@Slf4j
public class AtomicLongTester {
private static AtomicLong numA = new AtomicLong(0);
private static LongAdder numB = new LongAdder();
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i < 10001; i*=10) {
test(false, i);
test(true, i);
}
}
public static void test(boolean isLongAdder, int threadCount) throws InterruptedException {
long starTime = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 100000; i++) {
if (isLongAdder) {
numB.add(1);
} else {
numA.addAndGet(1);
}
}
latch.countDown();
}
}).start();
}
// 等待所有运算结束
latch.await();
if (isLongAdder) {
log.info("Thread Count=" + threadCount + ", LongAdder cost ms=" + (System.currentTimeMillis() - starTime) + ", Result=" + numB.sum());
} else {
log.info("Thread Count=" + threadCount + ", AtomicLong cost ms=" + (System.currentTimeMillis() - starTime) + ", Result=" + numA.get());
}
numA = new AtomicLong(0);
numB = new LongAdder();
}
}
某次运行结果:
Thread Count=1, AtomicLong cost ms=9, Result=100000
Thread Count=1, LongAdder cost ms=13, Result=100000
Thread Count=10, AtomicLong cost ms=14, Result=1000000
Thread Count=10, LongAdder cost ms=41, Result=1000000
Thread Count=100, AtomicLong cost ms=111, Result=10000000
Thread Count=100, LongAdder cost ms=45, Result=10000000
Thread Count=1000, AtomicLong cost ms=1456, Result=100000000
Thread Count=1000, LongAdder cost ms=379, Result=100000000
Thread Count=10000, AtomicLong cost ms=17452, Result=1000000000
Thread Count=10000, LongAdder cost ms=3545, Result=1000000000
可见:当线程竞争率比较低时,AtomicLong效率优于LongAdder的,但当线程竞争率增大时,可看出LongAdder的性能远远高于AtomicLong。
对比
对比项 | AtomicLong | LongAdder |
---|---|---|
原理 | Unsafe类CAS指令 | 分段累加;使用内部的多个Cell(类似于分段桶),每个线程可独立更新不同Cell,减少竞争 |
空间开销 | 只需一个long变量,内存占用小 | 内部包含多个Cell,每个Cell是一个独立变量,占用更多内存 |
低并发 | 更新只需要操作单一变量,执行路径更短;性能较优且实现简单 | 需初始化多个Cell,即使只有一个线程使用,也需要额外的内存和初始化开销 |
高并发 | 多线程更新同一变量时,频繁CAS重试会导致性能瓶颈 | 减少线程争用,吞吐量更高 |
精度要求高 | 更适合 | 当需要获取总值时,会将所有Cell的值汇总;短时间内的结果可能不完全一致 |
LongAccumulator
有待学习。