JUC并发编程(二)Monitor/自旋/轻量级/锁膨胀/wait/notify/等待通知机制/锁消除

发布于:2025-06-05 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

一 基础

1 概念

2 卖票问题

3 转账问题

二 锁机制与优化策略

 0 Monitor

1 轻量级锁

2 锁膨胀

3 自旋

4 偏向锁

5 锁消除

6 wait /notify

7 sleep与wait的对比

8 保护性暂停模式

9 join原理

10 一个使用等待通知机制的例子

1 收信人

2 邮递员

3 信箱类

4 保护对象


一 基础

1 概念

临界区

一段代码块内如果存在对共享资源的多线程读写操作,撑这段代码区为临界区。

竞态条件

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

为了避免临界区的竞态条件发生,有多种手段可以达到目的

  • 阻塞式的解决方案:synchronized,Lock
  • 非阻塞式的解决方案:原子变量

2 卖票问题

代码实现:

首先代码实现了一个窗口类实现对卖票的相关业务逻辑,其次在主方法当中定义多个线程实现对票的购买,实现了sell方法的安全,random随机数的安全,集合的安全,同时利用.join方法等待所有线程执行结束。

package day01.mysafe;

import java.util.ArrayList;
import java.util.List;

import java.util.Vector;
import java.util.concurrent.ThreadLocalRandom;

public class example1 {
    static int randomAmount() {
        return ThreadLocalRandom.current().nextInt(1, 6);
    }

    public static void main(String[] args) throws InterruptedException {
        //模拟卖票
        List<Integer> arr = new Vector<>();
        List<Thread> threads = new ArrayList<>();
        TicketWindow ticketWindow = new TicketWindow(1000);
        for (int i = 0; i < 2200; i++) {
            Thread t = new Thread(() -> {
                int num = ticketWindow.sell(randomAmount());
                arr.add(num);
            });
            threads.add(t);
            t.start();
        }
        //等待所有线程执行完
        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("剩余:" + ticketWindow.getAmount());
        System.out.println("卖出:" + arr.stream().mapToInt(x -> x == null ? 0 : x).sum());

    }
}

/**
 * 窗口类
 */
class TicketWindow {
    private int amount;

    public TicketWindow(int number) {
        this.amount = number;
    }

    public int getAmount() {
        return amount;
    }

    /**
     * 卖票
     */
    public synchronized int sell(int amount) {
        if (this.amount >= amount) {
            this.amount -= amount;
            return amount;
        } else {
            return 0;
        }
    }
}

3 转账问题

加实例锁

锁的是当前对象,每个对象都有独立的锁,只影响一个实例的并发操作,多个实例可以并发进行。会出现死锁问题,当线程1 获取 a的锁,将a锁住需要修改b但是需要b的锁,此时需要等待b的锁,但是同时线程2获取b的锁,将b锁住需要修改a但是需要a的锁,两个线程相互等待,持续僵持导致死锁。

import java.util.concurrent.ThreadLocalRandom;

public class example2 {
    static int random() {
        return ThreadLocalRandom.current().nextInt(1, 100);
    }

    public static void main(String[] args) throws InterruptedException {
        Amount a = new Amount(1000);
        Amount b = new Amount(1000);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                a.transfer(b, random());
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                b.transfer(a, random());
            }
        }, "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.printf("余额为%d\n", b.getMoney() + a.getMoney());


    }

}

class Amount {
    private int money;

    public Amount(int money) {
        this.money = money;
    }

    public int getMoney() {
        return money;
    }

    public void setMoney(int money) {
        this.money = money;
    }

    //转账 (向a账户转账money元)
    public synchronized void transfer(Amount a, int money) {
        if (this.money >= money) {
            this.money -= money;
            a.money += money;
        }
    }

}

加类锁

import java.util.concurrent.ThreadLocalRandom;

public class example2 {
    static int random() {
        return ThreadLocalRandom.current().nextInt(1, 100);
    }

    public static void main(String[] args) throws InterruptedException {
        Amount a = new Amount(1000);
        Amount b = new Amount(1000);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                a.transfer(b, random());
            }
        },  "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                b.transfer(a, random());
            }
        },  "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.printf("余额为%d\n", b.getMoney()+a.getMoney());


    }

}

