前言
在多线程编程中,生产者-消费者模型 是最常见的并发模式之一。生产者线程负责不断将数据放入队列,而消费者线程则从队列中取出数据进行处理。
C++ 标准库为我们提供了多种队列相关容器和工具,可以轻松实现这一模型。本文实践一步步掌握 C++ 队列的使用。
一、C++ 中常见的队列容器
1. std::queue
底层默认基于
std::deque
实现先进先出(FIFO)结构
常用方法:
push()
:入队pop()
:出队(不返回值,只移除)front()
:取队首元素back()
:取队尾元素empty()
:判空size()
:元素数量
示例代码:
#include <iostream>
#include <queue>
int main() {
std::queue<int> q;
q.push(10);
q.push(20);
q.push(30);
std::cout << "队首: " << q.front() << std::endl; // 10
std::cout << "队尾: " << q.back() << std::endl; // 30
q.pop(); // 移除 10
std::cout << "新的队首: " << q.front() << std::endl; // 20
}
执行结果
2. std::deque
(双端队列)
- 既可以从队首插入/删除,也可以从队尾插入/删除
std::queue
的底层就是基于std::deque
示例:
#include <deque>
#include <iostream>
int main() {
std::deque<int> dq;
dq.push_back(1); // 尾部插入
dq.push_front(2); // 头部插入
std::cout << dq.front() << " " << dq.back() << std::endl; // 2 1
}
执行结果
3. std::priority_queue
(优先队列)
- 默认大顶堆(最大值优先)
- 适用于需要任务调度、优先级处理的场景
示例:
#include <queue>
#include <iostream>
#include <vector>
int main() {
std::priority_queue<int> pq;
pq.push(5);
pq.push(1);
pq.push(10);
std::cout << "最大元素: " << pq.top() << std::endl; // 10
}
执行结果
对的 👍 你这段话是非常关键的知识点,可以在博客里单独拉出来,做成一个 小节重点提示。
我建议可以这样组织,读者会更容易理解:
二、为什么 std::queue
不是线程安全的?
C++ 标准库中的 std::queue
只是一个容器适配器,它本身不包含任何并发控制机制。
在单线程环境中直接使用没有问题,但在多线程场景下,如果生产者线程在 push
的同时消费者线程在 pop
,就会导致数据竞争 (data race),甚至造成程序崩溃。
如何解决?
我们需要引入额外的同步原语:
std::mutex
(互斥锁)- 用于保证对队列的操作是原子性的
- 在同一时刻,只有一个线程可以修改队列
std::condition_variable
(条件变量)- 用于在队列为空时阻塞消费者线程,避免忙等 (busy waiting)
- 当生产者放入数据时,调用
notify_one()
或notify_all()
来唤醒等待的消费者
三、使用队列实现生产者消费者模型
下面我们实现一个 典型的生产者消费者模型,其中:
- 生产者线程不断往队列中放入数据
- 消费者线程不断从队列中取出数据
示例代码
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
std::queue<int> q;
std::mutex mtx;
std::condition_variable cv;
bool done = false; // 结束标志
// 生产者
void producer(int id) {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mtx);
q.push(i + id * 100);
std::cout << "生产者 " << id << " 生产数据: " << (i + id * 100) << std::endl;
cv.notify_one(); // 通知消费者
}
// 通知结束
std::unique_lock<std::mutex> lock(mtx);
done = true;
cv.notify_all();
}
// 消费者
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !q.empty() || done; });
if (!q.empty()) {
int value = q.front();
q.pop();
lock.unlock();
std::cout << "消费者 " << id << " 消费数据: " << value << std::endl;
} else if (done) {
break;
}
}
}
int main() {
std::thread p1(producer, 1);
std::thread p2(producer, 2);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p1.join();
p2.join();
c1.join();
c2.join();
std::cout << "生产者消费者模型结束。" << std::endl;
return 0;
}
运行结果
四、可直接复用的线程安全队列实现
把 std::mutex
和 std::condition_variable
封装到一个类里,避免他们每次都要写锁和条件变量逻辑。编写一个可复用的ThreadSafeQueue<T>
模板实现:
模板实现
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueue&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
// 入队
void push(T value) {
{
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(std::move(value));
}
cv_.notify_one(); // 通知一个等待线程
}
// 出队(阻塞)
T wait_and_pop() {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return !queue_.empty(); });
T value = std::move(queue_.front());
queue_.pop();
return value;
}
// 出队(非阻塞,返回是否成功)
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty())
return false;
value = std::move(queue_.front());
queue_.pop();
return true;
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.size();
}
private:
mutable std::mutex mtx_;
std::condition_variable cv_;
std::queue<T> queue_;
};
调用代码
#include <iostream>
#include <thread>
#include "ThreadSafeQueue.h"
ThreadSafeQueue<int> tsq;
void producer() {
for (int i = 0; i < 10; i++) {
tsq.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "生产: " << i << "\r\n";
}
}
void consumer() {
for (int i = 0; i < 10; i++) {
int value = tsq.wait_and_pop();
std::cout << "消费: " << value << "\r\n";
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
运行结果
这样,在实际项目里就可以直接用 ThreadSafeQueue<T>
代替裸 std::queue<T>
,代码会简洁很多。
五、总结
std::queue
:最常用的队列容器,适合 FIFO 场景std::deque
:双端队列,支持从头尾高效操作std::priority_queue
:适合任务调度与优先级处理- 多线程场景下必须加锁,并配合
std::condition_variable
使用,才能实现高效的生产者-消费者模型