在多线程编程中,生产者-消费者模式是一种常见的设计模式,用于解决线程间的数据同步问题。最近,我在 Linux 和 macOS 上运行同一个生产者-消费者模式的程序时,发现它们表现出不同的行为。本文将介绍这个现象、分析其原因,并提供一些改进建议。
现象描述
在 Linux 上运行程序时,消费者线程能够及时处理生产者线程生成的任务,每个任务几乎在生产后立即被处理。然而,在 macOS 上运行同一程序时,生产者线程连续生成多个任务,而消费者线程似乎没有及时处理它们。
代码实现
以下是我用于测试的代码:
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <list>
#include <semaphore.h>
#include <iostream>
class Task {
public:
Task(int taskid) {
this->taskid = taskid;
}
void doTask() {
std::cout << "taskID: " << taskid << ", threadID: " << pthread_self() << std::endl;
}
private:
int taskid;
};
pthread_mutex_t mymutex;
std::list<Task*> tasks;
sem_t mysemaphore;
void* consumer_thread(void* arg) {
Task* pTask = NULL;
while (true) {
if (sem_wait(&mysemaphore) != 0)
continue;
if (tasks.empty())
continue;
pthread_mutex_lock(&mymutex);
pTask = tasks.front();
tasks.pop_front();
pthread_mutex_unlock(&mymutex);
pTask->doTask();
delete pTask;
}
return NULL;
}
void* producer_thread(void* arg) {
int taskid = 0;
Task* pTask = NULL;
while (true) {
pTask = new Task(taskid);
pthread_mutex_lock(&mymutex);
tasks.push_back(pTask);
std::cout << "produce a task, taskid: " << taskid << ", threadid: " << pthread_self() << std::endl;
pthread_mutex_unlock(&mymutex);
sem_post(&mysemaphore);
taskid++;
sleep(1);
}
return NULL;
}
int main() {
pthread_mutex_init(&mymutex, NULL);
sem_init(&mysemaphore, 0, 0);
pthread_t consumerThreadID[5];
for (int i = 0; i < 5; ++i) {
pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
}
pthread_t producerThreadID;
pthread_create(&producerThreadID, NULL, producer_thread, NULL);
pthread_join(producerThreadID, NULL);
for (int i = 0; i < 5; ++i) {
pthread_join(consumerThreadID[i], NULL);
}
sem_destroy(&mysemaphore);
pthread_mutex_destroy(&mymutex);
return 0;
}
Mac运行结果
Linux运行结果
原因分析
信号量和互斥锁的实现差异:
- macOS 和 Linux 对信号量和互斥锁底层的实现不同,这可能影响线程的调度和同步行为。
线程调度策略:
- Linux 和 macOS 使用不同的调度算法。Linux 通常使用完全公平队列调度器(CFS),而 macOS 使用基于优先级的调度策略。这可能导致线程切换的时机不同。
输出函数的行为:
std::cout
在不同系统上的缓冲行为可能不同。在 macOS 上,std::cout
的缓冲可能导致输出延迟。
系统调度器的影响:
- macOS 的调度器可能更倾向于将同一线程的生产者线程优先调度,导致多个任务快速连续地被生产。
改进建议
调整线程优先级:
- 在 macOS 上,可以尝试设置消费者线程的优先级高于生产者线程。
增加调试输出:
- 在
sem_post
和pthread_mutex_unlock
之后添加调试输出,检查信号量和互斥锁是否被正确释放。
- 在
减少生产者线程的生产速度:
- 在生产者线程中增加更长的
sleep
时间,使消费者线程有更多机会处理任务。
- 在生产者线程中增加更长的
检查编译器和运行时环境:
- 确保在两个系统上使用相同版本的编译器和运行时库。
使用条件变量替代信号量:
- 条件变量可能在不同系统上表现更一致。
使用
std::async
和std::future
替代 POSIX 线程:- 如果可以使用 C++11 或更高版本,可以考虑使用
std::async
和std::future
替代 POSIX 线程。
- 如果可以使用 C++11 或更高版本,可以考虑使用
示例改进代码
以下是一个使用条件变量的改进示例:
#include <pthread.h>
#include <condition_variable>
#include <list>
#include <iostream>
class Task {
public:
Task(int taskid) {
this->taskid = taskid;
}
void doTask() {
std::cout << "taskID: " << taskid << ", threadID: " << pthread_self() << std::endl;
}
private:
int taskid;
};
std::condition_variable cv;
std::mutex cv_m;
std::list<Task*> tasks;
bool task_ready = false;
void* consumer_thread(void* arg) {
Task* pTask = NULL;
while (true) {
std::unique_lock<std::mutex> lock(cv_m);
cv.wait(lock, []{ return task_ready; });
if (!tasks.empty()) {
pTask = tasks.front();
tasks.pop_front();
task_ready = false;
}
lock.unlock();
if (pTask) {
pTask->doTask();
delete pTask;
}
}
return NULL;
}
void* producer_thread(void* arg) {
int taskid = 0;
Task* pTask = NULL;
while (true) {
pTask = new Task(taskid);
{
std::unique_lock<std::mutex> lock(cv_m);
tasks.push_back(pTask);
task_ready = true;
}
cv.notify_one();
taskid++;
sleep(1);
}
return NULL;
}
int main() {
pthread_t consumerThreadID[5];
for (int i = 0; i < 5; ++i) {
pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
}
pthread_t producerThreadID;
pthread_create(&producerThreadID, NULL, producer_thread, NULL);
pthread_join(producerThreadID, NULL);
for (int i = 0; i < 5; ++i) {
pthread_join(consumerThreadID[i], NULL);
}
return 0;
}
结论
通过调整线程优先级、增加调试输出、控制生产速度、使用条件变量等方式,可以更好地确保生产者和消费者线程之间的平衡,使任务能够及时被处理。希望这些建议能帮助你在不同操作系统上实现更稳定和高效的生产者-消费者模式。
参考资源
- C++ 条件变量
- C++服务端开发精髓