【后端】.NET Core API框架搭建(8) --配置使用RabbitMQ

发布于:2025-07-18 ⋅ 阅读:(13) ⋅ 点赞:(0)

      

目录

1.添加包

2. 连接配置

        2.1.连接字符串

   2.2.连接对象

3.创建连接服务

        3.1.添加配置获取方法

        3.2.服务实现类

        3.3.服务接口

4.创建生产者服务

        4.1.生产者实现类

         4.2.生产者接口

5.创建消费者服务

        5.1.消费者服务接口

5.2.消费者接口

6.注册

7.简单使用案例

        7.1.实现

        7.2.接口

        7.3.控制器


        在 .NET Core 应用程序中使用 RabbitMQ 有许多好处,主要体现在其作为消息队列系统所带来的灵活性、可靠性和扩展性等方面,还能促进微服务架构的实施,是构建现代分布式应用的理想选择之一

1.添加包

        添加 RabbitMQ.Client 包。

2. 连接配置

        2.1.连接字符串

        dbsettings.json文件添加 RabbitMQ 连接配置

//RabbitMQ配置
"RabbitMQSettings": {
  "HostName": "ip地址",             //地址
  "Port": "端口",                   //端口
  "UserName": "RabbitMQ用户名",     //用户名
  "Password": "RabbitMQ密码",       //密码
  "VirtualHost": "/",              //本地虚拟地址
  "RetryCount": 5,                 //最大重试次数
  "RetryInterval": 5,              //断开重连次数
  "PrefetchCount": 5,              //预取消息数量
  "ConsumerCount": 5               //消费者数量
}

   2.2.连接对象

namespace Frame3_DataRepository.RabbitMQRepository.BaseMQ
{
    /// <summary>
    /// 消息队列配置类
    /// </summary>
    public class RabbitMQSettings
    {
        /// <summary>
        /// RabbitMQ 服务器地址
        /// </summary>
        public string HostName { get; set; }

        /// <summary>
        /// 端口号,默认5672
        /// </summary>
        public int Port { get; set; } = 5672;

        /// <summary>
        /// 用户名
        /// </summary>
        public string UserName { get; set; }

        /// <summary>
        /// 密码
        /// </summary>
        public string Password { get; set; }

        /// <summary>
        /// 虚拟主机,默认为/
        /// </summary>
        public string VirtualHost { get; set; } = "/";

        /// <summary>
        /// 连接重试次数
        /// </summary>
        public int RetryCount { get; set; } = 5;

        /// <summary>
        /// 重试间隔(秒)
        /// </summary>
        public int RetryInterval { get; set; } = 5;

        /// <summary>
        /// 预取消息数量
        /// </summary>
        public ushort PrefetchCount { get; set; }

        /// <summary>
        /// 消费者数量
        /// </summary>
        public int ConsumerCount { get; set; }

    }

    /// <summary>
    /// 持久化
    /// </summary>
    public enum DeliveryMode : byte
    {
        NonPersistent = 1,
        Persistent = 2
    }

     /// <summary>
     /// 消费者状态信息
     /// </summary>
     public class ConsumerStatus
     {
         /// <summary>
         /// 当前活跃消费者数量
         /// </summary>
         public int CurrentCount { get; set; }

         /// <summary>
         /// 最大允许消费者数量
         /// </summary>
         public int MaxCount { get; set; }

         /// <summary>
         /// 活跃消费者标签列表
         /// </summary>
         public List<string> ActiveConsumers { get; set; } = new();
     }
}

        案例如下

3.创建连接服务

        先创建配置获取方法,再创建 RabbitMqClient 服务实现类和 IRabbitMqClient 服务接口来MQ连接。

        3.1.添加配置获取方法

using Microsoft.Extensions.Configuration;
 
