【Linux】多线程 -> 线程同步与基于BlockingQueue的生产者消费者模型

发布于:2025-02-22 ⋅ 阅读:(15) ⋅ 点赞:(0)

线程同步

条件变量

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如:一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

同步概念

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。

竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解。

条件变量函数

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict

attr);

参数:

cond:要初始化的条件变量

attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond);

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:

cond:要在这个条件变量上等待

mutex:互斥量,后面详细解释

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒一批线程。

int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个线程。

示例代码:

makefile:

testCond:testCond.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f testCond

testCond.cc:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>

int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *start_routine(void *args)
{
    std::string name = static_cast<const char *>(args);
    while (true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);
        // 判断暂时省略
        std::cout << name << "->" << tickets << std::endl;
        tickets--;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    // 通过条件变量控制线程的执行
    pthread_t t[4];
    for (int i = 0; i < 4; i++)
    {
        char *name = new char[64];
        snprintf(name, 64, "thread %d", i + 1);
        pthread_create(t + i, nullptr, start_routine, (void *)name);
    }
    while (true)
    {
        sleep(1);
        //pthread_cond_broadcast(&cond); // 唤醒一批线程
        pthread_cond_signal(&cond);//唤醒一个线程
        std::cout << "main thread wakeup one thread... " << std::endl;
    }
    for (const auto &i : t)
    {
        pthread_join(i, nullptr);
    }

    return 0;
}

pthread_cond_signal:唤醒一个线程。      pthread_cond_broadcast:唤醒一批线程。

这些线程会持续等待一个条件变量的信号。主线程每隔 1 秒就会发送一个条件变量信号,唤醒其中一个等待的线程。被唤醒的线程会输出当前剩余的票数并将票数减 1。

可以看到,由于条件变量的存在,输出结果变得有顺序性。

  • 为什么 pthread_cond_wait 需要互斥量?

1. 保证条件检查和等待操作的原子性

在多线程环境中,线程需要先检查某个条件是否满足,如果不满足则进入等待状态。这个检查条件和进入等待的操作必须是原子的,否则可能会出现竞态条件

例如,在生产者 - 消费者模型中,消费者线程需要检查缓冲区是否为空,如果为空则等待。假设没有互斥量保护,可能会出现以下情况:

  • 消费者线程检查到缓冲区为空,准备进入等待状态。

  • 在消费者线程真正进入等待状态之前,生产者线程往缓冲区中添加了数据,并发出了条件变量的通知。

  • 消费者线程此时才进入等待状态,由于之前通知已经发出,消费者线程可能会一直等待下去,导致程序出现错误。

使用互斥量可以保证条件检查和进入等待状态这两个操作的原子性。当线程调用pthread_cond_wait时,它会先释放互斥量,然后进入等待状态;当被唤醒时,又会重新获取互斥量。这样就避免了上述竞态条件的发生。

2. 保护共享资源和条件变量

条件变量通常与共享资源相关联,线程在检查条件和修改共享资源时需要保证线程安全。互斥量可以用来保护这些共享资源,确保同一时间只有一个线程能够访问和修改它们

在调用pthread_cond_wait之前,线程需要先获取互斥量,这样可以保证在检查条件和进入等待状态时,其他线程不会同时修改共享资源和条件变量。当线程被唤醒后,再次获取互斥量,又可以保证在处理共享资源时的线程安全

生产者消费者模型

  • 为何要使用生产者消费者模型?

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

“321”原则(便于记忆)

  • 3种关系:生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥、同步)。
  • 2种角色:生产者线程和消费者线程。
  • 1种交易场所:一段特定结构的缓冲区。

优点

  1. 解耦:生产线程和消费线程解耦。
  2. 支持忙闲不均:生产和消费的一段时间的忙闲不均。
  3. 提高效率:支持并发。

基于BlockingQueue的生产消费模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

下面我们以单生产者,单消费者为例:

makefile:

MainCp:MainCp.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f MainCp

BlockQueue.hpp:

#include <iostream>
#include <queue>
#include <pthread.h>

