JUC常用线程辅助类详解

发布于:2025-08-18 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、CountDownLatch

1.1 概述

倒计时门闩, 当执行任务的线程数量到达为0的时候,触发.

  • 可以理解为: 一个教室有多个人,直到所有人走光之后, 班长锁门, 这里需要等待所有人都走完.

一种同步辅助,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

CountDownLatch使用给定的计数进行初始化。 由于调用了countDown方法, await方法会阻塞直到当前计数达到零,然后释放所有等待线程,并且任何后续的await调用都会立即返回。 这是一种一次性现象——计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。

CountDownLatch是一种通用的同步工具,可用于多种目的。 计数为 1 的CountDownLatch用作简单的开/关锁存器或门:调用await所有线程在门处等待,直到它被调用countDown的线程countDown 。 初始化为N的CountDownLatch可用于使一个线程等待,直到N 个线程完成某个操作,或者某个操作已完成 N 次.
在这里插入图片描述

1.2 示例代码

package cn.tcmeta.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * CountDownLatchDemo
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        // 等待的线程数量为5
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            new Thread(() ->{
                try {
                    System.out.println(Thread.currentThread().getName() + " --> \t" +  " 正在执行!");
                    TimeUnit.MILLISECONDS.sleep(5000);
                    System.out.println(Thread.currentThread().getName() + " --> \t" +  "执行成功!" );
                    // 当任务执行完成之后,要进行减1操作
                    countDownLatch.countDown();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }, "线程:  " + i).start();
        }
        
        try {
            // 当计数为0时,停止阻塞.相当于打开门闩.执行后续的逻辑
            countDownLatch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
        
        System.out.println(Thread.currentThread().getName() + " --> \t" +  " 所有任务执行完成!");
    }
}

程序执行结果:
在这里插入图片描述

1.3 使用场景

1.3.1 并行任务启动后等待所有任务完成

场景:主线程需要启动多个子线程并行处理任务(如批量处理数据),并在所有子线程完成后进行结果汇总

CountDownLatch doneLatch = new CountDownLatch(TASK_COUNT);

for (int i = 0; i < TASK_COUNT; i++) {
    new Thread(() -> {
        try {
            // 执行子任务(如调用外部API、计算等)
        } finally {
            doneLatch.countDown(); // 任务完成时计数器减1
        }
    }).start();
}

doneLatch.await(); // 主线程阻塞,直到所有子线程完成
System.out.println("所有任务处理完毕,开始汇总结果");

1.3.2 服务启动时依赖资源初始化

场景:微服务启动时需要加载多个独立资源(如数据库连接、缓存预热、配置文件加载),主线程需等待所有资源初始化完成后才启动服务。

CountDownLatch initLatch = new CountDownLatch(3); // 3个依赖资源

// 线程1:初始化数据库
new Thread(() -> { initDatabase(); initLatch.countDown(); }).start();

// 线程2:预热缓存
new Thread(() -> { warmUpCache(); initLatch.countDown(); }).start();

// 线程3:加载配置文件
new Thread(() -> { loadConfig(); initLatch.countDown(); }).start();

initLatch.await(); // 等待所有资源初始化
startServer(); // 启动服务

1.3.3 并发性能测试

模拟高并发请求,需要所有请求线程在同一时刻发起操作(如压力测试接口)。

CountDownLatch startLatch = new CountDownLatch(1); // 发令枪
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); // 结束标识

for (int i = 0; i < THREAD_COUNT; i++) {
    new Thread(() -> {
        try {
            startLatch.await(); // 所有线程在此等待
            callTestAPI();      // 同时发起请求
        } finally {
            endLatch.countDown();
        }
    }).start();
}

Thread.sleep(500); // 确保所有线程就绪
startLatch.countDown(); // 统一放行
endLatch.await();      // 等待所有请求完成
generateReport();       // 生成测试报告

1.3.4 多阶段任务协作

游戏服务器中,玩家需全部准备就绪后游戏才开始,且需等待所有玩家完成第一关卡后再进入下一关。

// 等待所有玩家准备
CountDownLatch readyLatch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player -> 
    player.prepare(() -> readyLatch.countDown())
);
readyLatch.await();
startGame();

// 等待所有玩家完成第一关
CountDownLatch level1Latch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player -> 
    player.finishLevel1(() -> level1Latch.countDown())
);
level1Latch.await();
startLevel2();

1.3.5 分布式任务分片同步

[!tip]

分布式计算中,主节点将任务分发给多个工作节点,需等待所有节点返回结果后再合并。

List<WorkerNode> nodes = getWorkerNodes();
CountDownLatch resultLatch = new CountDownLatch(nodes.size());