namespace Frame4_LibraryCore.BaseConfig
{
    /// <summary>
    /// 全局配置
    /// </summary>
    public static class Config
    {
        /// <summary>
        /// 从指定的 JSON 配置文件中读取配置,并反序列化为指定类型
        /// </summary>
        /// <typeparam name="T">目标配置类型(如 RedisSettings、DatabaseSettings 等)</typeparam>
        /// <param name="fileName">JSON 配置文件名(如 "appsettings.json")</param>
        /// <param name="sessions">配置节点名称(如 "RedisSettings")</param>
        /// <returns>返回绑定后的强类型配置对象</returns>
        public static T GetSetting<T>(string fileName, string sessions)
        {
            //创建 ConfigurationBuilder 实例,用于构建配置
            var builder = new ConfigurationBuilder()
                //设置配置文件的基础路径为当前程序运行目录
                .SetBasePath(Directory.GetCurrentDirectory())
                //添加 JSON 文件作为配置源:
                //- fileName: 指定要加载的 JSON 文件
                //- optional: false 表示文件必须存在,否则抛出异常
                //- reloadOnChange: true 表示文件修改时自动重新加载
                .AddJsonFile(fileName, optional: false, reloadOnChange: true);
 
            //构建配置对象(IConfigurationRoot)
            IConfigurationRoot config = builder.Build();
 
            //获取指定配置节点(sessions),并将其反序列化为类型 T
            var conn = config.GetSection(sessions).Get<T>();
 
            //返回反序列化后的配置对象
            return conn;
        }
    }
}

         案例如下

 

        3.2.服务实现类

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Frame3_DataRepository.RabbitMQRepository
{
    /// <summary>
    /// 队列服务实现类
    /// 提供消息队列连接和通道管理功能
    /// </summary>
    public sealed class RabbitMqClient : BaseServiceSingleton, IRabbitMqClient
    {
        /// <summary>
        /// RabbitMQ 连接工厂实例
        /// </summary>
        private readonly IConnectionFactory _connectionFactory;
        /// <summary>
        /// 日志记录器
        /// </summary>
        private readonly ILogger<RabbitMqClient> _logger;
        /// <summary>
        /// 连接重试最大次数
        /// </summary>
        private readonly int _retryCount;
        /// <summary>
        /// 重试间隔时间(秒)
        /// </summary>
        private readonly int _retryInterval;
        /// <summary>
        /// RabbitMQ 连接对象
        /// </summary>
        private IConnection _connection;
        /// <summary>
        /// 标识对象是否已被释放
        /// </summary>
        private bool _disposed;
        /// <summary>
        /// 连接操作的线程锁
        /// </summary>
        private readonly SemaphoreSlim _connectionLock = new(1, 1);
        /// <summary>
        /// 心跳检测定时器,用于定期检查连接状态
        /// </summary>
        private Timer _heartbeatTimer;
        /// <summary>
        /// 心跳检测间隔(秒),默认30秒
        /// </summary>
        private const int HeartbeatInterval = 30;
        /// <summary>
        /// 预取消息数量
        /// </summary>
        private readonly ushort _prefetchCount;
        /// <summary>
        /// 最大允许的消费者数量
        /// </summary>
        private readonly int _maxConsumerCount;

        /// <summary>
        /// 构造函数,初始化RabbitMQ服务
        /// </summary>
        /// <param name="logger">日志记录器,从DI容器注入</param>
        /// <exception cref="ArgumentNullException">当必需参数为null时抛出</exception>
        public RabbitMqClient(ILogger<RabbitMqClient> logger)
        {
            //参数校验,确保依赖注入的参数不为null
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            //读取配置
            //var settingsValue = settings?.Value ?? throw new ArgumentNullException(nameof(settings));
            var settingsValue = Config.GetSetting<RabbitMQSettings>("dbsettings.json", "RabbitMQSettings");

            //从配置中初始化重试参数
            _retryCount = settingsValue.RetryCount;
            _retryInterval = settingsValue.RetryInterval;

            //配置连接工厂参数
            _connectionFactory = new ConnectionFactory
            {
                HostName = settingsValue.HostName,                  // 主机地址
                Port = settingsValue.Port,                          // 端口号
                UserName = settingsValue.UserName,                  // 用户名
                Password = settingsValue.Password,                  // 密码
                VirtualHost = settingsValue.VirtualHost,            // 虚拟主机
                AutomaticRecoveryEnabled = true,                    // 启用自动恢复
                NetworkRecoveryInterval = TimeSpan.FromSeconds(10), // 网络恢复间隔10秒
                RequestedHeartbeat = TimeSpan.FromSeconds(60)        // 设置心跳间隔60秒
            };

            // 初始化心跳检测定时器
            _heartbeatTimer = new Timer(HeartbeatCheck, null, Timeout.Infinite, Timeout.Infinite);

            //初始化预取消息数量
            _prefetchCount = settingsValue.PrefetchCount;
            //初始化消费者数量
            _maxConsumerCount = settingsValue.ConsumerCount;
        }

        /// <summary>
        /// 检查当前是否已建立有效连接
        /// </summary>
        public bool IsConnected => _connection?.IsOpen == true && !_disposed;

        /// <summary>
        /// 预取消息数量
        /// </summary>
        public ushort prefetchCount => _prefetchCount;

        /// <summary>
        /// 消费者数量
        /// </summary>
        public int ConsumerCount => _maxConsumerCount;

        /// <summary>
        /// 尝试建立RabbitMQ连接
        /// </summary>
        /// <returns>是否连接成功</returns>
        public async Task<bool> TryConnectAsync()
        {
            // 已连接则直接返回成功
            if (IsConnected) return true;

            // 加锁防止多线程同时创建连接
            await _connectionLock.WaitAsync();
            try
            {
                // 双重检查锁定模式
                if (IsConnected) return true;
                //记录建立连接
                _logger.LogInformation("正在建立RabbitMQ连接...");
                // 带重试机制的连接逻辑
                for (int i = 0; i < _retryCount; i++)
                {
                    try
                    {
                        //创建新连接
                        _connection = await _connectionFactory.CreateConnectionAsync().ConfigureAwait(false);
                        //订阅连接关闭事件
                        _connection.ConnectionShutdownAsync += OnConnectionShutdown;
                        //验证连接是否成功建立
                        if (IsConnected)
                        {
                            //记录连接成功
                            _logger.LogInformation("RabbitMQ连接已成功建立");
                            //连接成功后启动心跳检测 不用心跳检测可注释
                            //StartHeartbeatCheck();
                            return true;
                        }
                    }
                    catch (BrokerUnreachableException ex)
                    {
                        //专门处理Broker不可达异常
                        _logger.LogWarning(ex, $"RabbitMQ服务不可达,第{i}次重试...");
                    }
                    catch (Exception ex)
                    {
                        //处理其他类型的异常
                        _logger.LogWarning(ex, $"RabbitMQ连接异常,第{i}次重试...");
                    }

                    //如果未达到最大重试次数,等待间隔时间后重试
                    if (i < _retryCount)
                    {
                        //等待间隔时间后重试
                        await Task.Delay(_retryInterval * 1000).ConfigureAwait(false);
                    }
                }
                //记录连接到最大重试数
                _logger.LogError($"RabbitMQ连接失败,已达到最大重试次数{_retryCount}");
                return false;
            }
            finally
            {
                //确保锁被释放
                _connectionLock.Release();
            }
        }

        /// <summary>
        /// 创建通道
        /// </summary>
        /// <returns>创建的通道对象</returns>
        /// <exception cref="InvalidOperationException">当连接不可用时抛出</exception>
        public async Task<IChannel> CreateChannelAsync()
        {
            //确保连接可用
            if (!IsConnected && !await TryConnectAsync().ConfigureAwait(false))
            {
                throw new InvalidOperationException("没有可用的RabbitMQ连接");
            }

            try
            {
                return await _connection.CreateChannelAsync().ConfigureAwait(false);
            }
            catch (OperationInterruptedException ex)
            {
                //处理RabbitMQ操作中断异常
                _logger.LogError(ex, "创建RabbitMQ通道时操作被中断");
                throw;
            }
            catch (Exception ex)
            {
                //处理其他创建通道时的异常
                _logger.LogError(ex, "创建RabbitMQ通道失败");
                throw;
            }
        }

        /// <summary>
        /// 连接关闭事件处理程序,自动尝试重新连接
        /// </summary>
        private async Task OnConnectionShutdown(object sender, ShutdownEventArgs args)
        {
            //记录连接关闭事件,包括关闭原因
            _logger.LogWarning($"RabbitMQ连接已关闭,原因: {args}");

            //如果服务未被释放,尝试自动重新连接
            if (!_disposed)
            {
                try
                {
                    //异步尝试重新连接,不阻塞当前线程
                    await TryConnectAsync().ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    //记录重连失败异常
                    _logger.LogError(ex, "重连失败");
                }
            }
        }

        /// <summary>
        /// 心跳检测回调方法,定期检查连接状态
        /// </summary>
        private async void HeartbeatCheck(object state)
        {
            try
            {
                // 如果连接不存在或已关闭,尝试重新连接
                if (!IsConnected)
                {
                    //记录检测断开重新连接
                    _logger.LogWarning("心跳检测发现连接断开,尝试重新连接...");
                    //建立连接
                    await TryConnectAsync().ConfigureAwait(false);
                }
            }
            catch (Exception ex)
            {
                // 记录心跳检测过程中的异常
                _logger.LogError(ex, "心跳检测异常");
            }
        }

        /// <summary>
        /// 启动心跳检测定时器
        /// </summary>
        private void StartHeartbeatCheck()
        {
            // 设置定时器,定期执行心跳检测
            _heartbeatTimer.Change(TimeSpan.FromSeconds(HeartbeatInterval),
                                 TimeSpan.FromSeconds(HeartbeatInterval));
        }

        /// <summary>
        /// 停止心跳检测定时器
        /// </summary>
        private void StopHeartbeatCheck()
        {
            // 禁用定时器
            _heartbeatTimer.Change(Timeout.Infinite, Timeout.Infinite);
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            //释放
            Dispose(true);
            //优化垃圾回收
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// 受保护的释放方法
        /// </summary>
        /// <param name="disposing">是否主动释放</param>
        private void Dispose(bool disposing)
        {
            //如果已经释放,直接返回
            if (_disposed) return;
            //如果是主动释放,处理托管资源
            if (disposing)
            {
                try
                {
                    //停止心跳检测
                    StopHeartbeatCheck();

                    //释放定时器资源
                    _heartbeatTimer?.Dispose();

                    //取消事件订阅
                    if (_connection != null)
                    {
                        _connection.ConnectionShutdownAsync -= OnConnectionShutdown;
                    }

                    //释放连接资源
                    _connection?.Dispose();
                    //释放线程锁资源
                    _connectionLock.Dispose();
                    //记录连接关闭日志
                    _logger.LogInformation("RabbitMQ连接已关闭");
                }
                catch (Exception ex)
                {
                    //记录资源释放过程中的异常
                    _logger.LogError(ex, "关闭RabbitMQ连接时出错");
                }
                finally
                {
                    //标记为已释放状态
                    _disposed = true;
                }
            }
        }
    }
}

        案例如下

        3.3.服务接口

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace Frame3_DataRepository.RabbitMQRepository
{
    /// <summary>
    /// 队列服务接口
    /// 供生产者和消费者调用
    /// </summary>
    public interface IRabbitMqClient //: IDisposable
    {
        /// <summary>
        /// 获取当前连接状态
        /// true表示已建立有效连接,false表示连接不可用
        /// </summary>
        bool IsConnected { get; }

        /// <summary>
        /// 预取消息数量
        /// </summary>
        ushort prefetchCount { get; }

        /// <summary>
        /// 消费者数量
        /// </summary>
        int ConsumerCount { get; }

        /// <summary>
        /// 尝试建立RabbitMQ连接
        /// </summary>
        /// <returns>是否连接成功</returns>
        Task<bool> TryConnectAsync();

        /// <summary>
        /// 异步创建RabbitMQ通道
        /// 用于消息发布、消费等操作
        /// </summary>
        /// <returns>RabbitMQ通道实例</returns>
        /// <exception cref="InvalidOperationException">当无法创建连接时抛出</exception>
        /// <exception cref="OperationInterruptedException">当RabbitMQ操作被中断时抛出</exception>
        Task<IChannel> CreateChannelAsync();

        void Dispose();

    }
}

         案例如下

4.创建生产者服务

        创建生产者服务实现类 MQProducerService 和接口 IMQProducerService

        4.1.生产者实现类

using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Text;
using System.Text.Json;

namespace Frame3_DataRepository.RabbitMQRepository.Producer
{
    /// <summary>
    /// 生产者服务实现类
    /// RabbitMQ
    /// </summary>
    public sealed class MQProducerService : BaseService, IMQProducerService
    {
        /// <summary>
        /// 基础连接服务
        /// </summary>
        private readonly IRabbitMqClient _iRabbitMQService;
        /// <summary>
        /// 日志记录器
        /// </summary>
        private readonly ILogger<MQProducerService> _logger;

        /// <summary>
        /// 构造函数,注入依赖
        /// </summary>
        /// <param name="iRabbitMQService"></param>
        /// <param name="logger"></param>
        public MQProducerService(IRabbitMqClient iRabbitMQService, ILogger<MQProducerService> logger)
        {
            //参数校验,确保依赖注入的参数不为null
            _iRabbitMQService = iRabbitMQService ?? throw new ArgumentNullException(nameof(iRabbitMQService));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        }

        /// <summary>
        /// 发布消息-点对点模式(Point-to-Point)
        /// </summary>
        /// <typeparam name="T">消息类型,必须是class类型</typeparam>
        /// <param name="queueName">目标队列名称</param>
        /// <param name="message">要发布的消息对象</param>
        /// <param name="messageId">可选的消息ID,未提供时自动生成</param>
        /// <param name="exchange">可选交换机名称,默认使用直接交换机</param>
        /// <param name="headers">可选的消息头字典</param>
        /// <param name="withDLX">是否启用死信队列</param>
        /// <param name="maxRetryCount">最大重试次数(仅当启用死信队列时有效)</param>
        /// <returns>异步任务</returns>
        /// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>
        /// <exception cref="InvalidOperationException">当消息序列化失败时抛出</exception>
        public async Task PublishByPTPAsync<T>(string queueName, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class
        {
            //参数校验
            if (string.IsNullOrWhiteSpace(queueName))
                throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");

            if (message == null)
                throw new ArgumentNullException(nameof(message), "消息内容不能为空");
            //生成消息ID(如果未提供则使用Guid)
            messageId = messageId.IsEmpty() ? messageId = Guid.NewGuid().ToString() : messageId;
            //声明变量用于存储序列化后的消息,便于错误处理
            var jsonMessage = string.Empty;
            //声明RabbitMQ通道
            IChannel? channel = null;

            try
            {

                //创建RabbitMQ通道
                channel = await _iRabbitMQService.CreateChannelAsync();

                // 死信队列配置
                var arguments = new Dictionary<string, object>();

                if (withDLX)
                {
                    // 死信交换机配置
                    var dlxExchange = $"{queueName}.DLX";
                    var dlxQueue = $"{queueName}.DLQ";

                    // 声明死信交换机和队列
                    await channel.ExchangeDeclareAsync(dlxExchange, ExchangeType.Direct, durable: true);
                    await channel.QueueDeclareAsync(
                        queue: dlxQueue,
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
                    await channel.QueueBindAsync(dlxQueue, dlxExchange, dlxQueue);

                    // 设置死信队列参数
                    arguments.Add("x-dead-letter-exchange", dlxExchange);
                    arguments.Add("x-dead-letter-routing-key", dlxQueue);
                    arguments.Add("x-max-retry-count", maxRetryCount); // 自定义属性,记录最大重试次数
                    arguments.Add("x-max-length", 100000);
                    arguments.Add("x-queue-mode", "lazy");
                }

                // 添加消费者数量限制
                arguments["x-max-consumers"] = _iRabbitMQService.ConsumerCount;

                //声明队列(确保队列存在)
                var queueDeclareOk = await channel.QueueDeclareAsync
                    (
                        queue: queueName,       //队列名称
                        durable: true,          //队列持久化(服务器重启后仍然存在)
                        exclusive: false,       //非独占队列
                        autoDelete: false,      //不自动删除
                        arguments: arguments//new Dictionary<string, object>
                                            //{
                                            //    // 只允许一个活跃消费者
                                            //    //["x-single-active-consumer"] = true,
                                            //    ["x-max-consumers"] = _iRabbitMQService.ConsumerCount,
                                            //}
                    );

                //序列化消息为JSON格式
                jsonMessage = JsonSerializer.Serialize(message);
                //将消息内容转换为UTF-8字节数组
                var body = Encoding.UTF8.GetBytes(jsonMessage);

                //创建消息属性
                var properties = new BasicProperties
                {
                    Persistent = true,                                                          //消息持久化(需要队列也是持久化的才有效)
                    MessageId = messageId,                                                      //设置唯一消息ID用于追踪
                    ContentType = "application/json",                                           //明确指定内容类型为JSON
                    Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())    //添加时间戳
                };

                //设置消息头(如果提供)
                if (headers != null && headers.Any())
                {
                    properties.Headers = new Dictionary<string, object>(headers);
                }

                // 发布消息到队列
                await channel.BasicPublishAsync
                    (
                        exchange: exchange ?? string.Empty,         //交换机名称(空字符串表示默认direct交换机)
                        routingKey: queueName,                      //路由键(对于默认交换机就是队列名)
                        mandatory: false,                           //不强制要求消息被路由到队列
                        basicProperties: properties,                //消息属性
                        body: body                                  //消息体
                    );

                //记录成功日志(结构化日志)
                _logger.LogInformation("消息发布成功。\r\n交换机: {Exchange}\r\n消息ID: {MessageId}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", messageId, queueName, jsonMessage);
            }
            catch (JsonException jsonEx)
            {
                // 专门处理JSON序列化错误
                _logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);
                throw new InvalidOperationException("消息序列化失败", jsonEx);
            }
            catch (OperationInterruptedException opEx)
            {
                // 处理RabbitMQ操作中断异常
                _logger.LogError(opEx, "RabbitMQ操作中断。\r\n消息: {jsonMessage}\r\n队列: {queueName}", jsonMessage, queueName);
                throw;
            }
            catch (Exception ex)
            {
                // 处理其他所有异常
                _logger.LogError(ex, "消息发布失败。\r\n交换机: {Exchange}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", queueName, jsonMessage);
                throw;
            }
            finally
            {
                // 确保通道资源被释放
                await channel?.CloseAsync();
            }
        }

        /// <summary>
        /// 发布消息-发布/订阅模式(Pub/Sub)
        /// </summary>
        /// <typeparam name="T">消息类型,必须是 class 类型</typeparam>
        /// <param name="exchangeName">目标 Exchange 名称</param>
        /// <param name="message">要发布的消息对象</param>
        /// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>
        /// <param name="headers">可选的消息头字典</param>
        /// <returns>异步任务</returns>
        public async Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class
        {
            //校验 Exchange 名称是否为空或空白字符串
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");

            //校验消息内容是否为 null
            if (message == null)
                throw new ArgumentNullException(nameof(message), "消息内容不能为空");

            //如果未提供消息ID,则使用 Guid 生成唯一的 ID
            messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;

            //用于存储序列化后的 JSON 消息(便于日志和异常处理)
            var jsonMessage = string.Empty;

            //声明 RabbitMQ 通道变量,初始为 null
            IChannel? channel = null;

            try
            {
                //创建 RabbitMQ 通道
                channel = await _iRabbitMQService.CreateChannelAsync();

                //声明 Fanout 类型的 Exchange(广播模式)
                await channel.ExchangeDeclareAsync(
                    exchange: exchangeName,
                    type: ExchangeType.Fanout,   //扇出类型,广播给所有绑定队列
                    durable: true,               //可持久化
                    autoDelete: false);          //不自动删除

                //将消息对象序列化为 JSON 字符串
                jsonMessage = JsonSerializer.Serialize(message);

                //将 JSON 消息转换为 UTF-8 编码的字节数组
                var body = Encoding.UTF8.GetBytes(jsonMessage);

                //创建消息属性
                var properties = new BasicProperties
                {
                    Persistent = true,                                                //消息持久化
                    //DeliveryMode = (DeliveryModes)DeliveryMode.Persistent,
                    MessageId = messageId,                                            //设置唯一消息ID
                    ContentType = "application/json",                                 //内容类型为 JSON
                    Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //添加时间戳
                };

                //如果提供了 Headers,则设置到消息属性中
                if (headers != null && headers.Any())
                {
                    properties.Headers = new Dictionary<string, object>(headers);
                }

                //向 Exchange 发送消息(不指定 Routing Key,Fanout 类型忽略此参数)
                await channel.BasicPublishAsync(
                    exchange: exchangeName,      //目标 Exchange 名称
                    routingKey: string.Empty,    //Fanout 类型不需要路由键
                    mandatory: false,            //不要求消息必须被投递
                    basicProperties: properties, //消息属性
                    body: body);                 //消息体字节数据

                //记录消息发布成功的日志信息
                _logger.LogInformation("消息已发布到Exchange。\r\nExchange: {Exchange}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}", exchangeName, messageId, jsonMessage);
            }
            catch (JsonException jsonEx)
            {
                //捕获 JSON 序列化异常并记录错误日志
                _logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);

                //抛出自定义异常,包含原始异常信息
                throw new InvalidOperationException("消息序列化失败", jsonEx);
            }
            catch (Exception ex)
            {
                //捕获其他所有异常并记录错误日志
                _logger.LogError(ex, "消息发布到Exchange失败。\r\nExchange: {Exchange}\r\n消息: {jsonMessage}", exchangeName, jsonMessage);

                //抛出异常
                throw;
            }
            finally
            {
                // 确保通道资源被释放
                await channel?.CloseAsync();
            }
        }

        /// <summary>
        /// 发布消息-路由模式(Routing)
        /// </summary>
        /// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>
        /// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
        /// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>
        /// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
        /// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
        /// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
        /// <returns>异步任务</returns>
        public async Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class
        {
            //检查 Exchange 名称是否为空或空白字符
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");

            //检查路由键是否为空或空白字符
            if (string.IsNullOrWhiteSpace(routingKey))
                throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");

            //检查消息对象是否为 null
            if (message == null)
                throw new ArgumentNullException(nameof(message), "消息内容不能为空");

            //如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识
            messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;

            //用于记录日志的消息 JSON 字符串
            var jsonMessage = string.Empty;

            //声明一个 IChannel 对象,初始为 null
            IChannel? channel = null;

            try
            {
                //创建 RabbitMQ Channel(通道)
                channel = await _iRabbitMQService.CreateChannelAsync();

                //声明一个 Direct 类型的 Exchange(如果不存在则创建)
                await channel.ExchangeDeclareAsync(
                    exchange: exchangeName,     //指定 Exchange 的名称
                    type: ExchangeType.Direct,  //指定 Exchange 的类型为 Direct(直连模式)
                    durable: true,              //设置为持久化 Exchange,RabbitMQ 重启后不会丢失
                    autoDelete: false           //不自动删除 Exchange,即使最后一个队列被解绑也不会自动删除
                    );

                //将消息对象序列化为 JSON 字符串
                jsonMessage = JsonSerializer.Serialize(message);

                //将 JSON 字符串转换为 UTF-8 编码的字节数组
                var body = Encoding.UTF8.GetBytes(jsonMessage);

                //创建并初始化 BasicProperties(消息属性)
                var properties = new BasicProperties
                {
                    Persistent = true, //设置消息持久化
                    MessageId = messageId, //设置消息 ID
                    ContentType = "application/json", //内容类型为 JSON
                    Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳
                };

                //如果提供了自定义 Headers,则复制到消息属性中
                if (headers != null && headers.Any())
                {
                    properties.Headers = new Dictionary<string, object>(headers);
                }

                //发布消息到指定 Exchange 和路由键
                await channel.BasicPublishAsync(
                    exchange: exchangeName,      //指定消息要发送到的 Exchange 名称
                    routingKey: routingKey,      //指定消息的路由键,用于 Exchange 路由决策
                    mandatory: false,            //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃
                    basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等
                    body: body                   //消息的实际内容(字节数组),通常是序列化后的 JSON 数据
                    );

                //记录日志:消息发送成功
                _logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",
                    exchangeName, routingKey, messageId, jsonMessage);
            }
            catch (Exception ex)
            {
                //捕获异常并记录日志
                _logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",
                    exchangeName, routingKey, jsonMessage);
                //抛出异常以便上层处理
                throw;
            }
            finally
            {
                // 确保通道资源被释放
                await channel?.CloseAsync();
            }
        }

        /// <summary>
        /// 发布消息-主题模式(Topic)
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
        /// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>
        /// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
        /// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
        /// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
        /// <returns>异步任务</returns>
        public async Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class
        {
            //检查 Exchange 名称是否为空或空白字符
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");

            //检查路由键是否为空或空白字符
            if (string.IsNullOrWhiteSpace(routingKey))
                throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");

            //检查消息对象是否为 null
            if (message == null)
                throw new ArgumentNullException(nameof(message), "消息内容不能为空");

            //如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识
            messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;

            //用于记录日志的消息 JSON 字符串
            var jsonMessage = string.Empty;

            //声明一个 IChannel 对象,初始为 null
            IChannel? channel = null;

            try
            {
                //创建 RabbitMQ Channel(通道)
                channel = await _iRabbitMQService.CreateChannelAsync();

                // 删除已存在的 Exchange(如果不需要保留消息)
                //await channel.ExchangeDeleteAsync("TopicTest");

                //声明一个 Topic 类型的 Exchange(如果不存在则创建)
                await channel.ExchangeDeclareAsync(
                    exchange: exchangeName,     //指定要声明的 Exchange 名称,名称由外部传入的 exchangeName 变量指定
                    type: ExchangeType.Topic,   //设置 Exchange 的类型为 Topic(主题模式),支持通配符路由键匹配
                    durable: true,              //设置 Exchange 为持久化,即使 RabbitMQ 重启也不会丢失
                    autoDelete: false           //设置 Exchange 不自动删除,即使最后一个绑定被移除后仍保留
                    );

                //将消息对象序列化为 JSON 字符串
                jsonMessage = JsonSerializer.Serialize(message);

                //将 JSON 字符串转换为 UTF-8 编码的字节数组
                var body = Encoding.UTF8.GetBytes(jsonMessage);

                //创建并初始化 BasicProperties(消息属性)
                var properties = new BasicProperties
                {
                    Persistent = true, //设置消息持久化
                    MessageId = messageId, //设置消息 ID
                    ContentType = "application/json", //内容类型为 JSON
                    Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳
                };

                //如果提供了自定义 Headers,则复制到消息属性中
                if (headers != null && headers.Any())
                {
                    properties.Headers = new Dictionary<string, object>(headers);
                }

                //发布消息到指定 Exchange 和路由键
                await channel.BasicPublishAsync(
                    exchange: exchangeName,      //指定消息要发送到的 Exchange 名称
                    routingKey: routingKey,      //指定消息的路由键,用于 Exchange 路由决策
                    mandatory: false,            //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃
                    basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等
                    body: body                   //消息的实际内容(字节数组),通常是序列化后的 JSON 数据
                    );

                //记录日志:消息发送成功
                _logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",
                    exchangeName, routingKey, messageId, jsonMessage);
            }
            catch (Exception ex)
            {
                //捕获异常并记录日志
                _logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",
                    exchangeName, routingKey, jsonMessage);
                //抛出异常以便上层处理
                throw;
            }
            finally
            {
                // 确保通道资源被释放
                await channel?.CloseAsync();
            }
        }

        /// <summary>
        /// 发布消息-请求/响应(RPC)
        /// </summary>
        /// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>
        /// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>
        /// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>
        /// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>
        /// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>
        /// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>
        /// <returns>异步任务</returns>
        public async Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class
        {
            // 参数校验
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));

            if (string.IsNullOrWhiteSpace(routingKey))
                throw new ArgumentException("路由键不能为空", nameof(routingKey));

            if (request == null)
                throw new ArgumentNullException(nameof(request));

            // 设置默认超时时间(30秒)
            var actualTimeout = timeout == default ? TimeSpan.FromSeconds(5) : timeout;
            if (actualTimeout <= TimeSpan.Zero)
                throw new ArgumentException("超时时间必须大于0", nameof(timeout));

            // 生成唯一 CorrelationId
            var correlationId = Guid.NewGuid().ToString();

            // 创建 TaskCompletionSource
            var tcs = new TaskCompletionSource<TResponse>();

            // 创建独立 Channel
            var channel = await _iRabbitMQService.CreateChannelAsync();

            try
            {



                // 在 PublishByPRCAsync 方法中,发送请求前添加:
                await channel.ExchangeDeclareAsync(
                    exchange: exchangeName,
                    type: ExchangeType.Direct, // RPC通常使用Direct
                    durable: true,            // 持久化
                    autoDelete: false
                    );

                // 声明临时队列用于接收响应
                var replyQueue = await channel.QueueDeclareAsync(
                    queue: "",
                    durable: false,
                    exclusive: true,
                    autoDelete: true
                    );

                // 创建消费者
                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.ReceivedAsync += (model, ea) =>
                {
                    try
                    {
                        if (ea.BasicProperties?.CorrelationId == correlationId)
                        {
                            var response = JsonSerializer.Deserialize<TResponse>(ea.Body.Span);
                            tcs.TrySetResult(response);
                        }
                    }
                    catch (Exception ex)
                    {
                        tcs.TrySetException(ex);
                    }

                    return Task.CompletedTask;
                };

                // 开始监听回复队列
                var consumerTag = await channel.BasicConsumeAsync(
                    queue: replyQueue,
                    autoAck: true,
                    consumer: consumer);

                // 构建消息属性
                var props = new BasicProperties();
                props.ReplyTo = replyQueue;
                props.CorrelationId = correlationId;
                props.ContentType = "application/json";

                // 序列化请求体
                var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request));

                // 发送请求
                await channel.BasicPublishAsync(
                    exchange: exchangeName,
                    routingKey: routingKey,
                    mandatory: false,
                    basicProperties: props,
                    body: body);

                // 设置超时取消
                using var cts = new CancellationTokenSource(actualTimeout);
                cts.Token.Register(() =>
                {
                    if (!tcs.Task.IsCompleted)
                    {
                        tcs.TrySetException(new TimeoutException($"RPC请求超时({actualTimeout.TotalSeconds}秒)"));
                        channel.BasicCancelAsync(consumerTag);
                    }
                });

                return await tcs.Task;
            }
            finally
            {
                // 确保通道资源被释放
                await channel?.CloseAsync();
            }
        }

        /// <summary>
        /// 将死信队列中的消息重新发布到原始队列(泛型版本)
        /// </summary>
        /// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>
        /// <param name="queueName">原始队列名称</param>
        /// <param name="batchSize">每次处理的消息批大小</param>
        /// <param name="delay">重发延迟时间(毫秒)</param>
        /// <returns>成功处理的消息数量</returns>
        public async Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class
        {
            // 检查传入的队列名是否为空或空白字符串,若为空则抛出异常
            if (string.IsNullOrWhiteSpace(queueName))
                throw new ArgumentNullException(nameof(queueName));

            // 构造死信队列(DLQ)名称,格式为:{原始队列名}.DLQ
            var dlxQueueName = $"{queueName}.DLQ";

            // 声明一个 RabbitMQ 的 Channel 对象,用于后续操作
            IChannel? channel = null;

            // 记录已处理的消息数量
            int processedCount = 0;

            try
            {
                // 创建一个新的 Channel 实例(通过服务注入的 _iRabbitMQService)
                channel = await _iRabbitMQService.CreateChannelAsync();

                // 检查死信队列是否存在(被动声明方式)
                try
                {
                    // 如果不存在会抛出异常,catch 中捕获并记录日志后返回 0
                    await channel.QueueDeclarePassiveAsync(dlxQueueName);
                }
                catch
                {
                    // 日志记录:如果 DLQ 不存在,则直接返回 0
                    _logger.LogWarning("死信队列 {DLXQueueName} 不存在", dlxQueueName);
                    return 0;
                }

                // 循环获取最多 batchSize 条消息
                for (int i = 0; i < batchSize; i++)
                {
                    // 使用 BasicGet 从 DLQ 获取一条消息(不自动确认)
                    var result = await channel.BasicGetAsync(dlxQueueName, autoAck: false);

                    // 如果没有更多消息了,跳出循环
                    if (result == null)
                        break;

                    // 获取消息体内容,并转为 byte[] 数组
                    var body = result.Body.ToArray();

                    // 获取原始消息属性(BasicProperties),用于后续操作
                    var originalProperties = result.BasicProperties;

                    // 获取当前消息的 DeliveryTag,用于确认或拒绝消息
                    var deliveryTag = result.DeliveryTag;

                    try
                    {
                        // 如果设置了 delay > 0,则等待指定时间(模拟延迟重试)
                        if (delay > 0)
                            await Task.Delay(delay);

                        // 生成新的唯一 MessageId,用于追踪消息
                        var messageId = Guid.NewGuid().ToString();

                        // 创建新的 BasicProperties 实例,用于新消息的属性设置
                        var properties = new BasicProperties
                        {
                            Persistent = true,                                                          // 设置消息持久化(需队列也持久化才生效)
                            MessageId = messageId,                                                      // 设置唯一消息 ID
                            ContentType = "application/json",                                           // 明确内容类型为 JSON
                            Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())    // 添加当前时间戳
                        };

                        // 将消息重新发布到原始队列,使用默认交换机(exchange 为空)
                        await channel.BasicPublishAsync(
                            exchange: string.Empty,
                            routingKey: queueName,
                            mandatory: false,
                            basicProperties: properties,
                            body: body);

                        // 确认 DLQ 中该消息已被正确处理(ACK)
                        await channel.BasicAckAsync(deliveryTag, multiple: false);

                        // 成功处理计数器 +1
                        processedCount++;

                        // 日志记录:消息已成功重新发布
                        _logger.LogInformation("已重新发布死信消息 {MessageId} 到队列 {QueueName}",
                            properties.MessageId ?? "未知ID", queueName);
                    }
                    catch (Exception ex)
                    {
                        // 日志记录:消息重发失败
                        _logger.LogError(ex, "重新发布死信消息失败,DeliveryTag={DeliveryTag}", deliveryTag);

                        // 拒绝消息并重新入队(回到 DLQ),requeue: true 表示重新入队
                        await channel.BasicNackAsync(deliveryTag, multiple: false, requeue: true);
                    }
                }

                // 返回成功处理的消息数量
                return processedCount;
            }
            catch (Exception ex)
            {
                // 日志记录:整个处理过程中发生错误
                _logger.LogError(ex, "处理死信队列 {DLXQueueName} 时发生错误", dlxQueueName);

                // 抛出异常,供上层调用者捕获处理
                throw;
            }
            finally
            {
                // 确保通道资源被释放
                await channel?.CloseAsync();
            }
        }

    }
}

        案例如下

         4.2.生产者接口

