JDK源码

发布于:2025-08-05 ⋅ 阅读:(14) ⋅ 点赞:(0)

java.util.concurrent

以下是atomic包下的

AtomicInteger

Unsafe类:提供的方法可以直接访问内存、线程。
属性:Unsafe、int value
通过Unsafe方法中的CAS循环,保证int类型值的原子操作

int var5;
do {
    var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;

AtomicInteger

属性:Unsafe、int value 0、1代表true、false
保证boolean类型值的原子操作

AtomicIntegerArray

属性:Unsafe、int[] array
保证int[]数组中的每一个索引元素的原子操作

AtomicMarkableReference

保证对一个对象和一个boolean类型的标识符的原子操作。

A initialRef = new A();
boolean initialMark = false;
AtomicMarkableReference<A> amr = new AtomicMarkableReference<>(initialRef, initialMark);
boolean[] markHolder = new boolean[1];
A expectedReference = amr.get(markHolder); // amr中的对象引用
boolean expectedMark = markHolder[0]; // amr中的标识符
A newReference = new A();
boolean newMark = true;
System.out.println(amr.compareAndSet(expectedReference, newReference, expectedMark, newMark));

AtomicReference

实现对一个对象引用的原子操作。

AtomicReference<A> ar = new AtomicReference<>();
A origin = ar.get();
// origin = new A();
A newA = new A();
System.out.println(ar.compareAndSet(origin, newA));

AtomicStampedReference

实现对对象引用和int类型的版本戳的原子操作,
解决ABA问题(A修改为B,再修改回A,另一个线程的CAS误认为A没有修改,导致CAS成功)

A initialRef = new A();
int initialStamp = 0;
AtomicStampedReference<A> asr = new AtomicStampedReference<>(initialRef, initialStamp);
int[] stampHolder = new int[1];
A expectedReference = asr.get(stampHolder);
int expectedStamp = stampHolder[0];
A newReference = new A();
int newStamp = expectedStamp + 1;
System.out.println(asr.compareAndSet(expectedReference, newReference, expectedStamp, newStamp));

LongAdder

以空间换时间,内部有多个单元格数组,add时,可能在数组的不同位置进行CAS,避免了CAS的冲突,提高的CAS的成功率。
相较于AtomicInteger,性能高、但内存开销大。

ExecutorService threadPool = Executors.newFixedThreadPool(5);
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
 threadPool.execute(() -> {
     for (int j = 0; j < 100000; j++) {
         la.increment();
     }
     countDownLatch.countDown();
 });
}
countDownLatch.await();
System.out.println(la.sum());
System.out.println(la.intValue());

一下是locks包下的

ReentrantLock

可重入式独占锁。

static int num = 0;
static ReentrantLock rl = new ReentrantLock();

public static void main(String[] args) throws InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    CountDownLatch countDownLatch = new CountDownLatch(2);
    threadPool.execute(() -> {
        rl.lock();
        try {
            TimeUnit.SECONDS.sleep(3); // 保证第二个线程的tryLock失败
            num++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rl.unlock();
        }
        countDownLatch.countDown();
        ;
    });
    threadPool.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            if (rl.tryLock(1, TimeUnit.SECONDS)) {
                try {
                    num++;
                } finally {
                    rl.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch.countDown();
    });
    countDownLatch.await();
    System.out.println(num);
}

原理分析

ReentrantLock含有属性Sync,Sync继承AQS(AbstractQueuedSynchronizer)类,
其中AQS中含有属性:unsafe、state、exclusiveOwnerThread,头节点node:head、尾节点node:tail,
其中state=0锁未被使用,state=1代表被使用且未被重入,state=2代表被某个线程使用且重入一次,exclusiveOwnerThread表示持有当前锁的线程。node节点含有属性:prev、next、thread对象,首次初始化时,head=tail=new Node()

reentrantLock.lock();

1、尝试获取锁
尝试通过unsafe.CAS将state从0替换成1(其中公平锁与非公平锁的区别是,非公平锁直接尝试通过CAS将state将0替换成1,而公平锁会判断工作队列中是否有其他等待节点,如果有,就不尝试CAS,如果没有,才会尝试CAS),
如果成功,更新exclusiveOwnerThread=当前线程,逻辑结束,再判断exclusiveOwnerThread是否等于当前线程,如果等于,state++,逻辑结束。

2、将当前线程封装的node加入到等待队列
创建Node,属性thread=当前线程。进行CAS循环,不断尝试将创建的node赋值给tail,且node.prev=原tail,原tail.next=node

3、通过for循环不断尝试获取锁
A.循环逻辑1:判断prev.waitStatus的值,如果等于-1(signal),逻辑结束;如果等于0,通过CAS尝试将waitStatus从0变为-1,如果大于0,清理node的所有waitStatus>0的prev节点,再继续走下一个循环逻辑1。(前驱节点的waitStatus=-1,表示前驱节点释放锁时,会唤醒当前节点(当前线程))
B.循环逻辑2:循环逻辑1中如果成功将前驱节点的waitStatus改为-1,此时通过unsafe.park()将当前线程置为阻塞状态,再通过Thread.interrupted()获取当前线程的中断标志(AQS中,即使线程被中断,也会继续参与锁竞争),如果中断标志为true,循环结束后会调用Thread.currentThread().interrupt();继续修改当前线程的中断标志为true
C.循环逻辑3:判断prev是否是头节点head,如果是,调用第一步的逻辑尝试获取锁,获取失败就继续走下一个循环,如果获取成功,设置头节点head=当前节点
TODO:finally里的逻辑