for (WorkerNode node : nodes) {
    node.executeTask(dataShard, () -> {
        // 返回分片处理结果
        resultLatch.countDown();
    });
}

resultLatch.await();
mergeResults(); // 合并所有分片结果

1.4 关键注意事项

  • 计数器不可重置:CountDownLatch的计数器归零后无法重置,如需重复使用,改用CyclicBarrier。
  • 异常处理:确保countDown()在finally块中调用,避免线程异常导致主线程永久阻塞。
  • 超时控制:使用await(long timeout, TimeUnit unit)防止死锁。
  • 资源释放:等待线程被中断时,需妥善处理资源(如关闭连接)。

二、CyclicBarrier

2.1 概述

一种同步辅助工具,「它允许一组线程全部等待彼此到达公共屏障点」「CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用.」

「CyclicBarrier支持可选的Runnable命令,该命令在每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。」

与CountDownLatch类似,但是它不是计数相减, 而是计数相加操作;功能类似;

  • 执行一个线程, 加一个数,当执行的线程数达到指定的线程数的时候,触发一个操作;
  • 集齐七个龙珠,可以召唤神龙, 或者理解成: 同学都到了, 咱们再上课

2.2 基本示例代码

package cn.tcmeta.thread;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * @author laoren
 *  CyclicBarrier类似于CountDownLatch, 不同的是这个是做加法,相当于一个屏障.
 *          可以理解为: 集齐七颗龙珠,就可以召唤神龙了. 它的计数是相加,达到某个值之后, 触发后续操作;
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        /**
         * 构造方法说明:
         *      参数说明:
         *              parities: 在屏障到达之前必须调用 await 的线程数
         *              barrierAction: 到达屏障点触发的动作.
         *     public CyclicBarrier(int parties, Runnable barrierAction) {
         *         if (parties <= 0) throw new IllegalArgumentException();
         *         this.parties = parties;
         *         this.count = parties;
         *         this.barrierCommand = barrierAction;
         *     }
         */
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("已经集齐了七颗龙珠,神龙现世!"));

        for (int i = 0; i < 7; i++) {
            int count = i;
            new Thread(() ->{
                System.out.println(Thread.currentThread().getName() + " --> \t" +  " 已经集齐了 " + count + " 龙珠了!");
                try {
                    TimeUnit.MILLISECONDS.sleep(2000);
                    // 未到达屏障点
                    cyclicBarrier.await();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }, "线程 - " + i).start();
        }
    }
}

在这里插入图片描述

2.3 使用场景

2.3.1 并行计算分阶段处理

分布式计算中,每个线程处理数据分片,需等待所有线程完成当前阶段才能进入下一阶段(如机器学习模型的迭代训练)

class DataProcessor {
    final int THREAD_COUNT = 4;
    final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> 
        System.out.println("当前阶段完成,开始下一阶段")
    );

    void process() {
        ExecutorService exec = Executors.newFixedThreadPool(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; i++) {
            exec.submit(() -> {
                for (int phase = 1; phase <= 3; phase++) {  // 3个处理阶段
                    processPhase(phase);  // 当前阶段计算
                    barrier.await();     // 等待其他线程
                }
            });
        }
        exec.shutdown();
    }

    void processPhase(int phase) {
        // 模拟分片计算
        System.out.println(Thread.currentThread().getName() 
            + " 完成阶段" + phase);
    }
}

在这里插入图片描述

2.3.2 多玩家游戏同步

游戏关卡中,所有玩家必须完成当前关卡才能同时进入下一关

package cn.tcmeta.thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class GameServer {
    final int PLAYERS = 3;
    final CyclicBarrier levelBarrier = new CyclicBarrier(PLAYERS, () -> 
        System.out.println("所有玩家通关!开始新关卡")
    );

    void startGame() {
        for (int i = 1; i <= PLAYERS; i++) {
            new Thread(() -> {
                while (true) {
                    // completeLevel();  // 玩家完成当前关卡
                    try {
                        levelBarrier.await();  // 等待其他玩家
                    } catch (InterruptedException | BrokenBarrierException e) {
                        throw new RuntimeException(e);
                    }
                }
            }, "玩家"+i).start();
        }
    }
}

2.3.3 性能测试压力生成

需要精确控制所有压测线程在同一毫秒发起请求

package cn.tcmeta.thread;

import java.util.concurrent.CyclicBarrier;

class StressTester {
    final int THREADS = 100;
    final CyclicBarrier startBarrier = new CyclicBarrier(THREADS);

