LINUX 线程(下)

发布于:2025-05-29 ⋅ 阅读:(62) ⋅ 点赞:(0)

一、模拟实现生产消费者模型

1.1 生产消费者模型示意图

1.2 生产消费者三种关系、两种角色、一个交易场所

三种关系:

①、 生产者跟生产者关系:互斥关系:

生产者不能同时往一个内存空间里存放数据;

、生产者跟消费者关系:互斥关系、同步关系:

生产者在往一个内存空间里放数据时消费者不能同时进行读取,要等生产者存放完之后,且生产者生产了数据要及时告诉消费者来消费,消费者消费完数据要及时告诉生产者来生产;

③、消费者跟消费者关系:互斥关系;

消费者不能同时往一个内存空间获取数据;

两种角色:

生产者、消费者;

一个交易场所:

共享的特定结构的内存空间;

1.3 阻塞等待队列的生产消费者模型

多线程编程中的阻塞等待队列:

其中thread1扮演的是生产者,BlockingQueue扮演的是数据交易场所,thread2扮演的是消费者;

thread1往blockingqueue里生产数据,thread2往BlockingQueue里消费数据;

代码:

特定结构的内存空间(数据交易场所):

//阻塞队列的实现(数据交易场所,特定结构的内存空间)
#pragma once
//阻塞等待队里的实现
#include <iostream>
#include <queue>
#include<pthread.h>
static int defaultNum=10;

template<class T>//设置成模版类可支持存放其他类型数据
class BlockingQueue
{

    private:
    std::queue<T> blockqueue;//数据交易地点
    int Max_capacity;//代表队列能存放的最大数据个数
    pthread_mutex_t mutex_;//用锁实现互斥关系
    pthread_cond_t cond_p;//用条件变量实现同步关系
    pthread_cond_t cond_c;//消费者到消费者的等待队列、生产者到生产者的等待队列
    private:
    void lock()
    {
        pthread_mutex_lock(&mutex_);
    }
    void unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }
    bool isempty()
    {
        return blockqueue.size()==0;
    }
    public:
    BlockingQueue(int maxcapacity=defaultNum)
    :Max_capacity(maxcapacity)
    {
        //锁和条件变量初始化
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&cond_p,nullptr);
        pthread_cond_init(&cond_c,nullptr);
    }
    void push(const T& data)
    {
        //往队列生产数据
        //如果数据满了,让生产到等待队列里等待
        lock();//访问队列前先上锁
        while(Max_capacity==blockqueue.size())//用while防止伪唤醒
        {
            pthread_cond_wait(&cond_p,&mutex_);//等待并解锁
        }
        blockqueue.push(data);//走到这里说明 1.队列不为满  2.生产者被唤醒且队列不为满
        pthread_cond_signal(&cond_c);//生产一个数据就唤醒一个消费者来消费(同步)
        unlock();//访问结束解锁
    }
    T pop()
    {
        //从队列中消费数据
            lock();//访问前先上锁
            while(isempty())//消费前先判空,while循环防止伪唤醒
            {
                //如果为空到消费者的等待队列里等、
                pthread_cond_wait(&cond_c,&mutex_);
            }
            T out=blockqueue.front();
            blockqueue.pop();
            pthread_cond_signal(&cond_p);//消费一个数据唤醒一次生产者队列
            unlock();
            return out;

    }
    ~BlockingQueue()
    {
        //锁、条件变量销毁
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_c);
        pthread_cond_destroy(&cond_p);
    }
};

 生产消费的任务(数据):

//自定义的运算+-*/%任务类
#pragma once
#include <iostream>
std::string opers="+-*/%";//运算方法
enum{
    Divzero=1,//除0错误码
    Modzero,//模0错误码
    Unknow//未知错误
};
//计算加减乘除的类
class Task
{
    private:
    int data1_;//第一个数据
    int data2_;//第二个数据
    char operator_;//运算方法
    int result_;//运算结果
    int exitcode_;//运算退出码
    public:
    Task(int data1,int data2,char opeator)
    :data1_(data1),data2_(data2),operator_(opeator),result_(0),exitcode_(0)
    {

    }
    void operator()()
    {
        run();
    }
    void run()//运算
    {
        switch(operator_)
        {
            case '+':
            result_=data1_+data2_;
            break;
            case '-':
            result_=data1_-data2_;
            break;
            case '*':result_=data1_*data2_;
            break;
            case '/':if(data2_){result_=data1_/data2_;}else{exitcode_=Divzero;}
            break;
            case '%':if(data2_){result_=data1_/data2_;}else{exitcode_=Modzero;}
            break;
            default:
            exitcode_=Unknow;
            break; 
        }
    
    }
    std::string getTask()
    {
        string str;
        str+=std::to_string(data1_);
        str+=operator_;
        str+=std::to_string(data2_);
        str+='=';
        str+='?';
        return str;
    }
    std::string getResult()
    {
        string str;
        str+=std::to_string(data1_);
        str+=operator_;
        str+=std::to_string(data2_);
        str+='=';
        str+=to_string(result_);
        str+='[';
        str+=to_string(exitcode_);
        str+=']';
        return str;
    }


};

 建立两种角色:生产者消费者:

