C++实现rabbitmq生产者消费者

发布于:2025-03-20 ⋅ 阅读:(26) ⋅ 点赞:(0)

RabbitMQ是一个开源的消息队列系统,它实现了高级消息队列协议(AMQP),

特点

  • 可靠性:通过持久化、镜像队列等机制保证消息不丢失,确保消息可靠传递。
  • 灵活的路由:提供多种路由方式,如直连、主题、扇形等,可根据不同的业务需求进行灵活配置。
  • 多语言支持:支持多种编程语言,如Java、Python、C++、Ruby等,方便不同技术栈的开发者使用。
  • 高可用性:可以通过集群方式实现高可用性,避免单点故障,保证系统的稳定运行。
  • 易于管理:提供了可视化的管理界面,方便管理员对队列、交换机、消息等进行监控和管理。

核心概念

  • 消息:应用程序之间传递的数据单元。
  • 队列:用于存储消息的缓冲区,消息会在队列中等待消费者来获取。
  • 交换机:接收生产者发送的消息,并根据路由键将消息路由到相应的队列。
  • 绑定:将交换机和队列通过路由键进行关联,建立路由规则。
  • 生产者:负责发送消息到RabbitMQ服务器的应用程序。
  • 消费者:从RabbitMQ服务器获取消息并进行处理的应用程序。

工作原理

  1. 生产者将消息发送到交换机,消息中包含路由键等信息。
  2. 交换机根据路由键和绑定规则,将消息路由到对应的队列。
  3. 消费者从队列中获取消息并进行处理。

应用场景

  • 异步处理:将一些耗时的操作如发送邮件、生成报表等放在消息队列中,由消费者异步处理,提高系统的响应速度。
  • 系统解耦:不同模块之间通过消息队列进行通信,降低模块之间的耦合度,使得各个模块可以独立扩展和维护。
  • 流量削峰:在高并发场景下,将大量的请求放入消息队列中,由消费者按照一定的速度进行处理,避免系统因瞬间高流量而崩溃。

C++实现rabbitmq生产者消费者


// MFCRABBITMQDlg.h: 头文件
//

#pragma once
#include <iostream>
#include <string>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#pragma comment(lib , "rabbitmq.lib")

// CMFCRABBITMQDlg 对话框
class CMFCRABBITMQDlg : public CDialogEx
{
// 构造
public:
	CMFCRABBITMQDlg(CWnd* pParent = nullptr);	// 标准构造函数

// 对话框数据
#ifdef AFX_DESIGN_TIME
	enum { IDD = IDD_MFCRABBITMQ_DIALOG };
#endif

	protected:
	virtual void DoDataExchange(CDataExchange* pDX);	// DDX/DDV 支持


// 实现
protected:
	HICON m_hIcon;

	// 生成的消息映射函数
	virtual BOOL OnInitDialog();
	afx_msg void OnSysCommand(UINT nID, LPARAM lParam);
	afx_msg void OnPaint();
	afx_msg HCURSOR OnQueryDragIcon();
	DECLARE_MESSAGE_MAP()
public:
	afx_msg void OnBnClickedButtonProduct();
	void CMFCRABBITMQDlg::sendMessageToRabbitMQ(const std::string& message);
	void CMFCRABBITMQDlg::receiveMessagesFromRabbitMQ();
	afx_msg void OnBnClickedButtonConsumer();
	void CMFCRABBITMQDlg::showLog(CString p_str_log);
	CString m_CstrLog;
	CEdit m_ctrlLog;
};


// MFCRABBITMQDlg.cpp: 实现文件
//

#include "pch.h"
#include "framework.h"
#include "MFCRABBITMQ.h"
#include "MFCRABBITMQDlg.h"
#include "afxdialogex.h"

#ifdef _DEBUG
#define new DEBUG_NEW
#endif


// 用于应用程序“关于”菜单项的 CAboutDlg 对话框

