线程(三) linux 同步

发布于:2025-07-22 ⋅ 阅读:(14) ⋅ 点赞:(0)

目录

概念补充

条件变量

操作

例:多线程抢票

封装

生产者消费者模型

生产者和消费者之间的关系

BlockQueue(阻塞队列)

单生产单消费

信号量

简介

操作

多生产者多消费者RingQueue(环形队列)代码

sem封装   

信号量与锁

小知识


概念补充

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步

竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件.

条件变量

简单来说,条件变量相当于一个阻塞队列,将线程入队列(等待),满足条件就出队列(唤醒)

操作

全局条件变量

和mutex相似,使用宏初始化一个全局的条件变量

局部条件变量

attr传nullptr即可

等待

cond为条件变量,mutex为锁

唤醒

signal为唤醒一个线程

broadcast为唤醒所有线程

例:多线程抢票

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

int tickets = 100;
void* func(void* args)
{
    std::string name(static_cast<char*>(args));
    while(true)
    {
        pthread_mutex_lock(&lock);
        pthread_cond_wait(&cond,&lock);
        if(tickets==0) {pthread_mutex_unlock(&lock);break;}
        std::cout<<name<<"  "<<--tickets<<std::endl;
        pthread_mutex_unlock(&lock);
    }
    return (void*)0;
}

int main()
{
    pthread_t t1,t2,t3;

    pthread_create(&t1,nullptr,func,(void*)"t1");
    pthread_create(&t2,nullptr,func,(void*)"t2");
    pthread_create(&t3,nullptr,func,(void*)"t3");

    while(true)
    {
        pthread_cond_signal(&cond);
        sleep(0.3);
        if(tickets==0)
        {
            pthread_cond_broadcast(&cond);
            break;
        }
    }

    pthread_join(t1,nullptr);
    pthread_join(t2,nullptr);
    pthread_join(t3,nullptr);
    return 0;
}

每个线程执行到wait时会阻塞住,

为什么wait要传锁,

因为wait会让线程等待,如果线程申请到了锁等待,那么其他线程就无法申请到锁,所以wait会先释放锁再等待,等被唤醒后先申请锁再向下执行,保证互斥.

封装

#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"

class Cond
{
    pthread_cond_t _cond;
public:    
    Cond()
    {
        pthread_cond_init(&_cond,nullptr);
    }
    void Wait(LockGuard& lock)
    {
        int n = pthread_cond_wait(&_cond,lock.Get());
        (void)n;
    }
    void Notifyone()
    {
        int n = pthread_cond_signal(&_cond);
        (void)n;
    }
    void Notifyall()
    {
        int n = pthread_cond_broadcast(&_cond);
        (void)n;
    }
    ~Cond()
    {
        pthread_cond_destroy(&_cond);
    }
};

生产者消费者模型

和生活中的超市类似,就是让不同线程作为生产产品者和消费产品者,临界区作为厂家和消费者的中转站超市.

为什么存在这样的模型:

1.减少生产和消费过程中产生的成本

2.支持生产和消费的忙闲不均

3.维护松耦合关系


减少成本问题:   比如一个方便面厂家,生产方便面都是成批生产,而消费者一般只会按个按箱购买,如果两者直接交易,厂家生产一袋方便面,消费者购买,这样厂家的成本会很高;如果厂家生产一批,消费者也买不了一批产品,消费者成本会很高.

支持忙闲不均和松耦合关系问题:    还是上面的例子,厂家可以生产一批放到超市,然后等消费者慢慢消费,生产和消费没有直接关系

生产者和消费者之间的关系

生产者之间 : 互斥关系

消费者之间 : 互斥关系

生产者与消费者之间 : 互斥+同步关系

BlockQueue(阻塞队列)

在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

//<BlockQueue.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "mutex.hpp"
#include "cond.hpp"//上面封装的条件变量

const static u_int32_t CAP = 5;

