生产者消费者模型

发布于:2025-04-11 ⋅ 阅读:(33) ⋅ 点赞:(0)

目录

一、生产者消费者模型

1. 生产者消费者模型是什么?

2. 为什么使用生产者消费者模型?

3. 生产者消费者模型的特点(321原则)

🌵3种关系

🌵2种角色

🌵1个交易场所

二、基于BlockingQueue的生产者消费者模型

1. 什么是BlockingQueue?

2. C++实现阻塞队列

3. 生产者和消费者的实现

4. 生产者和消费者速度不一致的情况

5. 基于计算任务的生产者消费者模型


一、生产者消费者模型

1. 生产者消费者模型是什么?

        生产者消费者模型是一种经典的多线程设计模式,用于解决生产者和消费者之间的数据交换问题。在这个模型中,生产者负责生成数据,而消费者负责处理数据。两者通过一个共享的缓冲区(通常是阻塞队列)进行通信,而不是直接交互。这种间接通信方式有效地降低了生产者和消费者之间的耦合度。

2. 为什么使用生产者消费者模型?

  • 解耦:生产者和消费者不需要直接通信,它们通过共享缓冲区进行数据交换,降低了模块间的依赖性。

  • 支持并发:生产者和消费者可以同时运行,提高了系统的并发性能。

  • 支持忙闲不均:生产者和消费者的处理速度可以不同,通过缓冲区来平衡两者的处理能力。

3. 生产者消费者模型的特点(321原则)

🌵3种关系

  • 生产者和生产者(互斥关系):多个生产者不能同时向缓冲区写入数据。

  • 消费者和消费者(互斥关系):多个消费者不能同时从缓冲区读取数据。

  • 生产者和消费者(互斥关系、同步关系):生产者和消费者不能同时访问缓冲区,且需要协调访问顺序。

🌵2种角色

  • 生产者:负责生成数据并放入缓冲区。

  • 消费者:负责从缓冲区中取出数据并处理。

🌵1个交易场所

  • 缓冲区:通常是内存中的一段区域,用于存储生产者生成的数据。

二、基于BlockingQueue的生产者消费者模型

1. 什么是BlockingQueue?

        阻塞队列(Blocking Queue)是一种特殊的队列,当队列为空时,从队列获取元素的操作会被阻塞;当队列满时,向队列放入元素的操作会被阻塞。阻塞队列在多线程编程中非常有用,可以简化生产者消费者模型的实现。

2. C++实现阻塞队列

以下是一个简单的阻塞队列实现,使用C++的std::queue和POSIX线程库(pthread)。

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

template <class T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _q.size() == _cap;
    }

    bool IsEmpty()
    {
        return _q.empty();
    }

public:
    BlockQueue(int cap = 5) : _cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_full, nullptr);
        pthread_cond_init(&_empty, nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }

    // 生产者调用:向阻塞队列插入数据
    void Push(const T &data)
    {
        pthread_mutex_lock(&_mutex);
        while (IsFull())
        {
            // 队列满,生产者等待
            pthread_cond_wait(&_full, &_mutex);
        }
        _q.push(data);
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_empty); // 唤醒消费者
    }

    // 消费者调用:从阻塞队列获取数据
    void Pop(T &data)
    {
        pthread_mutex_lock(&_mutex);
        while (IsEmpty())
        {
            // 队列空,消费者等待
            pthread_cond_wait(&_empty, &_mutex);
        }
        data = _q.front();
        _q.pop();
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_full); // 唤醒生产者
    }

private:
    std::queue<T> _q;       // 阻塞队列
    int _cap;               // 队列容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _full;   // 队列满时的条件变量
    pthread_cond_t _empty;  // 队列空时的条件变量
};

代码说明:

  1. 互斥锁:保护共享资源(阻塞队列)的访问。

  2. 条件变量_full用于通知生产者队列已满,_empty用于通知消费者队列为空。

  3. Push和Pop方法:分别用于生产者和消费者的操作,确保线程安全。