    void test() throws Exception {
        for (int i = 0; i < THREADS; i++) {
            new Thread(() -> {
                awaitBarrier();    // 等待发令枪
                sendRequest();     // 发送API请求
            }).start();
        }
    }

    private void sendRequest() {
        System.out.println("Sending request...");
    }

    void awaitBarrier() {
        try {
            startBarrier.await();  // 所有线程在此同步
        } catch (Exception e) { /*...*/ }
    }

    public static void main(String[] args) throws Exception {
        new StressTester().test();
    }
}

在这里插入图片描述

2.3.4 遗传算法迭代

并行遗传算法中,每代种群需完成评估后才能进行交叉变异

class GeneticAlgorithm {
    final int POPULATION_SIZE = 50;
    final CyclicBarrier generationBarrier = new CyclicBarrier(POPULATION_SIZE);

    void evolve() {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < POPULATION_SIZE; i++) {
            pool.execute(() -> {
                for (int gen = 0; gen < 1000; gen++) {
                    evaluateFitness();  // 评估个体适应度
                    generationBarrier.await();
                    crossover();        // 等待后执行交叉操作
                }
            });
        }
        pool.shutdown();
    }
}

2.3.5 金融交易对账系统

银行每日需等待所有分行上传数据后执行全局对账

class ReconciliationSystem {
    final int BRANCHES = 20;
    final CyclicBarrier dailyBarrier = new CyclicBarrier(BRANCHES, this::runReconciliation);

    void startDailyJob() {
        for (String branch : getBranches()) {
            new Thread(() -> {
                uploadData(branch);    // 分行上传数据
                dailyBarrier.await();  // 等待其他分行
            }).start();
        }
    }

    void runReconciliation() {
        System.out.println("开始全局对账..."); 
    }
}

2.4 CyclicBarrier核心特性

在这里插入图片描述

2.5 最佳实践

最佳实践:

  • 「在循环体内使用 CyclicBarrier 处理分阶段任务」
  • 「栅栏动作中避免长时间阻塞(会延迟线程释放)」
  • 「配合 ExecutorService 管理线程生命周期」
  • 「使用 isBroken() 检测屏障状态并处理异常」

三、Semaphore

3.1 概述

3.1.1 核心概念

信号量, 信号灯; 计数信号量。 从概念上讲,信号量维护一组许可; 可以做限流使用;

  • 可以控制同时运行的线程数, 「线程可以有好多个线程,但是它可以控制同时执行的线程个数」
  • 类似于抢车位的概念或者是厕所抢坑的例子; - 当前有7辆车, 「七个操作线程」,车位有3个「信号量是3」

3.1.2 构造方法

// 可用的初始许可证数量
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// 数量, 是否为公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

3.1.3 重点方法

// 1. acquire(), 获取操作. 当一个线程调用`acquire`操作时, 它要么通过成功获取信号量「信号量加1操作」, 要么一直等待下去;下到有线程释放信息号或者超时. 
// 2. release(), 释放操作,实际上会将信号量的值加1操作,然后唤醒等待的线程;

3.2 示例代码

3.2.1 共享资源互斥

package cn.tcmeta.thread;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @author laoren
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 信号量,初始化为1, 表示只有一个线程可以操作.
        Semaphore semaphore = new Semaphore(1);

        new Thread(() ->{
            // 获取锁操作
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " --> \t" +  " 拿到了锁哦...");
                // 模拟一下业务操作
                try {
                    TimeUnit.MILLISECONDS.sleep(3000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName() + " --> \t" +  " 完成了业务哦..");

            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }finally {
                // 释放锁
                semaphore.release();
            }
        }, "王麻子").start();

        new Thread(() ->{
            try {
                semaphore.acquire();

                System.out.println(Thread.currentThread().getName() + " --> \t" +  " 拿到了锁哦...");
                // 模拟一下业务操作
                try {
                    TimeUnit.MILLISECONDS.sleep(3000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName() + " --> \t" +  " 完成了业务哦..");
            }catch(Exception e){
                e.printStackTrace();
            }finally {
                semaphore.release();
            }
        }, "肥七").start();
    }
}

在这里插入图片描述

3.2.2 并发线程数控制

场景:

  • 目前有三个坑位「信号量数量为3」
  • 有10个人抢这三个坑住「线程数量」
package cn.tcmeta.thread;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo2 {
    public static final int PERMITS = 3;
    public static final int PERSON_COUNT = 10;
    public static void main(String[] args) {
        // 定义信号量
        Semaphore semaphore = new Semaphore(PERMITS);

        for (int i = 0; i < PERSON_COUNT; i++) {
            new Thread(() ->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " --> \t" +  " 抢到了坑位了!");
                    try {
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);
                        System.out.println(Thread.currentThread().getName() + " --> \t" +  " 啊啊啊,,舒服...释放坑位 ~~~");
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }catch(Exception e){
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            }, "线程: " + i).start();
        }
    }
}