const int gmaxcap = 5;

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int &maxcap = gmaxcap) : _maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    void push(const T &in) // 输入型参数,const &;输出型参数 *;输入输出型参数 &;
    {
        pthread_mutex_lock(&_mutex);
        // 1.判断
        while (is_full())
        // if(is_full())//细节2:充当判断的语法必须是while,不能是if,因为在被唤醒时,有可能存在异常或伪唤醒。
        {
            // 细节1:pthread_cond_wait是在临界区啊。
            // pthread_cond_wait的第二个参数,必须是我们正在使用的互斥锁。
            // a.pthread_cond_wait:该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起。
            // b.pthread_cond_wait: 该函数在被唤醒返回的时候,会自动的重新获取你传入的锁。
            pthread_cond_wait(&_pcond, &_mutex); // 因为生产条件不满足,无法生产,生产者进行等待。
        }
        // 2.走到这里,一定是没有满的。
        _q.push(in);
        // 3.一定能保证阻塞队列里有数据。
        // 细节3:pthread_cond_signal:可以放在临界区内部,也可以放在外部。
        pthread_cond_signal(&_ccond); // 唤醒消费者消费。这里可以有一定的策略。
        pthread_mutex_unlock(&_mutex);
        // pthread_cond_siganl(&_ccond);
    }
    void pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        // 1.判断
        while (is_empty())
        // if(is_empty())
        {
            pthread_cond_wait(&_ccond, &_mutex); // 因为消费条件不满足,无法消费,消费者进行等待。
        }
        // 2.走到这里,一定是不为空的。
        *out = _q.front();
        _q.pop();
        // 3.一定能保证阻塞队列里至少有一个空位置。
        pthread_cond_signal(&_pcond); // 唤醒生产者生产。这里可以有一定的策略。
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size() == _maxcap;
    }

private:
    std::queue<T> _q;
    int _maxcap; // 队列中元素的上限
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond; // 生产者对应的条件变量
    pthread_cond_t _ccond; // 消费者对应的条件变量
};

 MainCp.cc:

#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

void *consumer(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);
    while (true)
    {
        // 生产活动
        int data;
        bq->pop(&data);
        std::cout << "消费数据:" << data << std::endl;
    }
    return nullptr;
}
void *productor(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);
    while (true)
    {
        // 生产活动
        int data = rand() % 10 + 1; // 这里我们先用一个随机数构建一个数据。
        bq->push(data);
        std::cout << "生产数据:" << data << std::endl;
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<int> *bq = new BlockQueue<int>();
    pthread_t c, p;
    // 生产消费要看到同一份资源,也就是阻塞队列
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;

    return 0;
}

如果不加控制,生产消费就会疯狂的执行,没有顺序。

  • 你怎么证明它是一个阻塞队列呢?

让生产者每隔一秒生产一个,消费者一直消费。那么最终的预期结果就是生产一个,消费一个;生产一个,消费一个。

void *consumer(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);
    while (true)
    {
        // 生产活动
        int data;
        bq->pop(&data);
        std::cout << "消费数据:" << data << std::endl;
    }
    return nullptr;
}
void *productor(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);
    while (true)
    {
        // 生产活动
        int data = rand() % 10 + 1; // 这里我们先用一个随机数构建一个数据。
        bq->push(data);
        std::cout << "生产数据:" << data << std::endl;
        sleep(1);
    }
    return nullptr;
}

让消费者每隔一秒消费一个,生产者一直生产。那么最终的预期结果就是消费一个,生产一个;消费一个,生产一个。

void *consumer(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);
    while (true)
    {
        // 生产活动
        int data;
        bq->pop(&data);
        std::cout << "消费数据:" << data << std::endl;
        sleep(1);
    }
    return nullptr;
}
void *productor(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);
    while (true)
    {
        // 生产活动
        int data = rand() % 10 + 1; // 这里我们先用一个随机数构建一个数据。
        bq->push(data);
        std::cout << "生产数据:" << data << std::endl;
    }
    return nullptr;
}

这就是基于阻塞队列的生产消费模型。

上面我们阻塞队列里放的就是一个整形数据,我们可以再完善一下。我们是可以直接在阻塞队列中放任务的。让生产者给消费者派发任务。

makefile:

MainCp:MainCp.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f MainCp

BlockQueue.hpp:

#include <iostream>
#include <queue>
#include <pthread.h>

const int gmaxcap = 5;

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int &maxcap = gmaxcap) : _maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    void push(const T &in) // 输入型参数,const &;输出型参数 *;输入输出型参数 &;
    {
        pthread_mutex_lock(&_mutex);
        // 1.判断
        while (is_full())
        // if(is_full())//细节2:充当判断的语法必须是while,不能是if,因为在被唤醒时,有可能存在异常或伪唤醒。eg:一个生产者十个消费者,broadcast唤醒。
        {
            // 细节1:pthread_cond_wait是在临界区啊。
            // pthread_cond_wait的第二个参数,必须是我们正在使用的互斥锁。
            // a.pthread_cond_wait:该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起。
            // b.pthread_cond_wait: 该函数在被唤醒返回的时候,会自动的重新获取你传入的锁。
            pthread_cond_wait(&_pcond, &_mutex); // 因为生产条件不满足,无法生产,生产者进行等待。
        }
        // 2.走到这里,一定是没有满的。
        _q.push(in);
        // 3.一定能保证阻塞队列里有数据。
        // 细节3:pthread_cond_signal:可以放在临界区内部,也可以放在外部。
        pthread_cond_signal(&_ccond); // 唤醒消费者消费。这里可以有一定的策略。
        pthread_mutex_unlock(&_mutex);
        // pthread_cond_siganl(&_ccond);
    }
    void pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        // 1.判断
        while (is_empty())
        // if(is_empty())
        {
            pthread_cond_wait(&_ccond, &_mutex); // 因为消费条件不满足,无法消费,消费者进行等待。
        }
        // 2.走到这里,一定是不为空的。
        *out = _q.front();
        _q.pop();
        // 3.一定能保证阻塞队列里至少有一个空位置。
        pthread_cond_signal(&_pcond); // 唤醒生产者生产。这里可以有一定的策略。
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size() == _maxcap;
    }