class Amount {
    private int money;

    public Amount(int money) {
        this.money = money;
    }

    public int getMoney() {
        return money;
    }

    public void setMoney(int money) {
        this.money = money;
    }

    //转账 (向a账户转账money元)
    public void transfer(Amount a, int money) {
        synchronized (Amount.class){
            if (this.money >= money) {
                this.money -= money;
                a.money += money;
            }
        }
    }

}

二 锁机制与优化策略

 0 Monitor 

Monitor被翻译为监视器或管程。Monitor 是 JVM 实现 synchronized 的核心机制,通过 EntryList、WaitSet 和 Owner 管理线程对锁的访问。

当线程首次通过 synchronized 竞争 obj 的锁时,JVM 会在底层为其关联一个 Monitor,如果Owner没有对应的线程,则会成功获取线程锁,否则进入EntryList阻塞排队. (同一对象使用synchroized)

下面介绍线程持有锁并执行 wait/sleep 的运作状态(sleep可以在没有锁的状态运行,无锁就只释放CPU,有锁释放CPU,但锁不释放)

  • 锁的争抢(进入EntryList)→ 持有(成为Owner)→ <wait>  主动让出,将锁释放(进入WaitSet)→ <notify> 唤醒后重新竞争->(进入EntryList)。

  • 锁的争抢(进入EntryList)-->持有Owner -> <sleep> 主动让出CPU时间片,不释放锁,变为TIMED_WAITING状态同时维持Owner身份->时间结束后自动恢复运行,无需重新进入EntryList竞争。

Monitor 由以下核心组件构成:

  • Owner(持有者):当前持有锁的线程。
  • EntryList(入口队列):等待获取锁的线程队列。
  • WaitSet(等待队列):调用 wait() 方法后释放锁的线程队列。

1 轻量级锁

轻量级锁:如果一个对象虽然有多线程访问,但多线程访问的时间是错开的(也就没有竞争),那么就可以使用轻量级锁来优化。

轻量级锁对使用者是透明的,语法依旧是synchroized,不需要人工干预。

  •  低竞争时用轻量级锁:当多线程竞争较小时(如交替执行同步代码),JVM 会优先使用轻量级锁(基于 CAS 操作),避免直接使用重量级锁(Monitor)的性能开销

  •  竞争加剧时升级:如果轻量级锁的 CAS 操作失败(其他线程同时竞争),JVM 会自动将其升级为重量级锁(通过操作系统互斥量实现阻塞)。

2 锁膨胀

锁膨胀是 JVM 在并发压力增大时,将轻量级锁升级为重量级锁的过程,以牺牲部分性能换取线程安全。

触发条件:

  • 轻量级锁竞争失败:当多个线程同时竞争轻量级锁(CAS 操作失败),JVM 会将锁升级为重量级锁。

  • 调用 wait()/notify():这些方法需要重量级锁(Monitor)的支持,会强制触发膨胀。

  • HashCode 冲突:若对象已计算哈希码,无法再使用偏向锁或轻量级锁,直接膨胀。

3 自旋

概念:自旋是“不停尝试”的锁获取策略

当首个线程获取轻量级锁后,第二个尝试访问的线程不会立即阻塞或促使锁升级,而是先进入自旋状态,等待原先的线程释放锁。若在自旋期间锁被释放,则该线程可直接获得锁,避免进入阻塞状态及触发锁升级至重量级锁,从而提高效率并减少资源消耗。这种机制有效降低了因锁升级带来的性能损耗,确保了在并发环境下的高效运行。

4 偏向锁

偏向锁是Java虚拟机(JVM)中一种针对同步操作的优化技术,主要用于减少无竞争情况下的同步开销。它是JVM锁升级机制的第一阶段(无锁→偏向锁→轻量级锁→重量级锁)。

在JDK15及以后版本,由于现代硬件性能提升和其他优化技术的出现,偏向锁默认被禁用,因为其带来的收益已经不明显,而撤销开销在某些场景下可能成为负担。

