RabbitMQ RPC模式Python示例

发布于:2025-06-29 ⋅ 阅读:(22) ⋅ 点赞:(0)

1.服务端

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@File:      rabbitmq_server.py
@Date:      2025/6/26 10:42
@Author:    xxx
@Description:
1. RabbitMQ服务端,支持多节点命令执行
2. 作为被控节点运行,可接收定向命令并返回结果
"""

import ssl
import pika
import time
import json
import socket
import logging
import subprocess
import configparser

# 定义日志模块
logger = logging.getLogger()
# 设置全局日志级别设为最低(DEBUG)
# # 可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
logger.setLevel(logging.DEBUG)

# 1. 文件日志(仅输出到文件)
file_handler = logging.FileHandler('rabbitmq_server.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)  # 文件记录DEBUG及以上级别
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)

# 2. 控制台日志(仅输出到控制台)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)  # 控制台仅显示INFO及以上
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)

RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"


class RabbitMQServer:
    """
    RabbitMQ RPC服务器类
    功能:接收并执行来自客户端的定向命令
    """

    def __init__(self, node_name=None, mq_user="rabbitmq", mq_password="rabbitmq@123",
                 mq_virtual_host="/", mq_host=None, mq_port=5671,
                 mq_ca="/opt/ssl/ca_certificate.pem"):
        """
        初始化RabbitMQ服务端
        :param node_name: 节点名称标识(唯一)
        :param mq_user: RabbitMQ用户名
        :param mq_password: RabbitMQ密码
        :param mq_virtual_host: 虚拟主机
        :param mq_host: RabbitMQ服务器IP
        :param mq_port: RabbitMQ服务端口
        :param mq_ca: SSL证书路径
        """
        # 节点配置
        self.NODE_NAME = node_name if node_name else socket.gethostname()

        # 连接配置
        self.RABBITMQ_USER = mq_user
        self.RABBITMQ_UNLOCK_CODE = mq_password
        self.RABBITMQ_VIRTUAL_HOST = mq_virtual_host
        # 如果没有设置RabbitMQ服务器IP,则连接到配置文件中设置的IP节点
        self.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")
        self.RABBITMQ_PORT = mq_port
        self.SSL_CA_PATH = mq_ca

        # 初始化连接
        self._setup_connection()

    def get_option(self, file_path, section, option):
        """
        获取 file_path 配置项值,若配置文件没有,返回空字符串
        :param section: section字符串,例如:'global'
        :param option: key值,例如:'manage_nodes'
        :return: 字符串类型数据
        """
        parser = configparser.ConfigParser()
        parser.read(file_path)
        if not parser.has_option(section, option):
            return ""
        else:
            return parser.get(section, option)

    def _get_ssl_options(self):
        """配置SSL安全连接选项"""
        # 创建一个 SSL/TLS 安全通信的上下文对象
        # 生产环境建议指定协议版本,避免使用不安全的默认值
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        # 在 SSL/TLS 通信中承担核心安全验证功能
        context.load_verify_locations(self.SSL_CA_PATH)
        # 创建 RabbitMQ 的 SSL 连接配置对象
        return pika.SSLOptions(
            context,        # ssl.SSLContext 对象,包含证书、协议版本等SSL配置
            # TODO 可以根据具体证书,设置该选项
            "localhost"     # 服务器主机名,用于证书验证(需匹配证书CN或SAN字段),主机名验证:强制检查证书中的 CN(Common Name) 或 SAN(Subject Alternative Name) 是否匹配"localhost"
        )

    def _setup_connection(self):
        """建立RabbitMQ连接并设置队列"""
        # 创建 RabbitMQ 连接所需的用户名/密码认证对,用于后续建立连接时的身份验证
        credentials = pika.PlainCredentials(
            self.RABBITMQ_USER,         # RabbitMQ 服务认证用户名(字符串类型)
            self.RABBITMQ_UNLOCK_CODE   # RabbitMQ 服务认证密码(字符串类型)
        )

        # RabbitMQ 连接参数设置
        connection_params = pika.ConnectionParameters(
            # RabbitMQ服务器IP地址(字符串类型)
            # - 若未配置则自动读取配置文件中的虚拟IP
            # - 示例:'192.168.120.19' 或 'rabbitmq.example.com'
            host=self.RABBITMQ_HOST,
            # RabbitMQ服务端口(整数类型)
            # - 默认使用加密端口5671
            # - 非加密连接通常用5672
            port=self.RABBITMQ_PORT,
            # 虚拟主机名称(字符串类型)
            # - 默认'/'表示根虚拟主机
            # - 用于多租户隔离场景
            virtual_host=self.RABBITMQ_VIRTUAL_HOST,
            # 认证凭证对象(pika.PlainCredentials)
            # - 包含用户名/密码信息
            # - 必须与RabbitMQ配置的用户权限匹配
            credentials=credentials,
            # SSL配置对象(pika.SSLOptions)
            # - 包含CA证书和主机名验证配置
            # - 为空时建立非加密连接
            ssl_options=self._get_ssl_options(),
            # 心跳检测间隔(秒,整数类型),建议设置为300-1200秒(防止NAT超时断开)
            # - 600秒=10分钟检测一次连接活性
            # - 0表示禁用心跳(不推荐)
            heartbeat=600   # 需 ≥ 客户端配置
        )

        # 建立与 RabbitMQ 服务器的同步阻塞式连接
        # 阻塞式:所有操作(如发送/接收消息)会阻塞当前线程直到完成
        self.connection = pika.BlockingConnection(connection_params)
        # 在连接上创建 AMQP 信道
        self.channel = self.connection.channel()

        # 声明节点专用队列(绑定到该信道)
        # 如果队列不存在则自动创建,如果存在则直接绑定
        self.channel.queue_declare(
            queue=self.NODE_NAME,   # 队列名称(使用节点名作为唯一标识)
            durable=True            # 队列持久化标志,需要配合消息的delivery_mode=2使用才能完全持久化
        )
        # 控制并发数,值越大并发越高,但资源消耗和复杂度也增加
        # TODO 集群需要调研配置并发度
        self.channel.basic_qos(prefetch_count=1)

        # 将当前信道绑定到指定队列,开始监听消息,消费消息(通过该信道)
        self.channel.basic_consume(
            queue=self.NODE_NAME,   # 指定消费的队列名称(当前节点专属队列)
            on_message_callback=self._execute_command,  # 消息处理回调函数
            auto_ack=False          # 手动消息确认模式
        )

    def _execute_command(self, ch, method, props, body):
        """执行接收到的命令并返回结果"""
        try:
            # 解析消息内容(JSON格式)
            message = json.loads(body.decode('utf-8'))
            command = message.get('command', '')    # 获取要执行的命令
            target = message.get('target', '')      # 获取目标节点标识
            logger.info(f" [x] 收到({target})命令:{command}")

            # 校验目标节点(防止误处理其他节点的消息)
            if target != self.NODE_NAME:
                logger.warning(f" [x] 收到非本节点({self.NODE_NAME})命令,已忽略")
                ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息(防止重新投递)
                return
            logger.info(f" [*] 执行命令 【{command}】...")

            try:
                # 通过子进程执行命令(同步阻塞)
                output = subprocess.check_output(
                    command,
                    shell=True,     # 启用shell解析
                    stderr=subprocess.STDOUT,   # 捕获标准错误
                    timeout=60      # 超时时间(秒)
                )
                response = output.decode('utf-8')
            except subprocess.TimeoutExpired:
                # 超时异常
                response = "Error: Command timed out"
            except subprocess.CalledProcessError as e:
                # 命令执行失败
                response = f"Error: {e.output.decode('utf-8')}"
            except Exception as e:
                # 其他系统异常
                response = f"System Error: {str(e)}"

            # 返回执行结果
            ch.basic_publish(
                exchange='',    # 默认交换器
                routing_key=props.reply_to, # 回复到客户端指定的回调队列
                properties=pika.BasicProperties(
                    correlation_id=props.correlation_id,    # 关联ID(匹配请求-响应)
                    delivery_mode=2 # 持久化消息到磁盘,会降低性能(服务重启不丢失)
                ),
                body=response.encode('utf-8')   # 响应内容
            )

            logger.info(f" [*] 命令执行完成")
            # 确认消息处理完成
            ch.basic_ack(delivery_tag=method.delivery_tag)

        except Exception as e:
            # 全局异常处理
            logger.exception(f" [x] 消息处理异常: {str(e)}")
            # 否定确认(可能重新投递)
            ch.basic_nack(delivery_tag=method.delivery_tag)

    def start(self, max_retries=5, retry_delay=10):
        """
        启动RabbitMQ服务并持续监听消息
        功能:管理服务生命周期,处理连接异常和重试逻辑
        :param max_retries: 最大重试次数,默认5次
        :param retry_delay: 重试间隔时间(秒),默认10秒
        :return:
        """
        # 当前重试次数计数器
        retry_count = 0

        # 主服务循环(持续运行直到主动终止)
        while True:
            try:
                # 打印服务状态信息
                logger.info(f" [*] {self.NODE_NAME} 节点服务启动 (尝试 {retry_count + 1}/{max_retries})")
                logger.info(f" [*] 等待队列 {self.NODE_NAME} 中的请求...")

                # 检查并重建连接(如果不存在或已关闭)
                if not hasattr(self, 'connection') or self.connection.is_closed:
                    self._setup_connection()    # 初始化AMQP连接

                # 开始消费消息(阻塞调用)
                self.channel.start_consuming()

            except pika.exceptions.AMQPConnectionError as e:
                # RabbitMQ连接异常处理
                retry_count += 1
                logger.exception(f"连接失败: {str(e)}")

                # 超过最大重试次数则终止服务
                if retry_count >= max_retries:
                    logger.error(" [x] 达到最大重试次数,终止服务")
                    self.close()
                    break   # 退出循环

                # 未达上限则延迟重试
                logger.warning(f" [*] {retry_delay}秒后尝试重新连接...")
                time.sleep(retry_delay)

            except KeyboardInterrupt:
                # 处理用户主动终止(Ctrl + C)
                logger.error("\n [x] 接收到终止信号")
                self.close()
                logger.error(" [x] 服务已停止")
                break   # 退出循环
            except Exception as e:
                # 其他未捕获异常处理
                logger.exception(f"服务异常: {str(e)}")
                time.sleep(retry_delay) # 防止异常时CPU空转

    def close(self):
        """
        安全关闭RabbitMQ连接
        功能:清理资源,确保连接被正确关闭
        :return:
        """
        # 防御式编程:检查连接存在且未关闭
        if hasattr(self, 'connection') and not self.connection.is_closed:
            self.connection.close() # 关闭AMQP连接
            logger.info(" [x] 连接已安全关闭")


if __name__ == '__main__':
    # 服务启动入口(自动获取主机名作为节点名)
    server = RabbitMQServer()
    try:
        server.start()
    except KeyboardInterrupt:
        logger.error("\n [x] 接收到终止信号")
        server.close()
        logger.error(" [x] 服务已停止")


2.客户端

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@File:      rabbitmq_client.py
@Date:      2025/6/26 10:43
@Author:    xxx
@Description:
1. RabbitMQ客户端类,支持向指定节点发送SSH命令
2. 作为控制端运行,可定向发送命令并接收执行结果
"""

