【Linux】生产者消费者模型:基于阻塞队列,使用互斥锁和条件变量维护互斥与同步关系

发布于:2024-09-17 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

一、什么是生产者消费者模型

二、为什么要引入生产者消费者模型?

三、详解生产者消费者模型 ​编辑

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

生产者和消费者之间为什么会存在同步关系?

为什么生产者与消费者之间需要一个交易场所?

四、基于阻塞队列的生产者消费者模型

生产者消费者模型的使用场景示例:

五、理解生产者消费者模型代码


一、什么是生产者消费者模型

生产者-消费者模型是一种常见的并发设计模式,用于处理在生产和消费任务之间的协调。这个模型主要用来解决在多线程或多进程环境中,生产者和消费者之间的同步和数据共享问题。

在这个模型中:

  • 生产者:负责生成数据或任务,并将其放入一个共享的缓冲区(也称为队列)中。
  • 消费者:从共享缓冲区中取出数据或任务并进行处理。

缓冲区的作用是实现生产者和消费者之间的解耦,使得生产者和消费者可以在不同的速度下独立工作。

二、为什么要引入生产者消费者模型?

在没有这种模型的情况下,生产者和消费者可能会遇到以下问题:

  • 资源浪费:如果生产者生产速度过快,而消费者消费速度跟不上,可能会导致生产出的产品(通常是数据或任务)被丢弃,造成资源浪费。
  • 资源不足:如果消费者消费速度过快,而生产者生产速度跟不上,消费者可能会因为等待新资源而处于空闲状态,造成资源不足。
  • 竞态条件:在没有适当的同步机制的情况下,多个生产者或消费者同时访问共享资源(如队列)可能会导致数据不一致或状态错误。
  •  死锁:如果生产者和消费者在等待对方释放资源时都阻塞了,可能会导致死锁,即系统无法继续前进。
  • 饥饿:某些消费者可能因为其他消费者不断地获取资源而长时间得不到服务,这种现象称为饥饿。

生产者-消费者模型通过引入缓冲区(如队列)和同步机制(如信号量、互斥锁)来解决这些问题。模型通常包含以下几个关键组件:

  • 生产者:负责生成数据或任务。
  • 消费者:负责处理数据或任务。
  • 缓冲区:存储生产者生产的数据,供消费者使用。
  • 同步机制:确保生产者和消费者之间的协调,避免竞态条件和死锁。

通过这些组件,生产者-消费者模型可以有效地管理资源,确保生产者不会在缓冲区满时生产,消费者不会在缓冲区空时消费,从而实现生产者和消费者之间的平衡和同步。

三、详解生产者消费者模型 

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(如阻塞队列、环形队列等)

我们在用代码编写生产者消费者模型的时,本质就是对这三个特点进行维护。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

生产者将生产的数据放入容器中,而消费者又会从容器中取出数据,这就造成了在同一时刻中,容器中的数据可能被多个执行流访问。而该容器其实就是临界资源,对于临界资源,我们必须保护对临界资源操作的原子性,否则容易造成数据错乱。

因此我们必须保证生产者和消费者访问临界资源的操作是串行的。每次访问,我们只允许一个执行流进入临界区。如此,我们就保证了临界资源的安全。

所以,在访问临界区资源之前,无论是生产者还是消费者,必须先去竞争保护临界资源的那把互斥锁。即生产者和消费者会进行同一把锁的竞争!

生产者和消费者之间为什么会存在同步关系?

如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

当缓冲区为满时,生产者需要在自己的条件变量下阻塞等待,直到有消费者进行消费,缓冲区有空间剩余时,才会继续与消费者竞争临界资源的管理权。

当缓冲区为空时,消费者需要在自己的条件变量下阻塞等待,直到有生产者进行生产,缓冲区有数据时,才会继续与生产者竞争临界资源的管理权。

【注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。】

为什么生产者与消费者之间需要一个交易场所?