偏向锁与轻量级锁之间的对比

偏向锁 轻量级锁
针对无竞争场景(同一线程多次获取锁) 针对低竞争场景(多个线程交替执行,无并发冲突)
消除整个同步过程的开销 避免操作系统互斥量(Mutex)的开销

偏向锁的核心机制

  • 首次获取锁
    通过一次 CAS操作 将线程ID写入对象头的Mark Word,之后该线程进入同步块无需任何原子操作。

  • 无竞争时
    执行同步代码就像无锁一样(仅检查线程ID是否匹配)。

  • 遇到竞争
    触发偏向锁撤销(需暂停线程),升级为轻量级锁。

轻量级锁的核心机制

  • 加锁过程

    1. 在栈帧中创建锁记录(Lock Record)

    2. 用CAS将对象头的Mark Word复制到锁记录中

    3. 再用CAS将对象头替换为指向锁记录的指针(成功则获取锁)

  • 解锁过程
    用CAS将Mark Word还原回对象头(若失败说明存在竞争,升级为重量级锁)。

 关键差异

  • 偏向锁:全程只需1次CAS(首次获取时)

  • 轻量级锁:每次进出同步块都需要CAS(加锁/解锁各1次)

5 锁消除

锁消除是JVM中一项重要的编译器优化技术,它通过移除不必要的同步操作来提升程序性能。这项技术主要解决"无实际竞争情况下的无效同步"问题。

锁消除基于逃逸分析(Escape Analysis) 技术:

  1. JVM在运行时分析对象的作用域

  2. 判断对象是否会"逃逸"出当前线程(即被其他线程访问)

  3. 如果确认对象不会逃逸(线程私有),则消除该对象的所有同步操作

public String concatStrings(String s1, String s2) {
    // StringBuilder是方法内部的局部变量
    StringBuilder sb = new StringBuilder();
    sb.append(s1);  // 内部有synchronized块
    sb.append(s2);  // 内部有synchronized块
    return sb.toString();
}
  1. StringBuilder实例sb是方法局部变量

  2. 逃逸分析确认sb不会逃逸出当前线程(不会被其他线程访问)

  3. JIT编译器会消除所有synchronized同步操作

6 wait /notify

首先涉及三个组件,Owner,EntryList,WaitSet。

组件 存储线程状态 触发条件 是否持有锁 位置转移方向
Owner RUNNABLE 成功获取锁 → WaitSet (wait()时)
EntryList BLOCKED 竞争锁失败 ← WaitSet (notify()后)
WaitSet WAITING 主动调用wait() → EntryList (被唤醒后)

一个线程进入时首先会尝试获取Owner权,也就是获取锁,但是同一时刻只能有一个线程持有锁,获取成功可以直接执行临界代码区,获取失败的线程待在EntryList当中,处于Blocked阻塞状态,在持有锁阶段可以使用wait方法,会使当前锁释放,并进入WaitSet当中,处于Waiting等待状态,其必须使用notify/notifyAll唤醒才可进入EntryList当中,从而再次得到竞争Owner的权力。

代码示意

代码开启两个线程,对同一个对象实例加锁,线程1进入锁后,执行wait进入WaitSet进入阻塞等待,线程1将锁释放,此时线程2获取到锁,将线程1唤醒,线程1将后续代码执行结束。

  • 在线程2当中睡眠3s一定程度上确保其在线程1之后执行(一定程度上避免出现永久阻塞等待的状态),线程1阻塞等待,线程2唤醒。
  • 线程1被唤醒之后会将线程当中剩余的代码执行结束,然后进入EntryList中。
  • wait可加参数,相当于设置一个超时时间,在这个期间中等待,超时自动释放。
  • 在类文件当中加入一个Boolean标志位可以防止虚假唤醒的出现,虚假唤醒指的是在没有明确使用notify/notifyAll对线程进行唤醒的条件下而被唤醒或者唤醒的并不是想要的。(借助while循环持续判断)
package day01.mysynchronized;

public class Example4 {
    static final Object obj = new Object();
    static boolean isSignaled = false; // 新增标志位