reentrantLock.unLock();

判断exclusiveOwnerThread是否等于当前线程,如果不等于直接报错。
state–,如果最新的state=0,设置exclusiveOwnerThread=null,且走下面逻辑:
如果head.waitStatus<0,即-1,CAS尝试从-1替换为0,

Condition

实现等待通知机制。

public class MyTest {

    static ReentrantLock reentrantLock = new ReentrantLock();
    static Condition condition = reentrantLock.newCondition();
    static int num = 1;

    public static void main(String[] args) throws InterruptedException {
        // 两个线程交替打印
        two();
    }

    private static void two() throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 1; i < 100; i++) {
                reentrantLock.lock();
                try {
                    while (num % 2 == 0) {
                        condition.await(); // 释放锁并等待
                    }
                    System.out.println("线程1===>" + i);
                    condition.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 1; i < 100; i++) {
                reentrantLock.lock();
                try {
                    while (num % 1 == 0) {
                        condition.await(); // 释放锁并等待 
                    }
                    System.out.println("线程2===>" + i);
                    condition.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("END");
    }
}

ReentrantReadWriteLock

用于优化读多写少场景下的性能。
读写锁不能被同时拥有。
在没有线程拥有写锁的前提下,多个线程可以同时拥有读锁。
在没有线程拥有读锁的前提下,只能有一个线程拥有写锁。
存在问题:拥有读锁不断被使用,导致写锁可能长时间无法获取。

public class MyTest {

    static int num = 0;
    static ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock();
    static ReentrantReadWriteLock.ReadLock readLock = rrwl.readLock();
    static ReentrantReadWriteLock.WriteLock writeLock = rrwl.writeLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        threadPool.execute(() -> {
            readLock.lock();
            try {
                TimeUnit.SECONDS.sleep(4);
                System.out.println("线程1==>" + num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                readLock.unlock();
            }
            countDownLatch.countDown();
        });
        threadPool.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                if (readLock.tryLock(1, TimeUnit.SECONDS)) {
                    try {
                        System.out.println("线程2==>" + num);
                    } finally {
                        readLock.unlock();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
        });
        threadPool.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                if (writeLock.tryLock(1, TimeUnit.SECONDS)) {
                    try {
                        num++;
                    } finally {
                        writeLock.unlock();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
        });
        countDownLatch.await();
        System.out.println(num);
    }
}

StampedLock

读写锁,与ReentrantReadWriteLock的区别是提供一种新的读锁:乐观读锁。
获取乐观读锁的操作并不会阻塞其他线程获取读锁,但是我们要校验读锁的有效性,即中间是否有其他线程获取了写锁,如果有,再尝试获取读锁。

public class MyTest {

    static int num = 0;
    static StampedLock sl = new StampedLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        threadPool.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long stamp = sl.tryOptimisticRead();
            if (!sl.validate(stamp)) {
                stamp = sl.readLock();
                try {
                    System.out.println(stamp + "悲观线程1===>" + num);

                } finally {
                    sl.unlockRead(stamp);
                }
            } else {
                System.out.println(stamp + "线程1===>" + num);
            }
            countDownLatch.countDown();
            /*long stamp = sl.readLock();
            try {
                TimeUnit.SECONDS.sleep(4);
                System.out.println(stamp + "线程1===>" + num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                sl.unlockRead(stamp);
            }
            countDownLatch.countDown();*/
        });
        threadPool.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long stamp = sl.tryOptimisticRead();
            if (!sl.validate(stamp)) {
                stamp = sl.readLock();
                try {
                    System.out.println(stamp + "悲观线程2===>" + num);

                } finally {
                    sl.unlockRead(stamp);
                }
            } else {
                System.out.println(stamp + "线程2===>" + num);
            }
            countDownLatch.countDown();
        });
        threadPool.execute(() -> {
            /*try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
            long stamp = sl.writeLock();
            try {
                TimeUnit.SECONDS.sleep(2);
                num++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                sl.unlockWrite(stamp);
            }
            countDownLatch.countDown();
        });
        countDownLatch.await();
        System.out.println(num);
    }
}

一下是juc包下的

ArrayBlockingQueue

基于数组实现的有界阻塞队列。

CompletableFuture

四种创建方式:

ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture.runAsync(() -> {});
CompletableFuture.runAsync(() -> {}, threadPool);
CompletableFuture.supplyAsync(() -> null);
CompletableFuture.supplyAsync(() -> null, threadPool);

1、阻塞主线程,获取结果

ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return new Random().nextInt(10);
}, threadPool);
Integer join = completableFuture.join(); // 阻塞当前线程

2、任务串行执行
不能先创建CompletableFuture对象,在调用completableFuture.handle() 这样会导致join()先于handle()执行。
方式一:获取上一步的返回结果和异常信息

ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("第一步返回结果:" + 1);
    return 1;
}, threadPool).handle((result, exception) -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (exception == null) {
        System.out.println("上一步结果:" + result);
    } else {
        System.out.println("上一步异常:");
        exception.printStackTrace();
    }
    return 2;
});

方式二:thenRun()方法异步执行,不影响join()返回结果,所以先执行了join()

ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(3);
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
   System.out.println("第一步返回结果:" + 1);
   return 1;
}, threadPool);
completableFuture.thenRun(() -> {
   try {
       TimeUnit.SECONDS.sleep(2);
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
   System.out.println("第二步以触发");
});
System.out.println(completableFuture.join());
}

3、任务间的交互

public static void main(String[] args) throws InterruptedException {
   ExecutorService threadPool = Executors.newFixedThreadPool(5);
   CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
       try {
           TimeUnit.SECONDS.sleep(1);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println("返回1");
       return 1;
   }, threadPool);
   CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
       try {
           TimeUnit.SECONDS.sleep(3);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println("返回3");
       return 3;
   }, threadPool);
   System.out.println(f1.thenCombine(f2, (r1, r2) -> {
       System.out.println("r1=" + r1);
       System.out.println("r2=" + r2);
       return r1 + r2;
   }).join());
}

ConcurrentHashMap

多线程环境下可以保证线程安全的map容器。

ConcurrentLinkedDeque

TODO:JRB

ConcurrentLinkedQueue

TODO:JRB

ConcurrentSkipListMap

TODO:JRB

ConcurrentSkipListSet

TODO:JRB

CopyOnWriteArrayList

TODO:JRB

CopyOnWriteArraySet

TODO:JRB

CountDownLatch

通过await()阻塞当前线程执行,直到指定次数的countDown()执行完

CyclicBarrier

cyclicBarrier.await()时,直有该cyclicBarrier的所有的await()都触发时,才会往下执行。
可以用多次。

public static void main(String[] args) throws InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        System.out.println("3个await()执行完就触发");
    });
    threadPool.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println("线程一完成1阶段");
            cyclicBarrier.await();
            TimeUnit.SECONDS.sleep(2);
            System.out.println("线程一完成2阶段");
            cyclicBarrier.await();
            TimeUnit.SECONDS.sleep(3);
            System.out.println("线程一完成3阶段");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
    threadPool.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("线程二完成1阶段");
            cyclicBarrier.await();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("线程二完成2阶段");
            cyclicBarrier.await();
            TimeUnit.SECONDS.sleep(2);
            System.out.println("线程二完成3阶段");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
    threadPool.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("线程三完成1阶段");
            cyclicBarrier.await();
            TimeUnit.SECONDS.sleep(2);
            System.out.println("线程三完成2阶段");
            cyclicBarrier.await();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("线程三完成3阶段");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
}

输出结果:
线程一完成1阶段
线程二完成1阶段
线程三完成1阶段
3个await()执行完就触发
线程二完成2阶段
线程一完成2阶段
线程三完成2阶段
3个await()执行完就触发
线程三完成3阶段
线程二完成3阶段
线程一完成3阶段

DelayQueue

TODO:JRB

Exchanger

用于两个线程之间交换数据。

public static void main(String[] args) throws InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    Exchanger<Integer> exchanger = new Exchanger<>();
    threadPool.execute(() -> {
        int result = 0;
        for (int i = 0; i < 5; i++) {
            result += i;
        }
        Integer sum = Integer.valueOf(result);
        try {
            TimeUnit.SECONDS.sleep(3);
            Integer exchange = exchanger.exchange(sum);
            System.out.println("线程一收到结果:" + exchange);
            System.out.println("总和=" + sum + exchange);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    threadPool.execute(() -> {
        int result = 0;
        for (int i = 0; i < 5; i++) {
            result += i;
        }
        Integer sum = Integer.valueOf(result);
        try {
            TimeUnit.SECONDS.sleep(1);
            // 阻塞,知道另一个exchanger.exchange()执行
            Integer exchange = exchanger.exchange(sum);
            // 非阻塞,超时会报TimeoutException
            //Integer exchange = exchanger.exchange(sum, 1, TimeUnit.SECONDS);
            System.out.println("线程二收到结果:" + exchange);
            System.out.println("总和=" + sum + exchange);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

ExecutorCompletionService

TODO:JRB

Executors

TODO:JRB

ForkJoinPool

将一个复杂的大任务递归拆分为多个小任务(fork),直到子任务足够小可以直接处理;然后合并所有子任务的结果(join),得到最终结果。例如:计算 1~1000000 的和,可拆分为 1100000、100001200000 等子任务,分别计算后汇总。

Thread

new Thread().interrupt();
作用:设置目前线程的中断标志为true,此时若目标线程处于阻塞状态(如 sleep()/wait()),会立即抛出 InterruptedException并​​清除中断标志。

new Thread().isInterrupted();
作用:仅查询目前线程的中断标志

Thread.interrupted();
作用:查询当前线程的中断标志,并重置(重置为false)


网站公告

今日签到

点亮在社区的每一天
去签到