template<typename T>
class BlockQueue
{
    std::queue<T> _q;
    u_int32_t _cap;

    pthread_mutex_t _lock;

    Cond _c_cond;//消费者
    Cond _p_cond;//生产者

    unsigned int _c_wait_num;//当前消费者等待的个数
    unsigned int _p_wait_num;//当前生产者等待的个数  为了防止唤醒次数过多或唤醒无意义

    bool isFull()
    {
        return _q.size()>=_cap;
    }

    bool isEmpty()
    {
       return _q.empty();
    }

public:    
    BlockQueue(u_int32_t cap = CAP) :_cap(cap)
    {}

    void Enqueue(const T& data)
    {
        {
            LockGuard lock(&_lock);
            while(isFull())   //while循环防止wait调用失败和伪唤醒问题
            {
                _p_wait_num++;
                _p_cond.Wait(lock);
                _p_wait_num--;
            }
            _q.push(data);
            if(_c_wait_num>0)
               _c_cond.Notifyone();

        }
    }

    void Pop(T* retval)
    {

        {

            LockGuard lock(&_lock);

            while(isEmpty())     //while循环防止wait调用失败和伪唤醒问题

            {

                _c_wait_num++;

                _c_cond.Wait(lock);

                _c_wait_num--;

            }

            *retval = _q.front();

            _q.pop();

            if(_p_wait_num>0)

                _p_cond.Notifyone();

        }

    }

    ~BlockQueue()

    {}

};
单生产单消费
#include "blockqueue.hpp"
#include <iostream>
#include <string>
#include <unistd.h>

struct Data
{
    BlockQueue<int>* bq;
    std::string name;
};

void *consumer(void* args)
{
    Data* data = static_cast<Data*>(args);
    int retval = 0;
    while(true)
    {
        data->bq->Pop(&retval);
        std::cout<<data->name<<"消费了   " <<retval<<std::endl;   
        //sleep(1);
    } 
    return (void*)0;
}

void *productor(void* args)
{
    Data* data = static_cast<Data*>(args);
    int num = 1;
    while(true)
    {
        data->bq->Enqueue(num);
        std::cout<<data->name<<"生产了   "<<num++<<std::endl;
        sleep(1);
    }
    return (void*)0;
}

int main()
{
    BlockQueue<int>* bq = new BlockQueue<int>();
    pthread_t c,p;
    
    Data ctd = {bq,"消费者"};
    pthread_create(&c,nullptr,consumer,(void*)&ctd);

    Data ptd = {bq,"生产者"};
    pthread_create(&p,nullptr,productor,(void*)&ptd);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

对于多生产和多消费,BlockQueue.hpp也支持

信号量

简介

信号量针对多线程并发访问一块资源中的不同部分.

本质上,就是一个描述资源数量的计数器

操作

sem_init   初始化信号量

pshared   用来表示线程使用还是进程使用,0表示线程间使用

value  信号量的初始值

sem_destroy  销毁信号量

sem_wait   信号量--

sem_post  信号量++

多生产者多消费者RingQueue(环形队列)代码

生产者关注空余空间资源,消费者关注数据资源

sem封装   <sem.hpp>
#pragma once
#include <iostream>
#include <semaphore.h>

#define NUM 5

class Sem
{
    sem_t _sem;
    int _initnum;
public:
    Sem(int num = NUM):_initnum(num)
    {
        sem_init(&_sem,0,_initnum);
    }
    void P()
    {
        int n = sem_wait(&_sem);
        (void)n;
    }
    void V()
    {
        int n = sem_post(&_sem);
        (void)n;
    }
    ~Sem()
    {
        sem_destroy(&_sem);
    }
};
<mutex.hpp>
#pragma once
#include <iostream>
#include <unistd.h>

class LockGuard
{
    pthread_mutex_t* _lock;

public:
    LockGuard(pthread_mutex_t* lock)
        :_lock(lock)
    {
        pthread_mutex_lock(_lock);
    }    
    pthread_mutex_t* Get()
    {
        return _lock;
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_lock);
    }
};
<cond.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"