在这里插入图片描述

3.3 使用场景

  • Semaphore(信号量)是Java中用于控制并发线程访问资源数量的同步工具。

  • 它维护了一组许可证(permits),线程在访问资源前需要获取许可证,访问后释放许可证.

3.3.1 数据库连接池管理

限制同时使用的数据库连接数量,防止资源耗尽

public class ConnectionPool {
    private final Semaphore semaphore;
    private final List<Connection> connections = new ArrayList<>();

    public ConnectionPool(int poolSize) {
        semaphore = new Semaphore(poolSize, true); // 公平模式
        for (int i = 0; i < poolSize; i++) {
            connections.add(createConnection());
        }
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire(); // 获取许可证
        return getAvailableConnection();
    }

    public void releaseConnection(Connection conn) {
        returnConnection(conn);
        semaphore.release(); // 释放许可证
    }

    private synchronized Connection getAvailableConnection() {
        return connections.remove(0);
    }

    private synchronized void returnConnection(Connection conn) {
        connections.add(conn);
    }
    
    private Connection createConnection() {
        // 创建数据库连接
        return mock(Connection.class);
    }
}

在这里插入图片描述

3.3.2 限流器

控制API接口每秒最大请求数量

public class RateLimiter {
    private final Semaphore semaphore;
    private final int maxPermits;
    private final ScheduledExecutorService scheduler;

    public RateLimiter(int permitsPerSecond) {
        this.maxPermits = permitsPerSecond;
        semaphore = new Semaphore(permitsPerSecond);
        scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            int available = semaphore.availablePermits();
            if (available < maxPermits) {
                semaphore.release(maxPermits - available);
            }
        }, 0, 1, TimeUnit.SECONDS);
    }

    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }

    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }
}

3.3.3 停车场管理系统

模拟停车场车位管理

public class CarPark {
    private final Semaphore parkingSlots;
    private final int totalSlots;
    private final Set<String> parkedCars = new HashSet<>();

    public CarPark(int totalSlots) {
        this.totalSlots = totalSlots;
        this.parkingSlots = new Semaphore(totalSlots, true); // 公平模式
    }

    public boolean parkCar(String carId) {
        if (parkingSlots.tryAcquire()) {
            synchronized (this) {
                parkedCars.add(carId);
            }
            System.out.println(carId + " parked. Available slots: " + parkingSlots.availablePermits());
            return true;
        }
        System.out.println(carId + " failed to park. No available slots.");
        return false;
    }

    public void leaveCar(String carId) {
        synchronized (this) {
            if (parkedCars.remove(carId)) {
                parkingSlots.release();
                System.out.println(carId + " left. Available slots: " + parkingSlots.availablePermits());
            }
        }
    }
}

3.3.4 打印机池管理

多线程共享有限打印机资源

public class PrinterPool {
    private final Semaphore semaphore;
    private final List<Printer> printers = new ArrayList<>();
    
    public PrinterPool(int printerCount) {
        semaphore = new Semaphore(printerCount);
        for (int i = 0; i < printerCount; i++) {
            printers.add(new Printer("Printer-" + (i+1)));
        }
    }
    
    public void printDocument(String document) throws InterruptedException {
        semaphore.acquire();
        Printer printer = null;
        try {
            printer = getAvailablePrinter();
            printer.print(document);
        } finally {
            if (printer != null) {
                returnPrinter(printer);
            }
            semaphore.release();
        }
    }
    
    private synchronized Printer getAvailablePrinter() {
        return printers.remove(0);
    }
    
    private synchronized void returnPrinter(Printer printer) {
        printers.add(printer);
    }
}

在这里插入图片描述

3.3.5服务调用限流

限制对第三方服务的并发调用数量

public class ExternalServiceInvoker {
    private final Semaphore semaphore;
    private final int maxConcurrentCalls;
    
    public ExternalServiceInvoker(int maxConcurrentCalls) {
        this.maxConcurrentCalls = maxConcurrentCalls;
        this.semaphore = new Semaphore(maxConcurrentCalls);
    }
    
    public String invokeService(String request) {
        if (!semaphore.tryAcquire()) {
            throw new ServiceOverloadException("Too many concurrent requests");
        }
        
        try {
            return callExternalService(request);
        } finally {
            semaphore.release();
        }
    }
    