生产者与消费者之间需要一个交易场所,通常称为缓冲区(Buffer),是因为这个缓冲区在多线程环境中起到了至关重要的作用。以下是缓冲区的几个关键作用:

  1. 协调生产和消费:缓冲区作为生产者和消费者之间的中介,允许生产者在缓冲区有空间时生产数据,消费者在缓冲区有数据时消费数据。这种协调机制避免了生产者和消费者之间的直接竞争,确保了生产和消费的有序进行。

  2. 解耦生产者和消费者:缓冲区使得生产者和消费者不必同时运行。生产者可以在没有消费者等待的情况下生产数据,消费者也可以在没有生产者生产的情况下消费数据。这种解耦提高了系统的灵活性和效率。

  3. 防止数据丢失:在没有缓冲区的情况下,如果生产者生产速度过快,消费者可能无法及时消费,导致数据丢失。缓冲区提供了一个存储空间,可以暂时保存生产者生产的数据,直到消费者准备好消费。

  4. 提高并行性:缓冲区允许生产者和消费者以不同的速度独立工作,提高了系统的并行性和吞吐量。生产者可以快速生产数据,而消费者可以根据自己的速度消费数据,两者互不干扰。

  5. 避免死锁和饥饿:通过适当的同步机制(如信号量和条件变量),缓冲区可以避免死锁和饥饿问题。生产者在缓冲区满时等待,消费者在缓冲区空时等待,这些等待条件可以通过同步机制得到管理,确保所有线程都能在适当的时候获得资源。

  6. 实现负载均衡:缓冲区可以平滑生产者和消费者之间的负载波动。在生产者负载高时,缓冲区可以存储额外的数据,而在消费者负载高时,缓冲区可以提供足够的数据供消费。

  7. 简化线程管理:缓冲区提供了一个明确的接口,使得线程管理更加简单。生产者和消费者只需要关注缓冲区的状态,而不需要直接管理其他线程。

总之,缓冲区作为生产者与消费者之间的交易场所,是实现高效、稳定和可扩展的多线程系统的关键组件。它通过协调生产和消费、解耦生产者和消费者、防止数据丢失、提高并行性、避免死锁和饥饿以及实现负载均衡等方式,提高了系统的整体性能和可靠性。

四、基于阻塞队列的生产者消费者模型

什么是阻塞队列?它的作用是什么?

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

生产者将数据放入队列,消费者从队列中取出数据。

  • 当队列为空时,消费者会阻塞等待,直到有数据可用;
  • 当队列满时,生产者会阻塞等待,直到有空间可用。 

通过阻塞操作,阻塞队列可以有效地管理资源,避免过度生产或消费,从而提高并发性能。

#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include "Thread.hpp"

static const int MAX_CAPACITY = 6;

template <typename T>
class BlockQueue
{
private:
    std::queue<T> _block_queue;//阻塞队列,临界资源
    int _max_capacity;//队列最大容量
    pthread_mutex_t _mutex;//生产者和消费者之间需要满足互斥关系。因为当生产者在生产时,消费者不能去消费,否则容易造成临界资源错乱。反之亦然
    pthread_cond_t _producer_cond;//当生产资源达到最大容量的时候,生产者需要在生产者的条件变量下等待,当有空余空间时,在进行生产,由消费者告知生产者来生产
    pthread_cond_t _consumer_cond;//当生产资源被消费完之后,消费者需要在消费者条件变量下等待,直到新的资源被生产,由生产者唤醒消费者来进行消费
    //在生产资源未到最大容量和生产资源未空时,消费者和生产者形成互斥关系,竞争临界资源的管理权
public:
    //构造
    BlockQueue(int capacity = MAX_CAPACITY)
    :_max_capacity(capacity)
    {
        pthread_mutex_init(&_mutex, nullptr);//初始化锁
        pthread_cond_init(&_producer_cond, nullptr);//初始化条件变量
        pthread_cond_init(&_consumer_cond, nullptr);
    }

    void Push(const T& in)
    {
        //生产者和消费者竞争同一把锁
        pthread_mutex_lock(&_mutex);

        while(IsFull())//为什么用while,不用if? 防止当有多个生产者时,使用broadcast会造成多个线程被唤醒,而此时多个线程已经经过了if判断。
        //使用if只能判断一次,无法避免时间差所带来的条件改变,因此需要循环检查。
        {
            //生产者在生产者的条件变量处等待,等待时释放锁。被唤醒时重新参与锁的竞争
            pthread_cond_wait(&_producer_cond, &_mutex);
        }
        //走到这里一定不为满
        _block_queue.push(in);
        pthread_mutex_unlock(&_mutex);

        //走到这个地方说明此时阻塞队列中一定有数据,可以唤醒消费者线程
        pthread_cond_signal(&_consumer_cond);
    }

    void Pop(T* out)    //输出型参数,将数据带出来
    {
        //生产者和消费者竞争同一把锁
        pthread_mutex_lock(&_mutex);
        while(IsEmpty())
        {
            //消费者在消费者的条件变量处等待,等待时释放锁,被唤醒时重新参与锁的竞争。竞争到锁之后才能返回
            pthread_cond_wait(&_consumer_cond, &_mutex);
        }
        //走到这里一定不为空
        *out = _block_queue.front();
        _block_queue.pop();
        pthread_mutex_unlock(&_mutex);

        //走到这个地方说明阻塞队列中一定有剩余空间,可以唤醒生产者
        pthread_cond_signal(&_producer_cond);
    }

