阻塞队列BlockingQueue是如何唤醒等待的线程的呢?

发布于:2024-08-02 ⋅ 阅读:(49) ⋅ 点赞:(0)

在 Java 中,ConcurrentLinkedQueueArrayBlockingQueueLinkedBlockingQueue 等并发队列通常用于生产者-消费者模式。这些队列使用锁和条件(Condition)来实现线程间的通信。

以下是基于 ArrayBlockingQueue 或 LinkedBlockingQueue 的生产者-消费者模式中,消费者如何知道队列中有元素可用的基本原理:

  1. 锁(Lock): 队列内部维护了一个锁,通常是 ReentrantLock。这个锁用于同步对队列的访问,确保在任何时刻只有一个线程可以修改队列。

  2. 条件(Condition): 锁关联了两个条件,通常称为 notEmpty 和 notFullnotEmpty 条件用于通知消费者队列非空,而 notFull 条件用于通知生产者队列未满。

  3. 消费者等待: 当队列为空时,消费者线程会调用条件 notEmpty 的 await() 方法。这将导致消费者线程释放锁并等待,直到另一个线程(生产者)在队列中插入一个元素并调用 signal() 或 signalAll() 方法来唤醒等待的消费者线程。

  4. 生产者通知: 当生产者向队列中添加一个元素时,它会调用 notEmpty 条件的 signal() 方法来唤醒一个(或所有)等待的消费者线程。

以下是一个简化的例子:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notEmpty = lock.newCondition();
    final Condition notFull = lock.newCondition();
    final Object[] items = new Object[100]; // 假设缓冲区大小为100
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) // 如果队列已满,则等待
                notFull.await();
            items[putptr] = x; // 在这里插入元素
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 通知消费者队列非空
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) // 如果队列为空,则等待
                notEmpty.await();
            Object x = items[takeptr]; // 在这里取出元素
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 通知生产者队列未满
            return x;
        } finally {
            lock.unlock();
        }
    }
}

在这个例子中,消费者在 notEmpty 条件上等待,而生产者在 notFull 条件上等待。当队列状态改变时(例如,生产者添加了一个元素或消费者取出一个元素),相应的条件会被信号唤醒,这样等待的线程就可以重新获取锁并继续执行。