在Java中,生产者-消费者模型可以通过多种方式实现。以下是常见的几种实现方法及其代码示例:
1. **使用 `wait()` 和 `notify()`(基础同步机制)
通过 `synchronized` 块和 `Object` 的等待/唤醒机制实现。
public class WaitNotifyExample {
private final Queue<Integer> queue = new LinkedList<>();
private final int MAX_SIZE = 10;
public void produce() throws InterruptedException {
while (true) {
synchronized (queue) {
while (queue.size() == MAX_SIZE) {
queue.wait(); // 队列满时等待
}
int value = new Random().nextInt(100);
queue.add(value);
System.out.println("生产: " + value);
queue.notifyAll(); // 唤醒消费者
}
Thread.sleep(500);
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
queue.wait(); // 队列空时等待
}
int value = queue.poll();
System.out.println("消费: " + value);
queue.notifyAll(); // 唤醒生产者
}
Thread.sleep(1000);
}
}
}
2. 使用 `BlockingQueue`(线程安全队列)
直接利用 `BlockingQueue` 的阻塞特性简化代码。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
public void produce() throws InterruptedException {
while (true) {
int value = new Random().nextInt(100);
queue.put(value); // 队列满时自动阻塞
System.out.println("生产: " + value);
Thread.sleep(500);
}
}
public void consume() throws InterruptedException {
while (true) {
int value = queue.take(); // 队列空时自动阻塞
System.out.println("消费: " + value);
Thread.sleep(1000);
}
}
}
3. 使用 `Lock` 和 `Condition`(更灵活的锁)
通过显式锁和条件变量实现细粒度控制。
import java.util.concurrent.locks.*;
import java.util.Queue;
import java.util.LinkedList;
public class LockConditionExample {
private final Queue<Integer> queue = new LinkedList<>();
private final int MAX_SIZE = 10;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce() throws InterruptedException {
while (true) {
lock.lock();
try {
while (queue.size() == MAX_SIZE) {
notFull.await(); // 等待队列不满
}
int value = new Random().nextInt(100);
queue.add(value);
System.out.println("生产: " + value);
notEmpty.signal(); // 唤醒消费者
} finally {
lock.unlock();
}
Thread.sleep(500);
}
}
public void consume() throws InterruptedException {
while (true) {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待队列不空
}
int value = queue.poll();
System.out.println("消费: " + value);
notFull.signal(); // 唤醒生产者
} finally {
lock.unlock();
}
Thread.sleep(1000);
}
}
}
4. 使用 `Semaphore`(信号量控制资源)
通过信号量管理可用资源数量。
import java.util.concurrent.Semaphore;
import java.util.Queue;
import java.util.LinkedList;
public class SemaphoreExample {
private final Queue<Integer> queue = new LinkedList<>();
private final int MAX_SIZE = 10;
private final Semaphore semProducer = new Semaphore(MAX_SIZE);
private final Semaphore semConsumer = new Semaphore(0);
private final Object lock = new Object();
public void produce() throws InterruptedException {
while (true) {
semProducer.acquire(); // 获取生产许可
synchronized (lock) {
int value = new Random().nextInt(100);
queue.add(value);
System.out.println("生产: " + value);
}
semConsumer.release(); // 释放消费许可
Thread.sleep(500);
}
}
public void consume() throws InterruptedException {
while (true) {
semConsumer.acquire(); // 获取消费许可
synchronized (lock) {
int value = queue.poll();
System.out.println("消费: " + value);
}
semProducer.release(); // 释放生产许可
Thread.sleep(1000);
}
}
}
总结
以上四种是Java中实现生产者-消费者的主流方式:
1. `wait()/notify()`:适合基础场景,需手动处理同步。
2. `BlockingQueue`:代码最简洁,推荐优先使用。
3. `Lock` + `Condition`:提供更灵活的锁控制。
4. `Semaphore`:通过资源计数管理同步,需注意线程安全。
根据具体需求(如性能、复杂度、可扩展性)选择合适的方式。