    bool IsFull()
    {
        return _block_queue.size() == _max_capacity;
    }

    bool IsEmpty()
    {
        return _block_queue.empty();
    }

    //析构
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_producer_cond);
        pthread_cond_destroy(&_consumer_cond);
    }
};

需要注意的是,条件变量的等待必须要放在while循环中进行条件判断。

  • 在多线程环境中,多个线程可能同时操作共享资源。即使一个线程被唤醒,也可能因为其他线程的操作使得条件不再满足。因此,在条件变量的等待过程中,必须持续检查条件是否真的符合期望,确保线程在安全的条件下继续执行。
  • 在pthread_cond_wait函数的等待队列中可能存在多个等待线程。如果使用 if 进行条件判断,仅能在该执行流执行管理资源的代码前进行判断。假如此时阻塞队列中恰好生产了一个数据,而条件变量的等待队列中恰好有多个线程被其他线程使用pthread_cond_broadcast函数同时唤醒,无论

为什么不使用 if 判断呢?

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  • 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,此时在pthread_cond_wait函数的等待队列中可能存在多个等待线程,因此就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  • 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

生产者消费者模型的使用场景示例:

生产者承担的工作:不断地将创建的任务放入阻塞队列中。

消费者承担的工作:不断地从阻塞队列中提取任务,并进行执行。

阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。

#include "Blocking_Queue.hpp"
#include <functional>
#include <unistd.h>
#include <ctime>
using task_t = std::function<void()>;

void download()
{
    std::cout << "DownLooad ......" << std::endl;
}

void *Productor(void *block_queue)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(block_queue);
    while (true)
    {
        std::cout << "Producing ......" << std::endl;
        bq->Push(download);
        sleep(1);
    }
    return (void*)0;
}

void *Consumer(void *block_queue)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(block_queue);
    while (true)
    {
        sleep(1);
        std::cout << "Consuming ......" << std::endl;
        task_t out;
        bq->Pop(&out);
        out();
    }
    return (void*)0;
}

int main()
{
    BlockQueue<task_t> block_queue;
    pthread_t productor_tid, consumer_tid;
    pthread_create(&productor_tid, nullptr, Productor, static_cast<void *>(&block_queue));
    pthread_create(&consumer_tid, nullptr, Consumer, static_cast<void *>(&block_queue));
    pthread_join(productor_tid, nullptr);
    pthread_join(consumer_tid, nullptr);
    return 0;
}

由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。

需要注意的是:由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。这点可根据需求自行修改。

五、理解生产者消费者模型代码

看完上述内容后,同学们可能会有这种疑惑:生产者消费者模型不是为了实现高并发而设计的吗?为什么在生产者创建任务时和消费者提取任务时是串行的,而不是并行的?

在回答这个问题之前,我们需要了解一下什么是并发,什么又是并行。

并发(Concurrency):

定义:并发指的是系统能够处理多个任务的能力,这些任务可能是同时进行的,也可能是交替进行的。在并发的场景下,任务可以在同一时间段内交替执行,但不一定是同时的并发更关注的是任务的切换和管理。

并行(Parallelism)

定义:并行指的是系统能够真正地同时执行多个任务。在并行的场景下,多个任务在同一时间段内在不同的处理器或计算核心上同时执行。并行更多地关注的是计算任务的真正同时处理。

在生产者消费者模型中,生产者和消费者对阻塞队列的操作仅仅是向阻塞队列中添加和提取数据,而阻塞队列是共享资源,我们必须对其进行线程安全的保护,让各个执行流串行地去执行临界区代码,保证对临界资源操作的原子性。

添加与提取任务恰恰是生产者消费者模型中执行比较快速的操作。我们可以假设此时有多个生产者线程,此时只能有一个生产者线程能够进入临界区中。那么,在同一时刻,其他的生产者线程一定都在等待进入临界区中呢?答案是否定的!

我们需要知道,今天我们只是使用该模型执行了简单的任务。当遇见复杂任务时生产者需要时间来构建任务!反之,消费者也需要时间来执行任务! 在某个生产者线程向阻塞队列中添加任务时,其他生产者线程可能在等待,也可能在进行任务的生产。消费者线程亦然!

由此我们可以看出,上述代码实现的生产者消费者模型并不是低效的。相反,它有效地管理共享资源的访问、合理安排线程的任务,并确保系统在处理复杂任务时仍能保持高效和稳定。同时解决了生产者与消费者可能出现的忙闲不均的问题,实现了负载均衡。