    public static void main(String[] args) {
        System.out.println("线程1开始执行");
        Thread t1 = new Thread(() -> {
            try {
                synchronized (obj) {
                    System.out.println("线程1处于等待状态....");
                    // 循环检查标志位,防止虚假唤醒
                    while (!isSignaled) {
                        obj.wait();
                    }
                    System.out.println("线程1执行结束");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            System.out.println("线程2开始执行,睡眠3秒...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            synchronized (obj) {
                System.out.println("线程2对线程1进行唤醒");
                isSignaled = true; // 设置标志位为true
                obj.notify();
                System.out.println("线程2执行结束,线程1被唤醒");
            }
        }, "t2");

        t2.start();
        t1.start();
    }
}

运行展示:

7 sleep与wait的对比

1 对于参数含义的不同

sleep(0)是一种主动让出时间片的过程,而wait(0) /wait() 是指长时间等待

2 调用位置的要求

sleep可以在任意位置调用,而wait必须在同步代码块当中调用。

3 唤醒机制

sleep:interrupt或者超时唤醒

wait:其他线程使用notify/notifyAll或者超时唤醒

4 线程改变的状态不同

线程持有锁并执行sleep,当前线程并不会释放当前持有的锁,而是携带锁休眠一段时间,持续处于Owner状态,休眠结束会继续执行代码逻辑。

线程持有并锁执行wait时,当前线程会释放当前持有的锁,并从持有管程Monitor转移到WaitSet等待队列当中,其他线程可以获取锁的持有权,可借助notify/notifyAll将锁唤醒,从WaitSet等待队列进入EntryList锁竞争队列当中。

8 保护性暂停模式

Guarded Suspension 保护性暂停是一种节点的多线程设计模式,用于在条件不满足时暂停线程执行,直到条件满足后在继续执行。(线程不满足等待条件,手动实现主动等待)

代码实现:

package day01.mysynchronized;

import java.util.ArrayList;

public class example6 {
    public static void main(String[] args) {
        GuardObject<ArrayList<Integer>> guardObject = new GuardObject<>();
        // 线程1等待线程2的结果
        new Thread(() -> {
            //  等待结果
            System.out.println("t1线程开始执行... 等待结果");
            ArrayList<Integer> result = guardObject.get();
            result.forEach(System.out::println);
        }, "t1").start();
        new Thread(() -> {
            System.out.println("t2线程开始执行...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ArrayList<Integer> objects = new ArrayList<>();
            objects.add(1);
            objects.add(2);
            guardObject.complete(objects);
        }, "t2").start();

    }
}

class GuardObject<T> {
    private T response;

    public T get() {
        synchronized (this) {
            while (response == null) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    public void complete(T response) {
        synchronized (this) {
            this.response = response;
            this.notifyAll();
        }
    }
}

存在的问题

  1. 缺少完成状态标志

    • 当前只检查 response == null,无法区分"结果未设置"和"结果就是null"的情况

    • 如果线程2设置 complete(null),线程1将永远等待

  2. 中断处理不当

    • 当线程在 wait() 时被中断,只打印了异常堆栈

    • 没有恢复线程的中断状态,导致上层代码无法感知中断

    • 可能造成线程无法正常退出

  3. 缺少超时控制

    • get() 方法会无限期等待,如果生产者线程失败,消费者线程将永久阻塞

    • 可能导致资源泄漏和系统不稳定

  4. 没有防止多次设置结果的机制

    • 如果多个线程调用 complete(),结果可能被覆盖

    • 导致数据不一致或结果丢失

  5. 异常处理不足

    • 在 get() 方法中捕获异常后没有进一步处理

    • 可能导致线程继续执行无效操作

  6. 泛型使用不规范

    • 创建 GuardObject<ArrayList<Integer>> 限制了使用场景

    • 应该使用更通用的 List<Integer> 接口

更完善的:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Example7 {
    public static void main(String[] args) {
        GuardObject<List<Integer>> guardObject = new GuardObject<>();

        // 线程1等待线程2的结果
        Thread t1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "开始执行... 等待结果");

            try {
                // 设置5秒超时
                List<Integer> result = guardObject.get(5000);
                System.out.println(Thread.currentThread().getName() + "收到结果:");
                result.forEach(System.out::println);
            } catch (TimeoutException e) {
                System.out.println(Thread.currentThread().getName() + "等待结果超时");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + "被中断");
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "开始执行...");
            try {
                // 模拟耗时操作
                Thread.sleep(3000);

                // 创建结果
                List<Integer> objects = new ArrayList<>();
                objects.add(1);
                objects.add(2);

                // 设置结果
                guardObject.complete(objects);
                System.out.println(Thread.currentThread().getName() + "已发送结果");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + "被中断");
            }
        }, "t2");

        t1.start();
        t2.start();

        // 确保主线程等待子线程完成
        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        System.out.println("所有线程执行完成");
    }
}