class CAboutDlg : public CDialogEx
{
public:
	CAboutDlg();

// 对话框数据
#ifdef AFX_DESIGN_TIME
	enum { IDD = IDD_ABOUTBOX };
#endif

	protected:
	virtual void DoDataExchange(CDataExchange* pDX);    // DDX/DDV 支持

// 实现
protected:
	DECLARE_MESSAGE_MAP()
};

CAboutDlg::CAboutDlg() : CDialogEx(IDD_ABOUTBOX)
{
}

void CAboutDlg::DoDataExchange(CDataExchange* pDX)
{
	CDialogEx::DoDataExchange(pDX);
}

BEGIN_MESSAGE_MAP(CAboutDlg, CDialogEx)
END_MESSAGE_MAP()


// CMFCRABBITMQDlg 对话框



CMFCRABBITMQDlg::CMFCRABBITMQDlg(CWnd* pParent /*=nullptr*/)
	: CDialogEx(IDD_MFCRABBITMQ_DIALOG, pParent)
{
	m_hIcon = AfxGetApp()->LoadIcon(IDR_MAINFRAME);
}

void CMFCRABBITMQDlg::DoDataExchange(CDataExchange* pDX)
{
    CDialogEx::DoDataExchange(pDX);
    DDX_Control(pDX, IDC_EDIT_LOG, m_ctrlLog);
}

BEGIN_MESSAGE_MAP(CMFCRABBITMQDlg, CDialogEx)
	ON_WM_SYSCOMMAND()
	ON_WM_PAINT()
	ON_WM_QUERYDRAGICON()
	ON_BN_CLICKED(IDC_BUTTON_PRODUCT, &CMFCRABBITMQDlg::OnBnClickedButtonProduct)
    ON_BN_CLICKED(IDC_BUTTON_CONSUMER, &CMFCRABBITMQDlg::OnBnClickedButtonConsumer)
END_MESSAGE_MAP()


// CMFCRABBITMQDlg 消息处理程序

BOOL CMFCRABBITMQDlg::OnInitDialog()
{
	CDialogEx::OnInitDialog();

	// 将“关于...”菜单项添加到系统菜单中。

	// IDM_ABOUTBOX 必须在系统命令范围内。
	ASSERT((IDM_ABOUTBOX & 0xFFF0) == IDM_ABOUTBOX);
	ASSERT(IDM_ABOUTBOX < 0xF000);

	CMenu* pSysMenu = GetSystemMenu(FALSE);
	if (pSysMenu != nullptr)
	{
		BOOL bNameValid;
		CString strAboutMenu;
		bNameValid = strAboutMenu.LoadString(IDS_ABOUTBOX);
		ASSERT(bNameValid);
		if (!strAboutMenu.IsEmpty())
		{
			pSysMenu->AppendMenu(MF_SEPARATOR);
			pSysMenu->AppendMenu(MF_STRING, IDM_ABOUTBOX, strAboutMenu);
		}
	}

	// 设置此对话框的图标。  当应用程序主窗口不是对话框时,框架将自动
	//  执行此操作
	SetIcon(m_hIcon, TRUE);			// 设置大图标
	SetIcon(m_hIcon, FALSE);		// 设置小图标

	// TODO: 在此添加额外的初始化代码

	return TRUE;  // 除非将焦点设置到控件,否则返回 TRUE
}

void CMFCRABBITMQDlg::OnSysCommand(UINT nID, LPARAM lParam)
{
	if ((nID & 0xFFF0) == IDM_ABOUTBOX)
	{
		CAboutDlg dlgAbout;
		dlgAbout.DoModal();
	}
	else
	{
		CDialogEx::OnSysCommand(nID, lParam);
	}
}

// 如果向对话框添加最小化按钮,则需要下面的代码
//  来绘制该图标。  对于使用文档/视图模型的 MFC 应用程序,
//  这将由框架自动完成。

