0 常用并发同步工具类的真实应用场景
JDK 提供了比
synchronized
更加高级的各种同步工具,包括ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等,可以实现更加丰富的多线程操作;
1 ReentrantLock(可重入的占用锁)
1.1 简介
ReentrantLock
是可重入的独占锁;- “可重入”是指同一线程能多次获取同一把锁,不会自己阻塞自己;
- “独占”是说同一时间,最多只有一个线程能成功拿到锁,其他线程得等待;
- 和
synchronized
作用类似,都是解决多线程并发访问共享资源时的线程安全问题;
相比
synchronized
,ReentrantLock
多了这些灵活特性:可中断:获取锁的过程中,线程能响应中断(比如其他地方调用了
interrupt()
),不用死等锁释放,更灵活控制执行流程;可设置超时时间:调用
tryLock(long timeout, TimeUnit unit)
时,线程在指定时间内没拿到锁,就会放弃尝试,避免无限阻塞;可设置为公平锁:默认
ReentrantLock
是 “非公平锁”(新线程和等待队列里的线程抢锁,可能插队),但它支持通过构造方法ReentrantLock(true)
设为“公平锁”,严格按线程等待顺序分配锁,减少线程饥饿(某些线程一直拿不到锁);
与
synchronized
一样,都支持可重入:synchronized
靠wait/notify
实现线程通信,只能关联一个等待队列;ReentrantLock
可通过newCondition()
创建多个Condition
,精准控制不同线程的等待 / 唤醒,比如生产者 - 消费者模型里,能区分 “生产条件”“消费条件” 分别处理;
应用场景:多线程抢共享资源时,需要独占访问保证数据安全,比如卖票系统(如下两图)、银行账户转账;
- 线程 A、B 抢锁:线程 A、B 同时尝试获取锁,假设线程 A 先拿到(锁的独占性,同一时间只有 A 能持有),此时 A 可以操作共享资源(比如修改车票库存 ),B 因为没抢到,进入 “等待” 状态;
- 线程 A 释放锁:A 操作完共享资源后,会释放锁;接着 B 再次尝试获取锁,这次就能成功拿到,然后 B 开始操作共享资源(修改车票库存)。
1.2 常用API
ReentrantLock 实现了Lock接口规范,常见API如下:
方法 方法声明 功能说明 lock
void lock()
获取锁,调用该方法当前线程会获取锁,当锁获得后,该方法返回 lockInterruptibly
void lockInterruptibly() throws InterruptedException
可中断的获取锁,和 lock()
方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程tryLock
boolean tryLock()
尝试非阻塞的获取锁,调用该方法后立即返回。如果能够获取到锁返回 true
,否则返回false
tryLock
(带超时)boolean tryLock(long time, TimeUnit unit) throws InterruptedException
超时获取锁,当前线程在以下三种情况下会返回:当前线程在超时时间内获取了锁;当前线程在超时时间内被中断;超时时间结束,返回 false
unlock
void unlock()
释放锁 newCondition
Condition newCondition()
获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的 await()
方法,而调用后,当前线程将释放锁基本用法:
private final Lock lock = new ReentrantLock(); public void foo() { // 获取锁 lock.lock(); try { // 程序执行逻辑 } finally { // finally语句块可以确保lock被正确释放 lock.unlock(); } } // 尝试获取锁,最多等待 100 毫秒 if (lock.tryLock(100, TimeUnit.MILLISECONDS)) { try { // 成功获取到锁,执行需要同步的代码块 // ... 执行一些操作 ... } finally { // 释放锁 lock.unlock(); } } else { // 超时后仍未获取到锁,执行备选逻辑 // ... 执行一些不需要同步的操作 ... }
在使用时要注意以下 4 个问题:
- 默认情况下 ReentrantLock 为非公平锁而非公平锁;
- 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
- 加锁操作一定要放在
try
代码之前,这样可以避免未加锁成功又释放锁的异常; - 释放锁一定要放在
finally
中,否则会导致线程阻塞;
工作原理:
当有线程调用
lock
方法时,会用 CAS(Compare-And-Swap,比较并交换) 操作,尝试把 AQS(AbstractQueuedSynchronizer,抽象队列同步器,Java 并发包的核心基础组件 )内部的state
变量从0
改成1
;state=0
表示“锁没人用”,CAS 成功 → 线程拿到锁,开始执行临界区代码(操作共享资源);state=1
表示“锁被占用”,CAS 失败 → 线程抢锁失败,进入阻塞队列(CLH 队列,按 FIFO 排队 ) 等待;
抢锁失败的线程,会被包装成节点(Node),加入队列尾部(tail),队列头部是
head
节点(代表 “即将拿到锁的线程”);- 队列里的线程,都在等锁释放,避免线程忙等(一直重试抢锁,浪费 CPU 资源);
- 队列是 FIFO(先进先出) 顺序,理论上保证线程公平性,但实际还受“公平锁 / 非公平锁”策略影响;
当持有锁的线程执行完
unlock()
,会把state
改回0
(释放锁),然后唤醒队列里的线程。这时分两种策略:公平锁(
ReentrantLock(true)
):严格按队列顺序唤醒:释放锁后,优先唤醒head
节点的下一个节点(head.next
),让队列里“等最久”的线程拿到锁;- 优点:绝对公平,避免线程 饥饿(某些线程一直抢不到锁);
- 缺点:频繁唤醒 / 切换线程,性能略低(线程上下文切换有开销);
- 优点:绝对公平,避免线程 饥饿(某些线程一直抢不到锁);
非公平锁(默认策略,
ReentrantLock()
):释放锁后,不严格按队列顺序,允许新线程和队列里被唤醒的线程重新用 CAS 抢锁:新线程(没进队列的)可能直接 CAS 抢锁成功(插队),不用进队列等;
队列里的线程也会被唤醒,参与竞争;
优点:减少线程切换,吞吐量更高(适合竞争不激烈的场景);
缺点:可能让队列里的线程等更久,存在小概率线程饥饿;
1.3 使用
1.3.1 独占锁
模拟抢票场景。8张票,10个人抢,如果不加锁,会出现什么问题?
/** * 模拟抢票场景 */ public class ReentrantLockDemo { // 创建 ReentrantLock 实例,默认使用非公平锁策略 private final ReentrantLock lock = new ReentrantLock();//默认非公平 // 共享资源:总票数,会有多个线程同时操作这个变量 private static int tickets = 8; /** * 购买车票的方法 * 核心逻辑:通过加锁保证同一时间只有一个线程能执行购票操作 */ public void buyTicket() { // 1. 获取锁:调用 lock() 方法,当前线程会尝试获取锁 // 如果锁未被占用,则当前线程获得锁并继续执行 // 如果锁已被占用,则当前线程会进入阻塞队列等待 lock.lock(); // 获取锁 // 2. try-finally 结构保证锁一定会被释放 // 即使代码执行过程中发生异常,finally 块也会执行解锁操作 try { // 3. 临界区:操作共享资源(tickets 变量) if (tickets > 0) { // 检查是否还有剩余车票 try { // 休眠 10ms,放大并发问题的可能性 // 如果不加锁,这里会出现多个线程同时进入判断并扣减票数的情况 Thread.sleep(10); // 模拟出并发效果 } catch (InterruptedException e) { e.printStackTrace(); } // 打印购票信息,并将票数减 1(原子操作) System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票"); } else { // 票已售罄时的提示 System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败"); } } finally { // 4. 释放锁:无论是否发生异常,都必须释放锁 // 否则会导致其他线程永远无法获取锁,造成死锁 lock.unlock(); // 释放锁 } } public static void main(String[] args) { // 创建抢票系统实例(共享同一个锁和票数变量) ReentrantLockDemo ticketSystem = new ReentrantLockDemo(); // 创建 10 个线程模拟 10 个用户抢票(总票数只有 8 张) for (int i = 1; i <= 10; i++) { Thread thread = new Thread(() -> { // 每个线程执行抢票操作 ticketSystem.buyTicket(); // 抢票 }, "线程" + i); // 给线程命名,方便观察输出 // 启动线程,线程进入就绪状态,等待 CPU 调度 thread.start(); } try { // 主线程休眠 3000ms,等待所有抢票线程执行完毕 // 避免主线程提前打印剩余票数 Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } // 打印最终剩余票数,验证是否正确(应该为 0) System.out.println("剩余票数:" + tickets); } }
不加锁:出现超卖问题
加锁:正常,两人抢票失败
1.3.2 公平锁和非公平锁
ReentrantLock 支持公平锁和非公平锁两种模式:
- 公平锁:线程在获取锁时,按照线程等待的先后顺序获取锁;
- 非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock 默认是非公平锁;
ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁 ReentrantLock lock = new ReentrantLock(true); //公平锁
比如买票的时候就有可能出现插队的场景,允许插队就是非公平锁,如下图:
1.3.3 可重入锁
可重入锁又名递归锁,是指在同一线程里,只要锁对象相同,内层方法(或代码块)能直接复用已获取的锁,不用重新竞争。比如线程执行
方法A
时拿到锁,方法A
里调用方法B
(也需要同一把锁),线程进方法B
时不用等自己释放锁,直接继续用;Java 中
ReentrantLock
和synchronized
都是可重入锁synchronized
:隐式的(JVM 自动管加锁 / 释放)、可重入的内置锁,只要是同一线程、同一对象锁,内层同步代码直接重入;ReentrantLock
:显式的(手动lock()
加锁、unlock()
释放)可重入锁,功能更灵活(支持公平 / 非公平、可中断、超时获取等),但得手动配对加锁释放,否则容易死锁;
可重入锁的一个优点是可一定程度避免死锁:
- 要是锁不可重入,同一线程内层方法需要锁时,会因为自己占着锁没放,导致自己等自己(阻塞),最后死锁。可重入锁允许同一线程重复拿锁,从设计上就避免了这种自己堵死自己的情况;
- 注意:只是一定程度避免,要是代码逻辑乱(比如忘记释放锁、不同锁交叉嵌套不当),还是可能死锁,只是解决了同一线程重入锁这类场景的死锁风险;
应用场景:
- 递归操作:递归函数里加锁,每次递归调用都是内层“方法”,可重入锁让线程不用反复竞争锁,比如计算阶乘时用
ReentrantLock
保护共享变量,递归调用时直接重入; - 调用同一类其他方法:类里多个
synchronized
方法,线程调完一个调另一个,因为是同一对象锁,直接重入,不用额外处理; - 锁嵌套:多层代码块都需要同一把锁,外层加锁后,内层嵌套的加锁逻辑直接复用,不用释放外层锁再重新加;
- 递归操作:递归函数里加锁,每次递归调用都是内层“方法”,可重入锁让线程不用反复竞争锁,比如计算阶乘时用
例:
class Counter { // 创建 ReentrantLock 对象,作为可重入锁的实例 // ReentrantLock 是显式锁,支持可重入、可中断、公平/非公平等特性 private final ReentrantLock lock = new ReentrantLock(); // 递归调用方法,演示可重入锁的核心场景 public void recursiveCall(int num) { // 1. 获取锁:同一线程再次调用时,可直接重入,不会阻塞自己 // 可重入的关键体现:锁对象识别当前持有线程,允许重复获取 lock.lock(); try { // 递归终止条件:num 减到 0 时停止 if (num == 0) { return; } // 打印当前递归层级,证明方法执行(锁已成功获取) System.out.println("执行递归,num = " + num); // 2. 递归调用自身:再次进入方法时,会再次执行 lock.lock() // 由于是【同一线程】操作【同一把锁】,可直接重入,不会阻塞 recursiveCall(num - 1); } finally { // 3. 释放锁:递归调用多少次,就要释放多少次 // 保证锁的获取与释放次数严格匹配,避免死锁 lock.unlock(); } } // 主方法:测试可重入锁的递归场景 public static void main(String[] args) throws InterruptedException { // 创建 Counter 实例,所有递归调用共享同一把锁 Counter counter = new Counter(); // 启动递归测试:从 num=10 开始调用 // 预期行为:线程安全执行递归,不会因锁重入导致阻塞 counter.recursiveCall(10); } }
1.3.4 Condition 详解
Condition
是 Java 并发包里的线程协调工具,依赖Lock
(如ReentrantLock
)使用,比Object
的wait/notify
更灵活,解决线程间按条件等待 / 唤醒的问题。可以把它理解成:给Lock
搭配“专属等待队列”,让线程能按需等待条件、精准唤醒,而不是像wait/notify
只能用 Object 的单一队列;核心优势(对比
Object.wait/notify
)多条件分离:
Object
里,一个对象只有 1 个等待队列(所有wait()
的线程都挤在一起);Condition
让一个Lock
可以有多个等待队列(比如锁lock
可以创建condition1
、condition2
,不同条件的线程进不同队列),唤醒时能精准选队列,避免唤醒无关线程;
更灵活的等待控制:
- 支持超时等待(
await(long time, TimeUnit unit)
),避免线程无限阻塞; - 唤醒时可选单个唤醒(
signal()
)或全部唤醒(signalAll()
),比notify()
(随机唤醒一个)、notifyAll()
(唤醒全部)更精准;
- 支持超时等待(
核心方法解析
返回值类型 方法 作用说明 void
await()
让当前线程进入等待,直到被 signal()
/signalAll()
唤醒、被中断,或意外唤醒(如假唤醒) 等待前释放当前持有的Lock
,唤醒后重新竞争获取锁,再继续执行boolean
await(long time, TimeUnit unit)
限时等待:等待 time
时间后,若没被唤醒就自动返回false
;被唤醒则返回true
同样会先释放锁,超时 / 唤醒后重新抢锁。void
signal()
唤醒 此 Condition
等待队列中一个线程(选一个唤醒,类似notify()
但更可控) 唤醒后,线程不会直接执行,要重新竞争锁void
signalAll()
唤醒 此 Condition
等待队列中所有线程(类似notifyAll()
) 线程被唤醒后,重新竞争锁,抢到锁的继续执行比如生产者 - 消费者模型中,想区分“队列满了让生产者等”和“队列空了让消费者等”:
用
ReentrantLock
加锁,然后创建两个Condition
:notFull
(生产者等)、notEmpty
(消费者等);生产者发现队列满了 → 调用
notFull.await()
等待;消费者取走数据后 → 调用notFull.signal()
唤醒生产者;消费者发现队列空了 → 调用
notEmpty.await()
等待;生产者放入数据后 → 调用notEmpty.signal()
唤醒消费者;这样就能 精准控制不同条件的线程等待 / 唤醒,比
wait/notify
更清晰。
1.3.5 结合 Condition 实现生产者消费者模式
案例:基于ReentrantLock
和Condition
实现一个简单队列
public class ReentrantLockDemo3 {
public static void main(String[] args) {
// 1. 创建容量为 5 的队列,作为生产者和消费者共享的资源
Queue queue = new Queue(5);
// 2. 启动生产者线程:传入队列,线程执行 Producer 的 run 方法
new Thread(new Producer(queue)).start();
// 3. 启动消费者线程:传入队列,线程执行 Customer 的 run 方法
new Thread(new Customer(queue)).start();
}
}
/**
* 队列封装类:
* 用 ReentrantLock + Condition 实现线程安全的生产者-消费者队列
* 核心逻辑:
* - 队列满时,生产者通过 notFull.await() 等待;
* - 队列空时,消费者通过 notEmpty.await() 等待;
* - 生产/消费后,用 signal() 唤醒对应等待线程
*/
class Queue {
private Object[] items; // 存储队列元素的数组
int size = 0; // 当前队列中元素数量
int takeIndex = 0; // 消费者取元素的索引
int putIndex = 0; // 生产者放元素的索引
private ReentrantLock lock; // 控制并发的锁
public Condition notEmpty; // 消费者等待条件:队列空时阻塞,生产后唤醒
public Condition notFull; // 生产者等待条件:队列满时阻塞,消费后唤醒
// 初始化队列,指定容量
public Queue(int capacity) {
this.items = new Object[capacity];
lock = new ReentrantLock();
// 为同一把锁创建两个 Condition,分别控制“空”和“满”的等待
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 生产者放入元素的方法
* 必须在 lock 保护下调用,保证线程安全
*/
public void put(Object value) throws Exception {
// 加锁:同一时间只有一个线程能操作队列
lock.lock();
try {
// 队列满了(size == 数组长度),让生产者等待
// 用 while 而非 if:防止“假唤醒”后直接执行(需再次检查条件)
while (size == items.length)
notFull.await(); // 释放锁,进入等待队列,直到被唤醒
// 队列有空位,放入元素
items[putIndex] = value;
// 索引循环:如果放到数组末尾,重置为 0
if (++putIndex == items.length)
putIndex = 0;
size++; // 元素数量+1
// 生产完成,唤醒等待的消费者(队列非空了)
notEmpty.signal();
} finally {
// 测试用:打印生产日志(实际可删除或放业务逻辑里)
System.out.println("producer生产:" + value);
// 必须释放锁:无论是否异常,保证锁能被其他线程获取
lock.unlock();
}
}
/**
* 消费者取出元素的方法
* 必须在 lock 保护下调用,保证线程安全
*/
public Object take() throws Exception {
// 加锁:同一时间只有一个线程能操作队列
lock.lock();
try {
// 队列空了(size == 0),让消费者等待
// 用 while 而非 if:防止“假唤醒”后直接执行(需再次检查条件)
while (size == 0)
notEmpty.await(); // 释放锁,进入等待队列,直到被唤醒
// 取出元素
Object value = items[takeIndex];
items[takeIndex] = null; // 清空位置,避免内存泄漏
// 索引循环:如果取到数组末尾,重置为 0
if (++takeIndex == items.length)
takeIndex = 0;
size--; // 元素数量-1
// 消费完成,唤醒等待的生产者(队列非满了)
notFull.signal();
return value; // 返回取出的元素
} finally {
// 释放锁:无论是否异常,保证锁能被其他线程获取
lock.unlock();
}
}
}
/**
* 生产者线程:
* 每隔 1 秒生产一个随机数(0~999),放入队列
*/
class Producer implements Runnable {
private Queue queue; // 共享的队列
public Producer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) { // 无限循环生产
Thread.sleep(1000); // 每隔 1 秒生产一次
// 生产随机数,放入队列
queue.put(new Random().nextInt(1000));
}
} catch (Exception e) {
e.printStackTrace(); // 捕获并打印异常
}
}
}
/**
* 消费者线程:
* 每隔 2 秒从队列取出一个元素,打印消费日志
*/
class Customer implements Runnable {
private Queue queue; // 共享的队列
public Customer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) { // 无限循环消费
Thread.sleep(2000); // 每隔 2 秒消费一次
// 取出元素并打印消费日志
System.out.println("consumer消费:" + queue.take());
}
} catch (Exception e) {
e.printStackTrace(); // 捕获并打印异常
}
}
}
Condition
的作用:notEmpty
:消费者专属等待条件。队列空时,消费者调用notEmpty.await()
释放锁并阻塞;生产者放入元素后,用notEmpty.signal()
唤醒。notFull
:生产者专属等待条件。队列满时,生产者调用notFull.await()
释放锁并阻塞;消费者取出元素后,用notFull.signal()
唤醒;- 注意:
await()
和signal()
都必须被lock.lock()
和lock.unlock()
包裹,即都在 lock 保护范围内;
为什么用
while
而非if
检查条件?防止**“假唤醒”**:线程可能在未被signal()
的情况下醒来(如系统调度)。用while
会重新检查条件,确保队列状态符合预期后再继续执行;锁的配对使用:
lock.lock()
和lock.unlock()
必须成对出现,且unlock()
放在finally
中,保证无论是否发生异常,锁都会释放,避免死锁;生产者-消费者的节奏:生产者 1 秒生产一次,消费者 2 秒消费一次 → 队列会逐渐被填满(生产者更快),但通过
Condition
协调,不会出现“队列满了还生产”或“队列空了还消费”的情况。
1.4 应用场景总结
ReentrantLock
最基本的作用是多线程环境下,让共享资源只能被一个线程独占访问,保证操作共享资源时数据不会乱(比如多个线程同时改同一个变量,用锁让它们排队改);应用场景总结:
解决多线程竞争资源的问题
场景描述:多个线程抢同一资源(比如同时写同一个数据库、操作同一个文件、修改同一个内存变量),需要保证同一时间只有一个线程能改,避免数据冲突;
例子:多个线程同时往数据库同一表插入/修改数据,用
ReentrantLock
加锁,让线程排队执行写操作,保证数据最终是正确的,不会因为并发写入导致数据混乱(比如库存扣减、订单状态修改);
实现多线程任务的顺序执行
场景描述:希望线程 A 执行完某段逻辑后,线程 B 再执行;或者多个线程严格按特定顺序跑任务;
例子:比如线程 1 先初始化配置,线程 2 再加载数据,线程 3 最后处理业务。用
ReentrantLock
配合Condition
(条件变量 ),线程 2 等线程 1 释放锁并发信号后再执行,线程 3 等线程 2 发信号后执行,实现顺序控制;
实现多线程等待/通知机制
场景描述:线程 A 完成某个关键步骤后,需要主动通知线程 B、C 可以继续执行了;或者线程需要等待某个条件满足后再干活(类似生产者 - 消费者模型);
例子:生产者线程生产完数据,通过
ReentrantLock
的Condition
发信号,唤醒等待的消费者线程来处理数据;反之,消费者处理完,也能发信号让生产者继续生产。这比Object
的wait/notify
更灵活,能精准控制哪些线程被唤醒。
2 Semaphore(信号量)
2.1 简介
Semaphore
是多线程同步工具,核心解决控制同时访问共享资源的线程数量,让有限的资源(比如数据库连接、文件句柄)在同一时间被合理数量的线程使用,避免因资源耗尽导致系统崩溃;工作原理:使用
Semaphore
的过程实际上是多个线程获取访问共享资源许可证的过程Semaphore
内部维护一个计数器,可以把它理解成剩余许可证数量。比如设置Semaphore(3)
,就代表最多允许 3 个线程同时用资源,相当于发 3 张“访问许可证”;线程获取许可证(
acquire()
):- 线程要访问共享资源时,必须先调用
acquire()
拿许可证; - 如果计数器 > 0(还有许可证):线程拿到许可证,计数器减 1,然后继续执行(访问资源);
- 如果计数器 == 0(没许可证了):线程会被阻塞,直到有其他线程释放许可证;
- 线程要访问共享资源时,必须先调用
线程释放许可证(
release()
):线程用完资源后,调用release()
归还许可证,计数器加 1,这样阻塞的线程里就有机会拿到新的许可证,继续执行;
Semaphore
专门用来限制资源的并发访问数量,典型场景比如:数据库连接池:假设连接池只有 10 个连接,用
Semaphore(10)
控制,避免几百个线程同时抢连接,把数据库压垮;文件访问:如果一个文件同一时间只能被 3 个线程读写,用
Semaphore(3)
限制,防止文件被疯狂读写导致错误;网络请求:控制同时发起的 HTTP 请求数量,避免把服务器或本地网络打满,保证系统稳定。
2.2 常用API
2.2.1 构造器
构造器是用来创建
Semaphore
对象的,主要决定两件事:- 允许同时访问资源的 最大线程数(许可证数量);
- 线程获取许可证时,是用 公平策略 还是 非公平策略;
Semaphore
有两个构造器:public Semaphore(int permits) { sync = new NonfairSync(permits); }
permits
:设置最大并发数(比如传3
,就代表最多允许 3 个线程同时拿许可证);NonfairSync
:默认用非公平策略。意思是,当许可证释放时,新线程和等待队列里的线程一起抢许可证,新线程可能“插队”,不用严格排队;等价写法:
new Semaphore(3)
等价于new Semaphore(3, false)
,因为默认是非公平的;
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
permits
:同上,设置最大并发数;fair
:布尔值,决定用公平还是非公平策略:fair = true
:用公平策略(FairSync
),线程严格按“等待队列顺序”拿许可证,先等的线程一定先拿到,不会被插队;fair = false
:用非公平策略(NonfairSync
),新线程和等待线程一起抢,可能插队。
2.2.2 acquire
方法
acquire
是Semaphore
用于获取许可证的核心方法,特点是获取不到许可证时,线程会一直阻塞等待,直到拿到许可证或者被中断。它有两种重载形式,适配不同的许可证获取需求;void acquire() throws InterruptedException
尝试获取 1 个许可证。如果当前有可用许可证,直接获取(许可证计数减 1)并返回;如果没有可用许可证,当前线程会进入阻塞状态,直到:
- 其他线程释放许可证(使当前线程有可用许可证),此时当前线程会竞争获取许可证;
- 当前线程被其他线程中断(触发
InterruptedException
);
类比:类似去银行办事,只有 1 个窗口(许可证 = 1),当前窗口有人(没许可证),你就只能排队等着,直到窗口空出来(有人办完业务,释放许可证);
void acquire(int permits) throws InterruptedException
功能:尝试获取指定数量(
permits
)的许可证。如果Semaphore
中剩余许可证数量 ≥permits
,直接获取(许可证计数减permits
)并返回;否则,线程进入阻塞,直到:- 其他线程释放许可证,使剩余许可证 ≥
permits
,当前线程竞争获取; - 当前线程被中断(触发
InterruptedException
);
- 其他线程释放许可证,使剩余许可证 ≥
注意:使用该方法时,要确保最终能释放对应数量的许可证,否则容易导致其他线程长期无法获取足够许可证而阻塞,引发“资源饥饿”问题;
类比:如果银行办理大额业务需要占 2 个窗口(
permits = 2
),只有当至少空出 2 个窗口时,你才能开始办理,否则就得等;
例:主线程先占许可证,子线程等待,主线程释放后子线程再获取
// 1. 创建 Semaphore:许可证数量=1,公平策略(true 表示严格按线程等待顺序分配许可证) final Semaphore semaphore = new Semaphore(1, true); // 2. 主线程直接抢许可证:因为初始许可证是 1,主线程能直接拿到(许可证计数变为 0) semaphore.acquire(); // 3. 创建子线程,尝试获取许可证 Thread t = new Thread(() -> { try { // 子线程执行到这,尝试获取许可证:但主线程已占用(许可证计数 0),所以子线程进入阻塞等待 System.out.println("子线程等待获取permit"); semaphore.acquire(); // 4. 子线程被唤醒(主线程释放许可证后),执行到这,打印获取成功 System.out.println("子线程获取到permit"); } catch (InterruptedException e) { // 子线程等待中被中断时,会走到这 e.printStackTrace(); } finally { // 5. 子线程执行完,释放许可证(许可证计数 +1 ) semaphore.release(); } }); t.start(); // 启动子线程 try { // 6. 主线程休眠 5 秒:模拟做其他事情,期间子线程一直阻塞等待许可证 TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } // 7. 主线程释放许可证(许可证计数从 0 变为 1 ),子线程此时会被唤醒,竞争获取许可证 System.out.println("主线程释放permit"); semaphore.release();
- 初始化:
Semaphore
许可证数量1
,公平策略; - 主线程抢许可证:
semaphore.acquire()
执行后,许可证计数0
,主线程持有许可证; - 子线程启动:执行
semaphore.acquire()
时,因许可证0
,子线程进入阻塞,打印子线程等待获取permit
; - 主线程休眠:5 秒内,子线程一直阻塞;
- 主线程释放许可证:
semaphore.release()
执行,许可证计数1
;因为是公平策略,阻塞的子线程被唤醒,竞争拿到许可证,执行后续逻辑,打印子线程获取到permit
; - 子线程释放许可证:子线程执行
semaphore.release()
,许可证计数0
(子线程获取时减 1,释放时加 1,整体回到初始逻辑)。
- 初始化:
2.2.3 tryAcquire
方法
tryAcquire
是Semaphore
用于尝试获取许可证的方法,特点是:非阻塞优先:如果拿不到许可证,不会一直阻塞,而是直接返回
false
(表示没拿到);灵活控制:支持获取 1 个许可证、获取指定数量许可证、带超时等待等场景,比
acquire
更灵活;
tryAcquire
方法有三种重载形式boolean tryAcquire()
:尝试获取 1 个许可证。如果当前有可用许可证,直接获取(许可证计数 -1),返回true
;否则,直接返回false
(不阻塞);boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
:尝试获取 1 个许可证,但增加超时等待机制。如果一开始没许可证,线程会阻塞最多timeout
时间:- 期间有许可证释放,线程拿到许可证,返回
true
; - 超时后还没许可证,返回
false
; - 等待中被中断,抛出
InterruptedException
;
- 期间有许可证释放,线程拿到许可证,返回
boolean tryAcquire(int permits)
:尝试获取 指定数量(permits
)的许可证。如果Semaphore
剩余许可证 ≥permits
,直接获取(计数 -permits
),返回true
;否则,返回false
(不阻塞);- 注意:要确保最终释放对应数量的许可证,否则会导致其他线程无法获取足够许可证;
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
:结合指定数量和超时等待,尝试获取permits
个许可证,最多等timeout
时间,逻辑和上面类似;
例:
// 1. 创建 Semaphore:1 个许可证,公平策略 final Semaphore semaphore = new Semaphore(1, true); // 2. 启动线程 t,尝试获取许可证 new Thread(() -> { // 2.1 尝试获取 1 个许可证:返回 true/false boolean gotPermit = semaphore.tryAcquire(); // 2.2 如果拿到许可证 if (gotPermit) { try { System.out.println(Thread.currentThread() + " 拿到许可证"); TimeUnit.SECONDS.sleep(5); // 模拟占用 5 秒 } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 释放许可证(必须!) } } }).start(); // 3. 主线程休眠 1 秒:确保线程 t 启动并拿到许可证 TimeUnit.SECONDS.sleep(1); // 4. 主线程尝试“带超时的获取”:最多等 3 秒 if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) { System.out.println("主线程拿到许可证"); } else { System.out.println("主线程 3 秒内没拿到许可证,失败"); }
- 线程 t 启动:
tryAcquire()
拿到许可证(gotPermit = true
),打印日志,休眠 5 秒; - 主线程休眠 1 秒:等线程 t 启动并占用许可证;
- 主线程尝试获取:调用
tryAcquire(3, ...)
,但线程 t 会占用许可证 5 秒,所以主线程等 3 秒后超时,进入else
,打印主线程 3 秒内没拿到许可证,失败
;
- 线程 t 启动:
例:
// 1. 创建 Semaphore:5 个许可证,公平策略 final Semaphore semaphore = new Semaphore(5, true); // 2. 尝试获取 5 个许可证:成功(因为初始有 5 个) assert semaphore.tryAcquire(5) : "获取 5 个许可证失败"; // 3. 此时许可证已耗尽(5 - 5 = 0 ),尝试获取 1 个许可证:失败 assert !semaphore.tryAcquire() : "获取 1 个许可证失败";
tryAcquire(5)
一次性拿 5 个许可证,成功后Semaphore
剩余 0 个;后续
tryAcquire()
(默认拿 1 个)返回false
,因为没许可证了。
2.2.4 正确使用release
Semaphore
的release
是用来 归还许可证 的,让其他线程有机会获取。它有两种形式:release()
:归还 1 个许可证,内部计数器 +1;release(int permits)
:归还permits
个许可证,内部计数器 +permits
;关键点:许可证数量有限,必须谁拿的谁还,否则会导致计数器混乱,破坏
Semaphore
控制并发的逻辑;
错误用法示例(用
finally
无脑释放)// 1. 创建 1 个许可证的 Semaphore(公平策略) final Semaphore semaphore = new Semaphore(1, true); // 2. 线程 t1:拿到许可证后,霸占 1 小时(休眠) Thread t1 = new Thread(() -> { try { semaphore.acquire(); // 拿许可证 System.out.println("t1 拿到许可证"); TimeUnit.HOURS.sleep(1); // 霸占 1 小时 } catch (InterruptedException e) { System.out.println("t1 被中断"); } finally { semaphore.release(); // 不管怎样,finally 里释放 } }); t1.start(); TimeUnit.SECONDS.sleep(1); // 等 t1 启动 // 3. 线程 t2:尝试拿许可证,但若被中断,会在 finally 里错误释放 Thread t2 = new Thread(() -> { try { semaphore.acquire(); // 尝试拿许可证(但 t1 没释放,所以会阻塞) System.out.println("t2 拿到许可证"); } catch (InterruptedException e) { System.out.println("t2 被中断"); } finally { semaphore.release(); // 问题:t2 没拿到许可证,却释放了! } }); t2.start(); TimeUnit.SECONDS.sleep(2); // 4. 主线程逻辑:中断 t2,然后自己拿许可证 t2.interrupt(); // 中断 t2(此时 t2 还在阻塞等许可证) semaphore.acquire(); // 主线程尝试拿许可证 System.out.println("主线程拿到许可证");
t2
根本没拿到许可证(因为t1
霸占着),但由于release
写在finally
里,t2
被中断时,会错误地归还 1 个许可证(相当于无中生有多了 1 个许可证);原本
Semaphore
只有 1 个许可证,被t1
占用后,计数器是 0。但t2
错误释放后,计数器变成 1,导致主线程能直接拿到许可证(而预期中t1
要 1 小时后才释放,主线程不该拿到);
修改后的
t2
逻辑:Thread t2 = new Thread(() -> { boolean acquired = false; // 标记是否成功拿到许可证 try { semaphore.acquire(); // 尝试拿许可证 acquired = true; // 拿到了,标记为 true System.out.println("t2 拿到许可证"); } catch (InterruptedException e) { System.out.println("t2 被中断"); } finally { // 只有成功拿到许可证(acquired=true),才释放 if (acquired) { semaphore.release(); } } });
- 用
acquired
标记是否真的拿到许可证,只有拿到许可证的线程,才在finally
里释放,避免没拿到却释放的问题;
- 用
Semaphore
的设计里,不强制检查释放许可证的线程是否真的拿过,而是靠开发者自己保证。官方文档说明:“没有要求释放许可证的线程必须是通过acquire
拿到许可证的,正确用法由开发者通过编程规范保证。”
2.3 使用
2.3.1 Semaphore
实现商品服务接口限流
Semaphore
可以用于实现限流功能,即限制某个操作或资源在一定时间内的访问次数;代码:限制同一时间,最多有
N
个线程能访问接口(比如下面代码中的N=2
),超过的请求要么排队,要么直接拒绝,保证服务稳定;@Slf4j public class SemaphoreDemo { /** * 同一时刻最多只允许有两个并发 * 即许可证数量=2 → 同一时间最多允许 2 个线程访问 */ private static Semaphore semaphore = new Semaphore(2); // 创建线程池,最多 10 个线程(模拟大量请求) private static Executor executor = Executors.newFixedThreadPool(10); public static void main(String[] args) { // 循环 10 次,模拟 10 个请求 for(int i = 0; i < 10; i++){ // 提交任务到线程池,执行 getProductInfo() 或 getProductInfo2() executor.execute(() -> getProductInfo()); } } // 阻塞式限流 public static String getProductInfo() { // 1. 尝试获取许可证:拿不到就阻塞,直到有许可证 try { semaphore.acquire(); log.info("请求服务"); // 拿到许可证,执行逻辑 Thread.sleep(2000); // 模拟接口执行耗时(2 秒) } catch (InterruptedException e) { throw new RuntimeException(e); }finally { // 2. 释放许可证:不管是否异常,必须释放,让其他线程能用 semaphore.release(); } return "返回商品详情信息"; } // 非阻塞式限流 public static String getProductInfo2() { // 1. 尝试获取许可证:拿不到直接返回 false,不阻塞 if(!semaphore.tryAcquire()){ log.error("请求被流控了"); // 没拿到许可证,直接拒绝 return "请求被流控了"; } try { log.info("请求服务"); // 拿到许可证,执行逻辑 Thread.sleep(2000); // 模拟接口执行耗时(2 秒) } catch (InterruptedException e) { throw new RuntimeException(e); }finally { // 2. 释放许可证:必须释放 semaphore.release(); } return "返回商品详情信息"; } }
假设运行
getProductInfo()
:前 2 个线程能拿到许可证,执行
log.info("请求服务")
,然后 sleep 2 秒。第 3~10 个线程调用
acquire()
时,因为许可证被占满,会阻塞等待。2 秒后,前 2 个线程
release()
归还许可证,阻塞的线程开始竞争,每次放 2 个执行,直到所有请求处理完。
如果运行
getProductInfo2()
:前 2 个线程拿到许可证,执行逻辑;第 3 个线程tryAcquire()
返回false
,直接走限流逻辑(log.error
)。
2.3.2 Semaphore
限制同时在线的用户数量
模拟一个登录系统,最多限制给定数量的人员同时在线,如果所能申请的许可证不足,那么将告诉用户无法登录,请稍后重试;
主类
SemaphoreDemo7
(模拟多用户登录):public class SemaphoreDemo7 { public static void main(String[] args) { // 最多允许 10 个用户同时在线 final int MAX_PERMIT_LOGIN_ACCOUNT = 10; LoginService loginService = new LoginService(MAX_PERMIT_LOGIN_ACCOUNT); // 启动 20 个线程(模拟 20 个用户登录) IntStream.range(0, 20).forEach(i -> new Thread(() -> { // 执行登录 boolean login = loginService.login(); if (!login) { // 登录失败(超过并发数) System.out.println(Thread.currentThread() + " 因超过最大在线数被拒绝"); return; } try { simulateWork(); // 模拟登录后的业务操作 } finally { loginService.logout(); // 退出时释放许可证 } }, "User-" + i).start() ); } // 模拟登录后的业务操作(随机休眠,模拟用户在线时长) private static void simulateWork() { try { TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException e) { /* 忽略中断 */ } } }
创建
LoginService
,传入最大在线数10
;启动 20 个线程(用户),调用
loginService.login()
尝试登录;登录成功 → 执行
simulateWork()
(模拟用户在线操作,随机休眠 )→ 退出时logout()
释放许可证;登录失败 → 直接提示并返回;
LoginService
类(控制登录逻辑)private static class LoginService { private final Semaphore semaphore; public LoginService(int maxPermitLoginAccount) { // 创建 Semaphore:许可证数量=maxPermitLoginAccount,公平策略(true) this.semaphore = new Semaphore(maxPermitLoginAccount, true); } public boolean login() { // 尝试获取许可证:非阻塞,拿不到直接返回 false boolean success = semaphore.tryAcquire(); if (success) { System.out.println(Thread.currentThread() + " 登录成功"); } return success; } public void logout() { // 释放许可证:登录成功的用户退出时,归还许可证 semaphore.release(); System.out.println(Thread.currentThread() + " 退出成功"); } }
Semaphore
许可证数量 = 最大在线用户数(10
),保证同一时间最多 10 个线程(用户)能拿到许可证;login()
用tryAcquire()
尝试拿许可证:- 拿到 → 返回
true
(登录成功); - 拿不到 → 返回
false
(登录失败,超过并发);
- 拿到 → 返回
logout()
用release()
释放许可证,让其他用户可登录;
运行效果:
登录成功:前 10 个线程(用户)能拿到许可证,打印
登录成功
,执行simulateWork()
随机休眠;登录失败:后 10 个线程调用
tryAcquire()
时,许可证已耗尽,返回false
,打印因超过最大在线数被拒绝
;用户退出:休眠结束后,线程执行
logout()
释放许可证,Semaphore
计数器 +1,后续新线程(或之前阻塞的线程)有机会拿到许可证登录;
如果把
login
里的tryAcquire()
换成acquire()
(阻塞式获取 ):public boolean login() { try { // 阻塞式获取:拿不到就一直等,直到有许可证 semaphore.acquire(); System.out.println(Thread.currentThread() + " 登录成功"); return true; } catch (InterruptedException e) { // 被中断时返回登录失败 return false; } }
- 效果:超过并发数的用户不会直接失败,而是阻塞等待,直到有用户退出释放许可证,再继续登录。
2.4 应用场景总结
Semaphore
(信号量)是高并发工具,核心能力是控制同时访问共享资源的线程数量,让有限的资源(比如连接池、文件句柄)在高并发下被合理使用,避免系统被压垮;应用场景总结
限流(流量控制):系统的某个资源(比如接口、数据库连接)能承受的并发量有限,需要限制同时访问的线程数,防止资源被打满导致系统崩溃;
- 接口限流:比如商品详情接口,最多允许 100 个线程同时访问,用
Semaphore(100)
控制,超过的请求排队或拒绝; - 数据库连接限流:数据库连接池有 20 个连接,用
Semaphore(20)
控制,避免几千个线程同时抢连接,把数据库压垮; - 侧重于控制并发访问量,保护资源不被压垮,比如接口、网关层的流量控制;
- 接口限流:比如商品详情接口,最多允许 100 个线程同时访问,用
资源池(维护有限资源):系统有一组有限资源(比如数据库连接、文件句柄、网络端口),需要让线程按需借用、用完归还,保证资源被合理复用;
- 数据库连接池:初始化
Semaphore(连接数)
,线程需要连接时acquire()
拿许可证(同时从池里取连接),用完后release()
释放许可证(同时把连接还回池); - 文件访问池:如果有 5 个文件句柄,用
Semaphore(5)
控制,线程访问文件时拿许可证,访问完归还,保证同一时间最多 5 个线程操作文件; - 侧重于管理有限资源的借用/归还,保证资源复用,比如连接池、句柄池的资源调度;
- 数据库连接池:初始化
但本质都是用
Semaphore
的许可证数量,限制同时使用资源的线程数。
3 CountDownLatch(闭锁)
3.1 简介
CountDownLatch
是多线程同步工具,解决的问题是:让一个或多个线程等待其他多个任务全部完成后,再继续执行。比如:- 主线程要等 10 个子线程都跑完初始化任务,才开始处理业务;
- 或者多个线程要等某个“总开关”任务完成,再一起执行;
工作流程:
初始化计数器:
CountDownLatch latch = new CountDownLatch(N);
,这里的N
是需要等待的任务数量(比如有 3 个子线程要执行,N=3
);等待线程:
latch.await();
,调用await()
的线程(比如主线程)会阻塞等待,直到N
减到 0;任务线程计数减 1:每个子任务线程执行完自己的逻辑后,调用
latch.countDown();
,让计数器N-1
;计数器归 0,等待线程唤醒:当所有子任务线程都调用
countDown()
,N
变成 0 ,之前阻塞的线程(await()
的线程)会被唤醒,继续执行;
TA
:等待线程、T1/T2/T3
:任务线程cnt = 3
:对应CountDownLatch latch = new CountDownLatch(3);
,表示需要等待 3 个任务完成;过程:
线程
TA
调用await()
:TA
执行到latch.await()
时,会检查计数器cnt
:此时cnt=3≠0
,所以TA
进入阻塞状态(awaiting...
),暂停执行;任务线程
T1
完成:T1
执行latch.countDown()
→ 计数器cnt
从3→2
。此时cnt≠0
,TA
仍阻塞;任务线程
T2
完成:T2
执行latch.countDown()
→ 计数器cnt
从2→1
。此时cnt≠0
,TA
仍阻塞;任务线程
T3
完成:T3
执行latch.countDown()
→ 计数器cnt
从1→0
;当
cnt=0
时,CountDownLatch
会唤醒所有等待的线程(这里是TA
):TA
从阻塞状态恢复(resumed
),继续执行后续逻辑;
关键特性:
一次性:计数器
N
减到 0 后,就不能再重置或复用,只能用一次;多线程等待:可以有多个线程调用
await()
,一起等待N
归 0 后被唤醒;任务结束的宽泛性:子任务结束包括正常跑完或者抛异常终止,只要调用
countDown()
,就会让计数器减 1;
典型场景:
- 并行任务汇总:比如计算一个大数组的和,拆成 10 个子数组并行计算,主线程等 10 个子线程都算完,再汇总结果;
- 系统启动初始化:系统启动时,需要初始化 5 个服务(比如缓存、数据库连接、配置加载),主线程等 5 个服务都初始化完,再对外提供服务;
- 测试多线程并发:测试时,让 100 个线程等信号,信号发出(
countDown()
)后一起执行,模拟高并发场景。
3.2 常用API
3.2.1 构造器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch
构造时,count
必须 ≥0,否则抛IllegalArgumentException
;count
减到 0 后,无法重置,CountDownLatch
只能用一次。
3.2.2 常用方法
总览:
// 1. await():调用的线程会阻塞,直到 count 减到 0 public void await() throws InterruptedException {}; // 2. await(long timeout, TimeUnit unit):阻塞等待,但最多等 timeout 时间; // 若超时后 count 仍≠0,不再等待,返回 false public boolean await(long timeout, TimeUnit unit) throws InterruptedException {}; // 3. countDown():让 count 减 1,直到 count=0 时唤醒所有等待的线程 public void countDown() {};
await()
:调用线程进入阻塞状态,直到countDownLatch
的count
减到 0;- 如果等待中被其他线程中断,会抛出
InterruptedException
; count=0
时,调用await()
会立即返回,不阻塞;
- 如果等待中被其他线程中断,会抛出
await(long timeout, TimeUnit unit)
:阻塞等待,但增加了超时退出机制;- 返回值为
true
→ 等待中count
减到 0(正常唤醒); - 返回值为
false
→ 超时后count
仍≠0(放弃等待);
- 返回值为
countDown()
:让count
减 1。当count
从1→0
时,会唤醒所有等待的线程(await()
的线程);count
已经是 0 时,调用countDown()
会被忽略(count
最小为 0);- 只有
count
从1→0
时,才会触发唤醒;count
从3→2
这类变化,不会唤醒线程;
例:
// 1. 初始化:count=2 → 需要 2 次 countDown() 才会唤醒 await() 的线程 CountDownLatch latch = new CountDownLatch(2); // 2. 第一次 countDown() → count=2→1 latch.countDown(); // 3. 第二次 countDown() → count=1→0 → 唤醒所有 await() 的线程 latch.countDown(); // 4. 第三次 countDown() → count 已经是 0,调用被忽略 latch.countDown(); // 5. count=0,调用 await() 直接返回,不阻塞 latch.await();
3.3 使用
3.3.1 多任务完成后合并汇总
开发中常见多个任务并行执行,必须等所有任务完成后,再统一处理结果 的需求:
比如“数据详情页”需要同时调用 5 个接口(并行),等所有接口返回数据后,再合并结果展示;
或者“多个数据操作完成后,统一做校验(check)”;
代码:
public class CountDownLatchDemo2 { public static void main(String[] args) throws Exception { // 1. 初始化 CountDownLatch:需要等待 5 个任务完成 CountDownLatch countDownLatch = new CountDownLatch(5); // 2. 启动 5 个线程(模拟 5 个并行任务) for (int i = 0; i < 5; i++) { final int index = i; new Thread(() -> { try { // 模拟任务执行耗时(1~3 秒随机) Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000)); System.out.println("任务 " + index + " 执行完成"); // 3. 任务完成,计数器减 1(countDownLatch.countDown()) countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } // 4. 主线程阻塞等待:直到 5 个任务都完成(count=0) countDownLatch.await(); // 5. 所有任务完成后,主线程执行汇总逻辑 System.out.println("主线程:在所有任务运行完成后,进行结果汇总"); } }
初始化
CountDownLatch
:new CountDownLatch(5)
→ 表示需要等待 5 个任务完成(计数器初始值5
);启动并行任务:循环创建 5 个线程,模拟 5 个并行任务。每个线程:
Thread.sleep(...)
:模拟任务执行耗时(随机 1~3 秒);countDownLatch.countDown()
:任务完成后,计数器减 1(5→4→3→2→1→0
);
主线程等待:
countDownLatch.await()
→ 主线程阻塞,直到计数器减到0
(所有任务完成);汇总结果:计数器归
0
后,主线程被唤醒,执行System.out.println(...)
做结果汇总;
运行效果
5 个任务线程会随机顺序完成(因为
sleep
时间随机),比如:任务 2 执行完成 任务 0 执行完成 任务 4 执行完成 任务 1 执行完成 任务 3 执行完成
主线程必须等所有任务打印完,才会输出:
主线程:在所有任务运行完成后,进行结果汇总
核心价值
并行效率:5 个任务并行执行,不用等待前一个任务完成再执行下一个,节省时间;
同步控制:主线程通过
CountDownLatch
精准等待所有任务完成,保证汇总逻辑在所有数据就绪后执行。
3.3.2 电商场景中的应用——等待所有子任务结束
需求:根据商品品类 ID,获取 10 个商品,并行计算每个商品的最终价格(需调用 ERP、CRM 等系统,计算复杂、耗时),最后汇总所有价格返回;
ERP系统:ERP是企业资源计划系统,它整合企业内部各部门的核心业务流程,如财务、采购、生产、销售和人力资源等,以实现数据共享和资源优化;
CRM系统:CRM是客户关系管理系统,它专注于管理公司与当前及潜在客户的交互和业务往来,旨在改善客户服务、提升销售效率并维护客户关系;
串行问题:如果一个一个计算(串行),总耗时 = 获取商品时间 + 10×单个商品计算时间,商品越多越慢;
并行优化:用多线程并行计算商品价格,总耗时 = 获取商品时间 + 最长单个商品计算时间,效率更高;
代码:工具方法 & 数据类
// 根据品类 ID 获取商品 ID 列表(模拟返回 1~10 号商品) private static int[] getProductsByCategoryId() { return IntStream.rangeClosed(1, 10).toArray(); } // 商品价格数据类:存储商品 ID 和计算后的价格 private static class ProductPrice { private final int prodID; // 商品 ID private double price; // 计算后的价格 // 构造方法、get/set、toString 略... }
主逻辑:并行计算商品价格
public static void main(String[] args) throws InterruptedException { // 1. 获取商品 ID 列表(1~10) final int[] products = getProductsByCategoryId(); // 2. 转换为 ProductPrice 列表(初始价格未计算) List<ProductPrice> list = Arrays.stream(products) .mapToObj(ProductPrice::new) // 每个商品 ID 对应一个 ProductPrice .collect(Collectors.toList()); // 3. 初始化 CountDownLatch:需要等待 10 个商品计算完成(products.length=10) final CountDownLatch latch = new CountDownLatch(products.length); // 4. 为每个商品启动线程,并行计算价格 list.forEach(pp -> { new Thread(() -> { try { System.out.println(pp.getProdID() + " -> 开始计算商品价格."); // 模拟耗时操作(调用外部系统):随机休眠 0~9 秒 TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 计算商品价格(模拟业务逻辑:奇偶商品不同折扣) if (pp.getProdID() % 2 == 0) { pp.setPrice(pp.getProdID() * 0.90); // 偶数商品 9 折 } else { pp.setPrice(pp.getProdID() * 0.71); // 奇数商品 7.1 折 } System.out.println(pp.getProdID() + " -> 价格计算完成."); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 6. 任务完成,计数器减 1 latch.countDown(); } }).start(); // 启动线程 }); // 7. 主线程阻塞等待:直到 10 个商品都计算完成(latch.await()) latch.await(); // 8. 所有商品计算完成,汇总结果 System.out.println("所有价格计算完成."); list.forEach(System.out::println); }
准备商品数据:调用
getProductsByCategoryId()
获取 1~10 号商品 ID,转成ProductPrice
列表(初始价格未计算);初始化
CountDownLatch
:new CountDownLatch(10)
→ 表示需要等待10 个商品的计算任务完成;并行计算价格:为每个商品启动线程:
- 模拟耗时操作(
TimeUnit.SECONDS.sleep(...)
); - 根据商品 ID 奇偶,设置不同折扣价格(模拟业务逻辑);
- 任务完成后,
latch.countDown()
让计数器减 1;
- 模拟耗时操作(
主线程等待 & 汇总结果:
latch.await()
阻塞主线程,直到 10 个任务都完成(计数器归 0),最后打印所有商品的计算结果。
3.4 应用场景总结
并行任务同步:多个任务并行执行(比如 5 个线程同时下载文件),必须等所有任务都完成后,再执行下一步(比如合并文件);
- 用
CountDownLatch
让主线程等待所有并行任务完成,保证后续操作在所有任务就绪后执行;
- 用
多任务汇总:需要统计多个线程的执行结果(比如 10 个线程分别计算一部分数据,最后汇总总和);
- 主线程等所有线程计算完,再统一汇总结果,避免部分数据未计算就开始汇总的问题;
资源初始化:系统启动时,需要初始化多个资源(比如缓存、数据库连接、配置加载),必须等所有资源初始化完成,再对外提供服务;
- 主线程等待所有资源初始化任务完成,保证系统启动后资源可用。
3.5 不足
CountDownLatch
是一次性工具:构造时设置的计数器(比如
new CountDownLatch(5)
),一旦减到0
,就无法重置或复用;如果业务需要“重复等待多个任务完成”,
CountDownLatch
无法满足,必须重新创建新的实例。
4 CyclicBarrier(回环栅栏/循环屏障)
4.1 简介
CyclicBarrier
是多线程同步工具,解决的问题是:让一组线程互相等待,直到所有线程都到达同一个“屏障点”,然后再一起继续执行;关键特点:可循环使用(屏障可以重置,重复让线程等待、一起执行);
工作流程:
初始化屏障:
CyclicBarrier barrier = new CyclicBarrier(N);
,N
是“需要等待的线程数量”(比如 5 个线程要一起执行后续逻辑);线程到达屏障点:每个线程执行到
barrier.await();
时,会阻塞等待,直到有N
个线程都调用了await()
;所有线程到达,一起执行:当第
N
个线程调用await()
后,所有阻塞的线程会被同时唤醒,继续执行后续逻辑;循环使用:唤醒后,屏障可以重置(通过
reset()
方法 ),再次让新的一组线程等待、一起执行;
适合把一个大任务拆成多个子任务并行执行,等所有子任务完成后,再统一做下一步的场景,且需要重复执行该流程。典型场景有:
并行计算 + 合并结果:比如计算一个大数组的和,拆成 10 个子数组并行计算,等所有子数组算完,再合并总和。计算完一次后,还能再拆新的数组,重复使用屏障。
多阶段任务:系统升级时,先让 5 个节点并行执行数据迁移,全部完成后,再一起执行验证数据,验证完还能继续下一阶段(比如启动服务),屏障可循环用;
与
CountDownLatch
的核心区别特性 CyclicBarrier
CountDownLatch
是否可循环 可循环(屏障可重置,重复用) 一次性(计数器到 0 后无法重置) 等待的目标 等待“一组线程互相到达屏障点” 等待“其他线程完成任务(计数减到 0)” 典型场景 多阶段并行任务(可重复) 单次多任务同步(不可重复)
4.2 常用API
4.2.1 构造器
有两个构造器:
public CyclicBarrier(int parties)
parties
:需要等待的线程总数。比如传4
,表示必须有 4 个线程都调用await()
,屏障才会放行;作用:初始化一个基础的循环屏障,所有线程到达后一起执行后续逻辑;
public CyclicBarrier(int parties, Runnable barrierAction)
parties
:同上,需要等待的线程总数;barrierAction
:一个Runnable
任务。当所有线程到达屏障后,会优先执行这个任务,再让所有线程继续执行;作用:适合线程到达屏障后,需要先统一处理一些逻辑(比如汇总数据、初始化资源)的场景;
工作原理:以
CyclicBarrier barrier = new CyclicBarrier(4, new Runnable(){...});
为例初始化:
parties=4
→ 需要 4 个线程到达屏障;barrierAction
→ 所有线程到达后执行的任务;
线程到达屏障:每个线程执行
barrier.await();
时:- 计数器(
count
)减 1(初始4
→ 线程 1 调用后变成3
→ 线程 2 调用后变成2
→ 线程 3 调用后变成1
→ 线程 4 调用后变成0
); - 前 3 个线程调用
await()
后,会阻塞等待;
- 计数器(
屏障放行(所有线程到达):第 4 个线程调用
await()
后,count=0
,执行barrierAction
,然后唤醒所有阻塞的线程,一起继续执行后续逻辑;循环复用:屏障放行后,
count
会重置为初始值(4),可以再次让新的一组线程(4 个)等待、触发屏障。
4.2.2 常用方法
总览:
// 1. await():线程调用后阻塞,直到所有线程都调用 await(),屏障放行 public int await() throws InterruptedException, BrokenBarrierException {} // 2. await(long timeout, TimeUnit unit):带超时的 await(),超时后屏障视为“被破坏” public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {} // 3. reset():重置屏障,让计数器回到初始值,可重复使用 public void reset() {}
await()
:线程调用await()
后,会阻塞等待,直到有parties
个线程都调用await()
(parties
是构造器传入的线程数);InterruptedException
:等待中的线程被中断;BrokenBarrierException
:屏障被破坏(比如其他线程await()
时被中断、超时 ),导致当前线程无法继续等待;返回值:返回当前线程在到达屏障的线程组中的索引(比如 4 个线程到达,第一个调用
await()
的线程返回3
,最后一个返回0
,索引从0
开始逆序);
await(long timeout, TimeUnit unit)
:和await()
类似,但增加超时机制。如果在timeout
时间内,凑不齐parties
个线程调用await()
,则触发超时,屏障被标记为破坏。防止线程因其他线程异常,无限期阻塞;- 除了
InterruptedException
和BrokenBarrierException
,还可能抛出TimeoutException
(超时);
- 除了
reset()
:重置CyclicBarrier
,让计数器回到初始值(parties
),屏障状态恢复到未使用;- 注意:重置时,若有线程正在
await()
,会触发BrokenBarrierException
(因为屏障被强制重置,这些线程的等待被打断)。
- 注意:重置时,若有线程正在
4.3 使用
4.3.1 等待所有子任务结束
需求:根据品类 ID 获取 10 个商品,并行计算每个商品的最终价格(模拟调用外部系统,耗时随机),等所有商品价格计算完成后,汇总结果返回;
工具方法 & 数据类
// 根据品类 ID 获取商品 ID 列表(1~10 号商品) private static int[] getProductsByCategoryId() { return IntStream.rangeClosed(1, 10).toArray(); } // 商品价格数据类:存储商品 ID 和计算后的价格 private static class ProductPrice { private final int prodID; // 商品 ID private double price; // 计算后的价格 // 构造方法、get/set、toString 略... }
主逻辑:用
CyclicBarrier
同步多线程public static void main(String[] args) throws InterruptedException { // 1. 获取商品 ID 列表,转换为 ProductPrice 列表 final int[] products = getProductsByCategoryId(); List<ProductPrice> list = Arrays.stream(products) .mapToObj(ProductPrice::new) .collect(Collectors.toList()); // 2. 初始化 CyclicBarrier:需要等待 list.size()(10)个线程到达屏障 final CyclicBarrier barrier = new CyclicBarrier(list.size()); // 3. 存储线程的列表(用于后续 join 等待) final List<Thread> threadList = new ArrayList<>(); // 4. 为每个商品启动线程,并行计算价格 list.forEach(pp -> { Thread thread = new Thread(() -> { try { System.out.println(pp.getProdID() + " 开始计算商品价格."); // 模拟耗时操作(调用外部系统):随机休眠 0~9 秒 TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 计算商品价格(奇偶商品不同折扣) if (pp.getProdID() % 2 == 0) { pp.setPrice(pp.getProdID() * 0.90); // 偶数商品 9 折 } else { pp.setPrice(pp.getProdID() * 0.71); // 奇数商品 7.1 折 } System.out.println(pp.getProdID() + " -> 价格计算完成."); // 6. 等待其他线程:调用 await(),直到所有线程都到达屏障 barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); threadList.add(thread); // 记录线程 thread.start(); // 启动线程 }); // 7. 等待所有线程执行完成(通过 join 保证主线程等所有子线程跑完) threadList.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); // 8. 所有商品价格计算完成,汇总结果 System.out.println("所有价格计算完成."); list.forEach(System.out::println); }
准备商品数据:调用
getProductsByCategoryId()
获取 1~10 号商品 ID,转成ProductPrice
列表(初始价格未计算);初始化
CyclicBarrier
:new CyclicBarrier(list.size())
→ 表示需要等待 10 个线程 到达屏障(每个商品对应一个线程);并行计算价格:为每个商品启动线程:
- 模拟耗时操作(
TimeUnit.SECONDS.sleep(...)
); - 根据商品 ID 奇偶,设置不同折扣价格;
- 调用
barrier.await()
:线程到达屏障,阻塞等待其他线程;
- 模拟耗时操作(
屏障放行:当第 10 个线程调用
await()
后,所有阻塞的线程会被同时唤醒,继续执行后续逻辑;主线程等待 & 汇总结果:通过
threadList.forEach(t -> t.join())
让主线程等待所有子线程执行完成,最后打印所有商品的计算结果。
4.3.2 CyclicBarrier的循环特性——模拟跟团旅游
需求:跟团旅游
第一阶段(上车屏障):
- 导游要求:所有游客上车后,大巴才出发(对应
CyclicBarrier
的第一次await()
); - 类比:10 个游客 + 1 个导游(主线程)= 11 个线程,凑齐后屏障放行;
- 导游要求:所有游客上车后,大巴才出发(对应
第二阶段(下车屏障):
- 导游要求:所有游客下车后,大巴才去下一个景点(对应
CyclicBarrier
的第二次await()
); - 类比:同一组线程(游客 + 导游)再次凑齐,屏障放行,实现“循环复用”;
- 导游要求:所有游客下车后,大巴才去下一个景点(对应
游客线程逻辑(
Tourist
类)private static class Tourist implements Runnable { private final int touristID; // 游客编号 private final CyclicBarrier barrier; // 循环屏障 public Tourist(int touristID, CyclicBarrier barrier) { this.touristID = touristID; this.barrier = barrier; } @Override public void run() { // 1. 模拟上车(第一阶段:上车同步) System.out.printf("游客:%d 乘坐旅游大巴\n", touristID); spendSeveralSeconds(); // 模拟上车耗时 waitAndPrint("游客:%d 上车,等别人上车.\n"); // 调用 await(),等待凑齐 11 个线程 // 2. 模拟下车(第二阶段:下车同步) System.out.printf("游客:%d 到达目的地\n", touristID); spendSeveralSeconds(); // 模拟下车耗时 waitAndPrint("游客:%d 下车,等别人下车.\n"); // 再次调用 await(),等待凑齐 11 个线程 } // 调用 barrier.await(),并打印日志 private void waitAndPrint(String message) { System.out.printf(message, touristID); try { barrier.await(); // 线程到达屏障,阻塞等待 } catch (InterruptedException | BrokenBarrierException e) { // 忽略异常 } } // 模拟随机耗时(上车/下车的时间) private void spendSeveralSeconds() { try { TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException e) { // 忽略异常 } } }
主线程逻辑(导游视角)
public static void main(String[] args) throws BrokenBarrierException, InterruptedException { // parties=11 → 需要 11 个线程到达屏障(10 个游客线程 + 1 个主线程) final CyclicBarrier barrier = new CyclicBarrier(11); // 启动 10 个游客线程 for (int i = 0; i < 10; i++) { new Thread(new Tourist(i, barrier)).start(); } // 4. 主线程(导游)参与第一阶段屏障:等待所有游客上车 barrier.await(); System.out.println("导游:所有的游客都上了车."); // 5. 主线程(导游)参与第二阶段屏障:等待所有游客下车 barrier.await(); System.out.println("导游:所有的游客都下车了."); }
上车同步
10 个游客线程 + 1 个主线程(导游),共 11 个线程;
每个游客线程执行到
waitAndPrint("游客:%d 上车,等别人上车.\n")
→ 调用barrier.await()
,阻塞等待;当 11 个线程都调用
await()
后,屏障放行:打印所有“游客上车等待”的日志;
主线程继续执行,打印
导游:所有的游客都上了车.
;
下车同步
同一组 11 个线程(10 个游客 + 1 个主线程 ),再次执行到
waitAndPrint("游客:%d 下车,等别人下车.\n")
→ 调用barrier.await()
,阻塞等待;当 11 个线程都调用
await()
后,屏障放行:打印所有“游客下车等待”的日志;
主线程继续执行,打印
导游:所有的游客都下车了.
;
CyclicBarrier
的循环特性可重复触发:同一
CyclicBarrier
实例,可通过多次await()
实现“多阶段同步”(上车→下车);线程组复用:同一组线程(游客 + 导游 )参与多个阶段的屏障,无需重新创建实例。
4.4 应用场景总结
CyclicBarrier
是多线程同步工具,核心解决让一组线程互相等待,全部到达同一屏障点后,再一起继续执行的问题,且可循环使用(屏障可重置,重复同步多阶段任务);应用场景:
多线程任务拆分与合并:一个复杂任务(比如计算大数据集的总和)拆成多个子任务(比如 10 个线程各算一部分),必须等所有子任务完成后,再合并结果;
多线程数据处理同步:多个线程并行处理不同的数据分片(比如处理 5 个文件),必须等所有线程处理完自己的数据,再统一汇总、校验或持久化。
4.5 CyclicBarrier VS CountDownLatch
可复用性
CountDownLatch
:一次性工具。构造时设置的计数器(比如new CountDownLatch(5)
),一旦减到0
,无法重置或复用;CyclicBarrier
:可循环复用。计数器(parties
)可以通过reset()
重置,重复让新的线程组等待、触发屏障;
等待目标
CountDownLatch
:await()
的线程等待其他线程调用countDown()
把计数器减到0
(主线程等子线程完成);CyclicBarrier
:await()
的线程等待其他线程也到达屏障点(调用await()
)(线程组互相等待);
计数器特性
CountDownLatch
:计数器只能递减(从N→0
),且无法重置;CyclicBarrier
:计数器可以重置(通过reset()
回到初始值parties
),支持多轮同步。
5 Exchange(数据交换机)
5.1 简介
Exchanger
专门解决两个线程需要互相交换数据的场景,让两个线程在“交换点”(调用exchange
方法时)同步,安全交换数据;工作流程
- 线程 1 调用
exchange(object1)
:线程 1 会阻塞等待,直到线程 2 也调用exchange
方法; - 线程 2 调用
exchange(object2)
:此时两个线程都到达“交换点”,Exchanger
会将object1
传递给线程 2,将object2
传递给线程 1; - 交换后继续执行:线程 1 拿到
object2
,线程 2 拿到object1
,继续执行后续逻辑;
- 线程 1 调用
5.2 常用API
public V exchange(V x) throws InterruptedException
功能:
- 当前线程携带数据
x
,阻塞等待另一个线程到达交换点; - 对方线程到达后,交换数据:当前线程接收对方数据,返回给调用方;
- 当前线程携带数据
异常:等待中若线程被中断,抛出
InterruptedException
;
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
- 同上,但增加超时机制。如果在
timeout
时间内,对方线程未到达交换点,抛出TimeoutException
; - 适用场景:防止线程因对方异常,无限期阻塞。
- 同上,但增加超时机制。如果在
5.3 使用
5.3.1 模拟交易场景
模拟买卖双方交易:
卖家带“商品”(
goods = "电脑"
),买家带“钱”(money = "$4000"
);双方必须都到达“交易点”(调用
exchanger.exchange(...)
),才能交换数据(一手交钱,一手交货);
代码:
public class ExchangerDemo { private static Exchanger exchanger = new Exchanger(); static String goods = "电脑"; static String money = "$4000"; public static void main(String[] args) throws InterruptedException { System.out.println("准备交易,一手交钱一手交货..."); // 卖家线程:携带 goods,等待买家 new Thread(() -> { try { System.out.println("卖家到了,已经准备好货:" + goods); // 交换数据:卖家发送 goods,接收 money String receivedMoney = (String) exchanger.exchange(goods); System.out.println("卖家收到钱:" + receivedMoney); } catch (Exception e) { /* 忽略异常 */ } }).start(); // 主线程休眠 3 秒,模拟买家延迟到达 Thread.sleep(3000); // 买家线程:携带 money,等待卖家 new Thread(() -> { try { System.out.println("买家到了,已经准备好钱:" + money); // 交换数据:买家发送 money,接收 goods String receivedGoods = (String) exchanger.exchange(money); System.out.println("买家收到货:" + receivedGoods); } catch (Exception e) { /* 忽略异常 */ } }).start(); } }
同步交换:卖家先调用
exchange(goods)
会阻塞,直到买家调用exchange(money)
,双方交换数据;数据流向:卖家发送
goods
→ 接收money
;买家发送money
→ 接收goods
。
5.3.2 模拟对账场景
模拟数据对账:
- 线程 1 生成数据
A
,线程 2 生成数据B
; - 双方交换数据后,线程 2 校验
A
和B
是否一致;
- 线程 1 生成数据
代码:
public class ExchangerDemo2 { private static final Exchanger<String> exchanger = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { // 线程 1:发送数据 A threadPool.execute(() -> { try { String A = "12379871924sfkhfksdhfks"; exchanger.exchange(A); // 发送 A,等待线程 2 } catch (InterruptedException e) { /* 忽略 */ } }); // 线程 2:发送数据 B,接收数据 A,校验一致性 threadPool.execute(() -> { try { String B = "32423423jkmjkfsbfj"; String A = exchanger.exchange(B); // 发送 B,接收 A System.out.println("A和B数据是否一致:" + A.equals(B)); System.out.println("A= " + A); System.out.println("B= " + B); } catch (InterruptedException e) { /* 忽略 */ } }); threadPool.shutdown(); } }
数据校验:线程 2 接收线程 1 的数据
A
后,对比自己的B
,判断是否一致;线程池简化:用线程池管理两个线程,避免手动创建
Thread
;
5.3.3 模拟队列中交换数据
模拟生产者 - 消费者模式,但通过
Exchanger
动态交换“满队列”和“空队列”:生产者往
emptyQueue
放数据,满了就和消费者交换队列(拿空队列继续生产);消费者从
fullQueue
取数据,空了就和生产者交换队列(拿满队列继续消费);
代码:
public class ExchangerDemo3 { // 满队列(消费者初始用)、空队列(生产者初始用) private static ArrayBlockingQueue<String> fullQueue = new ArrayBlockingQueue<>(5); private static ArrayBlockingQueue<String> emptyQueue = new ArrayBlockingQueue<>(5); private static Exchanger<ArrayBlockingQueue<String>> exchanger = new Exchanger<>(); public static void main(String[] args) { new Thread(new Producer()).start(); // 启动生产者 new Thread(new Consumer()).start(); // 启动消费者 } // 生产者:往队列放数据,满了就交换队列 static class Producer implements Runnable { @Override public void run() { ArrayBlockingQueue<String> current = emptyQueue; try { while (current != null) { String str = UUID.randomUUID().toString(); try { current.add(str); // 往队列放数据 System.out.println("producer:生产了一个序列:" + str + ">>>>>加入到交换区"); Thread.sleep(2000); } catch (IllegalStateException e) { // 队列满了,交换队列(拿空队列) System.out.println("producer:队列已满,换一个空的"); current = exchanger.exchange(current); } } } catch (Exception e) { /* 忽略 */ } } } // 消费者:从队列取数据,空了就交换队列 static class Consumer implements Runnable { @Override public void run() { ArrayBlockingQueue<String> current = fullQueue; try { while (current != null) { if (!current.isEmpty()) { String str = current.poll(); // 从队列取数据 System.out.println("consumer:消耗一个序列:" + str); Thread.sleep(1000); } else { // 队列空了,交换队列(拿满队列) System.out.println("consumer:队列空了,换个满的"); current = exchanger.exchange(current); System.out.println("consumer:换满的成功~~~~~~~~~~~~~~~~~~~~~~"); } } } catch (Exception e) { /* 忽略 */ } } } }
动态队列交换:
- 生产者队列满 → 用
exchanger.exchange(current)
交换出空队列,继续生产; - 消费者队列空 → 用
exchanger.exchange(current)
交换出满队列,继续消费;
- 生产者队列满 → 用
解耦生产和消费:通过交换队列,避免生产者/消费者因队列满/空阻塞,灵活控制数据流转。
5.4 应用场景总结
- 数据交换:两个线程需要安全交换数据(如交易场景的“钱 - 货”交换);
- 保证“交换原子性”,避免数据不一致;
- 数据采集:采集线程(生产者)和处理线程(消费者)交换数据(如日志采集→日志处理);
- 解耦数据生产和消费,通过交换数据缓冲,提升系统吞吐量。
6 Phaser(阶段协同器)
6.1 简介
Phaser
用于协调多个线程的多阶段执行,支持:动态调整参与线程的数量(可增、可减);
分阶段同步(线程完成当前阶段,再一起进入下一阶段);
比
CyclicBarrier
更灵活(支持动态线程数、多阶段),比CountDownLatch
更强大(可循环、可动态调整);
核心特性
多阶段同步:线程可以分多个阶段执行(如
phase-0 → phase-1 → phase-2
),每个阶段都需要线程同步后再继续;动态线程管理:
可通过
register()
动态增加参与线程;可通过
arriveAndDeregister()
动态减少参与线程;
灵活的阶段控制:每个阶段完成后,可自定义逻辑(重写
onAdvance
方法),决定是否继续下一阶段;
工作流程
- 阶段 0(
phase-0
):- 多个线程执行“阶段 0”的任务;
- 线程调用
arriveAndAwaitAdvance()
表示“阶段 0 完成”,等待其他线程也完成“阶段 0”;
- 进入阶段 1(
phase-1
):- 所有线程都完成“阶段 0”后,一起进入“阶段 1”;
- 重复“执行任务 → 同步等待”的流程;
- 多阶段循环:支持多个阶段(
phase-0 → phase-1 → phase-2 → ...
),直到手动终止或所有线程退出;
- 阶段 0(
6.2 常用 API
构造方法
构造方法 作用 Phaser()
初始化一个“参与任务数为 0”的 Phaser
,后续用register()
动态添加线程Phaser(int parties)
指定初始参与线程数(类似 CyclicBarrier
的parties
)Phaser(Phaser parent)
作为子阶段协同器,依附于父 Phaser
,适合复杂多阶段场景Phaser(Phaser parent, int parties)
结合父 Phaser
和初始线程数,更灵活的初始化增减参与线程
方法 作用 int register()
动态增加一个参与线程,返回当前阶段号 int bulkRegister(int parties)
动态增加多个参与线程(批量注册),返回当前阶段号 int arriveAndDeregister()
线程完成任务后,退出参与(减少一个线程),返回当前阶段号 到达、等待方法
方法 作用 int arrive()
标记“当前线程完成阶段任务”,但不等待其他线程,继续执行 int arriveAndAwaitAdvance()
标记“当前线程完成阶段任务”,等待其他线程也完成,再进入下一阶段 int awaitAdvance(int phase)
等待进入指定阶段(需当前阶段匹配) int awaitAdvanceInterruptibly(int phase)
同上,但等待中可被中断 int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
带超时的等待,超时后抛出异常 阶段自定义逻辑
protected boolean onAdvance(int phase, int registeredParties)
作用:每个阶段完成后,自动调用此方法,决定是否继续下一阶段;
返回值:
true
:阶段结束,Phaser
不再继续(可用于终止多阶段流程);false
:继续下一阶段。
6.3 使用
需求:模拟了公司团建的多阶段活动,团建分4个阶段,参与人数动态变化:
阶段0:所有人到公司集合 → 出发去公园
阶段1:所有人到公园门口 → 出发去餐厅
阶段2:部分人到餐厅(有人提前离开,有人新增加入)→ 开始用餐
阶段3:用餐结束 → 活动终止
参与人数不固定(有人早退、有人中途加入),每个阶段必须等人齐了再继续
代码:
public class PhaserDemo { public static void main(String[] args) { final Phaser phaser = new Phaser() { // 每个阶段完成后自动调用下面的 onAdvance 方法,打印阶段总结,并判断是否终止(只剩主线程时终止) @Override protected boolean onAdvance(int phase, int registeredParties) { // registeredParties 是当前注册的线程数(包括主线程),减去 1 得到实际员工数 // 主线程:作为协调者,全程参与并动态添加中途加入者 int staffs = registeredParties - 1; // 每个阶段完成后的提示信息 switch (phase) { case 0: System.out.println("大家都到公司了,出发去公园,人数:" + staffs); break; case 1: System.out.println("大家都到公园门口了,出发去餐厅,人数:" + staffs); break; case 2: System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs); break; } // 终止条件:只剩主线程(registeredParties == 1) return registeredParties == 1; } }; // 注册主线程————让主线程全程参与 phaser.register(); final StaffTask staffTask = new StaffTask(); // 全程参与者:3 人(参与所有 4 个阶段) for (int i = 0; i < 3; i++) { // 添加任务数 phaser.register(); new Thread(() -> { try { staffTask.step1Task(); //到达后等待其他任务到达 phaser.arriveAndAwaitAdvance(); staffTask.step2Task(); phaser.arriveAndAwaitAdvance(); staffTask.step3Task(); phaser.arriveAndAwaitAdvance(); staffTask.step4Task(); // 完成了,注销离开 phaser.arriveAndDeregister(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } // 早退者:2 人(只参与前 2 个阶段,到公园后离开) for (int i = 0; i < 2; i++) { phaser.register(); new Thread(() -> { try { staffTask.step1Task(); phaser.arriveAndAwaitAdvance(); staffTask.step2Task(); System.out.println("员工【" + Thread.currentThread().getName() + "】回家了"); // 完成了,注销离开 phaser.arriveAndDeregister(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } // 中途加入者:4 人(从阶段 2 开始参与,直接到餐厅聚餐) while (!phaser.isTerminated()) { int phase = phaser.arriveAndAwaitAdvance(); if (phase == 2) { for (int i = 0; i < 4; i++) { phaser.register(); new Thread(() -> { try { staffTask.step3Task(); phaser.arriveAndAwaitAdvance(); staffTask.step4Task(); // 完成了,注销离开 phaser.arriveAndDeregister(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } } } static final Random random = new Random(); // 封装了 4 个阶段的具体动作(从家出发→到公司→去公园→去餐厅→用餐),每个阶段用 Thread.sleep 模拟耗时 static class StaffTask { public void step1Task() throws InterruptedException { // 第一阶段:来公司集合 String staff = "员工【" + Thread.currentThread().getName() + "】"; System.out.println(staff + "从家出发了……"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "到达公司"); } public void step2Task() throws InterruptedException { // 第二阶段:出发去公园 String staff = "员工【" + Thread.currentThread().getName() + "】"; System.out.println(staff + "出发去公园玩"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "到达公园门口集合"); } public void step3Task() throws InterruptedException { // 第三阶段:去餐厅 String staff = "员工【" + Thread.currentThread().getName() + "】"; System.out.println(staff + "出发去餐厅"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "到达餐厅"); } public void step4Task() throws InterruptedException { // 第四阶段:就餐 String staff = "员工【" + Thread.currentThread().getName() + "】"; System.out.println(staff + "开始用餐"); Thread.sleep(random.nextInt(5000)); System.out.println(staff + "用餐结束,回家"); } } }
阶段0:到公司集合
- 主线程 + 3个全程者 + 2个早退者 → 共6个线程,调用
step1Task()
(从家到公司) - 完成后调用
phaser.arriveAndAwaitAdvance()
→ 等待所有人到公司 - 阶段结束:
onAdvance
触发,打印“出发去公园,人数:5”(6-1=5)
- 主线程 + 3个全程者 + 2个早退者 → 共6个线程,调用
阶段1:到公园门口
- 所有线程调用
step2Task()
(从公司到公园),完成后调用phaser.arriveAndAwaitAdvance()
→ 等待所有人到公园 - 2个早退者完成后调用
phaser.arriveAndDeregister()
→ 退出(注册线程数变为6-2=4) - 阶段结束:
onAdvance
触发,打印“出发去餐厅,人数:3”(4-1=3,只剩3个全程者+主线程)
- 所有线程调用
阶段2:到餐厅集合
- 主线程动态添加:检测到阶段2时,新增4个中途加入者 → 注册线程数变为4+4=8
- 3个全程者调用
step3Task()
(从公园到餐厅) - 4个新加入者直接调用
step3Task()
(到餐厅)
- 3个全程者调用
- 所有人调用
phaser.arriveAndAwaitAdvance()
→ 等待到餐厅 - 阶段结束:
onAdvance
触发,打印“开始用餐,人数:7”(8-1=7,3+4=7个员工+主线程)
- 主线程动态添加:检测到阶段2时,新增4个中途加入者 → 注册线程数变为4+4=8
阶段3:用餐结束
- 所有7人调用
step4Task()
(用餐),完成后调用phaser.arriveAndDeregister()
→ 所有人退出(注册线程数逐渐减少至1,只剩主线程) - 终止条件:
onAdvance
检测到registeredParties == 1
→ 返回true
,Phaser
终止
- 所有7人调用
Phaser
核心特性体现多阶段同步:通过
arriveAndAwaitAdvance()
实现每个阶段的等待,确保人齐后再进入下一阶段;动态线程管理:
phaser.register()
:新增参与者(如中途加入的4人);phaser.arriveAndDeregister()
:参与者退出(如早退者和用餐结束的人);
阶段自定义逻辑:
onAdvance
方法实现每个阶段的总结,并控制流程终止条件;灵活的协同:相比
CyclicBarrier
(固定线程数),Phaser
能应对“有人早退、有人中途加入”的动态场景。
6.4 应用场景总结
多线程任务分配:把一个复杂任务拆成多个子任务,分配给不同线程并行执行,且需要协调子任务的进度(比如所有子任务完成后,再合并结果);
用
Phaser
分阶段管理:- 阶段 0:子任务分配,线程开始执行
- 阶段 1:所有子任务完成,合并结果
支持动态调整线程数(比如某个子任务需要更多线程,用
register()
新增);
多级任务流程:任务需要分多个层级/阶段执行,必须等当前级所有任务完成,才能触发下一级任务(比如“数据采集→数据清洗→数据汇总→结果输出”);
每个层级对应
Phaser
的一个阶段(phase-0
采集→phase-1
清洗→phase-2
汇总);通过
arriveAndAwaitAdvance()
确保“当前级完成后,再进入下一级”,流程更清晰;
模拟并行计算:模拟分布式并行计算(比如科学计算、大数据处理 ),需要协调多个线程的“计算阶段”(比如矩阵计算分块执行,所有分块完成后再合并);
- 用
Phaser
同步“分块计算阶段”和“合并阶段”,确保:- 所有分块计算完成(阶段 0 同步);
- 合并结果后,再进入下一阶段(阶段 1 同步);
- 用
阶段性任务:任务天然是阶段性的,每个阶段需要所有线程同步后再继续(比如“团队项目”:需求评审→开发→测试→上线,每个阶段必须全员完成);
每个阶段对应
Phaser
的phase
,通过arriveAndAwaitAdvance()
实现“阶段同步”;支持动态调整参与线程(比如测试阶段需要新增测试人员,用
register()
加入);
上面所有场景都需要多阶段同步 + 动态线程协作,每个阶段必须等所有线程完成,再进入下一阶段。
Phaser
优势:比
CyclicBarrier
更灵活:支持动态增减线程、多阶段自定义逻辑(onAdvance
);比
CountDownLatch
更强大:可循环分阶段,而非一次性同步。