namespace Frame3_DataRepository.RabbitMQRepository.Producer
{
    /// <summary>
    /// 生产者服务接口
    /// RabbitMQ
    /// </summary>
    public interface IMQProducerService
    {
        /// <summary>
        /// 发布消息-点对点模式(Point-to-Point)
        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <typeparam name="T">消息类型,必须是引用类型</typeparam>
        /// <param name="message">要发布的消息对象</param>
        /// <param name="messageId">消息ID(可选,未提供时自动生成GUID)</param>
        /// <param name="exchange">交换机名称(空字符串表示默认交换机)</param>
        /// <returns>异步任务</returns>
        /// <exception cref="ArgumentNullException">当队列名或消息为空时抛出</exception>
        Task PublishByPTPAsync<T>(string queue, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class;

        /// <summary>
        /// 发布消息-发布/订阅模式(Pub/Sub)
        /// </summary>
        /// <typeparam name="T">消息类型,必须是 class 类型</typeparam>
        /// <param name="exchangeName">目标 Exchange 名称</param>
        /// <param name="message">要发布的消息对象</param>
        /// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>
        /// <param name="headers">可选的消息头字典</param>
        /// <returns>异步任务</returns>
        Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;

        /// <summary>
        /// 发布消息-路由模式(Routing)
        /// </summary>
        /// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>
        /// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
        /// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>
        /// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
        /// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
        /// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
        /// <returns>异步任务</returns>
        Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;

        /// <summary>
        /// 发布消息-主题模式(Topic)
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
        /// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>
        /// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
        /// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
        /// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
        /// <returns>异步任务</returns>
        Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;

        /// <summary>
        /// 发布消息-请求/响应(RPC)
        /// </summary>
        /// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>
        /// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>
        /// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>
        /// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>
        /// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>
        /// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>
        /// <returns>异步任务</returns>
        Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class;

        /// <summary>
        /// 将死信队列中的消息重新发布到原始队列(泛型版本)
        /// </summary>
        /// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>
        /// <param name="queueName">原始队列名称</param>
        /// <param name="batchSize">每次处理的消息批大小</param>
        /// <param name="delay">重发延迟时间(毫秒)</param>
        /// <returns>成功处理的消息数量</returns>
        Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class;
    }
}