class Cond
{
    pthread_cond_t _cond;
public:    
    Cond()
    {
        pthread_cond_init(&_cond,nullptr);
    }
    void Wait(LockGuard& lock)
    {
        int n = pthread_cond_wait(&_cond,lock.Get());
        (void)n;
    }
    void Notifyone()
    {
        int n = pthread_cond_signal(&_cond);
        (void)n;
    }
    void Notifyall()
    {
        int n = pthread_cond_broadcast(&_cond);
        (void)n;
    }
    ~Cond()
    {
        pthread_cond_destroy(&_cond);
    }
};
<RingQueue.hpp>
#pragma once
#include <iostream>
#include "sem.hpp"
#include <vector>
#include "mutex.hpp"

#define CAP 10

template<typename T>
class RingQueue
{
    std::vector<T> _rq;
    int _cap;

    Sem _data_sem;
    Sem _space_sem;

    int _c_step;
    int _p_step;

    pthread_mutex_t _p_lock;
    pthread_mutex_t _c_lock;
public:
    RingQueue(int cap = CAP)
    :_cap(cap) ,_rq(cap) ,_data_sem(0) ,_space_sem(cap),_c_step(0),_p_step(0)
    {
        pthread_mutex_init(&_p_lock,nullptr);
        pthread_mutex_init(&_c_lock,nullptr);
    }
    void Enqueue(const T& data)
    {   
        _space_sem.P();
        {
            LockGuard lock(&_p_lock);
            _rq[_c_step++] = data;
            _c_step%=_cap;
        }
        _data_sem.V();
    }
    void Pop(T* data)
    {
        _data_sem.P();
        {
            LockGuard lock(&_c_lock);
            *data = _rq[_p_step++];
            _p_step%=_cap;
        }
        _space_sem.V();
    }
    ~RingQueue()
    {
        pthread_mutex_destroy(&_p_lock);
        pthread_mutex_destroy(&_c_lock);
    }
};

<RingQueue.cpp>

#include <iostream>
#include "sem.hpp"
#include "mutex.hpp"
#include "cond.hpp"
#include <pthread.h>
#include "RingQueue.hpp"

RingQueue<int> rq(10);

void* consumer(void* args)
{
    
    while(true)
    {
        int data = 0;
        rq.Pop(&data);
        std::cout<<"消费  "<<data<<std::endl;
    }
    return (void*)0;
}

void* productor(void* args)
{
    int data = 1;
    while(true)
    {
        
        rq.Enqueue(data);
        std::cout<<"生产  "<<data++<<std::endl;
    }
    return (void*)0;
}

int main()
{
    pthread_t c[2],p[2];
    pthread_create(c,nullptr,consumer,nullptr);
    pthread_create(c+1,nullptr,consumer,nullptr);
    pthread_create(p,nullptr,productor,nullptr);
    pthread_create(p+1,nullptr,productor,nullptr);

    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr);

    return 0;
}

信号量与锁

为什么信号量不需要加锁,信号量为空或为满时,信号量完成了同步和互斥动作,信号量不为空和不为满时,生产者和消费者访问不同位置,也不需要加锁


二元信号量(信号量初始值为1) :就是锁

锁:认为资源只有一份,申请锁相当于信号量P操作(--),释放锁相当于信号量V操作(++),所以锁是信号量的一种特殊情况

小知识

1.pthread开头的函数一般返回值为0表示成功

2.生产和消费还可以传递函数,用来给线程派发任务

3.生产消费模型高效在哪里:生产者生产数据和消费者处理数据是并发的,两者是松耦合关系,可以忙闲不均,这里效率高,而二者向阻塞队列中进行放入和拿出属于同步和互斥,效率不高


网站公告

今日签到

点亮在社区的每一天
去签到