import ssl
import pika
import time
import uuid
import json
import socket
import logging
import configparser

# 定义日志模块
logger = logging.getLogger()
# 设置全局日志级别设为最低(DEBUG)
# # 可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
logger.setLevel(logging.DEBUG)

# 1. 文件日志(仅输出到文件)
file_handler = logging.FileHandler('rabbitmq_client.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)  # 文件记录DEBUG及以上级别
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)

# 2. 控制台日志(仅输出到控制台)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)  # 控制台仅显示INFO及以上
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)

RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"

class RabbitMQClient:
    """
    RabbitMQ RPC客户端类
    功能:向指定节点发送命令并获取执行结果
    """

    def __init__(self, mq_user="rabbitmq", mq_password="rabbitmq@123", mq_virtual_host="/",
                 mq_host=None, mq_port=5671, mq_ca="/opt/ssl/ca_certificate.pem"):
        """
        初始化RabbitMQ客户端
        :param mq_user: RabbitMQ用户名
        :param mq_password: RabbitMQ密码
        :param mq_virtual_host: 虚拟主机
        :param mq_host: RabbitMQ服务器IP
        :param mq_port: RabbitMQ服务端口
        :param mq_ca: SSL证书路径
        """
        # 连接配置
        self.RABBITMQ_USER = mq_user
        self.RABBITMQ_UNLOCK_CODE = mq_password
        self.RABBITMQ_VIRTUAL_HOST = mq_virtual_host
        # 如果没有设置RabbitMQ服务器IP,则连接到配置文件中设置的IP节点
        self.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")
        self.RABBITMQ_PORT = mq_port
        self.SSL_CA_PATH = mq_ca

        # 响应相关变量
        self.response = None
        self.corr_id = None

        # 建立连接
        logger.info(" [x] 正在建立连接 ...")
        self._connect()
        logger.info(" [x] 连接建立成功")

    def get_option(self, file_path, section, option):
        """
        获取 file_path 配置项值,若配置文件没有,返回空字符串
        :param section: section字符串,例如:'global'
        :param option: key值,例如:'manage_nodes'
        :return: 字符串类型数据
        """
        parser = configparser.ConfigParser()
        parser.read(file_path)
        if not parser.has_option(section, option):
            return ""
        else:
            return parser.get(section, option)

    def _connect(self):
        """
        建立RabbitMQ连接并初始化回调队列
        功能:配置安全连接参数、创建通信信道、设置消息回调处理
        :return:
        """
        # 创建SSL安全上下文,强制使用TLS 1.2协议(禁用不安全的老版本协议)
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        # 加载CA根证书用于验证服务端证书(防止中间人攻击)
        ssl_context.load_verify_locations(self.SSL_CA_PATH)
        # 将SSL配置封装为RabbitMQ专用的SSLOptions对象
        # "localhost"参数要求服务端证书必须包含该主机名(CN或SAN字段)
        ssl_options = pika.SSLOptions(ssl_context, "localhost")

        # 创建认证凭证对象(明文用户名/密码,实际传输时会通过SSL加密)
        credentials = pika.PlainCredentials(
            self.RABBITMQ_USER,         # RabbitMQ服务用户名
            self.RABBITMQ_UNLOCK_CODE   # RabbitMQ服务密码
        )
        # 配置连接参数
        connection_params = pika.ConnectionParameters(
            host=self.RABBITMQ_HOST,    # RabbitMQ服务器地址
            port=self.RABBITMQ_PORT,    # 默认使用5671加密端口
            virtual_host=self.RABBITMQ_VIRTUAL_HOST,    # 虚拟主机隔离环境
            credentials=credentials,    # 用户名密码凭证
            ssl_options=ssl_options,    # SSL安全配置
            heartbeat=60                # 1分钟心跳检测(防连接中断),更频繁的心跳检测(如AWS ELB默认60秒空闲超时)
        )

        # 建立阻塞式连接(同步操作,会阻塞直到连接成功或抛出异常)
        self.connection = pika.BlockingConnection(connection_params)
        # 创建AMQP信道(单个连接可创建多个信道,减少TCP连接开销)
        self.channel = self.connection.channel()

        # 声明临时回调队列(exclusive=True表示连接关闭时自动删除队列)
        result = self.channel.queue_declare(
            queue='',       # 空队列名表示由RabbitMQ自动生成
            exclusive=True  # 独占队列,仅当前连接可用
        )
        # 保存自动生成的队列名称(用于接收服务端响应)
        self.callback_queue = result.method.queue

        # 绑定消息消费回调
        self.channel.basic_consume(
            queue=self.callback_queue,  # 监听回调队列
            on_message_callback=self._on_response,  # 响应消息处理函数
            # TODO 生产环境建议:auto_ack=False  改为手动ACK确保消息可靠处理
            auto_ack=False   # auto_ack=True 自动确认消息(不推荐生产环境使用)
        )

    def _on_response(self, ch, method, props, body):
        """
        RPC模式下的响应消息回调处理函数
        功能:匹配并接收服务端返回的命令执行结果
        处理逻辑:
            1.通过correlation_id匹配对应的请求
            2.将二进制消息体解码为字符串
            3.存储结果供execute_command方法获取
        :param ch: (pika.channel.Channel): 接收到消息的信道对象
        :param method: (pika.spec.Basic.Deliver): 包含投递信息(如delivery_tag)
        :param props: (pika.spec.BasicProperties): 消息属性(含correlation_id等)
        :param body: (bytes): 消息体内容(服务端返回的执行结果)
        :return:
        """
        # 校验消息关联ID(确保是本请求的响应)
        try:
            if self.corr_id == props.correlation_id:
                # 解码服务端返回的消息内容(UTF-8编码)
                self.response = body.decode('utf-8')
                # 注意:此处不需要手动ack,因为消息已在服务端处理时ack
        except UnicodeDecodeError as e:
            self.response = f"解码失败: {str(e)}"

    def execute_command(self, command, target_node=None, timeout=60):
        """
        向指定RabbitMQ节点发送命令并获取执行结果(RPC模式)

        :param command (str): 要执行的shell命令字符串(如"ls -l")
        :param target_node (str): 目标节点标识,对应服务端的队列名
                                  - 默认None表示发送到当前主机节点
        :param timeout (int): 等待响应的超时时间(秒),默认60秒
        :return str: 命令执行结果文本
        异常:
            TimeoutError: 超过指定时间未收到响应时抛出
            AMQP相关异常: 消息发送失败时抛出
            向指定节点执行远程命令
        """
        # 初始化响应存储和请求ID
        self.response = None    # 清空之前的响应
        self.corr_id = str(uuid.uuid4())    # 生成唯一请求标识

        # 确定目标节点(如果执行命令没有指定节点,默认发送到当前主机)
        if not target_node:
            target_node = socket.gethostname()

        # 构建RPC请求消息体(JSON格式)(包含命令和目标节点信息)
        message = {
            "command": command,         # 要执行的命令
            "target": target_node,      # 目标节点标识
            "timestamp": time.time()    # 请求时间戳
        }

        # 发送消息到目标节点的专属队列
        self.channel.basic_publish(
            exchange='',    # 使用默认直连交换机
            routing_key=target_node,  # 通过队列名路由到指定节点
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,   # 设置回调队列名
                correlation_id=self.corr_id,    # 标记请求ID
            ),
            # JSON序列化消息
            body=json.dumps(message).encode('utf-8')
        )

        # 等待响应(带超时机制)
        start_time = time.time()
        while self.response is None:
            # 处理网络事件(非阻塞)
            self.connection.process_data_events()

            # 超时检查
            if time.time() - start_time > timeout:
                raise TimeoutError(f"等待节点 {target_node} 响应超时")

            # 避免CPU空转,此处sleep不能太长,影响命令返回时效性
            time.sleep(0.1)

        return self.response    # 返回执行结果

    def close(self):
        """
        安全关闭RabbitMQ连接

        功能:
            1. 清理网络连接资源
            2. 自动删除临时队列(exclusive队列)
            3. 防止资源泄漏
        :return:
        """
        if self.connection and not self.connection.is_closed:
            # 关闭连接(会触发信道关闭)
            self.connection.close()
            logger.warning(" [x] 连接已关闭")


if __name__ == '__main__':
    # 使用示例
    client = RabbitMQClient()
    try:
        # 向不同节点发送命令
        nodes = ["node247", "node248", "node249"]
        for node in nodes:
            try:
                logger.info(f"\n向节点 {node} 执行命令: hostname")
                logger.info(client.execute_command(command="hostname", target_node=node))
            except Exception as e:
                logger.exception(f"节点 {node} 执行失败: {str(e)}")

            try:
                logger.info(f"\n向节点 {node} 执行命令: ls -l /opt/")
                logger.info(client.execute_command(command="ls -l /opt/", target_node=node))
            except Exception as e:
                logger.exception(f"节点 {node} 执行失败: {str(e)}")

            try:
                logger.info(f"\n向节点 {node} 执行命令: date")
                logger.info(client.execute_command(command="date", target_node=node))
            except Exception as e:
                logger.exception(f"节点 {node} 执行失败: {str(e)}")

    finally:
        client.close()


3.调用结果

# 服务端,节点启动
192.168.120.17 node17
192.168.120.18 node18
192.168.120.19 node19

python3 rabbitmq_server.py



# 客户端,节点启动
192.168.120.17 node17

python3 rabbitmq_client.py


网站公告

今日签到

点亮在社区的每一天
去签到