        案例如下

5.创建消费者服务

         创建消费者服务实现类 MQConsumerService 和接口 IMQConsumerService

        5.1.消费者服务接口

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;

namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{
    /// <summary>
    /// 消费者服务实现类
    /// 提供可靠的消息消费功能,支持自动重试和错误处理
    /// </summary>
    public sealed class MQConsumerService : BaseServiceSingleton, IMQConsumerService
    {
        /// <summary>
        /// RabbitMQ 基础服务
        /// </summary>
        private readonly IRabbitMqClient _rabbitMQService;

        /// <summary>
        /// 日志记录器
        /// </summary>
        private readonly ILogger<MQConsumerService> _logger;

        /// <summary>
        /// 最大消费者数量限制
        /// </summary>
        private readonly int _maxConsumerCount;

        /// <summary>
        /// 当前消费者数量计数器(线程安全)
        /// </summary>
        private int _currentConsumerCount;

        /// <summary>
        /// 用于限制消费者数量的信号量
        /// </summary>
        private readonly SemaphoreSlim _consumerLimitSemaphore;

        /// <summary>
        /// 当前活跃的消费者字典,线程安全集合
        /// Key: 消费者标签
        /// Value: (通道对象, 消费者对象)
        /// </summary>
        private readonly ConcurrentDictionary<string, (IChannel Channel, AsyncEventingBasicConsumer Consumer)> _activeConsumers;

        /// <summary>
        /// 构造函数,依赖注入初始化
        /// </summary>
        /// <param name="rabbitMQService">RabbitMQ基础服务</param>
        /// <param name="logger">日志记录器</param>
        /// <exception cref="ArgumentNullException">当参数为null时抛出</exception>
        public MQConsumerService(IRabbitMqClient rabbitMQService, ILogger<MQConsumerService> logger)
        {
            _rabbitMQService = rabbitMQService ?? throw new ArgumentNullException(nameof(rabbitMQService));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            _activeConsumers = new ConcurrentDictionary<string, (IChannel, AsyncEventingBasicConsumer)>();
            _maxConsumerCount = _rabbitMQService.ConsumerCount;
            _currentConsumerCount = 0;
            _consumerLimitSemaphore = new SemaphoreSlim(_maxConsumerCount, _maxConsumerCount);
        }

        /// <summary>
        /// 消费消息-点对点(Point-to-Point)
        /// </summary>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="queueName">要消费的队列名称</param>
        /// <param name="messageHandler">消息处理委托</param>
        /// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
        /// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>
        /// <param name="withDLX">是否启用死信队列</param>
        /// <returns>取消令牌源,用于停止消费</returns>
        public async Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class
        {
            //参数校验
            if (string.IsNullOrWhiteSpace(queueName))
                throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");

            if (messageHandler == null)
                throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");

            //等待获取消费者槽位(带超时防止死锁)
            if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
            {
                throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
            }
            //当前消费者数量+1
            Interlocked.Increment(ref _currentConsumerCount);
            //创建取消令牌源
            var cancellationTokenSource = new CancellationTokenSource();

            //创建通道
            IChannel? channel = null;

            try
            {
                //赋值预读取数量
                if (prefetchCount == 0)
                {
                    prefetchCount = _rabbitMQService.prefetchCount;
                }
                //创建通道
                channel = await _rabbitMQService.CreateChannelAsync();
                //设置QoS(服务质量),控制预取消息数量
                await channel.BasicQosAsync
                    (
                        prefetchSize: 0,                //不限制预取消息总大小
                        prefetchCount: prefetchCount,   //prefetchCount > 0 ? prefetchCount : _rabbitMQService.prefetchCount,   //每次预取的消息数量
                        global: false                   //应用于当前消费者而非整个通道
                    );

                //检查队列是否存在
                try
                {
                    await channel.QueueDeclarePassiveAsync(queueName);
                }
                catch
                {
                    _logger.LogError($"队列 {queueName} 不存在");
                    throw;
                }

                //创建消费者
                var consumer = new AsyncEventingBasicConsumer(channel);

                //注册消息接收事件
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    try
                    {
                        //反序列化消息
                        var message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));

                        //记录接收日志
                        _logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\n队列: {QueueName}\r\n消息体:{message}", ea.BasicProperties.MessageId, queueName, message?.ToJson());

                        //处理消息
                        await messageHandler(message);

                        //如果不是自动确认模式,手动确认消息
                        if (!autoAck)
                        {
                            await channel.BasicAckAsync
                            (
                                deliveryTag: ea.DeliveryTag,    //消息投递标签
                                multiple: false                 //不批量确认
                            );
                        }
                    }
                    catch (JsonException jsonEx)
                    {
                        //处理反序列化错误
                        _logger.LogError(jsonEx, "消息反序列化失败。\r\n队列: {QueueName}", queueName);

                        //拒绝消息,不重新入队
                        if (!autoAck)
                        {
                            await channel.BasicRejectAsync(deliveryTag: ea.DeliveryTag, requeue: false);
                        }
                    }
                    catch (Exception ex)
                    {
                        //处理业务逻辑错误
                        _logger.LogError(ex, "消息处理失败。\r\n队列: {QueueName}", queueName);

                        //如果不是自动确认模式(autoAck=false),需要手动处理消息确认和重试逻辑
                        if (!autoAck)
                        {
                            //获取当前消息的属性对象
                            var properties = ea.BasicProperties;

                            //获取消息头,如果headers为null则创建新的字典
                            var headers = properties.Headers ?? new Dictionary<string, object>();

                            //获取当前重试次数,如果不存在x-retry-count头则默认为0
                            int retryCount = headers.TryGetValue("x-retry-count", out var retryObj) ? Convert.ToInt32(retryObj) : 0;

                            //获取最大重试次数,如果不存在x-max-retry-count头则默认为1
                            int maxRetryCount = headers.TryGetValue("x-max-retry-count", out var maxRetryObj) ? Convert.ToInt32(maxRetryObj) : 1;

                            //如果启用了死信队列(withDLX=true)且当前重试次数已达最大重试次数
                            if (withDLX && retryCount >= maxRetryCount)
                            {
                                //记录警告日志,说明消息已达到最大重试次数
                                _logger.LogWarning("消息已达到最大重试次数 {MaxRetryCount},将被移入死信队列", maxRetryCount);

                                //拒绝消息,requeue=false表示不重新入队,消息将被路由到死信队列
                                await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
                            }
                            else
                            {
                                //创建新的消息属性对象,复制原始消息的所有属性
                                var newProperties = new BasicProperties
                                {
                                    ContentType = properties.ContentType,       //复制内容类型
                                    ContentEncoding = properties.ContentEncoding,   //复制内容编码
                                    DeliveryMode = properties.DeliveryMode,         //复制投递模式(1-非持久化,2-持久化)
                                    Priority = properties.Priority,                 //复制消息优先级
                                    CorrelationId = properties.CorrelationId,       //复制关联ID(用于请求 - 响应模式)
                                    ReplyTo = properties.ReplyTo,                   //复制回复队列名称
                                    Expiration = properties.Expiration,             //复制消息过期时间
                                    MessageId = properties.MessageId,               //复制消息ID
                                    Timestamp = properties.Timestamp,               //复制时间戳
                                    Type = properties.Type,                         //复制消息类型
                                    UserId = properties.UserId,                     //复制用户ID
                                    AppId = properties.AppId,                       //复制应用ID
                                    ClusterId = properties.ClusterId,               //复制集群ID
                                    //复制消息头,并更新重试次数
                                    Headers = new Dictionary<string, object>(headers)
                                    {
                                        ["x-retry-count"] = retryCount + 1          //重试次数+1
                                    }
                                };

                                //重新发布消息到原始队列
                                await channel.BasicPublishAsync(
                                    exchange: string.Empty,             //exchange: 空字符串表示默认direct交换机
                                    routingKey: queueName,              //使用原始队列名称
                                    mandatory: false,                   //false表示如果无法路由则丢弃消息
                                    basicProperties: newProperties,     //使用更新后的属性(包含新的重试次数)
                                    body: ea.Body                       //原始消息体
                                    );

                                //确认原始消息已被处理(从队列中移除)
                                //- deliveryTag: 消息投递标签
                                //- multiple: false表示只确认单条消息
                                await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
                            }
                        }
                    }
                };

                //开始消费队列消息
                var consumerTag = await channel.BasicConsumeAsync
                    (
                        queue: queueName,   //队列名称
                        autoAck: autoAck,   //自动确认设置
                        consumer: consumer  //消费者实例
                    );

                //将消费者添加到活跃集合
                _activeConsumers.TryAdd(consumerTag, (channel, consumer));

                //注册取消令牌回调,当取消时停止消费者
                cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));

                //记录消费者/总消费者数量
                _logger.LogInformation("成功启动消费者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount},当前消费者标记: {consumerTag}", _currentConsumerCount, _maxConsumerCount, consumerTag);

                return cancellationTokenSource;
            }
            catch (Exception ex)
            {
                //清理创建失败的通道资源
                channel?.Dispose();

                //当前消费者数量-1
                Interlocked.Decrement(ref _currentConsumerCount);

                _consumerLimitSemaphore.Release();

                //记录启动失败日志
                _logger.LogError(ex, "启动消费者失败。队列: {QueueName}", queueName);
                throw;
            }
            //finally
            //{
            //    // 如果出错或任务完成,确保 channel 被释放
            //    channel?.Dispose();
            //}
        }

        /// <summary>
        /// 消费消息-发布/订阅模式(Pub/Sub)
        /// </summary>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="exchangeName">要订阅的Exchange名称</param>
        /// <param name="messageHandler">消息处理委托</param>
        /// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
        /// <param name="autoAck">是否自动确认消息</param>
        /// <returns>取消令牌源,用于停止消费</returns>
        public async Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class
        {
            //校验exchange名称是否为空
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");

            //校验消息处理器是否为空
            if (messageHandler == null)
                throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");

            //等待获取消费者槽位(防止并发消费者过多)
            if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
            {
                //获取失败则抛出超时异常
                throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
            }

            //原子增加当前消费者数量
            Interlocked.Increment(ref _currentConsumerCount);

            //创建取消令牌源用于后续停止消费
            var cancellationTokenSource = new CancellationTokenSource();
            IChannel? channel = null;

            try
            {
                //如果未指定prefetchCount,则使用默认值
                if (prefetchCount == 0)
                {
                    prefetchCount = _rabbitMQService.prefetchCount;
                }

                //创建 RabbitMQ 通道
                channel = await _rabbitMQService.CreateChannelAsync();

                //设置QoS(服务质量),限制预取的消息数量
                await channel.BasicQosAsync(
                    prefetchSize: 0,
                    prefetchCount: prefetchCount,
                    global: false);

                //声明一个 fanout 类型的 Exchange(广播模式)
                await channel.ExchangeDeclareAsync(
                    exchange: exchangeName,
                    type: ExchangeType.Fanout,
                    durable: true,   //可持久化
                    autoDelete: false); //不自动删除

                //自定义临时队列名称
                var queueName = "PubSub-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");
                //创建临时队列(由RabbitMQ自动生成名字)
                //var queueResult = await channel.QueueDeclareAsync();
                var queueResult = await channel.QueueDeclareAsync(
                    queue: queueName,   //队列名称
                    durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
                    exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)
                    autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
                    );

                //将队列绑定到Exchange(fanout类型忽略routingKey)
                await channel.QueueBindAsync(
                    queue: queueName,
                    exchange: exchangeName,
                    routingKey: "");

                //创建异步消费者对象
                var consumer = new AsyncEventingBasicConsumer(channel);

                //注册消息接收事件处理逻辑
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    try
                    {
                        //反序列化消息体为泛型对象T
                        var message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));

                        //记录收到消息的日志
                        _logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, message?.ToJson());

                        //调用用户定义的消息处理方法
                        await messageHandler(message);

                        //如果不是自动确认,则手动发送 Ack 确认消息已处理
                        if (!autoAck)
                        {
                            await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
                        }
                    }
                    catch (JsonException jsonEx)
                    {
                        //消息反序列化失败,记录错误日志
                        _logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);

                        //非自动确认模式下拒绝消息,不重入队列
                        if (!autoAck)
                        {
                            await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
                        }
                    }
                    catch (Exception ex)
                    {
                        //消息处理过程中发生其他异常,记录错误日志
                        _logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);

                        //非自动确认模式下拒绝消息,并尝试重新入队
                        if (!autoAck)
                        {
                            await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);
                        }
                    }
                };

                //启动消费,开始监听消息
                var consumerTag = await channel.BasicConsumeAsync(
                    queue: queueName,
                    autoAck: autoAck,
                    consumer: consumer);

                //将消费者和通道保存起来以便后续取消操作
                _activeConsumers.TryAdd(consumerTag, (channel, consumer));

                //注册取消回调,当取消令牌被触发时调用StopConsuming
                cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));

                //记录启动成功日志
                _logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);

                //返回取消令牌源,供外部控制消费终止
                return cancellationTokenSource;
            }
            catch (Exception ex)
            {
                //出现异常时释放资源
                channel?.Dispose();

                //原子减少消费者计数
                Interlocked.Decrement(ref _currentConsumerCount);

                //释放信号量槽位
                _consumerLimitSemaphore.Release();

                //记录启动失败日志
                _logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);

                //抛出异常
                throw;
            }
        }

        /// <summary>
        /// 消费消息-路由模式(Routing)
        /// </summary>
        /// <typeparam name="T">消息反序列化的目标类型</typeparam>
        /// <param name="exchangeName">要绑定的 Exchange 名称</param>
        /// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>
        /// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
        /// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
        /// <param name="autoAck">是否自动确认消息</param>
        /// <returns>CancellationTokenSource,用于取消消费操作</returns>
        public async Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class
        {
            //校验exchange名称是否为空
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");

            //校验路由键是否为空
            if (routingKey == null)
                throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");

            //校验消息处理器是否为空
            if (messageHandler == null)
                throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");

            //等待获取消费者槽位(防止并发消费者过多)
            if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
            {
                //获取失败则抛出超时异常
                throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
            }

            //原子增加当前消费者数量
            Interlocked.Increment(ref _currentConsumerCount);

            // 创建 CancellationTokenSource,用于后续控制取消消费
            var cancellationTokenSource = new CancellationTokenSource();
            IChannel? channel = null;

            try
            {
                // 创建一个新的 RabbitMQ Channel
                channel = await _rabbitMQService.CreateChannelAsync();

                // 声明一个 Direct 类型的 Exchange(如果不存在则创建)
                await channel.ExchangeDeclareAsync(
                    exchange: exchangeName,
                    type: ExchangeType.Direct,
                    durable: true,          // Exchange 持久化
                    autoDelete: false);     // 不自动删除

                //自定义临时队列名称
                var queueName = "Routing-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");
                //创建临时队列(由RabbitMQ自动生成名字)
                //var queueResult = await channel.QueueDeclareAsync();
                var queueResult = await channel.QueueDeclareAsync(
                    queue: queueName,   //队列名称
                    durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
                    exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)
                    autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
                    );

                // 将队列绑定到指定的 Exchange,并使用给定的 routingKey
                await channel.QueueBindAsync(queueName, exchangeName, routingKey);

                // 创建异步消费者
                var consumer = new AsyncEventingBasicConsumer(channel);

                // 注册消息接收事件处理逻辑
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    try
                    {
                        // 反序列化消息体为泛型 T 对象
                        var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));

                        //记录收到消息的日志
                        _logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());

                        // 调用用户定义的消息处理函数
                        await messageHandler(msg);

                        // 如果不是自动确认,则手动发送 Ack 确认消息已处理成功
                        if (!autoAck)
                            await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
                    }
                    catch (JsonException jsonEx)
                    {
                        //消息反序列化失败,记录错误日志
                        _logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);

                        //非自动确认模式下拒绝消息,不重入队列
                        if (!autoAck)
                        {
                            await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
                        }
                    }
                    catch (Exception ex)
                    {
                        //消息处理过程中发生其他异常,记录错误日志
                        _logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);

                        //非自动确认模式下拒绝消息,并尝试重新入队
                        if (!autoAck)
                        {
                            await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);
                        }
                    }
                };

                // 开始消费队列中的消息
                var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);

                // 记录当前消费者信息以便后续取消或释放资源
                _activeConsumers.TryAdd(consumerTag, (channel, consumer));

                // 注册取消令牌,在取消时调用 StopConsuming 方法停止消费
                cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));

                //记录启动成功日志
                _logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);

                // 返回 CancellationTokenSource,供外部控制取消
                return cancellationTokenSource;
            }
            catch (Exception ex)
            {
                //出现异常时释放资源
                channel?.Dispose();

                //原子减少消费者计数
                Interlocked.Decrement(ref _currentConsumerCount);

                //释放信号量槽位
                _consumerLimitSemaphore.Release();

                //记录启动失败日志
                _logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);

                //抛出异常
                throw;
            }
        }

        /// <summary>
        /// 消费消息-主题模式(Topic)
        /// </summary>
        /// <typeparam name="T">消息反序列化的目标类型</typeparam>
        /// <param name="exchangeName">要绑定的 Exchange 名称</param>
        /// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>
        /// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
        /// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
        /// <param name="autoAck">是否自动确认消息</param>
        /// <returns>CancellationTokenSource,用于取消消费操作</returns>
        public async Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class
        {
            //校验exchange名称是否为空
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");

            //校验匹配规则是否为空
            if (topicPattern == null)
                throw new ArgumentNullException(nameof(topicPattern), "topicPattern不能为空");

            //校验消息处理器是否为空
            if (messageHandler == null)
                throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");

            //等待获取消费者槽位(防止并发消费者过多)
            if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
            {
                //获取失败则抛出超时异常
                throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
            }

            //原子增加当前消费者数量
            Interlocked.Increment(ref _currentConsumerCount);

            // 创建 CancellationTokenSource,用于后续控制取消消费
            var cancellationTokenSource = new CancellationTokenSource();
            IChannel? channel = null;

            try
            {
                // 创建一个新的 RabbitMQ Channel
                channel = await _rabbitMQService.CreateChannelAsync();

                // 声明一个 Topic 类型的 Exchange(如果不存在则创建)
                await channel.ExchangeDeclareAsync(
                    exchange: exchangeName,
                    type: ExchangeType.Topic,
                    durable: true,          // Exchange 持久化
                    autoDelete: false);     // 不自动删除

                //自定义临时队列名称
                var queueName = "Topic-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");
                //创建临时队列(由RabbitMQ自动生成名字)
                //var queueResult = await channel.QueueDeclareAsync();
                var queueResult = await channel.QueueDeclareAsync(
                    queue: queueName,   //队列名称
                    durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
                    exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)
                    autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
                    );

                // 将队列绑定到指定的 Exchange,并使用 Topic 模式匹配规则
                await channel.QueueBindAsync(queueName, exchangeName, topicPattern);

                // 创建异步消费者
                var consumer = new AsyncEventingBasicConsumer(channel);

                // 注册消息接收事件处理逻辑
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    try
                    {
                        // 反序列化消息体为泛型 T 对象
                        var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));

                        //记录收到消息的日志
                        _logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());

                        // 调用用户定义的消息处理函数
                        await messageHandler(msg);

                        // 如果不是自动确认,则手动发送 Ack 确认消息已处理成功
                        if (!autoAck)
                            await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
                    }
                    catch (JsonException jsonEx)
                    {
                        //消息反序列化失败,记录错误日志
                        _logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);

                        //非自动确认模式下拒绝消息,不重入队列
                        if (!autoAck)
                        {
                            await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
                        }
                    }
                    catch (Exception ex)
                    {
                        //消息处理过程中发生其他异常,记录错误日志
                        _logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);

                        //非自动确认模式下拒绝消息,并尝试重新入队
                        if (!autoAck)
                        {
                            await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);
                        }
                    }
                };

                // 开始消费队列中的消息
                var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);

                // 记录当前消费者信息以便后续取消或释放资源
                _activeConsumers.TryAdd(consumerTag, (channel, consumer));

                // 注册取消令牌,在取消时调用 StopConsuming 方法停止消费
                cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));

                //记录启动成功日志
                _logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);

                // 返回 CancellationTokenSource,供外部控制取消
                return cancellationTokenSource;
            }
            catch (Exception ex)
            {
                //出现异常时释放资源
                channel?.Dispose();

                //原子减少消费者计数
                Interlocked.Decrement(ref _currentConsumerCount);

                //释放信号量槽位
                _consumerLimitSemaphore.Release();

                //记录启动失败日志
                _logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);

                //抛出异常
                throw;
            }
        }

        /// <summary>
        /// 消费消息-请求/响应(RPC)
        /// </summary>
        /// <typeparam name="TRequest">请求消息的类型</typeparam>
        /// <typeparam name="TResponse">响应消息的类型</typeparam>
        /// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>
        /// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>
        /// <param name="handler">处理请求并返回响应的异步回调函数</param>
        /// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
        /// <returns>CancellationTokenSource,用于取消消费操作</returns>
        public async Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class
        {
            // 参数校验
            if (string.IsNullOrWhiteSpace(exchangeName))
                throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));

            if (string.IsNullOrWhiteSpace(routingKey))
                throw new ArgumentException("路由键不能为空", nameof(routingKey));

            if (handler == null)
                throw new ArgumentNullException(nameof(handler), "消息处理器不能为空");

            // 等待获取消费者槽位
            if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
            {
                throw new InvalidOperationException(
                    $"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
            }

            try
            {
                Interlocked.Increment(ref _currentConsumerCount);

                // 创建新的 RabbitMQ Channel
                var channel = await _rabbitMQService.CreateChannelAsync();

                try
                {
                    // 设置预取数量(控制并发处理能力)
                    if (prefetchCount > 0)
                    {
                        await channel.BasicQosAsync(0, prefetchCount, false);
                    }

                    // 声明队列(与生产者保持一致)
                    var queueDeclareOk = await channel.QueueDeclareAsync(
                        queue: routingKey,   //队列名称
                        durable: true,       //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
                        exclusive: false,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)
                        autoDelete: false,   //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
                        arguments: null
                        );

                    // 绑定队列到Exchange
                    await channel.QueueBindAsync(
                        queue: routingKey,
                        exchange: exchangeName,
                        routingKey: routingKey);

                    // 创建异步消费者
                    var consumer = new AsyncEventingBasicConsumer(channel);

                    // 消息处理逻辑
                    consumer.ReceivedAsync += async (model, ea) =>
                    {
                        try
                        {
                            // 反序列化请求
                            var request = JsonSerializer.Deserialize<TRequest>(ea.Body.Span);

                            // 处理请求
                            var response = await handler(request);

                            // 准备响应属性
                            var replyProps = new BasicProperties();
                            replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
                            replyProps.ContentType = "application/json";

                            // 发送响应
                            await channel.BasicPublishAsync(
                                exchange: "", // 默认Exchange
                                routingKey: ea.BasicProperties.ReplyTo,
                                mandatory: false,
                                basicProperties: replyProps,
                                body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(response)));

                            // 确认消息
                            await channel.BasicAckAsync(ea.DeliveryTag, false);
                        }
                        catch (Exception ex)
                        {
                            _logger.LogError(ex, "处理RPC请求失败: {CorrelationId}",
                                ea.BasicProperties?.CorrelationId);

                            // 拒绝消息且不重新入队
                            await channel.BasicNackAsync(ea.DeliveryTag, false, false);
                        }
                    };

                    // 开始消费
                    var consumerTag = await channel.BasicConsumeAsync(
                        queue: routingKey,
                        autoAck: false, // 手动确认
                        consumer: consumer);

                    // 创建取消令牌
                    var cts = new CancellationTokenSource();

                    // 注册取消回调
                    cts.Token.Register(async () =>
                    {
                        try
                        {
                            await channel.BasicCancelAsync(consumerTag);
                            await channel.CloseAsync();
                        }
                        catch (Exception ex)
                        {
                            _logger.LogWarning(ex, "取消消费者时发生错误");
                        }
                        finally
                        {
                            channel.Dispose();
                            Interlocked.Decrement(ref _currentConsumerCount);
                            _consumerLimitSemaphore.Release();
                        }
                    });

                    return cts;
                }
                catch
                {
                    // 发生异常时确保通道被关闭
                    channel?.Dispose();
                    throw;
                }
            }
            catch
            {
                // 发生异常时释放信号量
                Interlocked.Decrement(ref _currentConsumerCount);
                _consumerLimitSemaphore.Release();
                throw;
            }
        }

        /// <summary>
        /// 停止指定消费者的消息消费
        /// </summary>
        /// <param name="consumerTag">消费者标签</param>
        /// <returns>异步任务</returns>
        public async Task StopConsuming(string consumerTag)
        {
            //从活跃集合中移除消费者
            if (_activeConsumers.TryRemove(consumerTag, out var consumerInfo))
            {
                try
                {
                    //取消消费者订阅
                    await consumerInfo.Channel.BasicCancelAsync(consumerTag);

                    //异步释放通道资源
                    await consumerInfo.Channel.DisposeAsync();

                    //记录停止成功日志
                    _logger.LogInformation("已停止消费者。消费者标签: {ConsumerTag}", consumerTag);
                }
                catch (OperationInterruptedException opEx)
                {
                    //记录操作中断警告日志
                    _logger.LogWarning(opEx, "消费者取消操作被中断。消费者标签: {ConsumerTag}", consumerTag);
                }
                catch (Exception ex)
                {
                    //记录停止失败错误日志
                    _logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);
                    throw;
                }
                finally
                {
                    Interlocked.Decrement(ref _currentConsumerCount);
                    _consumerLimitSemaphore.Release();
                    _logger.LogInformation("当前消费者数: {CurrentCount}/{MaxCount}", _currentConsumerCount, _maxConsumerCount);
                }
            }
            else
            {
                // 记录未找到消费者警告日志
                _logger.LogWarning("未找到对应的消费者。消费者标签: {ConsumerTag}", consumerTag);
            }
        }

        /// <summary>
        /// 停止所有消费者的消息消费
        /// </summary>
        /// <returns>异步任务</returns>
        public async Task StopAllConsuming()
        {
            // 遍历所有消费者标签并停止
            foreach (var consumerTag in _activeConsumers.Keys.ToList())
            {
                try
                {
                    await StopConsuming(consumerTag).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    // 记录单个消费者停止失败日志,继续停止其他消费者
                    _logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);
                }
            }
        }

        /// <summary>
        /// 获取当前消费者状态
        /// </summary>
        public ConsumerStatus GetConsumerStatus()
        {
            // 创建并返回一个新的 ConsumerStatus 对象,用于封装当前消费者的运行状态
            return new ConsumerStatus
            {
                // 设置当前消费者数量
                CurrentCount = _currentConsumerCount,
                // 设置最大消费者数量
                MaxCount = _maxConsumerCount,
                // 获取当前所有活跃消费者的标识符列表
                ActiveConsumers = _activeConsumers.Keys.ToList()
            };
        }

    }

}

        案例如下