//main函数 创建生产消费者
#include <iostream>
using namespace std;
#include "BlockingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <unistd.h>
void *producter(void *args)
{
    BlockingQueue<Task> *bq = static_cast<BlockingQueue<Task> *>(args);

    // 生产者生产数据
    // 1. 获取数据 2. 产生数据
    int len = opers.size(); //+-*/%
    while (true)            // 不停获取产生数据
    {
        int x = rand() % 10 + 1; //[1,10]
        int y = rand() % 10;     //[0,9]
        char c = opers[rand() % len];
        Task t(x, y, c); // 获取数据
        bq->push(t);     // 产生数据
        cout << "producter tid is: " << pthread_self() << "  i am producter ,procude a task done! task is: " << t.getTask() << endl;
        sleep(1);
    }
    return nullptr;
}
void *consumer(void *args)
{
    BlockingQueue<Task> *bq = static_cast<BlockingQueue<Task> *>(args);
    // 消费者消费数据
    // 1. 获取数据 2.处理数据
    while (true) // 不停获取处理数据
    {
        Task t = bq->pop(); // 获取数据
        t();                // 处理数据
        cout << "consumer tid is: " << pthread_self() << "  i am a consumer, consum a task done! task is: " << t.getTask() << " , task result is: " << t.getResult() << endl;
        sleep(1);
    }
    return nullptr;
}
int main()
{
    srand(time(nullptr)); // 种一个时间种子

    BlockingQueue<Task> *bq = new BlockingQueue<Task>(); // 建立共享特定结构内存空间
    pthread_t c[3], p[5];
    // 创建生产消费者线程
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, consumer, bq); // 传入特定结构内存空间
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_create(p + i, nullptr, producter, bq);
    }

    // 等待线程
    for (int i = 0; i < 3; i++)
    {

        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }

    return 0;
}

makefile:

mycp:mycp.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f mycp

编译运行:

生产者依次生产任务数据,消费者依次消费任务数据。

二、基于环形队列的生产消费模型

2.1、特点

①、p所指向的位置代表生产者即将生产的数据,c所指向的位置代表消费者即将消费的数据;

②、当p跟c在同一个位置时,代表当前队列为空或者满;

③、消费不能大于生产;

④、指向同一个位置时不能同时访问;

2.2、利用信号量标记资源的数量

信号量的本质是一把计数器,是用来描述资源数目的:

生产者关注的是剩余空间的多少,消费者关注的是剩余数据的多少;

生产者的pv操作:p:空间,v:数据;

消费者的pv操作:p:数据,v:空间;

信号量相关的接口:

#include<semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//1. 这是一个信号量初始化的接口
//2. sem: 初始化的信号量指针  , pshared: 设0,线程间共享,value: 刚开始设定的资源数量
//信号量的销毁
int sem_destory(sem_t* sem);
//信号量的申请
int sem_wart(sem_t *sem);// P操作
//信号量的发布
int sem_post( sem_t* sem); //V操作
2.3、实现

环形队列:

#pragma once
#include <semaphore.h>
const int defaultNum=10;
#include<iostream>
#include<vector>
template<class T>
class RingQueue
{
    private:
   int index_p;//生产者下标
   int index_c;//消费者下标
   std::vector<T> ringqueue_;//数据的存放位置(交易场所)
    sem_t space_sem_p;//生产者关注空间资源
    sem_t data_sem_c;//消费者关注数据资源
    pthread_mutex_t mutex_;
    private:
    void P( sem_t& sem)
    {
        sem_wait(&sem);//--
    }
    void V( sem_t& sem)
    {
        sem_post(&sem);//++
    }
    void lock()
    {
        pthread_mutex_lock(&mutex_);
    }
    void unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }
    public:
    RingQueue(int capacity=defaultNum)
    :index_c(0),index_p(0),ringqueue_(defaultNum)
    {
        //初始化信号量
        sem_init(&space_sem_p,0,defaultNum);
        sem_init(&data_sem_c,0,0);
        pthread_mutex_init(&mutex_,nullptr);
    }
    void push(const T& data)
    {
        //入数据
        //先申请信号量(申请空间资源的信号量)
        P(space_sem_p);//先确保申请成功再让进来
        lock();//进来要上锁

        ringqueue_[index_p]=data;
        index_p++;
        index_p%=ringqueue_.size();
        unlock();
        V(data_sem_c);
    }
    T pop()
    {
        //出数据
        P(data_sem_c);
        lock();
        T out=ringqueue_[index_c];
        index_c++;
        index_c%=ringqueue_.size();
        unlock();
        V(space_sem_p);
        return out;
    }
    ~RingQueue()
    {
        //信号量销毁
        sem_destroy(&space_sem_p);
        sem_destroy(&data_sem_c);
        pthread_mutex_destroy(&mutex_);
    }
};

Task:

#pragma once
#include <iostream>
std::string opers="+-*/%";//运算方法
enum{
    Divzero=1,//除0错误码
    Modzero,//模0错误码
    Unknow//未知错误
};
//计算加减乘除的类
class Task
{
    private:
    int data1_;//第一个数据
    int data2_;//第二个数据
    char operator_;//运算方法
    int result_;//运算结果
    int exitcode_;//运算退出码
    public:
    Task(){};
    Task(int data1,int data2,char opeator)
    :data1_(data1),data2_(data2),operator_(opeator),result_(0),exitcode_(0)
    {

    }
    void operator()()
    {
        run();
    }
    void run()//运算
    {
        switch(operator_)
        {
            case '+':
            result_=data1_+data2_;
            break;
            case '-':
            result_=data1_-data2_;
            break;
            case '*':result_=data1_*data2_;
            break;
            case '/':if(data2_){result_=data1_/data2_;}else{exitcode_=Divzero;}
            break;
            case '%':if(data2_){result_=data1_/data2_;}else{exitcode_=Modzero;}
            break;
            default:
            exitcode_=Unknow;
            break; 
        }
    
    }
    std::string getTask()
    {
        string str;
        str+=std::to_string(data1_);
        str+=operator_;
        str+=std::to_string(data2_);
        str+='=';
        str+='?';
        return str;
    }
    std::string getResult()
    {
        string str;
        str+=std::to_string(data1_);
        str+=operator_;
        str+=std::to_string(data2_);
        str+='=';
        str+=to_string(result_);
        str+='[';
        str+=to_string(exitcode_);
        str+=']';
        return str;
    }


};

main:

#include <iostream>
using namespace std;
#include "RingQueue.hpp"
#include <pthread.h>
#include"Task.hpp"
#include<ctime>
#include <unistd.h>
void* product(void*args)
{
    RingQueue<Task>*rq=static_cast<RingQueue<Task>*>(args);
    int len=opers.size();
    while(true)
    {
        //1.数据从何来 2. 数据到哪去?
      int x=rand()%10+1;
      int y=rand()%10;
      Task t(x,y,opers[rand()%len]);
      rq->push(t);
      cout<<" product a task success!! task is : "<<t.getTask()<<" tid is: ";
        printf("0x%x\n",pthread_self());
        sleep(1);
    }
    return nullptr;
}
void* consume(void*args)
{
    RingQueue<Task>*rq=static_cast<RingQueue<Task>*>(args);
    //1.数据从何而来 2.数据到哪去?
    while(true)
    {
        Task t=rq->pop();
        t();//处理数据
        cout<<" consum a task done!!task rusult is:"<<t.getResult()<<" | tid is: ";
        printf("0x%x\n",pthread_self());

        sleep(1);
    }

    return nullptr;
}
int main()
{
    srand(time(nullptr));
    RingQueue<Task>*rq=new RingQueue<Task>(20);
    pthread_t p[2],c[2];
    for(int i=0;i<2;i++)
    {
    pthread_create(p+i,nullptr,product,rq);
    pthread_create(c+i,nullptr,consume,rq);
    }
    for(int i=0;i<2;i++)
    {
    pthread_join(p[i],nullptr);
    pthread_join(c[i],nullptr);
    }

    return 0;
}

makefile:

ringqueue:Main.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ringqueue

运行:

生产者先生产数据,消费者依次消费历史数据;

今天的分享就到这里,如果对你有所帮助记得点赞收藏+关注哦!!谢谢!!!

咱下期见!!


网站公告

今日签到

点亮在社区的每一天
去签到