3. 生产者和消费者的实现

生产者线程

void* Producer(void* arg) {
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
    while (true) {
        sleep(1); // 模拟生产时间
        int data = rand() % 100 + 1; // 生成随机数据
        bq->Push(data); // 将数据放入阻塞队列
        std::cout << "Producer: " << data << std::endl;
    }
    return nullptr;
}

消费者线程

void* Consumer(void* arg) {
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
    while (true) {
        sleep(1); // 模拟消费时间
        int data = 0;
        bq->Pop(data); // 从阻塞队列中取出数据
        std::cout << "Consumer: " << data << std::endl;
    }
    return nullptr;
}

主函数

int main() {
    srand((unsigned int)time(nullptr));
    pthread_t producer, consumer;
    BlockQueue<int>* bq = new BlockQueue<int>;

    // 创建生产者和消费者线程
    pthread_create(&producer, nullptr, Producer, bq);
    pthread_create(&consumer, nullptr, Consumer, bq);

    // 等待线程结束
    pthread_join(producer, nullptr);
    pthread_join(consumer, nullptr);

    delete bq;
    return 0;
}

4. 生产者和消费者速度不一致的情况

生产者快,消费者慢

        如果生产者生成数据的速度比消费者消费数据的速度快,阻塞队列会很快被填满。此时,生产者会被阻塞,直到消费者消费数据后释放空间。

void *Producer(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    // 生产者不断进行生产
    while (true)
    {
        int data = rand() % 100 + 1;
        bq->Push(data); // 生产数据
        std::cout << "Producer: " << data << std::endl;
    }
}
void *Consumer(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    // 消费者不断进行消费
    while (true)
    {
        sleep(1);
        int data = 0;
        bq->Pop(data); // 消费数据
        std::cout << "Consumer: " << data << std::endl;
    }
}

生产者慢,消费者快

        如果消费者消费数据的速度比生产者生成数据的速度快,阻塞队列会很快变空。此时,消费者会被阻塞,直到生产者生成新的数据。

void *Producer(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    // 生产者不断进行生产
    while (true)
    {
        sleep(1);
        int data = rand() % 100 + 1;
        bq->Push(data); // 生产数据
        std::cout << "Producer: " << data << std::endl;
    }
}
void *Consumer(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    // 消费者不断进行消费
    while (true)
    {
        int data = 0;
        bq->Pop(data); // 消费数据
        std::cout << "Consumer: " << data << std::endl;
    }
}

5. 基于计算任务的生产者消费者模型

定义任务类

#pragma once

class Task
{
public:
    Task(int x = 0, int y = 0, char op = '+') : _x(x), _y(y), _op(op) {}

    void Run()
    {
        int result = 0;
        switch (_op)
        {
        case '+':
            result = _x + _y;
            break;
        case '-':
            result = _x - _y;
            break;
        case '*':
            result = _x * _y;
            break;
        case '/':
            if (_y == 0)
            {
                std::cout << "Warning: division by zero!" << std::endl;
                result = -1;
            }
            else
            {
                result = _x / _y;
            }
            break;
        default:
            std::cout << "Error: invalid operation!" << std::endl;
        }
        std::cout << _x << " " << _op << " " << _y << " = " << result << std::endl;
    }

private:
    int _x, _y;
    char _op;
};

生产者和消费者线程

void *Producer(void *arg)
{
    BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;
    const char *ops = "+-*/";
    while (true)
    {
        int x = rand() % 100;
        int y = rand() % 100 + 1; // 避免除以零
        char op = ops[rand() % 4];
        Task t(x, y, op);
        bq->Push(t);
        std::cout << "Produced task: " << x << " " << op << " " << y << std::endl;
    }
    return nullptr;
}

void *Consumer(void *arg)
{
    BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;
    while (true)
    {
        Task t;
        bq->Pop(t);
        t.Run();
    }
    return nullptr;
}