🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第十九章监视器(Monitor),废话不多说直接开始~
目录
一、核心原理深度拆解
1. 监视器三要素模型
┌──────────────────────┐
│ Monitor Object │
│ ┌─────────────────┐ │
│ │ Shared Data │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ Sync Methods │ │
│ │ (Entry Queue) │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ Wait Conditions │ │
│ │ (Condition Queue)│ │
│ └─────────────────┘ │
└──────────────────────┘
- 共享数据:被保护的临界资源(如计数器、连接池)
- 同步方法:互斥访问入口(Java的
synchronized
方法/块) - 条件变量:线程协作机制(
Object.wait()
/notify()
)
2. 线程调度机制
- Entry Set:竞争锁的线程队列(JVM管理)
- Wait Set:调用
wait()
的线程等待区 - 优先级控制:非公平锁(默认)vs 公平锁(按入队顺序)
二、生活化类比:银行柜台服务系统
监视器组件 |
银行类比 |
运行机制 |
共享数据 |
柜台现金 |
所有柜员共享同一保险箱 |
同步方法 |
柜台窗口 |
每次仅允许一个柜员操作现金 |
条件变量 |
客户等待区 |
现金不足时柜员进入等待状态 |
Entry Set |
排队叫号机 |
客户按顺序获取服务资格 |
Wait Set |
VIP休息室 |
特殊需求客户暂时离开主队列 |
- 异常处理:柜员突发离职(线程中断)→ 系统自动唤醒下个柜员
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MonitorPatternDemo {
// 共享资源:有限容量队列
private final LinkedList<String> messageQueue = new LinkedList<>();
private final int MAX_CAPACITY = 10;
// 显式锁(比synchronized更灵活)
private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
// 监视器方法:生产消息
public void produce(String message) throws InterruptedException {
lock.lock();
try {
while (messageQueue.size() == MAX_CAPACITY) {
System.out.println("[Producer] 队列已满,等待消费...");
notFull.await(); // 释放锁并进入等待
}
messageQueue.addLast(message);
System.out.println("[Producer] 添加消息: " + message + " | 队列大小: " + messageQueue.size());
notEmpty.signal(); // 唤醒等待的消费者
} finally {
lock.unlock();
}
}
// 监视器方法:消费消息
public String consume() throws InterruptedException {
lock.lock();
try {
while (messageQueue.isEmpty()) {
System.out.println("[Consumer] 队列为空,等待生产...");
notEmpty.await(); // 释放锁并进入等待
}
String message = messageQueue.removeFirst();
System.out.println("[Consumer] 处理消息: " + message + " | 剩余: " + messageQueue.size());
notFull.signal(); // 唤醒等待的生产者
return message;
} finally {
lock.unlock();
}
}
// 监控线程
public void startMonitorThread() {
new Thread(() -> {
while (true) {
try {
lock.lock();
try {
System.out.println("[Monitor] === 当前状态 ===");
System.out.println("队列大小: " + messageQueue.size());
System.out.println("等待生产者: " + lock.getWaitQueueLength(notFull));
System.out.println("等待消费者: " + lock.getWaitQueueLength(notEmpty));
System.out.println("=== === === === ===");
} finally {
lock.unlock();
}
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Monitor-Thread").start();
}
public static void main(String[] args) {
MonitorPatternDemo monitor = new MonitorPatternDemo();
monitor.startMonitorThread();
// 模拟生产者
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
monitor.produce("Msg-" + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-" + i).start();
}
// 模拟消费者
for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
monitor.consume();
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer-" + i).start();
}
}
}
2. 关键配置说明
// 锁类型选择
new ReentrantLock(true); // 公平锁(按入队顺序获取锁)
new ReentrantLock(); // 非公平锁(默认,吞吐量更高)
// 条件变量分离
private final Condition notFull = lock.newCondition(); // 队列未满条件
private final Condition notEmpty = lock.newCondition(); // 队列非空条件
// 监控接口
lock.getWaitQueueLength(condition); // 获取等待特定条件的线程数
四、横向对比表格
1. 线程同步机制对比
机制 |
互斥能力 |
条件等待 |
可中断 |
公平性 |
适用场景 |
synchronized |
有 |
单一条件 |
不可 |
非公平 |
简单同步场景 |
ReentrantLock |
有 |
多条件 |
可中断 |
可配置 |
复杂同步逻辑 |
Semaphore |
无 |
无 |
可中断 |
可配置 |
资源池控制 |
ReadWriteLock |
有 |
多条件 |
可中断 |
可配置 |
读多写少场景 |
2. 条件变量实现对比
实现方式 |
通知精度 |
批量唤醒 |
超时支持 |
使用复杂度 |
Object.wait() |
全部 |
是 |
有 |
低 |
Condition.await() |
指定条件 |
否 |
有 |
中 |
BlockingQueue |
内置 |
自动 |
有 |
低 |
五、高级优化技巧
1. 锁分段优化
// 降低锁粒度(如ConcurrentHashMap的分段锁思想)
private final ReentrantLock[] segmentLocks = new ReentrantLock[16];
{
for (int i = 0; i < segmentLocks.length; i++) {
segmentLocks[i] = new ReentrantLock();
}
}
public void put(String key, String value) {
int segment = Math.abs(key.hashCode() % segmentLocks.length);
segmentLocks[segment].lock();
try {
// 操作对应分段的共享数据
} finally {
segmentLocks[segment].unlock();
}
}
2. 条件变量优化
// 使用带超时的等待(避免死锁)
if (!notEmpty.await(5, TimeUnit.SECONDS)) {
throw new TimeoutException("等待消息超时");
}
// 使用signalAll()谨慎(可能引起"惊群效应")
notEmpty.signal(); // 优先使用精准通知
3. 监控指标扩展
// 添加JMX监控(示例)
public class MonitorMetrics implements MonitorMetricsMBean {
private final ReentrantLock lock;
public int getWaitThreadCount() {
return lock.getQueueLength(); // 获取等待锁的线程数
}
public int getActiveThreadCount() {
return lock.getHoldCount(); // 获取锁重入次数
}
}
// 注册MBean
ManagementFactory.getPlatformMBeanServer().registerMBean(
new MonitorMetrics(lock),
new ObjectName("com.example:type=MonitorMetrics")
);
4. 自适应锁优化
// 根据竞争情况动态切换锁类型
public class AdaptiveLock {
private volatile boolean highContention = false;
private final ReentrantLock fairLock = new ReentrantLock(true);
private final ReentrantLock unfairLock = new ReentrantLock();
public void lock() {
if (highContention) {
fairLock.lock(); // 高竞争时用公平锁
} else {
unfairLock.lock(); // 默认非公平锁
}
}
// 监控线程竞争情况
public void monitor() {
new Thread(() -> {
while (true) {
int waiters = fairLock.getQueueLength() + unfairLock.getQueueLength();
highContention = waiters > 5; // 阈值可配置
try { Thread.sleep(1000); } catch (InterruptedException e) { break; }
}
}).start();
}
}
5. 无锁化改造方案
// 对读多写少场景使用原子变量
private final AtomicReference<Map<String, String>> cache =
new AtomicReference<>(new ConcurrentHashMap<>());
public void updateCache(String key, String value) {
while (true) {
Map<String, String> oldMap = cache.get();
Map<String, String> newMap = new ConcurrentHashMap<>(oldMap);
newMap.put(key, value);
if (cache.compareAndSet(oldMap, newMap)) break;
}
}
六、异常处理与健壮性设计
1. 死锁检测与恢复
// 使用ThreadMXBean检测死锁
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
long[] threadIds = bean.findDeadlockedThreads();
if (threadIds != null) {
ThreadInfo[] infos = bean.getThreadInfo(threadIds);
for (ThreadInfo info : infos) {
System.err.println("死锁线程: " + info.getThreadName());
// 强制中断受害线程(生产环境需谨慎)
Thread thread = findThreadById(info.getThreadId());
if (thread != null) thread.interrupt();
}
}
2. 线程泄漏防护
// 封装安全的线程池
public class SafeExecutor extends ThreadPoolExecutor {
private final ConcurrentMap<Worker, Boolean> workers = new ConcurrentHashMap<>();
protected void beforeExecute(Thread t, Runnable r) {
workers.put((Worker) t, true);
}
protected void afterExecute(Runnable r, Throwable t) {
workers.remove(Thread.currentThread());
}
public List<Thread> getStuckThreads(long timeoutMs) {
return workers.keySet().stream()
.filter(w -> w.getActiveTime() > timeoutMs)
.collect(Collectors.toList());
}
}
七、分布式环境扩展
1. 跨JVM的Monitor实现
// 基于Redis的分布式锁
public class DistributedMonitor {
private final Jedis jedis;
private final String lockKey;
public boolean tryLock(long timeoutMs) {
String result = jedis.set(lockKey, "locked",
"NX", "PX", timeoutMs);
return "OK".equals(result);
}
public void unlock() {
jedis.del(lockKey);
}
// 使用Redisson的看门狗机制实现续期
public void startWatchdog() {
new Thread(() -> {
while (locked) {
jedis.expire(lockKey, 30);
try { Thread.sleep(10000); }
catch (InterruptedException e) { break; }
}
}).start();
}
}
2. 多节点协同方案
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Monitor │───ZK─▶│ Monitor │───ZK─▶│ Monitor │ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
└─────────────┘ └─────────────┘ └─────────────┘
- ZooKeeper协调:通过临时节点实现Leader选举
- 状态同步:使用Watcher机制通知条件变更
八、现代Java特性整合
1. 虚拟线程适配
// JDK21+ 虚拟线程优化
ExecutorService vThreadPool = Executors.newVirtualThreadPerTaskExecutor();
public void virtualThreadMonitor() {
try (var executor = vThreadPool) {
executor.submit(() -> {
synchronized(this) { // 兼容传统synchronized
while (conditionNotMet()) {
wait(); // 虚拟线程挂起时不占用OS线程
}
// 处理共享数据
}
});
}
}
2. Project Loom纤程支持
// 使用Fiber替代线程(实验性)
new Fiber<Void>(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
synchronized(lock) { // 百万级纤程共享Monitor
// 业务逻辑
}
}).start();
九、监控指标体系建设
1. Prometheus监控集成
// 暴露锁竞争指标
Gauge contentionGauge = Gauge.build()
.name("monitor_lock_contention")
.help("Current lock waiters count")
.register();
public void recordMetrics() {
new ScheduledThreadPoolExecutor(1)
.scheduleAtFixedRate(() -> {
contentionGauge.set(lock.getQueueLength());
}, 0, 5, TimeUnit.SECONDS);
}
2. 关键监控看板
指标名称 |
计算方式 |
健康阈值 |
锁等待时间 |
历史平均等待时间 |
< 50ms |
条件变量等待数 |
notEmpty.getWaitQueueLength |
< CPU核心数×2 |
死锁检测次数 |
ThreadMXBean统计 |
= 0 |
线程活跃度 |
活跃线程数/最大线程数 |
60%~80% |
十、经典场景最佳实践
1. 数据库连接池实现
public class ConnectionPool {
private final LinkedList<Connection> pool = new LinkedList<>();
private final int maxSize;
private final Object monitor = new Object();
public Connection borrow() throws InterruptedException {
synchronized (monitor) {
while (pool.isEmpty()) {
monitor.wait();
}
return pool.removeFirst();
}
}
public void release(Connection conn) {
synchronized (monitor) {
pool.addLast(conn);
monitor.notify();
}
}
}
2. 生产者-消费者增强版
// 支持优先级和批量处理
public class EnhancedBlockingQueue {
private final PriorityBlockingQueue<Item> queue;
private final Semaphore available;
public void putBatch(List<Item> items) {
queue.addAll(items);
available.release(items.size());
}
public Item take() throws InterruptedException {
available.acquire();
return queue.poll();
}
}