    private String callExternalService(String request) {
        // 调用外部服务
        return "Response for: " + request;
    }
}

3.3.6 生产者-消费者模型(有界缓冲区)

使用Semaphore实现生产者-消费者模式

public class BoundedBuffer<E> {
    private final Semaphore availableItems;
    private final Semaphore availableSpaces;
    private final Queue<E> buffer = new LinkedList<>();
    private final int capacity;
    
    public BoundedBuffer(int capacity) {
        this.capacity = capacity;
        this.availableItems = new Semaphore(0);
        this.availableSpaces = new Semaphore(capacity);
    }
    
    public void put(E item) throws InterruptedException {
        availableSpaces.acquire(); // 等待空槽
        synchronized (this) {
            buffer.add(item);
        }
        availableItems.release(); // 增加可用项目
    }
    
    public E take() throws InterruptedException {
        availableItems.acquire(); // 等待可用项目
        E item;
        synchronized (this) {
            item = buffer.poll();
        }
        availableSpaces.release(); // 增加空槽
        return item;
    }
}

在这里插入图片描述

3.4 Semaphore关键特性总结

在这里插入图片描述

3.5 使用建议

  • 资源保护:当需要保护有限资源时使用Semaphore
  • 公平性考虑:在资源竞争激烈时使用公平模式防止线程饥饿
  • 异常处理:确保在finally块中释放许可证,避免许可证泄漏
  • 许可证数量:合理设置许可证数量,过多失去限制意义,过少影响性能
  • 替代方案:对于简单计数场景,考虑使用CountDownLatch或CyclicBarrier

四、LockSupport

用于创建锁和其他同步类的基本线程阻塞基元。

4.1 重要方法

4.1.1 使用线程进入阻塞状态

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    U.park(false, 0L);
    setBlocker(t, null);
}

4.1.2 唤醒某个线程

public static void unpark(Thread thread) {
    if (thread != null)
        U.unpark(thread);
}

4.2 示例代码

需求:

  • 开启一条线程, 循环输出10个数;
  • 当打印到第五个数时,线程阻塞住.
  • 10s之后再继续打印其值;
package cn.tcmeta.thread;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * @author laoren
 */