5.2.消费者接口

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;

namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{
    /// <summary>
    /// 消费者服务接口
    /// RabbitMQ
    /// </summary>
    public interface IMQConsumerService
    {
        /// <summary>
        /// 消费消息-点对点(Point-to-Point)
        /// </summary>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="queueName">要消费的队列名称</param>
        /// <param name="messageHandler">消息处理委托</param>
        /// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
        /// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>
        /// <returns>取消令牌源,用于停止消费</returns>
        /// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>
        Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class;

        /// <summary>
        /// 消费消息-发布/订阅模式(Pub/Sub)
        /// </summary>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="exchangeName">要订阅的Exchange名称</param>
        /// <param name="messageHandler">消息处理委托</param>
        /// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
        /// <param name="autoAck">是否自动确认消息</param>
        /// <returns>取消令牌源,用于停止消费</returns>
        Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;

        /// <summary>
        /// 消费消息-路由模式(Routing)
        /// </summary>
        /// <typeparam name="T">消息反序列化的目标类型</typeparam>
        /// <param name="exchangeName">要绑定的 Exchange 名称</param>
        /// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>
        /// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
        /// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
        /// <param name="autoAck">是否自动确认消息</param>
        /// <returns>CancellationTokenSource,用于取消消费操作</returns>
        Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;

        /// <summary>
        /// 消费消息-主题模式(Topic)
        /// </summary>
        /// <typeparam name="T">消息反序列化的目标类型</typeparam>
        /// <param name="exchangeName">要绑定的 Exchange 名称</param>
        /// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>
        /// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
        /// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
        /// <param name="autoAck">是否自动确认消息</param>
        /// <returns>CancellationTokenSource,用于取消消费操作</returns>
        Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;

        /// <summary>
        /// 消费消息-请求/响应(RPC)
        /// </summary>
        /// <typeparam name="TRequest">请求消息的类型</typeparam>
        /// <typeparam name="TResponse">响应消息的类型</typeparam>
        /// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>
        /// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>
        /// <param name="handler">处理请求并返回响应的异步回调函数</param>
        /// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
        /// <returns>CancellationTokenSource,用于取消消费操作</returns>
        Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class;

        /// <summary>
        /// 停止指定消费者的消息消费
        /// </summary>
        /// <param name="consumerTag">消费者标签</param>
        /// <returns>异步任务</returns>
        Task StopConsuming(string consumerTag);

        /// <summary>
        /// 停止所有消费者的消息消费
        /// </summary>
        /// <returns>异步任务</returns>
        Task StopAllConsuming();

        /// <summary>
        /// 获取当前消费者状态
        /// </summary>
        /// <returns></returns>
        ConsumerStatus GetConsumerStatus();

    }
}

        案例如下