/**
 * 增强版保护对象类
 * @param <T> 结果类型
 */
class GuardObject<T> {
    private T response;
    private boolean done = false; // 完成状态标志

    /**
     * 获取结果(无限等待)
     * @return 结果对象
     * @throws InterruptedException 如果等待时被中断
     */
    public T get() throws InterruptedException, TimeoutException {
        return get(0);
    }

    /**
     * 获取结果(带超时)
     * @param timeout 超时时间(毫秒),0表示无限等待
     * @return 结果对象
     * @throws InterruptedException 如果等待时被中断
     * @throws TimeoutException 如果超过指定的超时时间
     */
    public T get(long timeout) throws InterruptedException, TimeoutException {
        synchronized (this) {
            long start = System.currentTimeMillis();
            long remaining = timeout;

            while (!done) {
                if (timeout > 0 && remaining <= 0) {
                    throw new TimeoutException("等待结果超时");
                }

                if (timeout == 0) {
                    this.wait();
                } else {
                    this.wait(remaining);
                    remaining = timeout - (System.currentTimeMillis() - start);
                }
            }

            return response;
        }
    }

    /**
     * 设置结果
     * @param response 结果对象
     * @throws IllegalStateException 如果结果已被设置
     */
    public void complete(T response) {
        synchronized (this) {
            if (done) {
                throw new IllegalStateException("结果已被设置");
            }

            this.response = response;
            this.done = true;
            this.notifyAll();
        }
    }
}

9 join原理

join() 方法的实现基于 Java 的 等待-通知机制 和 线程状态管理

Thread.join() 的核心原理:

  1. 基于 Java 内置锁(synchronized)

  2. 使用等待-通知机制(wait/notify)

  3. 依赖 JVM 的线程终止通知

  4. 通过循环检查确保正确性

Thread.join() 通过 内置锁 确保线程安全,利用 等待-通知机制 实现阻塞与唤醒,依赖 JVM 的线程终止通知 自动触发唤醒,并通过 循环检查 防止虚假唤醒,最终实现线程间的有序协作。

10 一个使用等待通知机制的例子

信箱类:借助HashMap用于管理收信人id与对应的异步结果对象GuardObject

收信人:借助信箱类的管理,设置对应的id与新建异步结果对象,接着调用对应的get进行等待。

邮递员:传递收件人的id,并设置对应的content,借助信箱类根据收件人id得到对应的GuardObject异步结果对象,调用compete通知(将content传递)。

异步结果对象:两个方法,get/compete一个等待,一个通知,实现多线程的保护性暂停模式。

1 收信人

在收信人当中需要内置变量信箱,在初始化创建时就需要对应调用创建信箱的方法,然后重写run方法,因为我们需要实现的是每创建一个收信人就需要新建一个线程执行业务代码,run方法当中使用get进行等待,若邮递员发送通知之后再将结果接收,接收成功之后还需从信箱当中移除。

/**
 * 模拟收信人
 */
class People extends Thread {
    private final GuardObject<String> guardObject;

