并发设计模式实战系列(19):监视器(Monitor)

发布于:2025-05-10 ⋅ 阅读:(19) ⋅ 点赞:(0)

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第十九章监视器(Monitor),废话不多说直接开始~

目录

一、核心原理深度拆解

1. 监视器三要素模型

2. 线程调度机制

二、生活化类比:银行柜台服务系统

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键配置说明

四、横向对比表格

1. 线程同步机制对比

2. 条件变量实现对比

五、高级优化技巧

1. 锁分段优化

2. 条件变量优化

3. 监控指标扩展

4. 自适应锁优化

5. 无锁化改造方案

六、异常处理与健壮性设计

1. 死锁检测与恢复

2. 线程泄漏防护

七、分布式环境扩展

1. 跨JVM的Monitor实现

2. 多节点协同方案

八、现代Java特性整合

1. 虚拟线程适配

2. Project Loom纤程支持

九、监控指标体系建设

1. Prometheus监控集成

2. 关键监控看板

十、经典场景最佳实践

1. 数据库连接池实现

2. 生产者-消费者增强版


一、核心原理深度拆解

1. 监视器三要素模型

┌──────────────────────┐
│      Monitor Object   │
│  ┌─────────────────┐  │
│  │  Shared Data     │  │
│  └────────┬─────────┘  │
│           │            │
│  ┌────────▼─────────┐  │
│  │  Sync Methods    │  │
│  │ (Entry Queue)    │  │
│  └────────┬─────────┘  │
│           │            │
│  ┌────────▼─────────┐  │
│  │  Wait Conditions │  │
│  │ (Condition Queue)│  │
│  └─────────────────┘  │
└──────────────────────┘
  • 共享数据:被保护的临界资源(如计数器、连接池)
  • 同步方法:互斥访问入口(Java的synchronized方法/块)
  • 条件变量:线程协作机制(Object.wait()/notify()

2. 线程调度机制

  • Entry Set:竞争锁的线程队列(JVM管理)
  • Wait Set:调用wait()的线程等待区
  • 优先级控制:非公平锁(默认)vs 公平锁(按入队顺序)

二、生活化类比:银行柜台服务系统

监视器组件

银行类比

运行机制

共享数据

柜台现金

所有柜员共享同一保险箱

同步方法

柜台窗口

每次仅允许一个柜员操作现金

条件变量

客户等待区

现金不足时柜员进入等待状态

Entry Set

排队叫号机

客户按顺序获取服务资格

Wait Set

VIP休息室

特殊需求客户暂时离开主队列

  • 异常处理:柜员突发离职(线程中断)→ 系统自动唤醒下个柜员

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MonitorPatternDemo {

    // 共享资源:有限容量队列
    private final LinkedList<String> messageQueue = new LinkedList<>();
    private final int MAX_CAPACITY = 10;

    // 显式锁(比synchronized更灵活)
    private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    // 监视器方法:生产消息
    public void produce(String message) throws InterruptedException {
        lock.lock();
        try {
            while (messageQueue.size() == MAX_CAPACITY) {
                System.out.println("[Producer] 队列已满,等待消费...");
                notFull.await(); // 释放锁并进入等待
            }

            messageQueue.addLast(message);
            System.out.println("[Producer] 添加消息: " + message + " | 队列大小: " + messageQueue.size());

            notEmpty.signal(); // 唤醒等待的消费者
        } finally {
            lock.unlock();
        }
    }

    // 监视器方法:消费消息
    public String consume() throws InterruptedException {
        lock.lock();
        try {
            while (messageQueue.isEmpty()) {
                System.out.println("[Consumer] 队列为空,等待生产...");
                notEmpty.await(); // 释放锁并进入等待
            }

            String message = messageQueue.removeFirst();
            System.out.println("[Consumer] 处理消息: " + message + " | 剩余: " + messageQueue.size());

            notFull.signal(); // 唤醒等待的生产者
            return message;
        } finally {
            lock.unlock();
        }
    }

    // 监控线程
    public void startMonitorThread() {
        new Thread(() -> {
            while (true) {
                try {
                    lock.lock();
                    try {
                        System.out.println("[Monitor] === 当前状态 ===");
                        System.out.println("队列大小: " + messageQueue.size());
                        System.out.println("等待生产者: " + lock.getWaitQueueLength(notFull));
                        System.out.println("等待消费者: " + lock.getWaitQueueLength(notEmpty));
                        System.out.println("=== === === === ===");
                    } finally {
                        lock.unlock();
                    }
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Monitor-Thread").start();
    }

    public static void main(String[] args) {
        MonitorPatternDemo monitor = new MonitorPatternDemo();
        monitor.startMonitorThread();

        // 模拟生产者
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        monitor.produce("Msg-" + System.currentTimeMillis());
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Producer-" + i).start();
        }

        // 模拟消费者
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        monitor.consume();
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Consumer-" + i).start();
        }
    }
}

2. 关键配置说明

// 锁类型选择
new ReentrantLock(true); // 公平锁(按入队顺序获取锁)
new ReentrantLock();     // 非公平锁(默认,吞吐量更高)

// 条件变量分离
private final Condition notFull = lock.newCondition();  // 队列未满条件
private final Condition notEmpty = lock.newCondition(); // 队列非空条件

// 监控接口
lock.getWaitQueueLength(condition); // 获取等待特定条件的线程数

四、横向对比表格

1. 线程同步机制对比

机制

互斥能力

条件等待

可中断

公平性

适用场景

synchronized

单一条件

不可

非公平

简单同步场景

ReentrantLock

多条件

可中断

可配置

复杂同步逻辑

Semaphore

可中断

可配置

资源池控制

ReadWriteLock

多条件

可中断

可配置

读多写少场景

2. 条件变量实现对比

实现方式

通知精度

批量唤醒

超时支持

使用复杂度

Object.wait()

全部

Condition.await()

指定条件

BlockingQueue

内置

自动


五、高级优化技巧

1. 锁分段优化

// 降低锁粒度(如ConcurrentHashMap的分段锁思想)
private final ReentrantLock[] segmentLocks = new ReentrantLock[16];
{
    for (int i = 0; i < segmentLocks.length; i++) {
        segmentLocks[i] = new ReentrantLock();
    }
}

public void put(String key, String value) {
    int segment = Math.abs(key.hashCode() % segmentLocks.length);
    segmentLocks[segment].lock();
    try {
        // 操作对应分段的共享数据
    } finally {
        segmentLocks[segment].unlock();
    }
}

2. 条件变量优化

// 使用带超时的等待(避免死锁)
if (!notEmpty.await(5, TimeUnit.SECONDS)) {
    throw new TimeoutException("等待消息超时");
}

// 使用signalAll()谨慎(可能引起"惊群效应")
notEmpty.signal(); // 优先使用精准通知

3. 监控指标扩展

// 添加JMX监控(示例)
public class MonitorMetrics implements MonitorMetricsMBean {
    private final ReentrantLock lock;

    public int getWaitThreadCount() {
        return lock.getQueueLength(); // 获取等待锁的线程数
    }

    public int getActiveThreadCount() {
        return lock.getHoldCount(); // 获取锁重入次数
    }
}

// 注册MBean
ManagementFactory.getPlatformMBeanServer().registerMBean(
    new MonitorMetrics(lock), 
    new ObjectName("com.example:type=MonitorMetrics")
);

4. 自适应锁优化

// 根据竞争情况动态切换锁类型
public class AdaptiveLock {
    private volatile boolean highContention = false;
    private final ReentrantLock fairLock = new ReentrantLock(true);
    private final ReentrantLock unfairLock = new ReentrantLock();

    public void lock() {
        if (highContention) {
            fairLock.lock(); // 高竞争时用公平锁
        } else {
            unfairLock.lock(); // 默认非公平锁
        }
    }

    // 监控线程竞争情况
    public void monitor() {
        new Thread(() -> {
            while (true) {
                int waiters = fairLock.getQueueLength() + unfairLock.getQueueLength();
                highContention = waiters > 5; // 阈值可配置
                try { Thread.sleep(1000); } catch (InterruptedException e) { break; }
            }
        }).start();
    }
}

5. 无锁化改造方案

// 对读多写少场景使用原子变量
private final AtomicReference<Map<String, String>> cache = 
    new AtomicReference<>(new ConcurrentHashMap<>());

public void updateCache(String key, String value) {
    while (true) {
        Map<String, String> oldMap = cache.get();
        Map<String, String> newMap = new ConcurrentHashMap<>(oldMap);
        newMap.put(key, value);
        if (cache.compareAndSet(oldMap, newMap)) break;
    }
}

六、异常处理与健壮性设计

1. 死锁检测与恢复

// 使用ThreadMXBean检测死锁
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
long[] threadIds = bean.findDeadlockedThreads();
if (threadIds != null) {
    ThreadInfo[] infos = bean.getThreadInfo(threadIds);
    for (ThreadInfo info : infos) {
        System.err.println("死锁线程: " + info.getThreadName());
        // 强制中断受害线程(生产环境需谨慎)
        Thread thread = findThreadById(info.getThreadId());
        if (thread != null) thread.interrupt();
    }
}

2. 线程泄漏防护

// 封装安全的线程池
public class SafeExecutor extends ThreadPoolExecutor {
    private final ConcurrentMap<Worker, Boolean> workers = new ConcurrentHashMap<>();

    protected void beforeExecute(Thread t, Runnable r) {
        workers.put((Worker) t, true);
    }

    protected void afterExecute(Runnable r, Throwable t) {
        workers.remove(Thread.currentThread());
    }

    public List<Thread> getStuckThreads(long timeoutMs) {
        return workers.keySet().stream()
            .filter(w -> w.getActiveTime() > timeoutMs)
            .collect(Collectors.toList());
    }
}

七、分布式环境扩展

1. 跨JVM的Monitor实现

// 基于Redis的分布式锁
public class DistributedMonitor {
    private final Jedis jedis;
    private final String lockKey;

    public boolean tryLock(long timeoutMs) {
        String result = jedis.set(lockKey, "locked", 
            "NX", "PX", timeoutMs);
        return "OK".equals(result);
    }

    public void unlock() {
        jedis.del(lockKey);
    }

    // 使用Redisson的看门狗机制实现续期
    public void startWatchdog() {
        new Thread(() -> {
            while (locked) {
                jedis.expire(lockKey, 30);
                try { Thread.sleep(10000); } 
                catch (InterruptedException e) { break; }
            }
        }).start();
    }
}

2. 多节点协同方案

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Node 1    │    │   Node 2    │    │   Node 3    │
│ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │
│ │ Monitor │───ZK─▶│ Monitor │───ZK─▶│ Monitor │ │
│ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │
└─────────────┘    └─────────────┘    └─────────────┘
  • ZooKeeper协调:通过临时节点实现Leader选举
  • 状态同步:使用Watcher机制通知条件变更

八、现代Java特性整合

1. 虚拟线程适配

// JDK21+ 虚拟线程优化
ExecutorService vThreadPool = Executors.newVirtualThreadPerTaskExecutor();

public void virtualThreadMonitor() {
    try (var executor = vThreadPool) {
        executor.submit(() -> {
            synchronized(this) { // 兼容传统synchronized
                while (conditionNotMet()) {
                    wait(); // 虚拟线程挂起时不占用OS线程
                }
                // 处理共享数据
            }
        });
    }
}

2. Project Loom纤程支持

// 使用Fiber替代线程(实验性)
new Fiber<Void>(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    synchronized(lock) { // 百万级纤程共享Monitor
        // 业务逻辑
    }
}).start();

九、监控指标体系建设

1. Prometheus监控集成

// 暴露锁竞争指标
Gauge contentionGauge = Gauge.build()
    .name("monitor_lock_contention")
    .help("Current lock waiters count")
    .register();

public void recordMetrics() {
    new ScheduledThreadPoolExecutor(1)
        .scheduleAtFixedRate(() -> {
            contentionGauge.set(lock.getQueueLength());
        }, 0, 5, TimeUnit.SECONDS);
}

2. 关键监控看板

指标名称

计算方式

健康阈值

锁等待时间

历史平均等待时间

< 50ms

条件变量等待数

notEmpty.getWaitQueueLength

< CPU核心数×2

死锁检测次数

ThreadMXBean统计

= 0

线程活跃度

活跃线程数/最大线程数

60%~80%


十、经典场景最佳实践

1. 数据库连接池实现

public class ConnectionPool {
    private final LinkedList<Connection> pool = new LinkedList<>();
    private final int maxSize;
    private final Object monitor = new Object();

    public Connection borrow() throws InterruptedException {
        synchronized (monitor) {
            while (pool.isEmpty()) {
                monitor.wait();
            }
            return pool.removeFirst();
        }
    }

    public void release(Connection conn) {
        synchronized (monitor) {
            pool.addLast(conn);
            monitor.notify();
        }
    }
}

2. 生产者-消费者增强版

// 支持优先级和批量处理
public class EnhancedBlockingQueue {
    private final PriorityBlockingQueue<Item> queue;
    private final Semaphore available;

    public void putBatch(List<Item> items) {
        queue.addAll(items);
        available.release(items.size());
    }

    public Item take() throws InterruptedException {
        available.acquire();
        return queue.poll();
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到