6.注册

         在 Program 或 Startup 中注册队列。

 // 注册 RabbitMQ 连接服务为单例(Singleton)
 // IRabbitMqClient 是一个接口,代表 RabbitMQ 客户端连接的抽象
 // RabbitMqClient 是其具体实现类
 // 单例生命周期意味着在整个应用程序生命周期中只创建一次该实例,所有请求共享同一个实例
 builder.Services.AddSingleton<IRabbitMqClient, RabbitMqClient>();

 // 注册 RabbitMQ 消息生产者服务为作用域(Scoped)
 // IMQProducerService 是用于发送消息的接口
 // MQProducerService 是其实现类
 // Scoped 生命周期表示在同一个请求上下文中使用同一个实例(适用于 Web 应用场景)
 builder.Services.AddScoped<IMQProducerService, MQProducerService>();

 // 注册 RabbitMQ 消息消费者服务为单例(Singleton)
 // IMQConsumerService 是用于消费消息(接收并处理消息)的接口
 // MQConsumerService 是其实现类
 // 使用 Singleton 是因为消费者通常需要长时间运行、持续监听队列,适合整个应用周期内保持一个实例
 builder.Services.AddSingleton<IMQConsumerService, MQConsumerService>();

         案例如下

7.简单使用案例

        下面是 实现、接口和控制器的使用案例

        7.1.实现