    public People() {
        super("People-" + System.currentTimeMillis());
        this.guardObject = MailBox.createGuardObject();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "开始收信...");
        try {
            // 等待信件,带超时
            String response = guardObject.get(5000);
            System.out.println(Thread.currentThread().getName() + "收到信:" + response);
        } catch (TimeoutException e) {
            System.out.println(Thread.currentThread().getName() + "收信超时");
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "收信被中断");
            Thread.currentThread().interrupt(); // 恢复中断状态
        } finally {
            // 确保从MailBox中移除GuardObject
            MailBox.removeGuardObject(guardObject.getId());
        }
    }
}

2 邮递员

邮递员需要内置两个成员变量,一个是收信人的id用来获取寻找对应的收信人,同时也是开启新的线程继承Thread类重写run方法,首先获取收信人的对象,调用compete方法通知对应的收信人

/**
 * 邮递员类
 */
class PostMan extends Thread {
    private final int id;
    private final String content;

    public PostMan(int id, String content) {
        super("PostMan-" + id);
        this.id = id;
        this.content = content;
    }

    @Override
    public void run() {
        GuardObject<String> guardObject = MailBox.getGuardObject(id);
        if (guardObject == null) {
            System.out.println(Thread.currentThread().getName() + "错误:收信人不存在");
            return;
        }

        System.out.println(Thread.currentThread().getName() + "开始发送信件...");
        try {
            // 模拟投递延迟
            Thread.sleep(1000 + (int)(Math.random() * 2000));

            // 发送信件
            guardObject.complete(content);
            System.out.println(Thread.currentThread().getName() + "已发送信件");
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "发送被中断");
            Thread.currentThread().interrupt(); // 恢复中断状态
        } catch (IllegalStateException e) {
            System.out.println(Thread.currentThread().getName() + "错误:" + e.getMessage());
        }
    }
}

3 信箱类

信箱当中维护的是一个HashMap集合,id存储收件人信息,id,GuardObject<String>对象

/**
 * 用于管理多个 GuardObject 的信箱类
 */
class MailBox {
    private static final AtomicInteger idGenerator = new AtomicInteger(1);
    private static final Map<Integer, GuardObject<?>> map = new ConcurrentHashMap<>();

    /**
     * 创建并返回一个泛型 GuardObject
     */
    public static <T> GuardObject<T> createGuardObject() {
        int id = idGenerator.getAndIncrement();
        GuardObject<T> guardObject = new GuardObject<>(id);
        map.put(id, guardObject);
        return guardObject;
    }

    /**
     * 获取所有 GuardObject 的 ID
     */
    public static Set<Integer> getGuardObjectIds() {
        return map.keySet();
    }

    /**
     * 根据id获取GuardObject
     */
    @SuppressWarnings("unchecked")
    public static <T> GuardObject<T> getGuardObject(int id) {
        return (GuardObject<T>) map.get(id);
    }

    /**
     * 移除GuardObject
     */
    public static void removeGuardObject(int id) {
        map.remove(id);
    }
}

4 保护对象

这里同时也会根据id维护独立的GuardObject对象,里面实现了get与compete的逻辑代码

/**
 * 增强版保护对象类
 */
class GuardObject<T> {
    private T response;
    private boolean done = false;
    private final int id;

    public GuardObject(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    /**
     * 获取结果(带超时)
     */
    public T get(long timeout) throws InterruptedException, TimeoutException {
        synchronized (this) {
            long start = System.currentTimeMillis();
            long remaining = timeout;

            while (!done) {
                if (timeout > 0 && remaining <= 0) {
                    throw new TimeoutException("等待结果超时");
                }

                if (timeout == 0) {
                    this.wait();
                } else {
                    this.wait(remaining);
                    remaining = timeout - (System.currentTimeMillis() - start);
                }
            }

            return response;
        }
    }

    /**
     * 设置结果
     */
    public void complete(T response) {
        synchronized (this) {
            if (done) {
                throw new IllegalStateException("结果已被设置");
            }

            this.response = response;
            this.done = true;
            this.notifyAll();
        }
    }

完整代码

package day01.mysynchronized;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class Example7 {
    public static void main(String[] args) throws InterruptedException {
        // 创建多个收信人
        for (int i = 0; i < 3; i++) {
            new People().start();
        }

        // 等待收信人创建GuardObject
        Thread.sleep(1000);

        // 获取所有等待收信的ID
        Set<Integer> ids = MailBox.getGuardObjectIds();

        // 为每个收信人创建邮递员
        int mailCount = 1;
        for (int id : ids) {
            new PostMan(id, "信件内容" + mailCount++).start();
        }
    }
}

/**
 * 模拟收信人
 */
class People extends Thread {
    private final GuardObject<String> guardObject;

    public People() {
        super("People-" + System.currentTimeMillis());
        this.guardObject = MailBox.createGuardObject();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "开始收信...");
        try {
            // 等待信件,带超时
            String response = guardObject.get(5000);
            System.out.println(Thread.currentThread().getName() + "收到信:" + response);
        } catch (TimeoutException e) {
            System.out.println(Thread.currentThread().getName() + "收信超时");
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "收信被中断");
            Thread.currentThread().interrupt(); // 恢复中断状态
        } finally {
            // 确保从MailBox中移除GuardObject
            MailBox.removeGuardObject(guardObject.getId());
        }
    }
}