private:
    std::queue<T> _q;
    int _maxcap; // 队列中元素的上限
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond; // 生产者对应的条件变量
    pthread_cond_t _ccond; // 消费者对应的条件变量
};

Task.hpp:

#pragma once

#include <iostream>
#include <cstdio>
#include <functional>

class Task
{
    using func_t = std::function<int(int, int, char)>;
    // typedef std::function<int(int,int,char)>func_t;
public:
    Task()
    {
    }
    Task(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callback(func)
    {
    }
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

MainCp.cc:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

const std::string oper = "+-*/%";

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
        {
            if (y == 0)
            {
                std::cerr << "div zero error!" << std::endl;
                result = -1;
            }
            else
            {
                result = x / y;
            }
        }
        break;
    case '%':
        {
            if (y == 0)
            {
                std::cerr << "mod zero error!" << std::endl;
                result = -1;
            }
            else
            {
                result = x % y;
            }
        }
        break;
    }
    return result;
}

void *consumer(void *bq_)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(bq_);
    while (true)
    {
        // 消费活动
        Task t;
        bq->pop(&t);
        std::cout << "消费任务:" << t() << std::endl;
        //sleep(1);
    }
    return nullptr;
}
void *productor(void *bq_)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(bq_);
    while (true)
    {
        // 生产活动
        int x = rand() % 100 + 1; // 这里我们先用一个随机数构建一个数据。
        int y = rand() % 10;
        int operCode = rand() % oper.size();
        Task t(x, y, oper[operCode], mymath);
        bq->push(t);

        std::cout << "生产任务:" << t.toTaskString() << std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c, p;
    // 生产消费要看到同一份资源,也就是阻塞队列
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;

    return 0;
}

让生产者sleep1秒,看到的结果就是生产一个任务,消费一个任务。

让消费者sleep1秒,看到的结果就是消费一个任务,生产一个任务。

这样,我们就完成了一个线程给另一个线程派发任务:生产者给消费者派发任务。

  • 上面是单生产者,单消费者,如果我们直接改成多个生产者多个消费者可以吗?

MainCp.cc:

//
//...
//...
int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c, c1, p, p1, p2;
    // 生产消费要看到同一份资源,也就是阻塞队列
    pthread_create(&p, nullptr, productor, bq);
    pthread_create(&p1, nullptr, productor, bq);
    pthread_create(&p2, nullptr, productor, bq);

    pthread_create(&c1, nullptr, consumer, bq);
    pthread_create(&c, nullptr, consumer, bq);

    pthread_join(c, nullptr);
    pthread_join(c1, nullptr);
    
    pthread_join(p, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    delete bq;

    return 0;
}

可以看到是可以的。无论外部的线程再多,真正进入到阻塞队列里生产或消费的线程永远只有一个。

生产者要向blockqueue里放任务,消费者要向blockqueue里取任务。由于有锁的存在,这个(生产过程和消费过程)过程是串行的,也就是blockqueue里任何时刻只有一个执行流。那么:

  • 那么生产者消费者模型高效在哪里呢?创建多线程生产和消费的意义是什么呢?

1、对于生产者而言,它获取数据构建任务,是需要花时间的。

  • 如果这个任务的构建非常耗时,这个线程(生产者)在构建任务的同时,其他线程(生产者)可以并发的继续构建任务。

2、对于消费者而言,它拿到任务以后,是需要花时间处理这个任务的!

  • 如果这个任务的处理非常耗时,这个线程(消费者)在处理任务的同时,其他线程(消费者)可以并发的继续从阻塞队列里拿任务处理。

所以,高效并不体现在生产者把任务放进阻塞队列里高效,或者消费者从阻塞队列里拿任务高效。而是,体现在多个线程可以同时并发的构建或处理任务。

对于单生产单消费,它的并发性体现在,消费者从阻塞队列里拿任务和生产者构建任务,或者生产者往阻塞队列里放任务和消费者处理任务的过程是并发的。

总结:生产消费模型高效体现在,可以在生产前,和消费之后,让线程并行执行。

创建多线程生产和消费的意义:

多个线程可以并发生产,并发消费。

以上就是线程同步和基于阻塞队列的生产者消费者模型。