using Frame1_Service.IService.Product;
using Frame2_DataModel.Entity.Products;
using Frame3_DataRepository.RabbitMQRepository.Consumer;
using Frame3_DataRepository.RabbitMQRepository.Producer;
using Frame6_LibraryUtility;
using RabbitMQ.Client.Exceptions;

namespace Frame1_Service.Service.Product
{
    public class RabbitMQTestSvr : BaseService, IRabbitMQTestSvr
    {

        /// <summary>
        /// 生产者
        /// </summary>
        private readonly IMQProducerService _iRabbitMQProducer;
        /// <summary>
        /// 消费者
        /// </summary>
        private readonly IMQConsumerService _iRabbitMQConsumer;

        /// <summary>
        /// 构造
        /// </summary>
        /// <param name="iRabbitMQProducer"></param>
        /// <param name="iRabbitMQConsumer"></param>
        public RabbitMQTestSvr(IMQProducerService iRabbitMQProducer, IMQConsumerService iRabbitMQConsumer)
        {
            _iRabbitMQConsumer = iRabbitMQConsumer;
            _iRabbitMQProducer = iRabbitMQProducer;
        }

        /// <summary>
        /// 模拟消费逻辑
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        private async Task ProcessOrderAsync(ProductsEntity model)
        {
            Console.WriteLine("消费成功:" + model.ToJson());
        }