/**
 * 邮递员类
 */
class PostMan extends Thread {
    private final int id;
    private final String content;

    public PostMan(int id, String content) {
        super("PostMan-" + id);
        this.id = id;
        this.content = content;
    }

    @Override
    public void run() {
        GuardObject<String> guardObject = MailBox.getGuardObject(id);
        if (guardObject == null) {
            System.out.println(Thread.currentThread().getName() + "错误:收信人不存在");
            return;
        }

        System.out.println(Thread.currentThread().getName() + "开始发送信件...");
        try {
            // 模拟投递延迟
            Thread.sleep(1000 + (int)(Math.random() * 2000));

            // 发送信件
            guardObject.complete(content);
            System.out.println(Thread.currentThread().getName() + "已发送信件");
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "发送被中断");
            Thread.currentThread().interrupt(); // 恢复中断状态
        } catch (IllegalStateException e) {
            System.out.println(Thread.currentThread().getName() + "错误:" + e.getMessage());
        }
    }
}

/**
 * 用于管理多个 GuardObject 的信箱类
 */
class MailBox {
    private static final AtomicInteger idGenerator = new AtomicInteger(1);
    private static final Map<Integer, GuardObject<?>> map = new ConcurrentHashMap<>();

    /**
     * 创建并返回一个泛型 GuardObject
     */
    public static <T> GuardObject<T> createGuardObject() {
        int id = idGenerator.getAndIncrement();
        GuardObject<T> guardObject = new GuardObject<>(id);
        map.put(id, guardObject);
        return guardObject;
    }

    /**
     * 获取所有 GuardObject 的 ID
     */
    public static Set<Integer> getGuardObjectIds() {
        return map.keySet();
    }

    /**
     * 根据id获取GuardObject
     */
    @SuppressWarnings("unchecked")
    public static <T> GuardObject<T> getGuardObject(int id) {
        return (GuardObject<T>) map.get(id);
    }

    /**
     * 移除GuardObject
     */
    public static void removeGuardObject(int id) {
        map.remove(id);
    }
}

/**
 * 增强版保护对象类
 */
class GuardObject<T> {
    private T response;
    private boolean done = false;
    private final int id;

    public GuardObject(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    /**
     * 获取结果(带超时)
     */
    public T get(long timeout) throws InterruptedException, TimeoutException {
        synchronized (this) {
            long start = System.currentTimeMillis();
            long remaining = timeout;

            while (!done) {
                if (timeout > 0 && remaining <= 0) {
                    throw new TimeoutException("等待结果超时");
                }

                if (timeout == 0) {
                    this.wait();
                } else {
                    this.wait(remaining);
                    remaining = timeout - (System.currentTimeMillis() - start);
                }
            }

            return response;
        }
    }

    /**
     * 设置结果
     */
    public void complete(T response) {
        synchronized (this) {
            if (done) {
                throw new IllegalStateException("结果已被设置");
            }

            this.response = response;
            this.done = true;
            this.notifyAll();
        }
    }
}

结果展示


网站公告

今日签到

点亮在社区的每一天
去签到