当两个线程同时对一个变量做五万次加法,但最后结果通常小于十万。
#include <iostream>
#include <unistd.h>
int count = 0;
void* threadStart(void *args)
{
for(int i = 0; i < 50000; i++)
count++;
return NULL;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
int n1 = pthread_create(&tid1, nullptr, threadStart, (void*)"thread1 ");
int n2 = pthread_create(&tid2, nullptr, threadStart, (void*)"thread2 ");
if(n1 != 0 || n2 != 0)
{
std::cerr << "create thread error..." << std::endl;
return 1;
}
pthread_join(tid1,nullptr);
pthread_join(tid2,nullptr);
std::cout << "count: " << count << std::endl;
return 0;
}
关键就在于 count++ 这行代码并不是原子操作,也就是会涉及读取、修改和写入三个操作。
两个线程都对 count 加了1,但其中一个线程的操作被另一个线程的操作覆盖了,看起来只加了一次。
可以通过 对共享资源的访问 进行加锁解决:
通过加锁(lock),确保同一时刻只有一个线程可以进入临界区(访问共享资源的那段程序)。线程A加锁后就可以对共享资源进行操作,其他线程就会被阻塞,直到线程A解锁(unlock)。
加锁时应只锁住真正需要保护的共享数据访问代码。比如只锁住 count++ 这行,因为只有这行对共享资源进行操作,若把整个for循环锁住,会导致不必要的串行化,降低并发性能。
线程同步
线程同步是指协调多个线程的执行顺序和对共享资源的访问,以保证程序的正确性。否则一个线程刚解锁,立马又加锁,其他线程就会一直获取不到锁。
条件变量
条件变量就是一种实现线程同步机制的一种方式。它允许线程等待某个条件成立,并由其他线程在条件成立时通知等待的线程。
假设有如下场景:一个线程(生产者)生产产品,其他线程(消费者)获取产品,产品就是共享资源,就会出现消费者不断获取产品也就是不断加锁,但实际上却获取不到产品。这就会导致生产者获取不到锁,无法将产品放置给生产者获取。现在通过约定,消费者若获取不到产品,就等待,此时生产者就可以获取锁来放置产品,然后通过一种东西(电话或者短信等)来通知消费者(可通知一个线程或多个线程),此时消费者就可以去获取产品了。在这个例子中,加锁是为了对共享资源的操作,东西(电话或短信)就是条件变量。
pthread_cond_wait():
作用:让当前线程在条件变量cond 上等待,直到被其他线程通过 pthread_cond_signal 或 pthread_cond_broadcast 唤醒。当线程等待后,函数会释放传入的锁,其他线程就可以获取锁并操作共享数据。该线程会加入到条件变量cond 的等待队列中。线程被唤醒后,会重新参与锁的竞争,意味着函数返回时,线程持有锁。
pthread_cond_signal():
作用:唤醒一个正在 cond 上等待的线程。如果没有任何线程在等待,则此调用无任何效果。pthread_cond_broadcast 则是唤醒所有正在 cond 上等待的线程。
生产-消费者模型
BlockQueue.hpp:
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
const static int defaultCap = 5;
template<typename T>
class BlockQueue
{
private:
bool isFull()
{
return _block_queue.size() == _max_cap;
}
bool isEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap = defaultCap): _max_cap(cap)
{
pthread_mutex_init(&_mutex, nullptr); //初始化锁
pthread_cond_init(&_p_cond, nullptr); //初始化条件变量
pthread_cond_init(&_c_cond, nullptr); //初始化条件变量
}
void Pop(T *out)
{
pthread_mutex_lock(&_mutex); //加锁
//使用while循环判断,因为可能存在另一个消费者把数据拿走,导致队列为空
while(isEmpty())
{
//队列为空,等待
pthread_cond_wait(&_c_cond, &_mutex);
}
//队列不空 或者 被唤醒
*out = _block_queue.front();
_block_queue.pop();
pthread_mutex_unlock(&_mutex); //解锁
//消费者取走数据,队列不满,通知生产者生产
pthread_cond_signal(&_p_cond);
}
void Equeue(const T &in)
{
pthread_mutex_lock(&_mutex); //加锁
while(isFull())
{
//队列已满,不能生产,需等待
//等待时,会释放锁,让其他线程获取
//函数返回时,还在临界区,所以会重新参与锁的竞争,重新加上锁后函数才会返回,对临界资源安全操作
pthread_cond_wait(&_p_cond, &_mutex);
}
//队列不满 或者 被唤醒
_block_queue.push(in); //放入到队列
pthread_mutex_unlock(&_mutex); //解锁
//生产者放入数据,队列不空,通知消费者取数据
pthread_cond_signal(&_c_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex); //回收锁
pthread_cond_destroy(&_p_cond); //回收条件变量
pthread_cond_destroy(&_c_cond); //回收条件变量
}
private:
std::queue<T> _block_queue; //临界资源
int _max_cap; //最大容量
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; //生产者条件变量-用来唤醒生产者
pthread_cond_t _c_cond; //消费者条件变量-用来唤醒消费者
};
Task.hpp:
#pragma once
#include <string>
class Task
{
public:
Task()
{}
Task(int x, int y):_x(x), _y(y)
{}
void Excute()
{
_result = _x + _y;
}
std::string Result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
private:
int _x;
int _y;
int _result;
};
main.cc:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
void *consumerFunc(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1 拿数据
Task t;
bq->Pop(&t);
// 2 处理数据
t.Excute();
std::cout << "消费者 ->:" << t.Result() << std::endl;
}
}
void *producerFunc(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
sleep(1);
// 1 生产数据
int x = rand() % 10 + 1; //1~10
int y = rand() % 20 + 1; //1~20
Task t(x, y);
// 2 放数据
bq->Equeue(t);
std::cout << "生产者 ->:" << t.debug() << std::endl;
}
}
int main()
{
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t consumer, producer;
pthread_create(&consumer, nullptr, consumerFunc, bq);
pthread_create(&producer, nullptr, producerFunc, bq);
pthread_join(consumer, nullptr);
pthread_join(producer, nullptr);
return 0;
}