        /// <summary>
        /// 生产者-点对点(Point-to-Point)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model)
        {
            var result = new ResultModel<bool>() { Data = false };

            // 创建 Random 实例
            Random random = new Random();
            model.Id = Guid.NewGuid().ToString();
            model.ProductName = "测试" + (random.Next(1, 999)).ToString();
            model.Price = random.Next(1000, 9999);
            model.Stock = random.Next(1, 99);

            await _iRabbitMQProducer.PublishByPTPAsync<ProductsEntity>("ProducerTest", model);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 消费者-点对点(Point-to-Point)
        /// </summary>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ConsumerTest()
        {
            var result = new ResultModel<bool>() { Data = false };

            await _iRabbitMQConsumer.StartConsumingByPTPAsync<ProductsEntity>("ProducerTest", ProcessOrderAsync);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 生产者-发布订阅(Pub/Sub)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model)
        {
            var result = new ResultModel<bool>() { Data = false };

            // 创建 Random 实例
            Random random = new Random();
            model.Id = Guid.NewGuid().ToString();
            model.ProductName = "测试" + (random.Next(1, 999)).ToString();
            model.Price = random.Next(1000, 9999);
            model.Stock = random.Next(1, 99);

            await _iRabbitMQProducer.PublishByPubSubAsync<ProductsEntity>("PubSubTest", model);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 消费者-发布订阅(Pub/Sub)
        /// </summary>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ConsumerPubSub()
        {
            var result = new ResultModel<bool>() { Data = false };

            await _iRabbitMQConsumer.StartConsumingByPubSubAsync<ProductsEntity>("PubSubTest", ProcessOrderAsync);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 生产者-路由模式(Routing)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model)
        {
            var result = new ResultModel<bool>() { Data = false };

            // 创建 Random 实例
            Random random = new Random();
            model.Id = Guid.NewGuid().ToString();
            model.ProductName = "测试" + (random.Next(1, 999)).ToString();
            model.Price = random.Next(1000, 9999);
            model.Stock = random.Next(1, 99);

            await _iRabbitMQProducer.PublishByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", model);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 消费者-路由模式(Routing)
        /// </summary>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ConsumerRouting()
        {
            var result = new ResultModel<bool>() { Data = false };

            await _iRabbitMQConsumer.StartConsumingByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", ProcessOrderAsync);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 生产者-主题模式(Topic)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model)
        {
            var result = new ResultModel<bool>() { Data = false };

            // 创建 Random 实例
            Random random = new Random();
            model.Id = Guid.NewGuid().ToString();
            model.ProductName = "测试" + (random.Next(1, 999)).ToString();
            model.Price = random.Next(1000, 9999);
            model.Stock = random.Next(1, 99);

            await _iRabbitMQProducer.PublishByTopicAsync<ProductsEntity>("TopicTest", "Topic.test", model);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 消费者-主题模式(Topic)
        /// </summary>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ConsumerTopic()
        {
            var result = new ResultModel<bool>() { Data = false };

            await _iRabbitMQConsumer.StartConsumingByTopicAsync<ProductsEntity>("TopicTest", "Topic.*", ProcessOrderAsync);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 生产者-请求响应模式(RPC)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model)
        {
            var result = new ResultModel<CalculateResponse>();

            var request = new CalculateRequest { X = 5, Y = 7 };

            // 创建 Random 实例
            Random random = new Random();
            model.Id = Guid.NewGuid().ToString();
            model.ProductName = "测试" + (random.Next(1, 999)).ToString();
            model.Price = random.Next(1000, 9999);
            model.Stock = random.Next(1, 99);

            var response = await _iRabbitMQProducer.PublishByPRCAsync<CalculateRequest, CalculateResponse>("RPCTest", "RPC", request);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = response;
            return result;
        }

        /// <summary>
        /// 消费者-请求响应模式(RPC)
        /// </summary>
        /// <returns></returns>
        public async Task<ResultModel<bool>> ConsumerRPC()
        {
            var result = new ResultModel<bool> { Data = false };

            try
            {
                //示例:模拟一个计算器服务(可替换为真实的 ICalculatorService)
                Func<CalculateRequest, Task<CalculateResponse>> handler = async req =>
                {
                    await Task.Delay(10); //模拟异步处理
                    return new CalculateResponse { Result = req.X + req.Y };
                };

                //启动消费者
                var cts = await _iRabbitMQConsumer.StartConsumingByPRCAsync<CalculateRequest, CalculateResponse>(exchangeName: "RPCTest", routingKey: "RPC", handler);

                result.Data = true;
                result.Code = ResultCodeEnum.Success;
                result.Msg = "RPC消费者已启动";
            }
            catch (OperationInterruptedException ex)
            {
                result.Msg = "消息队列服务不可用";
            }
            catch (Exception ex)
            {
                result.Msg = "消费者初始化失败";
            }

            return result;
        }

        /// <summary>
        /// 死信队列重抛
        /// </summary>
        /// <returns></returns>
        public async Task<ResultModel<bool>> Republish(string queueName)
        {
            var result = new ResultModel<bool>() { Data = false };

            await _iRabbitMQProducer.RepublishDeadLetterMessagesAsync<ProductsEntity>(queueName);

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

        /// <summary>
        /// 停止消费者
        /// </summary>
        /// <returns></returns>
        public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag)
        {
            var result = new ResultModel<bool>() { Data = false };
            if (consumerTag.Equals("0"))
            {
                await _iRabbitMQConsumer.StopAllConsuming();
            }
            else
            {
                await _iRabbitMQConsumer.StopConsuming(consumerTag);
            }

            result.Code = ResultCodeEnum.Success;
            result.Msg = "操作成功";
            result.Data = true;
            return result;
        }

    }
}

        案例如下

 

        7.2.接口

using Frame2_DataModel.Entity.Products;
using Frame6_LibraryUtility;

namespace Frame1_Service.IService.Product
{
    public interface IRabbitMQTestSvr
    {
        /// <summary>
        /// 生产者-点对点(Point-to-Point)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        Task<ResultModel<bool>> ProducerTest(ProductsEntity model);

        /// <summary>
        /// 消费者-点对点(Point-to-Point)
        /// </summary>
        /// <returns></returns>
        Task<ResultModel<bool>> ConsumerTest();

        /// <summary>
        /// 生产者-发布订阅(Pub/Sub)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model);

        /// <summary>
        /// 消费者-发布订阅(Pub/Sub)
        /// </summary>
        /// <returns></returns>
        Task<ResultModel<bool>> ConsumerPubSub();

        /// <summary>
        /// 生产者-路由模式(Routing)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        Task<ResultModel<bool>> ProducerRouting(ProductsEntity model);

        /// <summary>
        /// 消费者-路由模式(Routing)
        /// </summary>
        /// <returns></returns>
        Task<ResultModel<bool>> ConsumerRouting();

        /// <summary>
        /// 生产者-主题模式(Topic)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        Task<ResultModel<bool>> ProducerTopic(ProductsEntity model);

        /// <summary>
        /// 消费者-主题模式(Topic)
        /// </summary>
        /// <returns></returns>
        Task<ResultModel<bool>> ConsumerTopic();

        /// <summary>
        /// 生产者-请求响应模式(RPC)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model);

        /// <summary>
        /// 消费者-请求响应模式(RPC)
        /// </summary>
        /// <returns></returns>
        Task<ResultModel<bool>> ConsumerRPC();

        /// <summary>
        /// 死信队列重抛
        /// </summary>
        /// <returns></returns>
        Task<ResultModel<bool>> Republish(string queueName);

        /// <summary>
        /// 停止消费者
        /// </summary>
        /// <returns></returns>
        Task<ResultModel<bool>> StopAllConsumer(string consumerTag);
    }
}

        案例如下

        7.3.控制器

using Frame1_Service.IService.Product;
using Frame1_Service.Service.Product;
using Frame2_DataModel.Entity.Products;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.AspNetCore.Mvc;

namespace DemoAPI.Controllers
{
    /// <summary>
    /// 消息队列控制器 -RabbitMQ
    /// </summary>
    //[Authorize]// 保护整个控制器
    [Route("api/[controller]/[action]")]//标记路由地址规格
    [ApiController] // 标记该类为 API 控制器,启用一些默认的行为,如模型绑定、输入验证等
    [ApiExplorerSettings(GroupName = nameof(ApiVersionInfo.V1))]//设置控制器的API版本
    public class RabbitMQTestController : BaseController
    {
        private IRabbitMQTestSvr _iRabbitMQTestSvr;

        /// <summary>
        /// 构造
        /// </summary>
        /// <param name="iRabbitMQTestSvr"></param>
        public RabbitMQTestController(IRabbitMQTestSvr iRabbitMQTestSvr) 
        {
            _iRabbitMQTestSvr = iRabbitMQTestSvr;
        }

        /// <summary>
        /// 生产者-点对点(Point-to-Point)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTest(model);

        /// <summary>
        /// 消费者-点对点(Point-to-Point)
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public async Task<ResultModel<bool>> ConsumerTest() => await _iRabbitMQTestSvr.ConsumerTest();

        /// <summary>
        /// 生产者-发布订阅(Pub/Sub)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerPubSub(model);

        /// <summary>
        /// 消费者-发布订阅(Pub/Sub)
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public async Task<ResultModel<bool>> ConsumerPubSub() => await _iRabbitMQTestSvr.ConsumerPubSub();

        /// <summary>
        /// 生产者-路由模式(Routing)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRouting(model);

        /// <summary>
        /// 消费者-路由模式(Routing)
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public async Task<ResultModel<bool>> ConsumerRouting() => await _iRabbitMQTestSvr.ConsumerRouting();

        /// <summary>
        /// 生产者-主题模式(Topic)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTopic(model);

        /// <summary>
        /// 消费者-主题模式(Topic)
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public async Task<ResultModel<bool>> ConsumerTopic() => await _iRabbitMQTestSvr.ConsumerTopic();

        /// <summary>
        /// 生产者-请求响应模式(RPC)
        /// </summary>
        /// <param name="model"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRPC(model);

        /// <summary>
        /// 消费者-请求响应模式(RPC)
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public async Task<ResultModel<bool>> ConsumerRPC() => await _iRabbitMQTestSvr.ConsumerRPC();

        /// <summary>
        /// 死信队列重抛
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public async Task<ResultModel<bool>> Republish(string queueName) => await _iRabbitMQTestSvr.Republish(queueName);

        /// <summary>
        /// 停止消费者
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag) => await _iRabbitMQTestSvr.StopAllConsumer(consumerTag);

    }
}

        案例如下


网站公告

今日签到

点亮在社区的每一天
去签到