public class LockSupportDemo {
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " --> \t" + " 当前值是: " + i);

                    if (i == 5) {
                        System.out.println(Thread.currentThread().getName() + " --> \t" +  " 被阻塞了..等一会继续干活....");
                        LockSupport.park(); // 阻塞当前线程
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "print-info-thread: ");

        t1.start(); // 启动打印的线程
        try {
            // 10秒钟睡眼
            TimeUnit.SECONDS.sleep(10);
            LockSupport.unpark(t1); // 唤醒t1线程,继续干活
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

4.3 使用场景

LockSupport 是 Java 并发包中一个强大的线程阻塞工具,提供了比传统 wait/notify 更灵活、更底层的线程控制能力。

4.3.1 构建高级同步工具(如 AQS)

在实现自定义锁或同步器时,使用 LockSupport 作为底层阻塞机制

public class SimpleReentrantLock {
    private volatile Thread owner = null;
    private int lockCount = 0;
    private final ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();

    public void lock() {
        Thread current = Thread.currentThread();
        if (owner == current) {
            lockCount++;
            return;
        }
        
        waiters.add(current);
        while (owner != null || !waiters.peek().equals(current) ||
               !compareAndSetOwner(null, current)) {
            LockSupport.park(this); // 阻塞当前线程
        }
        waiters.remove();
        lockCount = 1;
    }

    public void unlock() {
        if (owner != Thread.currentThread()) {
            throw new IllegalMonitorStateException();
        }
        if (--lockCount == 0) {
            owner = null;
            // 唤醒队首等待线程
            Thread next = waiters.peek();
            if (next != null) {
                LockSupport.unpark(next);
            }
        }
    }
    
    private boolean compareAndSetOwner(Thread expect, Thread update) {
        // 原子更新owner
        return UNSAFE.compareAndSwapObject(this, ownerOffset, expect, update);
    }
    
    // Unsafe 相关初始化代码省略...
}

在这里插入图片描述

4.3.2 中断敏感的阻塞操作

实现可响应中断的等待机制,避免 Thread.sleep() 的局限性

public class InterruptibleTask {
    private volatile boolean completed = false;
    
    public void doTask() {
        Thread worker = new Thread(() -> {
            // 模拟长时间任务
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            completed = true;
            LockSupport.unpark(Thread.currentThread()); // 通知完成
        });
        worker.start();
        
        // 等待任务完成,可响应中断
        while (!completed) {
            LockSupport.park(this);
            if (Thread.interrupted()) {
                System.out.println("Task waiting interrupted!");
                worker.interrupt();
                break;
            }
        }
    }
}

4.3.3 定时阻塞与精确唤醒

需要精确控制线程阻塞时间的场景(如定时任务调度)

public class PrecisionScheduler {
    private final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();
    
    public void scheduleAt(Runnable task, long deadlineNanos) {
        Thread worker = new Thread(() -> {
            long now;
            while ((now = System.nanoTime()) < deadlineNanos) {
                long nanosToWait = deadlineNanos - now;
                if (nanosToWait > 0) {
                    LockSupport.parkNanos(nanosToWait);
                }
            }
            task.run();
        });
        worker.start();
    }
    
    // 取消任务(提前唤醒)
    public void cancel(Thread worker) {
        LockSupport.unpark(worker);
    }
}

4.3.4 线程协作与屏障

实现自定义的线程协作机制(替代 CountDownLatch 或 CyclicBarrier)

public class CustomBarrier {
    private final int parties;
    private final AtomicInteger count = new AtomicInteger();
    private final Thread[] waiters;
    
    public CustomBarrier(int parties) {
        this.parties = parties;
        this.waiters = new Thread[parties];
    }
    
    public void await() {
        int index = count.getAndIncrement();
        if (index < parties - 1) {
            waiters[index] = Thread.currentThread();
            LockSupport.park(this); // 阻塞直到所有线程到达
        } else {
            // 最后一个线程唤醒所有等待线程
            for (int i = 0; i < parties - 1; i++) {
                LockSupport.unpark(waiters[i]);
            }
            count.set(0); // 重置屏障
        }
    }
}

在这里插入图片描述

4.3.5 高性能消息队列

实现无锁或低争用的生产者-消费者模型

public class LockSupportQueue<T> {
    private static class Node<T> {
        T item;
        volatile Node<T> next;
    }
    
    private volatile Node<T> head;
    private volatile Node<T> tail;
    private volatile Thread consumer;
    
    public LockSupportQueue() {
        Node<T> dummy = new Node<>();
        head = dummy;
        tail = dummy;
    }
    
    public void put(T item) {
        Node<T> node = new Node<>();
        node.item = item;
        
        Node<T> t = tail;
        tail = node;
        t.next = node;
        
        // 唤醒等待的消费者
        if (consumer != null) {
            LockSupport.unpark(consumer);
            consumer = null;
        }
    }
    
    public T take() throws InterruptedException {
        Node<T> h = head;
        Node<T> first = h.next;
        
        if (first != null) {
            head = first;
            return first.item;
        }
        
        // 队列为空,注册自己为消费者并阻塞
        consumer = Thread.currentThread();
        while (first == null) {
            LockSupport.park();
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            h = head;
            first = h.next;
        }
        
        head = first;
        return first.item;
    }
}

4.3.6 死锁检测与恢复

实现带超时的资源获取,避免死锁

public class DeadlockAvoider {
    private final Lock resourceLock = new ReentrantLock();
    
    public boolean tryLockWithTimeout(long timeout, TimeUnit unit) {
        Thread current = Thread.currentThread();
        final long deadline = System.nanoTime() + unit.toNanos(timeout);
        
        // 尝试获取锁
        if (resourceLock.tryLock()) {
            return true;
        }
        
        // 设置超时线程
        Thread timeoutThread = new Thread(() -> {
            try {
                Thread.sleep(unit.toMillis(timeout));
                LockSupport.unpark(current); // 超时唤醒
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        timeoutThread.start();
        
        // 阻塞直到获取锁或超时
        while (System.nanoTime() < deadline) {
            if (resourceLock.tryLock()) {
                timeoutThread.interrupt();
                return true;
            }
            LockSupport.parkUntil(deadline);
        }
        return false;
    }
}

4.5 LockSupport核心优势

在这里插入图片描述

4.6 使用注意事项

  • 许可不可累加:多次调用 unpark 不会累积许可(最多只有一个有效许可)
  • 中断处理:调用 park 后线程被中断会立即返回,但不会清除中断状态
  • 虚假唤醒:和 Object.wait() 类似,park 也可能出现虚假唤醒
  • 同步配合:通常需要与其他同步机制(如 volatile 变量)配合使用
  • 避免滥用:在普通业务代码中优先使用高级并发工具类

4.7 LockSupport vs Object.wait/notify

在这里插入图片描述

LockSupport 是构建 Java 并发框架的基石(如 AQS),在开发高性能并发工具时尤其有用。对于日常业务开发,优先考虑使用基于 LockSupport 构建的高级并发工具(如 ReentrantLock, CountDownLatch 等)。

五、Exchanger

5.1 概述

线程可以配对和交换元素对的同步点。 每个线程在进入exchange方法时呈现一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。 Exchanger 可以被视为SynchronousQueue的双向形式。 交换器可能在遗传算法和管道设计等应用中很有用。

5.2 基本示例

package cn.tcmeta.thread;

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    // 初始化Exchanger对象
    public static final Exchanger<String> EXCHANGER = new Exchanger<>();
    public static void main(String[] args) {
        new Thread(() ->{
            String s1 = "它们都老了吗?它们在哪里呀,幸运的是我,曾陪他们开放............";
            System.out.println(Thread.currentThread().getName() + " --> \t" +  "交换之前: " + s1);

            try {
                s1 = EXCHANGER.exchange(s1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println(Thread.currentThread().getName() + " --> \t" +  " 交换之后: " + s1);
        }, "那些花: ").start();

        new Thread(() ->{
            String s1 = "明天你是否会想起,你昨天写的日记............";
            System.out.println(Thread.currentThread().getName() + " --> \t" +  "交换之前: " + s1);

            try {
                s1 = EXCHANGER.exchange(s1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println(Thread.currentThread().getName() + " --> \t" +  " 交换之后: " + s1);
        }, "同桌的你: ").start();
    }
}

在这里插入图片描述

5.3 使用场景

Exchanger 是 Java 并发包中一个独特的同步工具,用于两个线程之间交换数据。它提供了一个同步点,在这个点上两个线程可以交换彼此的对象。

5.3.1 生产者-消费者缓冲区交换

双缓冲技术中,生产者和消费者交换缓冲区引用,避免数据复制开销

public class DoubleBufferProcessor {
    private final Exchanger<Buffer> exchanger = new Exchanger<>();
    private final ExecutorService executor = Executors.newFixedThreadPool(2);
    
    static class Buffer {
        byte[] data = new byte[1024];
        int size;
        
        void fill(InputStream in) throws IOException {
            size = in.read(data);
        }
        
        void process() {
            // 处理缓冲区数据
            System.out.println("Processing " + size + " bytes");
        }
    }
    
    public void start(InputStream input) {
        executor.submit(() -> producerTask(input));
        executor.submit(() -> consumerTask());
    }
    
    private void producerTask(InputStream input) {
        Buffer current = new Buffer();
        try {
            while (true) {
                current.fill(input);  // 填充缓冲区
                current = exchanger.exchange(current);  // 交换缓冲区
            }
        } catch (IOException | InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void consumerTask() {
        Buffer current = new Buffer();
        try {
            while (true) {
                current = exchanger.exchange(current);  // 交换缓冲区
                current.process();  // 处理数据
                current.size = 0;  // 清空缓冲区
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在这里插入图片描述

5.3.2 遗传算法交叉操作

在并行遗传算法中,两个线程交换染色体进行交叉操作

public class GeneticAlgorithm {
    private final Exchanger<Chromosome> exchanger = new Exchanger<>();
    private final int populationSize;
    
    public GeneticAlgorithm(int populationSize) {
        this.populationSize = populationSize;
    }
    
    public void evolve() {
        ExecutorService pool = Executors.newFixedThreadPool(populationSize);
        for (int i = 0; i < populationSize; i += 2) {
            pool.submit(new Individual(i));
            pool.submit(new Individual(i + 1));
        }
    }
    
    class Individual implements Runnable {
        private Chromosome chromosome;
        
        Individual(int id) {
            this.chromosome = new Chromosome(id);
        }
        
        @Override
        public void run() {
            try {
                chromosome.evaluateFitness();
                
                // 与配对的个体交换染色体
                Chromosome partner = exchanger.exchange(chromosome);
                
                // 执行交叉操作
                chromosome = chromosome.crossover(partner);
                
                chromosome.mutate();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    static class Chromosome {
        // 染色体实现
        Chromosome crossover(Chromosome other) { return this; }
        void evaluateFitness() {}
        void mutate() {}
    }
}

5.3.3 网络协议握手交换

实现自定义协议中双方交换密钥或初始化向量

public class SecureChannel {
    private final Exchanger<byte[]> keyExchanger = new Exchanger<>();
    
    public void establishChannel(Socket socket1, Socket socket2) {
        new Thread(() -> partyTask(socket1)).start();
        new Thread(() -> partyTask(socket2)).start();
    }
    
    private void partyTask(Socket socket) {
        try {
            // 生成随机密钥
            byte[] myKey = generateSessionKey();
            
            // 发送公钥给对方
            OutputStream out = socket.getOutputStream();
            out.write(myKey);
            out.flush();
            
            // 接收对方的公钥
            InputStream in = socket.getInputStream();
            byte[] theirKey = new byte[myKey.length];
            in.read(theirKey);
            
            // 交换密钥并计算共享密钥
            byte[] sharedKey = keyExchanger.exchange(combineKeys(myKey, theirKey));
            
            // 使用共享密钥开始通信
            startEncryptedCommunication(socket, sharedKey);
        } catch (IOException | InterruptedException e) {
            handleError(e);
        }
    }
}

5.3.4 游戏玩家物品交易

在线游戏中两个玩家安全交换物品

public class PlayerTradingSystem {
    private final Exchanger<TradeOffer> exchanger = new Exchanger<>();
    
    public void trade(Player player1, Player player2) {
        new Thread(() -> playerTradeTask(player1, player2)).start();
        new Thread(() -> playerTradeTask(player2, player1)).start();
    }
    
    private void playerTradeTask(Player player, Player partner) {
        try {
            // 玩家选择要交易的物品
            TradeOffer myOffer = player.createTradeOffer();
            
            // 显示交易界面
            player.showTradeInterface(partner);
            
            // 等待双方确认
            if (player.confirmTrade()) {
                // 交换交易内容
                TradeOffer theirOffer = exchanger.exchange(myOffer);
                
                // 执行交易
                player.completeTrade(theirOffer);
            } else {
                player.cancelTrade();
            }
        } catch (InterruptedException e) {
            player.cancelTrade();
            Thread.currentThread().interrupt();
        }
    }
    
    static class TradeOffer {
        List<Item> items;
        int gold;
    }
}

在这里插入图片描述

5.3.5 测试工具中的请求-响应交换

在性能测试工具中,一个线程发送请求,另一个线程验证响应

public class RequestResponseValidator {
    private final Exchanger<HttpResponse> exchanger = new Exchanger<>();
    
    public void testEndpoint(String url) {
        new Thread(() -> requesterTask(url)).start();
        new Thread(() -> validatorTask()).start();
    }
    
    private void requesterTask(String url) {
        try {
            HttpClient client = HttpClient.newHttpClient();
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(url))
                    .build();
            
            HttpResponse<String> response = client.send(request, 
                    HttpResponse.BodyHandlers.ofString());
            
            // 交换响应给验证器
            exchanger.exchange(response);
        } catch (IOException | InterruptedException | URISyntaxException e) {
            handleError(e);
        }
    }
    
    private void validatorTask() {
        try {
            // 获取响应并验证
            HttpResponse<?> response = exchanger.exchange(null);
            validateResponse(response);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

5.3.6 数据流水线阶段交换

在数据处理流水线中,两个相邻阶段交换处理单元

public class DataPipeline {
    private final Exchanger<DataBatch> exchanger = new Exchanger<>();
    
    public void processData(DataSource source) {
        new Thread(() -> stage1(source)).start();
        new Thread(() -> stage2()).start();
    }
    
    private void stage1(DataSource source) {
        DataBatch batch = new DataBatch();
        try {
            while (source.hasMore()) {
                // 提取数据
                batch.extract(source);
                
                // 预处理
                batch.preprocess();
                
                // 交换给下一阶段
                batch = exchanger.exchange(batch);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void stage2() {
        DataBatch batch = new DataBatch();
        try {
            while (true) {
                // 获取处理批次
                batch = exchanger.exchange(batch);
                
                // 分析数据
                batch.analyze();
                
                // 存储结果
                batch.storeResults();
                
                // 清空批次
                batch.clear();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

5.4 Exchanger核心特性总结

在这里插入图片描述
在这里插入图片描述

5.5 使用注意事项

  • 仅限两个线程:Exchanger 严格用于两个线程间的交换,多线程行为未定义
  • 死锁风险:如果一个线程未调用 exchange,另一个线程会永久阻塞
  • 对象复用:交换后对象会被对方线程修改,需要适当同步或使用不可变对象
  • 性能考虑:适合中低频交换场景,高频交换可能成为瓶颈
  • 超时设置:生产环境中应始终使用带超时的 exchange 方法
  • 资源清理:确保线程中断时正确处理资源释放

5.6 典型应用场景对比

在这里插入图片描述

Exchanger 是 Java 并发工具包中的一颗"隐藏宝石",特别适合需要精确控制两个线程间数据交换的场景。在正确的场景下使用 Exchanger 可以简化设计并提高性能,但需要注意其严格的线程配对要求。

六、没有了

学习快乐!!!


网站公告

今日签到

点亮在社区的每一天
去签到