void CMFCRABBITMQDlg::OnPaint()
{
	if (IsIconic())
	{
		CPaintDC dc(this); // 用于绘制的设备上下文

		SendMessage(WM_ICONERASEBKGND, reinterpret_cast<WPARAM>(dc.GetSafeHdc()), 0);

		// 使图标在工作区矩形中居中
		int cxIcon = GetSystemMetrics(SM_CXICON);
		int cyIcon = GetSystemMetrics(SM_CYICON);
		CRect rect;
		GetClientRect(&rect);
		int x = (rect.Width() - cxIcon + 1) / 2;
		int y = (rect.Height() - cyIcon + 1) / 2;

		// 绘制图标
		dc.DrawIcon(x, y, m_hIcon);
	}
	else
	{
		CDialogEx::OnPaint();
	}
}

//当用户拖动最小化窗口时系统调用此函数取得光标
//显示。
HCURSOR CMFCRABBITMQDlg::OnQueryDragIcon()
{
	return static_cast<HCURSOR>(m_hIcon);
}

void CMFCRABBITMQDlg::sendMessageToRabbitMQ(const std::string& message)
{
    amqp_connection_state_t conn;
    amqp_socket_t* socket = NULL;
    int status;

    // 初始化连接
    conn = amqp_new_connection();
    socket = amqp_tcp_socket_new(conn);
    if (!socket) {
        std::cerr << "Failed to create TCP socket" << std::endl;
        return;
    }

    // 连接到 RabbitMQ 服务器
    status = amqp_socket_open(socket, "localhost", 5672);
    if (status) {
        std::cerr << "Failed to open TCP socket" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 登录到 RabbitMQ 服务器
    amqp_rpc_reply_t loginReply = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (loginReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to login to RabbitMQ" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 打开通道
    amqp_channel_open(conn, 1);
    amqp_rpc_reply_t openReply = amqp_get_rpc_reply(conn);
    if (openReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to open channel" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 声明队列
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("my_queue"), 0, 0, 0, 0, amqp_empty_table);
    amqp_rpc_reply_t queueDeclareReply = amqp_get_rpc_reply(conn);
    if (queueDeclareReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to declare queue" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 发布消息到队列
    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 2; // 持久化消息

    amqp_basic_publish(conn,
        1,
        amqp_cstring_bytes(""),  // 默认交换器
        amqp_cstring_bytes("my_queue"),
        0,
        0,
        &props,
        amqp_cstring_bytes(message.c_str()));

    // 检查发布结果
    amqp_rpc_reply_t publishReply = amqp_get_rpc_reply(conn);
    if (publishReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to publish message" << std::endl;
    }
    else {
        std::cout << "Message sent: " << message << std::endl;
    }

    // 关闭通道和连接
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
}


void CMFCRABBITMQDlg:: receiveMessagesFromRabbitMQ()
{
    amqp_connection_state_t conn;
    amqp_socket_t* socket = NULL;
    int status;

    // 初始化连接
    conn = amqp_new_connection();
    socket = amqp_tcp_socket_new(conn);
    if (!socket) {
        std::cerr << "Failed to create TCP socket" << std::endl;
        return;
    }

    // 连接到 RabbitMQ 服务器
    status = amqp_socket_open(socket, "localhost", 5672);
    if (status) {
        std::cerr << "Failed to open TCP socket" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 登录到 RabbitMQ 服务器
    amqp_rpc_reply_t loginReply = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (loginReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to login to RabbitMQ" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 打开通道
    amqp_channel_open(conn, 1);
    amqp_rpc_reply_t openReply = amqp_get_rpc_reply(conn);
    if (openReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to open channel" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 声明队列
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("my_queue"), 0, 0, 0, 0, amqp_empty_table);
    amqp_rpc_reply_t queueDeclareReply = amqp_get_rpc_reply(conn);
    if (queueDeclareReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to declare queue" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    // 开始消费消息
    amqp_basic_consume(conn, 1, amqp_cstring_bytes("my_queue"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
    amqp_rpc_reply_t consumeReply = amqp_get_rpc_reply(conn);
    if (consumeReply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to start consuming messages" << std::endl;
        amqp_destroy_connection(conn);
        return;
    }

    std::cout << "Waiting for messages. To exit press CTRL+C" << std::endl;
    showLog(_T("消费者开始监听:"));
    while (true) {
        amqp_rpc_reply_t reply;
        amqp_envelope_t envelope;

        amqp_maybe_release_buffers(conn);
        reply = amqp_consume_message(conn, &envelope, NULL, 0);

        if (AMQP_RESPONSE_NORMAL != reply.reply_type) {
            break;
        }
        static int t_count = 0;
        t_count++;
        std::cout << "Received message: ";
        char* p1 = static_cast<char*>(envelope.message.body.bytes);
        char* p = new char[envelope.message.body.len+1]();
        // strcpy_s(p , envelope.message.body.len, "11");
        for (size_t i = 0; i < envelope.message.body.len; ++i)
        {
            p[i] = p1[i];
        }
        std::cout << std::endl;
        CString t_Cstr = (CString)p;
        CString t_CstrCount;
        t_CstrCount.Format(_T("%d  ->"), t_count);
        

        CString t_tep = t_CstrCount + t_Cstr;
        showLog(t_tep);
        delete[]p;
        amqp_destroy_envelope(&envelope);
    }

    // 关闭通道和连接
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
}

// ch:取流线程 | en:Grabbing thread
unsigned int __stdcall SendThread(void* pUser)
{
    if (pUser)
    {
        CMFCRABBITMQDlg* Gthis = (CMFCRABBITMQDlg*)pUser;
        CString t_CstrSend;
       
        for (int i = 0; i < 1000000; ++i)
        {
            Gthis->GetDlgItemText(IDC_EDIT_TEXT, t_CstrSend);
            std::string t_strSend = (CT2A)t_CstrSend;
            Gthis->sendMessageToRabbitMQ(t_strSend);
            Sleep(1);
        }
        

        return 0;
    }

    return -1;
}

void CMFCRABBITMQDlg::OnBnClickedButtonProduct()
{
        unsigned int nThreadID = 0;
        void* m_hGrabThread = (void*)_beginthreadex(NULL, 0, SendThread, this, 0, &nThreadID);
        if (NULL == m_hGrabThread)
        {
            showLog(_T("消费者创建线程失败"));
            return;
        }

}

// ch:取流线程 | en:Grabbing thread
unsigned int __stdcall ConsumThread(void* pUser)
{
    if (pUser)
    {
        CMFCRABBITMQDlg* Gthis = (CMFCRABBITMQDlg*)pUser;
       
        Gthis->receiveMessagesFromRabbitMQ();

        return 0;
    }

    return -1;
}

void CMFCRABBITMQDlg::OnBnClickedButtonConsumer()
{
    unsigned int nThreadID = 0;
    void* m_hGrabThread = (void*)_beginthreadex(NULL, 0, ConsumThread, this, 0, &nThreadID);
    if (NULL == m_hGrabThread)
    {
        showLog(_T("消费者创建线程失败"));
        return;
    }
}

void CMFCRABBITMQDlg::showLog(CString p_str_log)
{
    int t_intLine = m_ctrlLog.GetLineCount();
    if (t_intLine == 1000)
    {
        m_CstrLog = _T("");
    }
    SYSTEMTIME timeCur;
    GetLocalTime(&timeCur);
    char t_logbuffer[1024] = { 0 };
    sprintf_s(t_logbuffer, ("[%04d%02d%02d_%02d:%02d:%02d:%03d] ")
        , timeCur.wYear, timeCur.wMonth, timeCur.wDay
        , timeCur.wHour, timeCur.wMinute, timeCur.wSecond, timeCur.wMilliseconds);

    p_str_log += "\r\n";
    m_CstrLog += t_logbuffer;
    m_CstrLog += p_str_log;
    m_ctrlLog.SetWindowTextW(m_CstrLog);
    m_ctrlLog.LineScroll(m_ctrlLog.GetLineCount() - 1, 0);
 
}

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到