一、模拟实现生产消费者模型
1.1 生产消费者模型示意图
1.2 生产消费者三种关系、两种角色、一个交易场所
三种关系:
①、 生产者跟生产者关系:互斥关系:
生产者不能同时往一个内存空间里存放数据;
②、生产者跟消费者关系:互斥关系、同步关系:
生产者在往一个内存空间里放数据时消费者不能同时进行读取,要等生产者存放完之后,且生产者生产了数据要及时告诉消费者来消费,消费者消费完数据要及时告诉生产者来生产;
③、消费者跟消费者关系:互斥关系;
消费者不能同时往一个内存空间获取数据;
两种角色:
生产者、消费者;
一个交易场所:
共享的特定结构的内存空间;
1.3 阻塞等待队列的生产消费者模型
多线程编程中的阻塞等待队列:
其中thread1扮演的是生产者,BlockingQueue扮演的是数据交易场所,thread2扮演的是消费者;
thread1往blockingqueue里生产数据,thread2往BlockingQueue里消费数据;
代码:
特定结构的内存空间(数据交易场所):
//阻塞队列的实现(数据交易场所,特定结构的内存空间)
#pragma once
//阻塞等待队里的实现
#include <iostream>
#include <queue>
#include<pthread.h>
static int defaultNum=10;
template<class T>//设置成模版类可支持存放其他类型数据
class BlockingQueue
{
private:
std::queue<T> blockqueue;//数据交易地点
int Max_capacity;//代表队列能存放的最大数据个数
pthread_mutex_t mutex_;//用锁实现互斥关系
pthread_cond_t cond_p;//用条件变量实现同步关系
pthread_cond_t cond_c;//消费者到消费者的等待队列、生产者到生产者的等待队列
private:
void lock()
{
pthread_mutex_lock(&mutex_);
}
void unlock()
{
pthread_mutex_unlock(&mutex_);
}
bool isempty()
{
return blockqueue.size()==0;
}
public:
BlockingQueue(int maxcapacity=defaultNum)
:Max_capacity(maxcapacity)
{
//锁和条件变量初始化
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&cond_p,nullptr);
pthread_cond_init(&cond_c,nullptr);
}
void push(const T& data)
{
//往队列生产数据
//如果数据满了,让生产到等待队列里等待
lock();//访问队列前先上锁
while(Max_capacity==blockqueue.size())//用while防止伪唤醒
{
pthread_cond_wait(&cond_p,&mutex_);//等待并解锁
}
blockqueue.push(data);//走到这里说明 1.队列不为满 2.生产者被唤醒且队列不为满
pthread_cond_signal(&cond_c);//生产一个数据就唤醒一个消费者来消费(同步)
unlock();//访问结束解锁
}
T pop()
{
//从队列中消费数据
lock();//访问前先上锁
while(isempty())//消费前先判空,while循环防止伪唤醒
{
//如果为空到消费者的等待队列里等、
pthread_cond_wait(&cond_c,&mutex_);
}
T out=blockqueue.front();
blockqueue.pop();
pthread_cond_signal(&cond_p);//消费一个数据唤醒一次生产者队列
unlock();
return out;
}
~BlockingQueue()
{
//锁、条件变量销毁
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_c);
pthread_cond_destroy(&cond_p);
}
};
生产消费的任务(数据):
//自定义的运算+-*/%任务类
#pragma once
#include <iostream>
std::string opers="+-*/%";//运算方法
enum{
Divzero=1,//除0错误码
Modzero,//模0错误码
Unknow//未知错误
};
//计算加减乘除的类
class Task
{
private:
int data1_;//第一个数据
int data2_;//第二个数据
char operator_;//运算方法
int result_;//运算结果
int exitcode_;//运算退出码
public:
Task(int data1,int data2,char opeator)
:data1_(data1),data2_(data2),operator_(opeator),result_(0),exitcode_(0)
{
}
void operator()()
{
run();
}
void run()//运算
{
switch(operator_)
{
case '+':
result_=data1_+data2_;
break;
case '-':
result_=data1_-data2_;
break;
case '*':result_=data1_*data2_;
break;
case '/':if(data2_){result_=data1_/data2_;}else{exitcode_=Divzero;}
break;
case '%':if(data2_){result_=data1_/data2_;}else{exitcode_=Modzero;}
break;
default:
exitcode_=Unknow;
break;
}
}
std::string getTask()
{
string str;
str+=std::to_string(data1_);
str+=operator_;
str+=std::to_string(data2_);
str+='=';
str+='?';
return str;
}
std::string getResult()
{
string str;
str+=std::to_string(data1_);
str+=operator_;
str+=std::to_string(data2_);
str+='=';
str+=to_string(result_);
str+='[';
str+=to_string(exitcode_);
str+=']';
return str;
}
};
建立两种角色:生产者消费者:
//main函数 创建生产消费者
#include <iostream>
using namespace std;
#include "BlockingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <unistd.h>
void *producter(void *args)
{
BlockingQueue<Task> *bq = static_cast<BlockingQueue<Task> *>(args);
// 生产者生产数据
// 1. 获取数据 2. 产生数据
int len = opers.size(); //+-*/%
while (true) // 不停获取产生数据
{
int x = rand() % 10 + 1; //[1,10]
int y = rand() % 10; //[0,9]
char c = opers[rand() % len];
Task t(x, y, c); // 获取数据
bq->push(t); // 产生数据
cout << "producter tid is: " << pthread_self() << " i am producter ,procude a task done! task is: " << t.getTask() << endl;
sleep(1);
}
return nullptr;
}
void *consumer(void *args)
{
BlockingQueue<Task> *bq = static_cast<BlockingQueue<Task> *>(args);
// 消费者消费数据
// 1. 获取数据 2.处理数据
while (true) // 不停获取处理数据
{
Task t = bq->pop(); // 获取数据
t(); // 处理数据
cout << "consumer tid is: " << pthread_self() << " i am a consumer, consum a task done! task is: " << t.getTask() << " , task result is: " << t.getResult() << endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr)); // 种一个时间种子
BlockingQueue<Task> *bq = new BlockingQueue<Task>(); // 建立共享特定结构内存空间
pthread_t c[3], p[5];
// 创建生产消费者线程
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, consumer, bq); // 传入特定结构内存空间
}
for (int i = 0; i < 5; i++)
{
pthread_create(p + i, nullptr, producter, bq);
}
// 等待线程
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
return 0;
}
makefile:
mycp:mycp.cpp
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f mycp
编译运行:
生产者依次生产任务数据,消费者依次消费任务数据。
二、基于环形队列的生产消费模型
2.1、特点
①、p所指向的位置代表生产者即将生产的数据,c所指向的位置代表消费者即将消费的数据;
②、当p跟c在同一个位置时,代表当前队列为空或者满;
③、消费不能大于生产;
④、指向同一个位置时不能同时访问;
2.2、利用信号量标记资源的数量
信号量的本质是一把计数器,是用来描述资源数目的:
生产者关注的是剩余空间的多少,消费者关注的是剩余数据的多少;
生产者的pv操作:p:空间,v:数据;
消费者的pv操作:p:数据,v:空间;
信号量相关的接口:
#include<semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//1. 这是一个信号量初始化的接口
//2. sem: 初始化的信号量指针 , pshared: 设0,线程间共享,value: 刚开始设定的资源数量
//信号量的销毁
int sem_destory(sem_t* sem);
//信号量的申请
int sem_wart(sem_t *sem);// P操作
//信号量的发布
int sem_post( sem_t* sem); //V操作
2.3、实现
环形队列:
#pragma once
#include <semaphore.h>
const int defaultNum=10;
#include<iostream>
#include<vector>
template<class T>
class RingQueue
{
private:
int index_p;//生产者下标
int index_c;//消费者下标
std::vector<T> ringqueue_;//数据的存放位置(交易场所)
sem_t space_sem_p;//生产者关注空间资源
sem_t data_sem_c;//消费者关注数据资源
pthread_mutex_t mutex_;
private:
void P( sem_t& sem)
{
sem_wait(&sem);//--
}
void V( sem_t& sem)
{
sem_post(&sem);//++
}
void lock()
{
pthread_mutex_lock(&mutex_);
}
void unlock()
{
pthread_mutex_unlock(&mutex_);
}
public:
RingQueue(int capacity=defaultNum)
:index_c(0),index_p(0),ringqueue_(defaultNum)
{
//初始化信号量
sem_init(&space_sem_p,0,defaultNum);
sem_init(&data_sem_c,0,0);
pthread_mutex_init(&mutex_,nullptr);
}
void push(const T& data)
{
//入数据
//先申请信号量(申请空间资源的信号量)
P(space_sem_p);//先确保申请成功再让进来
lock();//进来要上锁
ringqueue_[index_p]=data;
index_p++;
index_p%=ringqueue_.size();
unlock();
V(data_sem_c);
}
T pop()
{
//出数据
P(data_sem_c);
lock();
T out=ringqueue_[index_c];
index_c++;
index_c%=ringqueue_.size();
unlock();
V(space_sem_p);
return out;
}
~RingQueue()
{
//信号量销毁
sem_destroy(&space_sem_p);
sem_destroy(&data_sem_c);
pthread_mutex_destroy(&mutex_);
}
};
Task:
#pragma once
#include <iostream>
std::string opers="+-*/%";//运算方法
enum{
Divzero=1,//除0错误码
Modzero,//模0错误码
Unknow//未知错误
};
//计算加减乘除的类
class Task
{
private:
int data1_;//第一个数据
int data2_;//第二个数据
char operator_;//运算方法
int result_;//运算结果
int exitcode_;//运算退出码
public:
Task(){};
Task(int data1,int data2,char opeator)
:data1_(data1),data2_(data2),operator_(opeator),result_(0),exitcode_(0)
{
}
void operator()()
{
run();
}
void run()//运算
{
switch(operator_)
{
case '+':
result_=data1_+data2_;
break;
case '-':
result_=data1_-data2_;
break;
case '*':result_=data1_*data2_;
break;
case '/':if(data2_){result_=data1_/data2_;}else{exitcode_=Divzero;}
break;
case '%':if(data2_){result_=data1_/data2_;}else{exitcode_=Modzero;}
break;
default:
exitcode_=Unknow;
break;
}
}
std::string getTask()
{
string str;
str+=std::to_string(data1_);
str+=operator_;
str+=std::to_string(data2_);
str+='=';
str+='?';
return str;
}
std::string getResult()
{
string str;
str+=std::to_string(data1_);
str+=operator_;
str+=std::to_string(data2_);
str+='=';
str+=to_string(result_);
str+='[';
str+=to_string(exitcode_);
str+=']';
return str;
}
};
main:
#include <iostream>
using namespace std;
#include "RingQueue.hpp"
#include <pthread.h>
#include"Task.hpp"
#include<ctime>
#include <unistd.h>
void* product(void*args)
{
RingQueue<Task>*rq=static_cast<RingQueue<Task>*>(args);
int len=opers.size();
while(true)
{
//1.数据从何来 2. 数据到哪去?
int x=rand()%10+1;
int y=rand()%10;
Task t(x,y,opers[rand()%len]);
rq->push(t);
cout<<" product a task success!! task is : "<<t.getTask()<<" tid is: ";
printf("0x%x\n",pthread_self());
sleep(1);
}
return nullptr;
}
void* consume(void*args)
{
RingQueue<Task>*rq=static_cast<RingQueue<Task>*>(args);
//1.数据从何而来 2.数据到哪去?
while(true)
{
Task t=rq->pop();
t();//处理数据
cout<<" consum a task done!!task rusult is:"<<t.getResult()<<" | tid is: ";
printf("0x%x\n",pthread_self());
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr));
RingQueue<Task>*rq=new RingQueue<Task>(20);
pthread_t p[2],c[2];
for(int i=0;i<2;i++)
{
pthread_create(p+i,nullptr,product,rq);
pthread_create(c+i,nullptr,consume,rq);
}
for(int i=0;i<2;i++)
{
pthread_join(p[i],nullptr);
pthread_join(c[i],nullptr);
}
return 0;
}
makefile:
ringqueue:Main.cpp
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ringqueue
运行:
生产者先生产数据,消费者依次消费历史数据;
今天的分享就到这里,如果对你有所帮助记得点赞收藏+关注哦!!谢谢!!!
咱下期见!!