RabbitMQ是一个开源的消息队列系统,它实现了高级消息队列协议(AMQP),
特点
- 可靠性:通过持久化、镜像队列等机制保证消息不丢失,确保消息可靠传递。
- 灵活的路由:提供多种路由方式,如直连、主题、扇形等,可根据不同的业务需求进行灵活配置。
- 多语言支持:支持多种编程语言,如Java、Python、C++、Ruby等,方便不同技术栈的开发者使用。
- 高可用性:可以通过集群方式实现高可用性,避免单点故障,保证系统的稳定运行。
- 易于管理:提供了可视化的管理界面,方便管理员对队列、交换机、消息等进行监控和管理。
核心概念
- 消息:应用程序之间传递的数据单元。
- 队列:用于存储消息的缓冲区,消息会在队列中等待消费者来获取。
- 交换机:接收生产者发送的消息,并根据路由键将消息路由到相应的队列。
- 绑定:将交换机和队列通过路由键进行关联,建立路由规则。
- 生产者:负责发送消息到RabbitMQ服务器的应用程序。
- 消费者:从RabbitMQ服务器获取消息并进行处理的应用程序。
工作原理
- 生产者将消息发送到交换机,消息中包含路由键等信息。
- 交换机根据路由键和绑定规则,将消息路由到对应的队列。
- 消费者从队列中获取消息并进行处理。
应用场景
- 异步处理:将一些耗时的操作如发送邮件、生成报表等放在消息队列中,由消费者异步处理,提高系统的响应速度。
- 系统解耦:不同模块之间通过消息队列进行通信,降低模块之间的耦合度,使得各个模块可以独立扩展和维护。
- 流量削峰:在高并发场景下,将大量的请求放入消息队列中,由消费者按照一定的速度进行处理,避免系统因瞬间高流量而崩溃。
C++实现rabbitmq生产者消费者
// MFCRABBITMQDlg.h: 头文件
//
#pragma once
#include <iostream>
#include <string>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#pragma comment(lib , "rabbitmq.lib")
// CMFCRABBITMQDlg 对话框
class CMFCRABBITMQDlg : public CDialogEx
{
// 构造
public:
CMFCRABBITMQDlg(CWnd* pParent = nullptr); // 标准构造函数
// 对话框数据
#ifdef AFX_DESIGN_TIME
enum { IDD = IDD_MFCRABBITMQ_DIALOG };
#endif
protected:
virtual void DoDataExchange(CDataExchange* pDX); // DDX/DDV 支持
// 实现
protected:
HICON m_hIcon;
// 生成的消息映射函数
virtual BOOL OnInitDialog();
afx_msg void OnSysCommand(UINT nID, LPARAM lParam);
afx_msg void OnPaint();
afx_msg HCURSOR OnQueryDragIcon();
DECLARE_MESSAGE_MAP()
public:
afx_msg void OnBnClickedButtonProduct();
void CMFCRABBITMQDlg::sendMessageToRabbitMQ(const std::string& message);
void CMFCRABBITMQDlg::receiveMessagesFromRabbitMQ();
afx_msg void OnBnClickedButtonConsumer();
void CMFCRABBITMQDlg::showLog(CString p_str_log);
CString m_CstrLog;
CEdit m_ctrlLog;
};
// MFCRABBITMQDlg.cpp: 实现文件
//
#include "pch.h"
#include "framework.h"
#include "MFCRABBITMQ.h"
#include "MFCRABBITMQDlg.h"
#include "afxdialogex.h"
#ifdef _DEBUG
#define new DEBUG_NEW
#endif
// 用于应用程序“关于”菜单项的 CAboutDlg 对话框
class CAboutDlg : public CDialogEx
{
public:
CAboutDlg();
// 对话框数据
#ifdef AFX_DESIGN_TIME
enum { IDD = IDD_ABOUTBOX };
#endif
protected:
virtual void DoDataExchange(CDataExchange* pDX); // DDX/DDV 支持
// 实现
protected:
DECLARE_MESSAGE_MAP()
};
CAboutDlg::CAboutDlg() : CDialogEx(IDD_ABOUTBOX)
{
}
void CAboutDlg::DoDataExchange(CDataExchange* pDX)
{
CDialogEx::DoDataExchange(pDX);
}
BEGIN_MESSAGE_MAP(CAboutDlg, CDialogEx)
END_MESSAGE_MAP()
// CMFCRABBITMQDlg 对话框
CMFCRABBITMQDlg::CMFCRABBITMQDlg(CWnd* pParent /*=nullptr*/)
: CDialogEx(IDD_MFCRABBITMQ_DIALOG, pParent)
{
m_hIcon = AfxGetApp()->LoadIcon(IDR_MAINFRAME);
}
void CMFCRABBITMQDlg::DoDataExchange(CDataExchange* pDX)
{
CDialogEx::DoDataExchange(pDX);
DDX_Control(pDX, IDC_EDIT_LOG, m_ctrlLog);
}
BEGIN_MESSAGE_MAP(CMFCRABBITMQDlg, CDialogEx)
ON_WM_SYSCOMMAND()
ON_WM_PAINT()
ON_WM_QUERYDRAGICON()
ON_BN_CLICKED(IDC_BUTTON_PRODUCT, &CMFCRABBITMQDlg::OnBnClickedButtonProduct)
ON_BN_CLICKED(IDC_BUTTON_CONSUMER, &CMFCRABBITMQDlg::OnBnClickedButtonConsumer)
END_MESSAGE_MAP()
// CMFCRABBITMQDlg 消息处理程序
BOOL CMFCRABBITMQDlg::OnInitDialog()
{
CDialogEx::OnInitDialog();
// 将“关于...”菜单项添加到系统菜单中。
// IDM_ABOUTBOX 必须在系统命令范围内。
ASSERT((IDM_ABOUTBOX & 0xFFF0) == IDM_ABOUTBOX);
ASSERT(IDM_ABOUTBOX < 0xF000);
CMenu* pSysMenu = GetSystemMenu(FALSE);
if (pSysMenu != nullptr)
{
BOOL bNameValid;
CString strAboutMenu;
bNameValid = strAboutMenu.LoadString(IDS_ABOUTBOX);
ASSERT(bNameValid);
if (!strAboutMenu.IsEmpty())
{
pSysMenu->AppendMenu(MF_SEPARATOR);
pSysMenu->AppendMenu(MF_STRING, IDM_ABOUTBOX, strAboutMenu);
}
}
// 设置此对话框的图标。 当应用程序主窗口不是对话框时,框架将自动
// 执行此操作
SetIcon(m_hIcon, TRUE); // 设置大图标
SetIcon(m_hIcon, FALSE); // 设置小图标
// TODO: 在此添加额外的初始化代码
return TRUE; // 除非将焦点设置到控件,否则返回 TRUE
}
void CMFCRABBITMQDlg::OnSysCommand(UINT nID, LPARAM lParam)
{
if ((nID & 0xFFF0) == IDM_ABOUTBOX)
{
CAboutDlg dlgAbout;
dlgAbout.DoModal();
}
else
{
CDialogEx::OnSysCommand(nID, lParam);
}
}
// 如果向对话框添加最小化按钮,则需要下面的代码
// 来绘制该图标。 对于使用文档/视图模型的 MFC 应用程序,
// 这将由框架自动完成。
void CMFCRABBITMQDlg::OnPaint()
{
if (IsIconic())
{
CPaintDC dc(this); // 用于绘制的设备上下文
SendMessage(WM_ICONERASEBKGND, reinterpret_cast<WPARAM>(dc.GetSafeHdc()), 0);
// 使图标在工作区矩形中居中
int cxIcon = GetSystemMetrics(SM_CXICON);
int cyIcon = GetSystemMetrics(SM_CYICON);
CRect rect;
GetClientRect(&rect);
int x = (rect.Width() - cxIcon + 1) / 2;
int y = (rect.Height() - cyIcon + 1) / 2;
// 绘制图标
dc.DrawIcon(x, y, m_hIcon);
}
else
{
CDialogEx::OnPaint();
}
}
//当用户拖动最小化窗口时系统调用此函数取得光标
//显示。
HCURSOR CMFCRABBITMQDlg::OnQueryDragIcon()
{
return static_cast<HCURSOR>(m_hIcon);
}
void CMFCRABBITMQDlg::sendMessageToRabbitMQ(const std::string& message)
{
amqp_connection_state_t conn;
amqp_socket_t* socket = NULL;
int status;
// 初始化连接
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
std::cerr << "Failed to create TCP socket" << std::endl;
return;
}
// 连接到 RabbitMQ 服务器
status = amqp_socket_open(socket, "localhost", 5672);
if (status) {
std::cerr << "Failed to open TCP socket" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 登录到 RabbitMQ 服务器
amqp_rpc_reply_t loginReply = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (loginReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to login to RabbitMQ" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 打开通道
amqp_channel_open(conn, 1);
amqp_rpc_reply_t openReply = amqp_get_rpc_reply(conn);
if (openReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to open channel" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 声明队列
amqp_queue_declare(conn, 1, amqp_cstring_bytes("my_queue"), 0, 0, 0, 0, amqp_empty_table);
amqp_rpc_reply_t queueDeclareReply = amqp_get_rpc_reply(conn);
if (queueDeclareReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to declare queue" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 发布消息到队列
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; // 持久化消息
amqp_basic_publish(conn,
1,
amqp_cstring_bytes(""), // 默认交换器
amqp_cstring_bytes("my_queue"),
0,
0,
&props,
amqp_cstring_bytes(message.c_str()));
// 检查发布结果
amqp_rpc_reply_t publishReply = amqp_get_rpc_reply(conn);
if (publishReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to publish message" << std::endl;
}
else {
std::cout << "Message sent: " << message << std::endl;
}
// 关闭通道和连接
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
}
void CMFCRABBITMQDlg:: receiveMessagesFromRabbitMQ()
{
amqp_connection_state_t conn;
amqp_socket_t* socket = NULL;
int status;
// 初始化连接
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
std::cerr << "Failed to create TCP socket" << std::endl;
return;
}
// 连接到 RabbitMQ 服务器
status = amqp_socket_open(socket, "localhost", 5672);
if (status) {
std::cerr << "Failed to open TCP socket" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 登录到 RabbitMQ 服务器
amqp_rpc_reply_t loginReply = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (loginReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to login to RabbitMQ" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 打开通道
amqp_channel_open(conn, 1);
amqp_rpc_reply_t openReply = amqp_get_rpc_reply(conn);
if (openReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to open channel" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 声明队列
amqp_queue_declare(conn, 1, amqp_cstring_bytes("my_queue"), 0, 0, 0, 0, amqp_empty_table);
amqp_rpc_reply_t queueDeclareReply = amqp_get_rpc_reply(conn);
if (queueDeclareReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to declare queue" << std::endl;
amqp_destroy_connection(conn);
return;
}
// 开始消费消息
amqp_basic_consume(conn, 1, amqp_cstring_bytes("my_queue"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_rpc_reply_t consumeReply = amqp_get_rpc_reply(conn);
if (consumeReply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to start consuming messages" << std::endl;
amqp_destroy_connection(conn);
return;
}
std::cout << "Waiting for messages. To exit press CTRL+C" << std::endl;
showLog(_T("消费者开始监听:"));
while (true) {
amqp_rpc_reply_t reply;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
reply = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != reply.reply_type) {
break;
}
static int t_count = 0;
t_count++;
std::cout << "Received message: ";
char* p1 = static_cast<char*>(envelope.message.body.bytes);
char* p = new char[envelope.message.body.len+1]();
// strcpy_s(p , envelope.message.body.len, "11");
for (size_t i = 0; i < envelope.message.body.len; ++i)
{
p[i] = p1[i];
}
std::cout << std::endl;
CString t_Cstr = (CString)p;
CString t_CstrCount;
t_CstrCount.Format(_T("%d ->"), t_count);
CString t_tep = t_CstrCount + t_Cstr;
showLog(t_tep);
delete[]p;
amqp_destroy_envelope(&envelope);
}
// 关闭通道和连接
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
}
// ch:取流线程 | en:Grabbing thread
unsigned int __stdcall SendThread(void* pUser)
{
if (pUser)
{
CMFCRABBITMQDlg* Gthis = (CMFCRABBITMQDlg*)pUser;
CString t_CstrSend;
for (int i = 0; i < 1000000; ++i)
{
Gthis->GetDlgItemText(IDC_EDIT_TEXT, t_CstrSend);
std::string t_strSend = (CT2A)t_CstrSend;
Gthis->sendMessageToRabbitMQ(t_strSend);
Sleep(1);
}
return 0;
}
return -1;
}
void CMFCRABBITMQDlg::OnBnClickedButtonProduct()
{
unsigned int nThreadID = 0;
void* m_hGrabThread = (void*)_beginthreadex(NULL, 0, SendThread, this, 0, &nThreadID);
if (NULL == m_hGrabThread)
{
showLog(_T("消费者创建线程失败"));
return;
}
}
// ch:取流线程 | en:Grabbing thread
unsigned int __stdcall ConsumThread(void* pUser)
{
if (pUser)
{
CMFCRABBITMQDlg* Gthis = (CMFCRABBITMQDlg*)pUser;
Gthis->receiveMessagesFromRabbitMQ();
return 0;
}
return -1;
}
void CMFCRABBITMQDlg::OnBnClickedButtonConsumer()
{
unsigned int nThreadID = 0;
void* m_hGrabThread = (void*)_beginthreadex(NULL, 0, ConsumThread, this, 0, &nThreadID);
if (NULL == m_hGrabThread)
{
showLog(_T("消费者创建线程失败"));
return;
}
}
void CMFCRABBITMQDlg::showLog(CString p_str_log)
{
int t_intLine = m_ctrlLog.GetLineCount();
if (t_intLine == 1000)
{
m_CstrLog = _T("");
}
SYSTEMTIME timeCur;
GetLocalTime(&timeCur);
char t_logbuffer[1024] = { 0 };
sprintf_s(t_logbuffer, ("[%04d%02d%02d_%02d:%02d:%02d:%03d] ")
, timeCur.wYear, timeCur.wMonth, timeCur.wDay
, timeCur.wHour, timeCur.wMinute, timeCur.wSecond, timeCur.wMilliseconds);
p_str_log += "\r\n";
m_CstrLog += t_logbuffer;
m_CstrLog += p_str_log;
m_ctrlLog.SetWindowTextW(m_CstrLog);
m_ctrlLog.LineScroll(m_ctrlLog.GetLineCount() - 1, 0);
}