目录
在 .NET Core 应用程序中使用 RabbitMQ 有许多好处,主要体现在其作为消息队列系统所带来的灵活性、可靠性和扩展性等方面,还能促进微服务架构的实施,是构建现代分布式应用的理想选择之一
1.添加包
添加 RabbitMQ.Client 包。
2. 连接配置
2.1.连接字符串
dbsettings.json文件添加 RabbitMQ 连接配置
//RabbitMQ配置
"RabbitMQSettings": {
"HostName": "ip地址", //地址
"Port": "端口", //端口
"UserName": "RabbitMQ用户名", //用户名
"Password": "RabbitMQ密码", //密码
"VirtualHost": "/", //本地虚拟地址
"RetryCount": 5, //最大重试次数
"RetryInterval": 5, //断开重连次数
"PrefetchCount": 5, //预取消息数量
"ConsumerCount": 5 //消费者数量
}
2.2.连接对象
namespace Frame3_DataRepository.RabbitMQRepository.BaseMQ
{
/// <summary>
/// 消息队列配置类
/// </summary>
public class RabbitMQSettings
{
/// <summary>
/// RabbitMQ 服务器地址
/// </summary>
public string HostName { get; set; }
/// <summary>
/// 端口号,默认5672
/// </summary>
public int Port { get; set; } = 5672;
/// <summary>
/// 用户名
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 密码
/// </summary>
public string Password { get; set; }
/// <summary>
/// 虚拟主机,默认为/
/// </summary>
public string VirtualHost { get; set; } = "/";
/// <summary>
/// 连接重试次数
/// </summary>
public int RetryCount { get; set; } = 5;
/// <summary>
/// 重试间隔(秒)
/// </summary>
public int RetryInterval { get; set; } = 5;
/// <summary>
/// 预取消息数量
/// </summary>
public ushort PrefetchCount { get; set; }
/// <summary>
/// 消费者数量
/// </summary>
public int ConsumerCount { get; set; }
}
/// <summary>
/// 持久化
/// </summary>
public enum DeliveryMode : byte
{
NonPersistent = 1,
Persistent = 2
}
/// <summary>
/// 消费者状态信息
/// </summary>
public class ConsumerStatus
{
/// <summary>
/// 当前活跃消费者数量
/// </summary>
public int CurrentCount { get; set; }
/// <summary>
/// 最大允许消费者数量
/// </summary>
public int MaxCount { get; set; }
/// <summary>
/// 活跃消费者标签列表
/// </summary>
public List<string> ActiveConsumers { get; set; } = new();
}
}
案例如下
3.创建连接服务
先创建配置获取方法,再创建 RabbitMqClient 服务实现类和 IRabbitMqClient 服务接口来MQ连接。
3.1.添加配置获取方法
using Microsoft.Extensions.Configuration;
namespace Frame4_LibraryCore.BaseConfig
{
/// <summary>
/// 全局配置
/// </summary>
public static class Config
{
/// <summary>
/// 从指定的 JSON 配置文件中读取配置,并反序列化为指定类型
/// </summary>
/// <typeparam name="T">目标配置类型(如 RedisSettings、DatabaseSettings 等)</typeparam>
/// <param name="fileName">JSON 配置文件名(如 "appsettings.json")</param>
/// <param name="sessions">配置节点名称(如 "RedisSettings")</param>
/// <returns>返回绑定后的强类型配置对象</returns>
public static T GetSetting<T>(string fileName, string sessions)
{
//创建 ConfigurationBuilder 实例,用于构建配置
var builder = new ConfigurationBuilder()
//设置配置文件的基础路径为当前程序运行目录
.SetBasePath(Directory.GetCurrentDirectory())
//添加 JSON 文件作为配置源:
//- fileName: 指定要加载的 JSON 文件
//- optional: false 表示文件必须存在,否则抛出异常
//- reloadOnChange: true 表示文件修改时自动重新加载
.AddJsonFile(fileName, optional: false, reloadOnChange: true);
//构建配置对象(IConfigurationRoot)
IConfigurationRoot config = builder.Build();
//获取指定配置节点(sessions),并将其反序列化为类型 T
var conn = config.GetSection(sessions).Get<T>();
//返回反序列化后的配置对象
return conn;
}
}
}
案例如下
3.2.服务实现类
using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace Frame3_DataRepository.RabbitMQRepository
{
/// <summary>
/// 队列服务实现类
/// 提供消息队列连接和通道管理功能
/// </summary>
public sealed class RabbitMqClient : BaseServiceSingleton, IRabbitMqClient
{
/// <summary>
/// RabbitMQ 连接工厂实例
/// </summary>
private readonly IConnectionFactory _connectionFactory;
/// <summary>
/// 日志记录器
/// </summary>
private readonly ILogger<RabbitMqClient> _logger;
/// <summary>
/// 连接重试最大次数
/// </summary>
private readonly int _retryCount;
/// <summary>
/// 重试间隔时间(秒)
/// </summary>
private readonly int _retryInterval;
/// <summary>
/// RabbitMQ 连接对象
/// </summary>
private IConnection _connection;
/// <summary>
/// 标识对象是否已被释放
/// </summary>
private bool _disposed;
/// <summary>
/// 连接操作的线程锁
/// </summary>
private readonly SemaphoreSlim _connectionLock = new(1, 1);
/// <summary>
/// 心跳检测定时器,用于定期检查连接状态
/// </summary>
private Timer _heartbeatTimer;
/// <summary>
/// 心跳检测间隔(秒),默认30秒
/// </summary>
private const int HeartbeatInterval = 30;
/// <summary>
/// 预取消息数量
/// </summary>
private readonly ushort _prefetchCount;
/// <summary>
/// 最大允许的消费者数量
/// </summary>
private readonly int _maxConsumerCount;
/// <summary>
/// 构造函数,初始化RabbitMQ服务
/// </summary>
/// <param name="logger">日志记录器,从DI容器注入</param>
/// <exception cref="ArgumentNullException">当必需参数为null时抛出</exception>
public RabbitMqClient(ILogger<RabbitMqClient> logger)
{
//参数校验,确保依赖注入的参数不为null
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
//读取配置
//var settingsValue = settings?.Value ?? throw new ArgumentNullException(nameof(settings));
var settingsValue = Config.GetSetting<RabbitMQSettings>("dbsettings.json", "RabbitMQSettings");
//从配置中初始化重试参数
_retryCount = settingsValue.RetryCount;
_retryInterval = settingsValue.RetryInterval;
//配置连接工厂参数
_connectionFactory = new ConnectionFactory
{
HostName = settingsValue.HostName, // 主机地址
Port = settingsValue.Port, // 端口号
UserName = settingsValue.UserName, // 用户名
Password = settingsValue.Password, // 密码
VirtualHost = settingsValue.VirtualHost, // 虚拟主机
AutomaticRecoveryEnabled = true, // 启用自动恢复
NetworkRecoveryInterval = TimeSpan.FromSeconds(10), // 网络恢复间隔10秒
RequestedHeartbeat = TimeSpan.FromSeconds(60) // 设置心跳间隔60秒
};
// 初始化心跳检测定时器
_heartbeatTimer = new Timer(HeartbeatCheck, null, Timeout.Infinite, Timeout.Infinite);
//初始化预取消息数量
_prefetchCount = settingsValue.PrefetchCount;
//初始化消费者数量
_maxConsumerCount = settingsValue.ConsumerCount;
}
/// <summary>
/// 检查当前是否已建立有效连接
/// </summary>
public bool IsConnected => _connection?.IsOpen == true && !_disposed;
/// <summary>
/// 预取消息数量
/// </summary>
public ushort prefetchCount => _prefetchCount;
/// <summary>
/// 消费者数量
/// </summary>
public int ConsumerCount => _maxConsumerCount;
/// <summary>
/// 尝试建立RabbitMQ连接
/// </summary>
/// <returns>是否连接成功</returns>
public async Task<bool> TryConnectAsync()
{
// 已连接则直接返回成功
if (IsConnected) return true;
// 加锁防止多线程同时创建连接
await _connectionLock.WaitAsync();
try
{
// 双重检查锁定模式
if (IsConnected) return true;
//记录建立连接
_logger.LogInformation("正在建立RabbitMQ连接...");
// 带重试机制的连接逻辑
for (int i = 0; i < _retryCount; i++)
{
try
{
//创建新连接
_connection = await _connectionFactory.CreateConnectionAsync().ConfigureAwait(false);
//订阅连接关闭事件
_connection.ConnectionShutdownAsync += OnConnectionShutdown;
//验证连接是否成功建立
if (IsConnected)
{
//记录连接成功
_logger.LogInformation("RabbitMQ连接已成功建立");
//连接成功后启动心跳检测 不用心跳检测可注释
//StartHeartbeatCheck();
return true;
}
}
catch (BrokerUnreachableException ex)
{
//专门处理Broker不可达异常
_logger.LogWarning(ex, $"RabbitMQ服务不可达,第{i}次重试...");
}
catch (Exception ex)
{
//处理其他类型的异常
_logger.LogWarning(ex, $"RabbitMQ连接异常,第{i}次重试...");
}
//如果未达到最大重试次数,等待间隔时间后重试
if (i < _retryCount)
{
//等待间隔时间后重试
await Task.Delay(_retryInterval * 1000).ConfigureAwait(false);
}
}
//记录连接到最大重试数
_logger.LogError($"RabbitMQ连接失败,已达到最大重试次数{_retryCount}");
return false;
}
finally
{
//确保锁被释放
_connectionLock.Release();
}
}
/// <summary>
/// 创建通道
/// </summary>
/// <returns>创建的通道对象</returns>
/// <exception cref="InvalidOperationException">当连接不可用时抛出</exception>
public async Task<IChannel> CreateChannelAsync()
{
//确保连接可用
if (!IsConnected && !await TryConnectAsync().ConfigureAwait(false))
{
throw new InvalidOperationException("没有可用的RabbitMQ连接");
}
try
{
return await _connection.CreateChannelAsync().ConfigureAwait(false);
}
catch (OperationInterruptedException ex)
{
//处理RabbitMQ操作中断异常
_logger.LogError(ex, "创建RabbitMQ通道时操作被中断");
throw;
}
catch (Exception ex)
{
//处理其他创建通道时的异常
_logger.LogError(ex, "创建RabbitMQ通道失败");
throw;
}
}
/// <summary>
/// 连接关闭事件处理程序,自动尝试重新连接
/// </summary>
private async Task OnConnectionShutdown(object sender, ShutdownEventArgs args)
{
//记录连接关闭事件,包括关闭原因
_logger.LogWarning($"RabbitMQ连接已关闭,原因: {args}");
//如果服务未被释放,尝试自动重新连接
if (!_disposed)
{
try
{
//异步尝试重新连接,不阻塞当前线程
await TryConnectAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
//记录重连失败异常
_logger.LogError(ex, "重连失败");
}
}
}
/// <summary>
/// 心跳检测回调方法,定期检查连接状态
/// </summary>
private async void HeartbeatCheck(object state)
{
try
{
// 如果连接不存在或已关闭,尝试重新连接
if (!IsConnected)
{
//记录检测断开重新连接
_logger.LogWarning("心跳检测发现连接断开,尝试重新连接...");
//建立连接
await TryConnectAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
{
// 记录心跳检测过程中的异常
_logger.LogError(ex, "心跳检测异常");
}
}
/// <summary>
/// 启动心跳检测定时器
/// </summary>
private void StartHeartbeatCheck()
{
// 设置定时器,定期执行心跳检测
_heartbeatTimer.Change(TimeSpan.FromSeconds(HeartbeatInterval),
TimeSpan.FromSeconds(HeartbeatInterval));
}
/// <summary>
/// 停止心跳检测定时器
/// </summary>
private void StopHeartbeatCheck()
{
// 禁用定时器
_heartbeatTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
//释放
Dispose(true);
//优化垃圾回收
GC.SuppressFinalize(this);
}
/// <summary>
/// 受保护的释放方法
/// </summary>
/// <param name="disposing">是否主动释放</param>
private void Dispose(bool disposing)
{
//如果已经释放,直接返回
if (_disposed) return;
//如果是主动释放,处理托管资源
if (disposing)
{
try
{
//停止心跳检测
StopHeartbeatCheck();
//释放定时器资源
_heartbeatTimer?.Dispose();
//取消事件订阅
if (_connection != null)
{
_connection.ConnectionShutdownAsync -= OnConnectionShutdown;
}
//释放连接资源
_connection?.Dispose();
//释放线程锁资源
_connectionLock.Dispose();
//记录连接关闭日志
_logger.LogInformation("RabbitMQ连接已关闭");
}
catch (Exception ex)
{
//记录资源释放过程中的异常
_logger.LogError(ex, "关闭RabbitMQ连接时出错");
}
finally
{
//标记为已释放状态
_disposed = true;
}
}
}
}
}
案例如下
3.3.服务接口
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
namespace Frame3_DataRepository.RabbitMQRepository
{
/// <summary>
/// 队列服务接口
/// 供生产者和消费者调用
/// </summary>
public interface IRabbitMqClient //: IDisposable
{
/// <summary>
/// 获取当前连接状态
/// true表示已建立有效连接,false表示连接不可用
/// </summary>
bool IsConnected { get; }
/// <summary>
/// 预取消息数量
/// </summary>
ushort prefetchCount { get; }
/// <summary>
/// 消费者数量
/// </summary>
int ConsumerCount { get; }
/// <summary>
/// 尝试建立RabbitMQ连接
/// </summary>
/// <returns>是否连接成功</returns>
Task<bool> TryConnectAsync();
/// <summary>
/// 异步创建RabbitMQ通道
/// 用于消息发布、消费等操作
/// </summary>
/// <returns>RabbitMQ通道实例</returns>
/// <exception cref="InvalidOperationException">当无法创建连接时抛出</exception>
/// <exception cref="OperationInterruptedException">当RabbitMQ操作被中断时抛出</exception>
Task<IChannel> CreateChannelAsync();
void Dispose();
}
}
案例如下
4.创建生产者服务
创建生产者服务实现类 MQProducerService 和接口 IMQProducerService
4.1.生产者实现类
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Text;
using System.Text.Json;
namespace Frame3_DataRepository.RabbitMQRepository.Producer
{
/// <summary>
/// 生产者服务实现类
/// RabbitMQ
/// </summary>
public sealed class MQProducerService : BaseService, IMQProducerService
{
/// <summary>
/// 基础连接服务
/// </summary>
private readonly IRabbitMqClient _iRabbitMQService;
/// <summary>
/// 日志记录器
/// </summary>
private readonly ILogger<MQProducerService> _logger;
/// <summary>
/// 构造函数,注入依赖
/// </summary>
/// <param name="iRabbitMQService"></param>
/// <param name="logger"></param>
public MQProducerService(IRabbitMqClient iRabbitMQService, ILogger<MQProducerService> logger)
{
//参数校验,确保依赖注入的参数不为null
_iRabbitMQService = iRabbitMQService ?? throw new ArgumentNullException(nameof(iRabbitMQService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// 发布消息-点对点模式(Point-to-Point)
/// </summary>
/// <typeparam name="T">消息类型,必须是class类型</typeparam>
/// <param name="queueName">目标队列名称</param>
/// <param name="message">要发布的消息对象</param>
/// <param name="messageId">可选的消息ID,未提供时自动生成</param>
/// <param name="exchange">可选交换机名称,默认使用直接交换机</param>
/// <param name="headers">可选的消息头字典</param>
/// <param name="withDLX">是否启用死信队列</param>
/// <param name="maxRetryCount">最大重试次数(仅当启用死信队列时有效)</param>
/// <returns>异步任务</returns>
/// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>
/// <exception cref="InvalidOperationException">当消息序列化失败时抛出</exception>
public async Task PublishByPTPAsync<T>(string queueName, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class
{
//参数校验
if (string.IsNullOrWhiteSpace(queueName))
throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");
if (message == null)
throw new ArgumentNullException(nameof(message), "消息内容不能为空");
//生成消息ID(如果未提供则使用Guid)
messageId = messageId.IsEmpty() ? messageId = Guid.NewGuid().ToString() : messageId;
//声明变量用于存储序列化后的消息,便于错误处理
var jsonMessage = string.Empty;
//声明RabbitMQ通道
IChannel? channel = null;
try
{
//创建RabbitMQ通道
channel = await _iRabbitMQService.CreateChannelAsync();
// 死信队列配置
var arguments = new Dictionary<string, object>();
if (withDLX)
{
// 死信交换机配置
var dlxExchange = $"{queueName}.DLX";
var dlxQueue = $"{queueName}.DLQ";
// 声明死信交换机和队列
await channel.ExchangeDeclareAsync(dlxExchange, ExchangeType.Direct, durable: true);
await channel.QueueDeclareAsync(
queue: dlxQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
await channel.QueueBindAsync(dlxQueue, dlxExchange, dlxQueue);
// 设置死信队列参数
arguments.Add("x-dead-letter-exchange", dlxExchange);
arguments.Add("x-dead-letter-routing-key", dlxQueue);
arguments.Add("x-max-retry-count", maxRetryCount); // 自定义属性,记录最大重试次数
arguments.Add("x-max-length", 100000);
arguments.Add("x-queue-mode", "lazy");
}
// 添加消费者数量限制
arguments["x-max-consumers"] = _iRabbitMQService.ConsumerCount;
//声明队列(确保队列存在)
var queueDeclareOk = await channel.QueueDeclareAsync
(
queue: queueName, //队列名称
durable: true, //队列持久化(服务器重启后仍然存在)
exclusive: false, //非独占队列
autoDelete: false, //不自动删除
arguments: arguments//new Dictionary<string, object>
//{
// // 只允许一个活跃消费者
// //["x-single-active-consumer"] = true,
// ["x-max-consumers"] = _iRabbitMQService.ConsumerCount,
//}
);
//序列化消息为JSON格式
jsonMessage = JsonSerializer.Serialize(message);
//将消息内容转换为UTF-8字节数组
var body = Encoding.UTF8.GetBytes(jsonMessage);
//创建消息属性
var properties = new BasicProperties
{
Persistent = true, //消息持久化(需要队列也是持久化的才有效)
MessageId = messageId, //设置唯一消息ID用于追踪
ContentType = "application/json", //明确指定内容类型为JSON
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //添加时间戳
};
//设置消息头(如果提供)
if (headers != null && headers.Any())
{
properties.Headers = new Dictionary<string, object>(headers);
}
// 发布消息到队列
await channel.BasicPublishAsync
(
exchange: exchange ?? string.Empty, //交换机名称(空字符串表示默认direct交换机)
routingKey: queueName, //路由键(对于默认交换机就是队列名)
mandatory: false, //不强制要求消息被路由到队列
basicProperties: properties, //消息属性
body: body //消息体
);
//记录成功日志(结构化日志)
_logger.LogInformation("消息发布成功。\r\n交换机: {Exchange}\r\n消息ID: {MessageId}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", messageId, queueName, jsonMessage);
}
catch (JsonException jsonEx)
{
// 专门处理JSON序列化错误
_logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);
throw new InvalidOperationException("消息序列化失败", jsonEx);
}
catch (OperationInterruptedException opEx)
{
// 处理RabbitMQ操作中断异常
_logger.LogError(opEx, "RabbitMQ操作中断。\r\n消息: {jsonMessage}\r\n队列: {queueName}", jsonMessage, queueName);
throw;
}
catch (Exception ex)
{
// 处理其他所有异常
_logger.LogError(ex, "消息发布失败。\r\n交换机: {Exchange}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", queueName, jsonMessage);
throw;
}
finally
{
// 确保通道资源被释放
await channel?.CloseAsync();
}
}
/// <summary>
/// 发布消息-发布/订阅模式(Pub/Sub)
/// </summary>
/// <typeparam name="T">消息类型,必须是 class 类型</typeparam>
/// <param name="exchangeName">目标 Exchange 名称</param>
/// <param name="message">要发布的消息对象</param>
/// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>
/// <param name="headers">可选的消息头字典</param>
/// <returns>异步任务</returns>
public async Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class
{
//校验 Exchange 名称是否为空或空白字符串
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");
//校验消息内容是否为 null
if (message == null)
throw new ArgumentNullException(nameof(message), "消息内容不能为空");
//如果未提供消息ID,则使用 Guid 生成唯一的 ID
messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;
//用于存储序列化后的 JSON 消息(便于日志和异常处理)
var jsonMessage = string.Empty;
//声明 RabbitMQ 通道变量,初始为 null
IChannel? channel = null;
try
{
//创建 RabbitMQ 通道
channel = await _iRabbitMQService.CreateChannelAsync();
//声明 Fanout 类型的 Exchange(广播模式)
await channel.ExchangeDeclareAsync(
exchange: exchangeName,
type: ExchangeType.Fanout, //扇出类型,广播给所有绑定队列
durable: true, //可持久化
autoDelete: false); //不自动删除
//将消息对象序列化为 JSON 字符串
jsonMessage = JsonSerializer.Serialize(message);
//将 JSON 消息转换为 UTF-8 编码的字节数组
var body = Encoding.UTF8.GetBytes(jsonMessage);
//创建消息属性
var properties = new BasicProperties
{
Persistent = true, //消息持久化
//DeliveryMode = (DeliveryModes)DeliveryMode.Persistent,
MessageId = messageId, //设置唯一消息ID
ContentType = "application/json", //内容类型为 JSON
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //添加时间戳
};
//如果提供了 Headers,则设置到消息属性中
if (headers != null && headers.Any())
{
properties.Headers = new Dictionary<string, object>(headers);
}
//向 Exchange 发送消息(不指定 Routing Key,Fanout 类型忽略此参数)
await channel.BasicPublishAsync(
exchange: exchangeName, //目标 Exchange 名称
routingKey: string.Empty, //Fanout 类型不需要路由键
mandatory: false, //不要求消息必须被投递
basicProperties: properties, //消息属性
body: body); //消息体字节数据
//记录消息发布成功的日志信息
_logger.LogInformation("消息已发布到Exchange。\r\nExchange: {Exchange}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}", exchangeName, messageId, jsonMessage);
}
catch (JsonException jsonEx)
{
//捕获 JSON 序列化异常并记录错误日志
_logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);
//抛出自定义异常,包含原始异常信息
throw new InvalidOperationException("消息序列化失败", jsonEx);
}
catch (Exception ex)
{
//捕获其他所有异常并记录错误日志
_logger.LogError(ex, "消息发布到Exchange失败。\r\nExchange: {Exchange}\r\n消息: {jsonMessage}", exchangeName, jsonMessage);
//抛出异常
throw;
}
finally
{
// 确保通道资源被释放
await channel?.CloseAsync();
}
}
/// <summary>
/// 发布消息-路由模式(Routing)
/// </summary>
/// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>
/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
/// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>
/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
/// <returns>异步任务</returns>
public async Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class
{
//检查 Exchange 名称是否为空或空白字符
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");
//检查路由键是否为空或空白字符
if (string.IsNullOrWhiteSpace(routingKey))
throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");
//检查消息对象是否为 null
if (message == null)
throw new ArgumentNullException(nameof(message), "消息内容不能为空");
//如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识
messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;
//用于记录日志的消息 JSON 字符串
var jsonMessage = string.Empty;
//声明一个 IChannel 对象,初始为 null
IChannel? channel = null;
try
{
//创建 RabbitMQ Channel(通道)
channel = await _iRabbitMQService.CreateChannelAsync();
//声明一个 Direct 类型的 Exchange(如果不存在则创建)
await channel.ExchangeDeclareAsync(
exchange: exchangeName, //指定 Exchange 的名称
type: ExchangeType.Direct, //指定 Exchange 的类型为 Direct(直连模式)
durable: true, //设置为持久化 Exchange,RabbitMQ 重启后不会丢失
autoDelete: false //不自动删除 Exchange,即使最后一个队列被解绑也不会自动删除
);
//将消息对象序列化为 JSON 字符串
jsonMessage = JsonSerializer.Serialize(message);
//将 JSON 字符串转换为 UTF-8 编码的字节数组
var body = Encoding.UTF8.GetBytes(jsonMessage);
//创建并初始化 BasicProperties(消息属性)
var properties = new BasicProperties
{
Persistent = true, //设置消息持久化
MessageId = messageId, //设置消息 ID
ContentType = "application/json", //内容类型为 JSON
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳
};
//如果提供了自定义 Headers,则复制到消息属性中
if (headers != null && headers.Any())
{
properties.Headers = new Dictionary<string, object>(headers);
}
//发布消息到指定 Exchange 和路由键
await channel.BasicPublishAsync(
exchange: exchangeName, //指定消息要发送到的 Exchange 名称
routingKey: routingKey, //指定消息的路由键,用于 Exchange 路由决策
mandatory: false, //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃
basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等
body: body //消息的实际内容(字节数组),通常是序列化后的 JSON 数据
);
//记录日志:消息发送成功
_logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",
exchangeName, routingKey, messageId, jsonMessage);
}
catch (Exception ex)
{
//捕获异常并记录日志
_logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",
exchangeName, routingKey, jsonMessage);
//抛出异常以便上层处理
throw;
}
finally
{
// 确保通道资源被释放
await channel?.CloseAsync();
}
}
/// <summary>
/// 发布消息-主题模式(Topic)
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
/// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>
/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
/// <returns>异步任务</returns>
public async Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class
{
//检查 Exchange 名称是否为空或空白字符
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");
//检查路由键是否为空或空白字符
if (string.IsNullOrWhiteSpace(routingKey))
throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");
//检查消息对象是否为 null
if (message == null)
throw new ArgumentNullException(nameof(message), "消息内容不能为空");
//如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识
messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;
//用于记录日志的消息 JSON 字符串
var jsonMessage = string.Empty;
//声明一个 IChannel 对象,初始为 null
IChannel? channel = null;
try
{
//创建 RabbitMQ Channel(通道)
channel = await _iRabbitMQService.CreateChannelAsync();
// 删除已存在的 Exchange(如果不需要保留消息)
//await channel.ExchangeDeleteAsync("TopicTest");
//声明一个 Topic 类型的 Exchange(如果不存在则创建)
await channel.ExchangeDeclareAsync(
exchange: exchangeName, //指定要声明的 Exchange 名称,名称由外部传入的 exchangeName 变量指定
type: ExchangeType.Topic, //设置 Exchange 的类型为 Topic(主题模式),支持通配符路由键匹配
durable: true, //设置 Exchange 为持久化,即使 RabbitMQ 重启也不会丢失
autoDelete: false //设置 Exchange 不自动删除,即使最后一个绑定被移除后仍保留
);
//将消息对象序列化为 JSON 字符串
jsonMessage = JsonSerializer.Serialize(message);
//将 JSON 字符串转换为 UTF-8 编码的字节数组
var body = Encoding.UTF8.GetBytes(jsonMessage);
//创建并初始化 BasicProperties(消息属性)
var properties = new BasicProperties
{
Persistent = true, //设置消息持久化
MessageId = messageId, //设置消息 ID
ContentType = "application/json", //内容类型为 JSON
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳
};
//如果提供了自定义 Headers,则复制到消息属性中
if (headers != null && headers.Any())
{
properties.Headers = new Dictionary<string, object>(headers);
}
//发布消息到指定 Exchange 和路由键
await channel.BasicPublishAsync(
exchange: exchangeName, //指定消息要发送到的 Exchange 名称
routingKey: routingKey, //指定消息的路由键,用于 Exchange 路由决策
mandatory: false, //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃
basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等
body: body //消息的实际内容(字节数组),通常是序列化后的 JSON 数据
);
//记录日志:消息发送成功
_logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",
exchangeName, routingKey, messageId, jsonMessage);
}
catch (Exception ex)
{
//捕获异常并记录日志
_logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",
exchangeName, routingKey, jsonMessage);
//抛出异常以便上层处理
throw;
}
finally
{
// 确保通道资源被释放
await channel?.CloseAsync();
}
}
/// <summary>
/// 发布消息-请求/响应(RPC)
/// </summary>
/// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>
/// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>
/// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>
/// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>
/// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>
/// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>
/// <returns>异步任务</returns>
public async Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class
{
// 参数校验
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));
if (string.IsNullOrWhiteSpace(routingKey))
throw new ArgumentException("路由键不能为空", nameof(routingKey));
if (request == null)
throw new ArgumentNullException(nameof(request));
// 设置默认超时时间(30秒)
var actualTimeout = timeout == default ? TimeSpan.FromSeconds(5) : timeout;
if (actualTimeout <= TimeSpan.Zero)
throw new ArgumentException("超时时间必须大于0", nameof(timeout));
// 生成唯一 CorrelationId
var correlationId = Guid.NewGuid().ToString();
// 创建 TaskCompletionSource
var tcs = new TaskCompletionSource<TResponse>();
// 创建独立 Channel
var channel = await _iRabbitMQService.CreateChannelAsync();
try
{
// 在 PublishByPRCAsync 方法中,发送请求前添加:
await channel.ExchangeDeclareAsync(
exchange: exchangeName,
type: ExchangeType.Direct, // RPC通常使用Direct
durable: true, // 持久化
autoDelete: false
);
// 声明临时队列用于接收响应
var replyQueue = await channel.QueueDeclareAsync(
queue: "",
durable: false,
exclusive: true,
autoDelete: true
);
// 创建消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
try
{
if (ea.BasicProperties?.CorrelationId == correlationId)
{
var response = JsonSerializer.Deserialize<TResponse>(ea.Body.Span);
tcs.TrySetResult(response);
}
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
return Task.CompletedTask;
};
// 开始监听回复队列
var consumerTag = await channel.BasicConsumeAsync(
queue: replyQueue,
autoAck: true,
consumer: consumer);
// 构建消息属性
var props = new BasicProperties();
props.ReplyTo = replyQueue;
props.CorrelationId = correlationId;
props.ContentType = "application/json";
// 序列化请求体
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request));
// 发送请求
await channel.BasicPublishAsync(
exchange: exchangeName,
routingKey: routingKey,
mandatory: false,
basicProperties: props,
body: body);
// 设置超时取消
using var cts = new CancellationTokenSource(actualTimeout);
cts.Token.Register(() =>
{
if (!tcs.Task.IsCompleted)
{
tcs.TrySetException(new TimeoutException($"RPC请求超时({actualTimeout.TotalSeconds}秒)"));
channel.BasicCancelAsync(consumerTag);
}
});
return await tcs.Task;
}
finally
{
// 确保通道资源被释放
await channel?.CloseAsync();
}
}
/// <summary>
/// 将死信队列中的消息重新发布到原始队列(泛型版本)
/// </summary>
/// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>
/// <param name="queueName">原始队列名称</param>
/// <param name="batchSize">每次处理的消息批大小</param>
/// <param name="delay">重发延迟时间(毫秒)</param>
/// <returns>成功处理的消息数量</returns>
public async Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class
{
// 检查传入的队列名是否为空或空白字符串,若为空则抛出异常
if (string.IsNullOrWhiteSpace(queueName))
throw new ArgumentNullException(nameof(queueName));
// 构造死信队列(DLQ)名称,格式为:{原始队列名}.DLQ
var dlxQueueName = $"{queueName}.DLQ";
// 声明一个 RabbitMQ 的 Channel 对象,用于后续操作
IChannel? channel = null;
// 记录已处理的消息数量
int processedCount = 0;
try
{
// 创建一个新的 Channel 实例(通过服务注入的 _iRabbitMQService)
channel = await _iRabbitMQService.CreateChannelAsync();
// 检查死信队列是否存在(被动声明方式)
try
{
// 如果不存在会抛出异常,catch 中捕获并记录日志后返回 0
await channel.QueueDeclarePassiveAsync(dlxQueueName);
}
catch
{
// 日志记录:如果 DLQ 不存在,则直接返回 0
_logger.LogWarning("死信队列 {DLXQueueName} 不存在", dlxQueueName);
return 0;
}
// 循环获取最多 batchSize 条消息
for (int i = 0; i < batchSize; i++)
{
// 使用 BasicGet 从 DLQ 获取一条消息(不自动确认)
var result = await channel.BasicGetAsync(dlxQueueName, autoAck: false);
// 如果没有更多消息了,跳出循环
if (result == null)
break;
// 获取消息体内容,并转为 byte[] 数组
var body = result.Body.ToArray();
// 获取原始消息属性(BasicProperties),用于后续操作
var originalProperties = result.BasicProperties;
// 获取当前消息的 DeliveryTag,用于确认或拒绝消息
var deliveryTag = result.DeliveryTag;
try
{
// 如果设置了 delay > 0,则等待指定时间(模拟延迟重试)
if (delay > 0)
await Task.Delay(delay);
// 生成新的唯一 MessageId,用于追踪消息
var messageId = Guid.NewGuid().ToString();
// 创建新的 BasicProperties 实例,用于新消息的属性设置
var properties = new BasicProperties
{
Persistent = true, // 设置消息持久化(需队列也持久化才生效)
MessageId = messageId, // 设置唯一消息 ID
ContentType = "application/json", // 明确内容类型为 JSON
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) // 添加当前时间戳
};
// 将消息重新发布到原始队列,使用默认交换机(exchange 为空)
await channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: queueName,
mandatory: false,
basicProperties: properties,
body: body);
// 确认 DLQ 中该消息已被正确处理(ACK)
await channel.BasicAckAsync(deliveryTag, multiple: false);
// 成功处理计数器 +1
processedCount++;
// 日志记录:消息已成功重新发布
_logger.LogInformation("已重新发布死信消息 {MessageId} 到队列 {QueueName}",
properties.MessageId ?? "未知ID", queueName);
}
catch (Exception ex)
{
// 日志记录:消息重发失败
_logger.LogError(ex, "重新发布死信消息失败,DeliveryTag={DeliveryTag}", deliveryTag);
// 拒绝消息并重新入队(回到 DLQ),requeue: true 表示重新入队
await channel.BasicNackAsync(deliveryTag, multiple: false, requeue: true);
}
}
// 返回成功处理的消息数量
return processedCount;
}
catch (Exception ex)
{
// 日志记录:整个处理过程中发生错误
_logger.LogError(ex, "处理死信队列 {DLXQueueName} 时发生错误", dlxQueueName);
// 抛出异常,供上层调用者捕获处理
throw;
}
finally
{
// 确保通道资源被释放
await channel?.CloseAsync();
}
}
}
}
案例如下
4.2.生产者接口
namespace Frame3_DataRepository.RabbitMQRepository.Producer
{
/// <summary>
/// 生产者服务接口
/// RabbitMQ
/// </summary>
public interface IMQProducerService
{
/// <summary>
/// 发布消息-点对点模式(Point-to-Point)
/// </summary>
/// <param name="queue">队列名称</param>
/// <typeparam name="T">消息类型,必须是引用类型</typeparam>
/// <param name="message">要发布的消息对象</param>
/// <param name="messageId">消息ID(可选,未提供时自动生成GUID)</param>
/// <param name="exchange">交换机名称(空字符串表示默认交换机)</param>
/// <returns>异步任务</returns>
/// <exception cref="ArgumentNullException">当队列名或消息为空时抛出</exception>
Task PublishByPTPAsync<T>(string queue, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class;
/// <summary>
/// 发布消息-发布/订阅模式(Pub/Sub)
/// </summary>
/// <typeparam name="T">消息类型,必须是 class 类型</typeparam>
/// <param name="exchangeName">目标 Exchange 名称</param>
/// <param name="message">要发布的消息对象</param>
/// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>
/// <param name="headers">可选的消息头字典</param>
/// <returns>异步任务</returns>
Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;
/// <summary>
/// 发布消息-路由模式(Routing)
/// </summary>
/// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>
/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
/// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>
/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
/// <returns>异步任务</returns>
Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;
/// <summary>
/// 发布消息-主题模式(Topic)
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>
/// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>
/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>
/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>
/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>
/// <returns>异步任务</returns>
Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;
/// <summary>
/// 发布消息-请求/响应(RPC)
/// </summary>
/// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>
/// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>
/// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>
/// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>
/// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>
/// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>
/// <returns>异步任务</returns>
Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class;
/// <summary>
/// 将死信队列中的消息重新发布到原始队列(泛型版本)
/// </summary>
/// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>
/// <param name="queueName">原始队列名称</param>
/// <param name="batchSize">每次处理的消息批大小</param>
/// <param name="delay">重发延迟时间(毫秒)</param>
/// <returns>成功处理的消息数量</returns>
Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class;
}
}
案例如下
5.创建消费者服务
创建消费者服务实现类 MQConsumerService 和接口 IMQConsumerService
5.1.消费者服务接口
using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;
namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{
/// <summary>
/// 消费者服务实现类
/// 提供可靠的消息消费功能,支持自动重试和错误处理
/// </summary>
public sealed class MQConsumerService : BaseServiceSingleton, IMQConsumerService
{
/// <summary>
/// RabbitMQ 基础服务
/// </summary>
private readonly IRabbitMqClient _rabbitMQService;
/// <summary>
/// 日志记录器
/// </summary>
private readonly ILogger<MQConsumerService> _logger;
/// <summary>
/// 最大消费者数量限制
/// </summary>
private readonly int _maxConsumerCount;
/// <summary>
/// 当前消费者数量计数器(线程安全)
/// </summary>
private int _currentConsumerCount;
/// <summary>
/// 用于限制消费者数量的信号量
/// </summary>
private readonly SemaphoreSlim _consumerLimitSemaphore;
/// <summary>
/// 当前活跃的消费者字典,线程安全集合
/// Key: 消费者标签
/// Value: (通道对象, 消费者对象)
/// </summary>
private readonly ConcurrentDictionary<string, (IChannel Channel, AsyncEventingBasicConsumer Consumer)> _activeConsumers;
/// <summary>
/// 构造函数,依赖注入初始化
/// </summary>
/// <param name="rabbitMQService">RabbitMQ基础服务</param>
/// <param name="logger">日志记录器</param>
/// <exception cref="ArgumentNullException">当参数为null时抛出</exception>
public MQConsumerService(IRabbitMqClient rabbitMQService, ILogger<MQConsumerService> logger)
{
_rabbitMQService = rabbitMQService ?? throw new ArgumentNullException(nameof(rabbitMQService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_activeConsumers = new ConcurrentDictionary<string, (IChannel, AsyncEventingBasicConsumer)>();
_maxConsumerCount = _rabbitMQService.ConsumerCount;
_currentConsumerCount = 0;
_consumerLimitSemaphore = new SemaphoreSlim(_maxConsumerCount, _maxConsumerCount);
}
/// <summary>
/// 消费消息-点对点(Point-to-Point)
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="queueName">要消费的队列名称</param>
/// <param name="messageHandler">消息处理委托</param>
/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
/// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>
/// <param name="withDLX">是否启用死信队列</param>
/// <returns>取消令牌源,用于停止消费</returns>
public async Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class
{
//参数校验
if (string.IsNullOrWhiteSpace(queueName))
throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");
if (messageHandler == null)
throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");
//等待获取消费者槽位(带超时防止死锁)
if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
{
throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
}
//当前消费者数量+1
Interlocked.Increment(ref _currentConsumerCount);
//创建取消令牌源
var cancellationTokenSource = new CancellationTokenSource();
//创建通道
IChannel? channel = null;
try
{
//赋值预读取数量
if (prefetchCount == 0)
{
prefetchCount = _rabbitMQService.prefetchCount;
}
//创建通道
channel = await _rabbitMQService.CreateChannelAsync();
//设置QoS(服务质量),控制预取消息数量
await channel.BasicQosAsync
(
prefetchSize: 0, //不限制预取消息总大小
prefetchCount: prefetchCount, //prefetchCount > 0 ? prefetchCount : _rabbitMQService.prefetchCount, //每次预取的消息数量
global: false //应用于当前消费者而非整个通道
);
//检查队列是否存在
try
{
await channel.QueueDeclarePassiveAsync(queueName);
}
catch
{
_logger.LogError($"队列 {queueName} 不存在");
throw;
}
//创建消费者
var consumer = new AsyncEventingBasicConsumer(channel);
//注册消息接收事件
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
//反序列化消息
var message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));
//记录接收日志
_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\n队列: {QueueName}\r\n消息体:{message}", ea.BasicProperties.MessageId, queueName, message?.ToJson());
//处理消息
await messageHandler(message);
//如果不是自动确认模式,手动确认消息
if (!autoAck)
{
await channel.BasicAckAsync
(
deliveryTag: ea.DeliveryTag, //消息投递标签
multiple: false //不批量确认
);
}
}
catch (JsonException jsonEx)
{
//处理反序列化错误
_logger.LogError(jsonEx, "消息反序列化失败。\r\n队列: {QueueName}", queueName);
//拒绝消息,不重新入队
if (!autoAck)
{
await channel.BasicRejectAsync(deliveryTag: ea.DeliveryTag, requeue: false);
}
}
catch (Exception ex)
{
//处理业务逻辑错误
_logger.LogError(ex, "消息处理失败。\r\n队列: {QueueName}", queueName);
//如果不是自动确认模式(autoAck=false),需要手动处理消息确认和重试逻辑
if (!autoAck)
{
//获取当前消息的属性对象
var properties = ea.BasicProperties;
//获取消息头,如果headers为null则创建新的字典
var headers = properties.Headers ?? new Dictionary<string, object>();
//获取当前重试次数,如果不存在x-retry-count头则默认为0
int retryCount = headers.TryGetValue("x-retry-count", out var retryObj) ? Convert.ToInt32(retryObj) : 0;
//获取最大重试次数,如果不存在x-max-retry-count头则默认为1
int maxRetryCount = headers.TryGetValue("x-max-retry-count", out var maxRetryObj) ? Convert.ToInt32(maxRetryObj) : 1;
//如果启用了死信队列(withDLX=true)且当前重试次数已达最大重试次数
if (withDLX && retryCount >= maxRetryCount)
{
//记录警告日志,说明消息已达到最大重试次数
_logger.LogWarning("消息已达到最大重试次数 {MaxRetryCount},将被移入死信队列", maxRetryCount);
//拒绝消息,requeue=false表示不重新入队,消息将被路由到死信队列
await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
}
else
{
//创建新的消息属性对象,复制原始消息的所有属性
var newProperties = new BasicProperties
{
ContentType = properties.ContentType, //复制内容类型
ContentEncoding = properties.ContentEncoding, //复制内容编码
DeliveryMode = properties.DeliveryMode, //复制投递模式(1-非持久化,2-持久化)
Priority = properties.Priority, //复制消息优先级
CorrelationId = properties.CorrelationId, //复制关联ID(用于请求 - 响应模式)
ReplyTo = properties.ReplyTo, //复制回复队列名称
Expiration = properties.Expiration, //复制消息过期时间
MessageId = properties.MessageId, //复制消息ID
Timestamp = properties.Timestamp, //复制时间戳
Type = properties.Type, //复制消息类型
UserId = properties.UserId, //复制用户ID
AppId = properties.AppId, //复制应用ID
ClusterId = properties.ClusterId, //复制集群ID
//复制消息头,并更新重试次数
Headers = new Dictionary<string, object>(headers)
{
["x-retry-count"] = retryCount + 1 //重试次数+1
}
};
//重新发布消息到原始队列
await channel.BasicPublishAsync(
exchange: string.Empty, //exchange: 空字符串表示默认direct交换机
routingKey: queueName, //使用原始队列名称
mandatory: false, //false表示如果无法路由则丢弃消息
basicProperties: newProperties, //使用更新后的属性(包含新的重试次数)
body: ea.Body //原始消息体
);
//确认原始消息已被处理(从队列中移除)
//- deliveryTag: 消息投递标签
//- multiple: false表示只确认单条消息
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
}
}
}
};
//开始消费队列消息
var consumerTag = await channel.BasicConsumeAsync
(
queue: queueName, //队列名称
autoAck: autoAck, //自动确认设置
consumer: consumer //消费者实例
);
//将消费者添加到活跃集合
_activeConsumers.TryAdd(consumerTag, (channel, consumer));
//注册取消令牌回调,当取消时停止消费者
cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));
//记录消费者/总消费者数量
_logger.LogInformation("成功启动消费者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount},当前消费者标记: {consumerTag}", _currentConsumerCount, _maxConsumerCount, consumerTag);
return cancellationTokenSource;
}
catch (Exception ex)
{
//清理创建失败的通道资源
channel?.Dispose();
//当前消费者数量-1
Interlocked.Decrement(ref _currentConsumerCount);
_consumerLimitSemaphore.Release();
//记录启动失败日志
_logger.LogError(ex, "启动消费者失败。队列: {QueueName}", queueName);
throw;
}
//finally
//{
// // 如果出错或任务完成,确保 channel 被释放
// channel?.Dispose();
//}
}
/// <summary>
/// 消费消息-发布/订阅模式(Pub/Sub)
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="exchangeName">要订阅的Exchange名称</param>
/// <param name="messageHandler">消息处理委托</param>
/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
/// <param name="autoAck">是否自动确认消息</param>
/// <returns>取消令牌源,用于停止消费</returns>
public async Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class
{
//校验exchange名称是否为空
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");
//校验消息处理器是否为空
if (messageHandler == null)
throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");
//等待获取消费者槽位(防止并发消费者过多)
if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
{
//获取失败则抛出超时异常
throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
}
//原子增加当前消费者数量
Interlocked.Increment(ref _currentConsumerCount);
//创建取消令牌源用于后续停止消费
var cancellationTokenSource = new CancellationTokenSource();
IChannel? channel = null;
try
{
//如果未指定prefetchCount,则使用默认值
if (prefetchCount == 0)
{
prefetchCount = _rabbitMQService.prefetchCount;
}
//创建 RabbitMQ 通道
channel = await _rabbitMQService.CreateChannelAsync();
//设置QoS(服务质量),限制预取的消息数量
await channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: prefetchCount,
global: false);
//声明一个 fanout 类型的 Exchange(广播模式)
await channel.ExchangeDeclareAsync(
exchange: exchangeName,
type: ExchangeType.Fanout,
durable: true, //可持久化
autoDelete: false); //不自动删除
//自定义临时队列名称
var queueName = "PubSub-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");
//创建临时队列(由RabbitMQ自动生成名字)
//var queueResult = await channel.QueueDeclareAsync();
var queueResult = await channel.QueueDeclareAsync(
queue: queueName, //队列名称
durable: false, //队列是否持久化 false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列) true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
exclusive: true, //队列是否排他 true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列) false:队列可被多个消费者共享(默认值,适合常规队列)
autoDelete: true //队列是否自动删除 true:当最后一个消费者断开连接后,队列自动删除(适合临时队列) false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
);
//将队列绑定到Exchange(fanout类型忽略routingKey)
await channel.QueueBindAsync(
queue: queueName,
exchange: exchangeName,
routingKey: "");
//创建异步消费者对象
var consumer = new AsyncEventingBasicConsumer(channel);
//注册消息接收事件处理逻辑
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
//反序列化消息体为泛型对象T
var message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));
//记录收到消息的日志
_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, message?.ToJson());
//调用用户定义的消息处理方法
await messageHandler(message);
//如果不是自动确认,则手动发送 Ack 确认消息已处理
if (!autoAck)
{
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
}
}
catch (JsonException jsonEx)
{
//消息反序列化失败,记录错误日志
_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);
//非自动确认模式下拒绝消息,不重入队列
if (!autoAck)
{
await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
}
}
catch (Exception ex)
{
//消息处理过程中发生其他异常,记录错误日志
_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);
//非自动确认模式下拒绝消息,并尝试重新入队
if (!autoAck)
{
await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);
}
}
};
//启动消费,开始监听消息
var consumerTag = await channel.BasicConsumeAsync(
queue: queueName,
autoAck: autoAck,
consumer: consumer);
//将消费者和通道保存起来以便后续取消操作
_activeConsumers.TryAdd(consumerTag, (channel, consumer));
//注册取消回调,当取消令牌被触发时调用StopConsuming
cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));
//记录启动成功日志
_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);
//返回取消令牌源,供外部控制消费终止
return cancellationTokenSource;
}
catch (Exception ex)
{
//出现异常时释放资源
channel?.Dispose();
//原子减少消费者计数
Interlocked.Decrement(ref _currentConsumerCount);
//释放信号量槽位
_consumerLimitSemaphore.Release();
//记录启动失败日志
_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);
//抛出异常
throw;
}
}
/// <summary>
/// 消费消息-路由模式(Routing)
/// </summary>
/// <typeparam name="T">消息反序列化的目标类型</typeparam>
/// <param name="exchangeName">要绑定的 Exchange 名称</param>
/// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>
/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
/// <param name="autoAck">是否自动确认消息</param>
/// <returns>CancellationTokenSource,用于取消消费操作</returns>
public async Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class
{
//校验exchange名称是否为空
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");
//校验路由键是否为空
if (routingKey == null)
throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");
//校验消息处理器是否为空
if (messageHandler == null)
throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");
//等待获取消费者槽位(防止并发消费者过多)
if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
{
//获取失败则抛出超时异常
throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
}
//原子增加当前消费者数量
Interlocked.Increment(ref _currentConsumerCount);
// 创建 CancellationTokenSource,用于后续控制取消消费
var cancellationTokenSource = new CancellationTokenSource();
IChannel? channel = null;
try
{
// 创建一个新的 RabbitMQ Channel
channel = await _rabbitMQService.CreateChannelAsync();
// 声明一个 Direct 类型的 Exchange(如果不存在则创建)
await channel.ExchangeDeclareAsync(
exchange: exchangeName,
type: ExchangeType.Direct,
durable: true, // Exchange 持久化
autoDelete: false); // 不自动删除
//自定义临时队列名称
var queueName = "Routing-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");
//创建临时队列(由RabbitMQ自动生成名字)
//var queueResult = await channel.QueueDeclareAsync();
var queueResult = await channel.QueueDeclareAsync(
queue: queueName, //队列名称
durable: false, //队列是否持久化 false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列) true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
exclusive: true, //队列是否排他 true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列) false:队列可被多个消费者共享(默认值,适合常规队列)
autoDelete: true //队列是否自动删除 true:当最后一个消费者断开连接后,队列自动删除(适合临时队列) false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
);
// 将队列绑定到指定的 Exchange,并使用给定的 routingKey
await channel.QueueBindAsync(queueName, exchangeName, routingKey);
// 创建异步消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 注册消息接收事件处理逻辑
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
// 反序列化消息体为泛型 T 对象
var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));
//记录收到消息的日志
_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());
// 调用用户定义的消息处理函数
await messageHandler(msg);
// 如果不是自动确认,则手动发送 Ack 确认消息已处理成功
if (!autoAck)
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
}
catch (JsonException jsonEx)
{
//消息反序列化失败,记录错误日志
_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);
//非自动确认模式下拒绝消息,不重入队列
if (!autoAck)
{
await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
}
}
catch (Exception ex)
{
//消息处理过程中发生其他异常,记录错误日志
_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);
//非自动确认模式下拒绝消息,并尝试重新入队
if (!autoAck)
{
await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);
}
}
};
// 开始消费队列中的消息
var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);
// 记录当前消费者信息以便后续取消或释放资源
_activeConsumers.TryAdd(consumerTag, (channel, consumer));
// 注册取消令牌,在取消时调用 StopConsuming 方法停止消费
cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));
//记录启动成功日志
_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);
// 返回 CancellationTokenSource,供外部控制取消
return cancellationTokenSource;
}
catch (Exception ex)
{
//出现异常时释放资源
channel?.Dispose();
//原子减少消费者计数
Interlocked.Decrement(ref _currentConsumerCount);
//释放信号量槽位
_consumerLimitSemaphore.Release();
//记录启动失败日志
_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);
//抛出异常
throw;
}
}
/// <summary>
/// 消费消息-主题模式(Topic)
/// </summary>
/// <typeparam name="T">消息反序列化的目标类型</typeparam>
/// <param name="exchangeName">要绑定的 Exchange 名称</param>
/// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>
/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
/// <param name="autoAck">是否自动确认消息</param>
/// <returns>CancellationTokenSource,用于取消消费操作</returns>
public async Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class
{
//校验exchange名称是否为空
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");
//校验匹配规则是否为空
if (topicPattern == null)
throw new ArgumentNullException(nameof(topicPattern), "topicPattern不能为空");
//校验消息处理器是否为空
if (messageHandler == null)
throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");
//等待获取消费者槽位(防止并发消费者过多)
if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
{
//获取失败则抛出超时异常
throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
}
//原子增加当前消费者数量
Interlocked.Increment(ref _currentConsumerCount);
// 创建 CancellationTokenSource,用于后续控制取消消费
var cancellationTokenSource = new CancellationTokenSource();
IChannel? channel = null;
try
{
// 创建一个新的 RabbitMQ Channel
channel = await _rabbitMQService.CreateChannelAsync();
// 声明一个 Topic 类型的 Exchange(如果不存在则创建)
await channel.ExchangeDeclareAsync(
exchange: exchangeName,
type: ExchangeType.Topic,
durable: true, // Exchange 持久化
autoDelete: false); // 不自动删除
//自定义临时队列名称
var queueName = "Topic-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");
//创建临时队列(由RabbitMQ自动生成名字)
//var queueResult = await channel.QueueDeclareAsync();
var queueResult = await channel.QueueDeclareAsync(
queue: queueName, //队列名称
durable: false, //队列是否持久化 false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列) true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
exclusive: true, //队列是否排他 true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列) false:队列可被多个消费者共享(默认值,适合常规队列)
autoDelete: true //队列是否自动删除 true:当最后一个消费者断开连接后,队列自动删除(适合临时队列) false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
);
// 将队列绑定到指定的 Exchange,并使用 Topic 模式匹配规则
await channel.QueueBindAsync(queueName, exchangeName, topicPattern);
// 创建异步消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 注册消息接收事件处理逻辑
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
// 反序列化消息体为泛型 T 对象
var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));
//记录收到消息的日志
_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());
// 调用用户定义的消息处理函数
await messageHandler(msg);
// 如果不是自动确认,则手动发送 Ack 确认消息已处理成功
if (!autoAck)
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
}
catch (JsonException jsonEx)
{
//消息反序列化失败,记录错误日志
_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);
//非自动确认模式下拒绝消息,不重入队列
if (!autoAck)
{
await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
}
}
catch (Exception ex)
{
//消息处理过程中发生其他异常,记录错误日志
_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);
//非自动确认模式下拒绝消息,并尝试重新入队
if (!autoAck)
{
await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);
}
}
};
// 开始消费队列中的消息
var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);
// 记录当前消费者信息以便后续取消或释放资源
_activeConsumers.TryAdd(consumerTag, (channel, consumer));
// 注册取消令牌,在取消时调用 StopConsuming 方法停止消费
cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));
//记录启动成功日志
_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);
// 返回 CancellationTokenSource,供外部控制取消
return cancellationTokenSource;
}
catch (Exception ex)
{
//出现异常时释放资源
channel?.Dispose();
//原子减少消费者计数
Interlocked.Decrement(ref _currentConsumerCount);
//释放信号量槽位
_consumerLimitSemaphore.Release();
//记录启动失败日志
_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);
//抛出异常
throw;
}
}
/// <summary>
/// 消费消息-请求/响应(RPC)
/// </summary>
/// <typeparam name="TRequest">请求消息的类型</typeparam>
/// <typeparam name="TResponse">响应消息的类型</typeparam>
/// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>
/// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>
/// <param name="handler">处理请求并返回响应的异步回调函数</param>
/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
/// <returns>CancellationTokenSource,用于取消消费操作</returns>
public async Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class
{
// 参数校验
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));
if (string.IsNullOrWhiteSpace(routingKey))
throw new ArgumentException("路由键不能为空", nameof(routingKey));
if (handler == null)
throw new ArgumentNullException(nameof(handler), "消息处理器不能为空");
// 等待获取消费者槽位
if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30)))
{
throw new InvalidOperationException(
$"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");
}
try
{
Interlocked.Increment(ref _currentConsumerCount);
// 创建新的 RabbitMQ Channel
var channel = await _rabbitMQService.CreateChannelAsync();
try
{
// 设置预取数量(控制并发处理能力)
if (prefetchCount > 0)
{
await channel.BasicQosAsync(0, prefetchCount, false);
}
// 声明队列(与生产者保持一致)
var queueDeclareOk = await channel.QueueDeclareAsync(
queue: routingKey, //队列名称
durable: true, //队列是否持久化 false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列) true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)
exclusive: false, //队列是否排他 true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列) false:队列可被多个消费者共享(默认值,适合常规队列)
autoDelete: false, //队列是否自动删除 true:当最后一个消费者断开连接后,队列自动删除(适合临时队列) false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)
arguments: null
);
// 绑定队列到Exchange
await channel.QueueBindAsync(
queue: routingKey,
exchange: exchangeName,
routingKey: routingKey);
// 创建异步消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 消息处理逻辑
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
// 反序列化请求
var request = JsonSerializer.Deserialize<TRequest>(ea.Body.Span);
// 处理请求
var response = await handler(request);
// 准备响应属性
var replyProps = new BasicProperties();
replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
replyProps.ContentType = "application/json";
// 发送响应
await channel.BasicPublishAsync(
exchange: "", // 默认Exchange
routingKey: ea.BasicProperties.ReplyTo,
mandatory: false,
basicProperties: replyProps,
body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(response)));
// 确认消息
await channel.BasicAckAsync(ea.DeliveryTag, false);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理RPC请求失败: {CorrelationId}",
ea.BasicProperties?.CorrelationId);
// 拒绝消息且不重新入队
await channel.BasicNackAsync(ea.DeliveryTag, false, false);
}
};
// 开始消费
var consumerTag = await channel.BasicConsumeAsync(
queue: routingKey,
autoAck: false, // 手动确认
consumer: consumer);
// 创建取消令牌
var cts = new CancellationTokenSource();
// 注册取消回调
cts.Token.Register(async () =>
{
try
{
await channel.BasicCancelAsync(consumerTag);
await channel.CloseAsync();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "取消消费者时发生错误");
}
finally
{
channel.Dispose();
Interlocked.Decrement(ref _currentConsumerCount);
_consumerLimitSemaphore.Release();
}
});
return cts;
}
catch
{
// 发生异常时确保通道被关闭
channel?.Dispose();
throw;
}
}
catch
{
// 发生异常时释放信号量
Interlocked.Decrement(ref _currentConsumerCount);
_consumerLimitSemaphore.Release();
throw;
}
}
/// <summary>
/// 停止指定消费者的消息消费
/// </summary>
/// <param name="consumerTag">消费者标签</param>
/// <returns>异步任务</returns>
public async Task StopConsuming(string consumerTag)
{
//从活跃集合中移除消费者
if (_activeConsumers.TryRemove(consumerTag, out var consumerInfo))
{
try
{
//取消消费者订阅
await consumerInfo.Channel.BasicCancelAsync(consumerTag);
//异步释放通道资源
await consumerInfo.Channel.DisposeAsync();
//记录停止成功日志
_logger.LogInformation("已停止消费者。消费者标签: {ConsumerTag}", consumerTag);
}
catch (OperationInterruptedException opEx)
{
//记录操作中断警告日志
_logger.LogWarning(opEx, "消费者取消操作被中断。消费者标签: {ConsumerTag}", consumerTag);
}
catch (Exception ex)
{
//记录停止失败错误日志
_logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);
throw;
}
finally
{
Interlocked.Decrement(ref _currentConsumerCount);
_consumerLimitSemaphore.Release();
_logger.LogInformation("当前消费者数: {CurrentCount}/{MaxCount}", _currentConsumerCount, _maxConsumerCount);
}
}
else
{
// 记录未找到消费者警告日志
_logger.LogWarning("未找到对应的消费者。消费者标签: {ConsumerTag}", consumerTag);
}
}
/// <summary>
/// 停止所有消费者的消息消费
/// </summary>
/// <returns>异步任务</returns>
public async Task StopAllConsuming()
{
// 遍历所有消费者标签并停止
foreach (var consumerTag in _activeConsumers.Keys.ToList())
{
try
{
await StopConsuming(consumerTag).ConfigureAwait(false);
}
catch (Exception ex)
{
// 记录单个消费者停止失败日志,继续停止其他消费者
_logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);
}
}
}
/// <summary>
/// 获取当前消费者状态
/// </summary>
public ConsumerStatus GetConsumerStatus()
{
// 创建并返回一个新的 ConsumerStatus 对象,用于封装当前消费者的运行状态
return new ConsumerStatus
{
// 设置当前消费者数量
CurrentCount = _currentConsumerCount,
// 设置最大消费者数量
MaxCount = _maxConsumerCount,
// 获取当前所有活跃消费者的标识符列表
ActiveConsumers = _activeConsumers.Keys.ToList()
};
}
}
}
案例如下
5.2.消费者接口
using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{
/// <summary>
/// 消费者服务接口
/// RabbitMQ
/// </summary>
public interface IMQConsumerService
{
/// <summary>
/// 消费消息-点对点(Point-to-Point)
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="queueName">要消费的队列名称</param>
/// <param name="messageHandler">消息处理委托</param>
/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
/// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>
/// <returns>取消令牌源,用于停止消费</returns>
/// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>
Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class;
/// <summary>
/// 消费消息-发布/订阅模式(Pub/Sub)
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="exchangeName">要订阅的Exchange名称</param>
/// <param name="messageHandler">消息处理委托</param>
/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>
/// <param name="autoAck">是否自动确认消息</param>
/// <returns>取消令牌源,用于停止消费</returns>
Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;
/// <summary>
/// 消费消息-路由模式(Routing)
/// </summary>
/// <typeparam name="T">消息反序列化的目标类型</typeparam>
/// <param name="exchangeName">要绑定的 Exchange 名称</param>
/// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>
/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
/// <param name="autoAck">是否自动确认消息</param>
/// <returns>CancellationTokenSource,用于取消消费操作</returns>
Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;
/// <summary>
/// 消费消息-主题模式(Topic)
/// </summary>
/// <typeparam name="T">消息反序列化的目标类型</typeparam>
/// <param name="exchangeName">要绑定的 Exchange 名称</param>
/// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>
/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>
/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
/// <param name="autoAck">是否自动确认消息</param>
/// <returns>CancellationTokenSource,用于取消消费操作</returns>
Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;
/// <summary>
/// 消费消息-请求/响应(RPC)
/// </summary>
/// <typeparam name="TRequest">请求消息的类型</typeparam>
/// <typeparam name="TResponse">响应消息的类型</typeparam>
/// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>
/// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>
/// <param name="handler">处理请求并返回响应的异步回调函数</param>
/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>
/// <returns>CancellationTokenSource,用于取消消费操作</returns>
Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class;
/// <summary>
/// 停止指定消费者的消息消费
/// </summary>
/// <param name="consumerTag">消费者标签</param>
/// <returns>异步任务</returns>
Task StopConsuming(string consumerTag);
/// <summary>
/// 停止所有消费者的消息消费
/// </summary>
/// <returns>异步任务</returns>
Task StopAllConsuming();
/// <summary>
/// 获取当前消费者状态
/// </summary>
/// <returns></returns>
ConsumerStatus GetConsumerStatus();
}
}
案例如下
6.注册
在 Program 或 Startup 中注册队列。
// 注册 RabbitMQ 连接服务为单例(Singleton)
// IRabbitMqClient 是一个接口,代表 RabbitMQ 客户端连接的抽象
// RabbitMqClient 是其具体实现类
// 单例生命周期意味着在整个应用程序生命周期中只创建一次该实例,所有请求共享同一个实例
builder.Services.AddSingleton<IRabbitMqClient, RabbitMqClient>();
// 注册 RabbitMQ 消息生产者服务为作用域(Scoped)
// IMQProducerService 是用于发送消息的接口
// MQProducerService 是其实现类
// Scoped 生命周期表示在同一个请求上下文中使用同一个实例(适用于 Web 应用场景)
builder.Services.AddScoped<IMQProducerService, MQProducerService>();
// 注册 RabbitMQ 消息消费者服务为单例(Singleton)
// IMQConsumerService 是用于消费消息(接收并处理消息)的接口
// MQConsumerService 是其实现类
// 使用 Singleton 是因为消费者通常需要长时间运行、持续监听队列,适合整个应用周期内保持一个实例
builder.Services.AddSingleton<IMQConsumerService, MQConsumerService>();
案例如下
7.简单使用案例
下面是 实现、接口和控制器的使用案例
7.1.实现
using Frame1_Service.IService.Product;
using Frame2_DataModel.Entity.Products;
using Frame3_DataRepository.RabbitMQRepository.Consumer;
using Frame3_DataRepository.RabbitMQRepository.Producer;
using Frame6_LibraryUtility;
using RabbitMQ.Client.Exceptions;
namespace Frame1_Service.Service.Product
{
public class RabbitMQTestSvr : BaseService, IRabbitMQTestSvr
{
/// <summary>
/// 生产者
/// </summary>
private readonly IMQProducerService _iRabbitMQProducer;
/// <summary>
/// 消费者
/// </summary>
private readonly IMQConsumerService _iRabbitMQConsumer;
/// <summary>
/// 构造
/// </summary>
/// <param name="iRabbitMQProducer"></param>
/// <param name="iRabbitMQConsumer"></param>
public RabbitMQTestSvr(IMQProducerService iRabbitMQProducer, IMQConsumerService iRabbitMQConsumer)
{
_iRabbitMQConsumer = iRabbitMQConsumer;
_iRabbitMQProducer = iRabbitMQProducer;
}
/// <summary>
/// 模拟消费逻辑
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
private async Task ProcessOrderAsync(ProductsEntity model)
{
Console.WriteLine("消费成功:" + model.ToJson());
}
/// <summary>
/// 生产者-点对点(Point-to-Point)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model)
{
var result = new ResultModel<bool>() { Data = false };
// 创建 Random 实例
Random random = new Random();
model.Id = Guid.NewGuid().ToString();
model.ProductName = "测试" + (random.Next(1, 999)).ToString();
model.Price = random.Next(1000, 9999);
model.Stock = random.Next(1, 99);
await _iRabbitMQProducer.PublishByPTPAsync<ProductsEntity>("ProducerTest", model);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 消费者-点对点(Point-to-Point)
/// </summary>
/// <returns></returns>
public async Task<ResultModel<bool>> ConsumerTest()
{
var result = new ResultModel<bool>() { Data = false };
await _iRabbitMQConsumer.StartConsumingByPTPAsync<ProductsEntity>("ProducerTest", ProcessOrderAsync);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 生产者-发布订阅(Pub/Sub)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model)
{
var result = new ResultModel<bool>() { Data = false };
// 创建 Random 实例
Random random = new Random();
model.Id = Guid.NewGuid().ToString();
model.ProductName = "测试" + (random.Next(1, 999)).ToString();
model.Price = random.Next(1000, 9999);
model.Stock = random.Next(1, 99);
await _iRabbitMQProducer.PublishByPubSubAsync<ProductsEntity>("PubSubTest", model);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 消费者-发布订阅(Pub/Sub)
/// </summary>
/// <returns></returns>
public async Task<ResultModel<bool>> ConsumerPubSub()
{
var result = new ResultModel<bool>() { Data = false };
await _iRabbitMQConsumer.StartConsumingByPubSubAsync<ProductsEntity>("PubSubTest", ProcessOrderAsync);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 生产者-路由模式(Routing)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model)
{
var result = new ResultModel<bool>() { Data = false };
// 创建 Random 实例
Random random = new Random();
model.Id = Guid.NewGuid().ToString();
model.ProductName = "测试" + (random.Next(1, 999)).ToString();
model.Price = random.Next(1000, 9999);
model.Stock = random.Next(1, 99);
await _iRabbitMQProducer.PublishByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", model);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 消费者-路由模式(Routing)
/// </summary>
/// <returns></returns>
public async Task<ResultModel<bool>> ConsumerRouting()
{
var result = new ResultModel<bool>() { Data = false };
await _iRabbitMQConsumer.StartConsumingByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", ProcessOrderAsync);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 生产者-主题模式(Topic)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model)
{
var result = new ResultModel<bool>() { Data = false };
// 创建 Random 实例
Random random = new Random();
model.Id = Guid.NewGuid().ToString();
model.ProductName = "测试" + (random.Next(1, 999)).ToString();
model.Price = random.Next(1000, 9999);
model.Stock = random.Next(1, 99);
await _iRabbitMQProducer.PublishByTopicAsync<ProductsEntity>("TopicTest", "Topic.test", model);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 消费者-主题模式(Topic)
/// </summary>
/// <returns></returns>
public async Task<ResultModel<bool>> ConsumerTopic()
{
var result = new ResultModel<bool>() { Data = false };
await _iRabbitMQConsumer.StartConsumingByTopicAsync<ProductsEntity>("TopicTest", "Topic.*", ProcessOrderAsync);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 生产者-请求响应模式(RPC)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model)
{
var result = new ResultModel<CalculateResponse>();
var request = new CalculateRequest { X = 5, Y = 7 };
// 创建 Random 实例
Random random = new Random();
model.Id = Guid.NewGuid().ToString();
model.ProductName = "测试" + (random.Next(1, 999)).ToString();
model.Price = random.Next(1000, 9999);
model.Stock = random.Next(1, 99);
var response = await _iRabbitMQProducer.PublishByPRCAsync<CalculateRequest, CalculateResponse>("RPCTest", "RPC", request);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = response;
return result;
}
/// <summary>
/// 消费者-请求响应模式(RPC)
/// </summary>
/// <returns></returns>
public async Task<ResultModel<bool>> ConsumerRPC()
{
var result = new ResultModel<bool> { Data = false };
try
{
//示例:模拟一个计算器服务(可替换为真实的 ICalculatorService)
Func<CalculateRequest, Task<CalculateResponse>> handler = async req =>
{
await Task.Delay(10); //模拟异步处理
return new CalculateResponse { Result = req.X + req.Y };
};
//启动消费者
var cts = await _iRabbitMQConsumer.StartConsumingByPRCAsync<CalculateRequest, CalculateResponse>(exchangeName: "RPCTest", routingKey: "RPC", handler);
result.Data = true;
result.Code = ResultCodeEnum.Success;
result.Msg = "RPC消费者已启动";
}
catch (OperationInterruptedException ex)
{
result.Msg = "消息队列服务不可用";
}
catch (Exception ex)
{
result.Msg = "消费者初始化失败";
}
return result;
}
/// <summary>
/// 死信队列重抛
/// </summary>
/// <returns></returns>
public async Task<ResultModel<bool>> Republish(string queueName)
{
var result = new ResultModel<bool>() { Data = false };
await _iRabbitMQProducer.RepublishDeadLetterMessagesAsync<ProductsEntity>(queueName);
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
/// <summary>
/// 停止消费者
/// </summary>
/// <returns></returns>
public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag)
{
var result = new ResultModel<bool>() { Data = false };
if (consumerTag.Equals("0"))
{
await _iRabbitMQConsumer.StopAllConsuming();
}
else
{
await _iRabbitMQConsumer.StopConsuming(consumerTag);
}
result.Code = ResultCodeEnum.Success;
result.Msg = "操作成功";
result.Data = true;
return result;
}
}
}
案例如下
7.2.接口
using Frame2_DataModel.Entity.Products;
using Frame6_LibraryUtility;
namespace Frame1_Service.IService.Product
{
public interface IRabbitMQTestSvr
{
/// <summary>
/// 生产者-点对点(Point-to-Point)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
Task<ResultModel<bool>> ProducerTest(ProductsEntity model);
/// <summary>
/// 消费者-点对点(Point-to-Point)
/// </summary>
/// <returns></returns>
Task<ResultModel<bool>> ConsumerTest();
/// <summary>
/// 生产者-发布订阅(Pub/Sub)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model);
/// <summary>
/// 消费者-发布订阅(Pub/Sub)
/// </summary>
/// <returns></returns>
Task<ResultModel<bool>> ConsumerPubSub();
/// <summary>
/// 生产者-路由模式(Routing)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
Task<ResultModel<bool>> ProducerRouting(ProductsEntity model);
/// <summary>
/// 消费者-路由模式(Routing)
/// </summary>
/// <returns></returns>
Task<ResultModel<bool>> ConsumerRouting();
/// <summary>
/// 生产者-主题模式(Topic)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
Task<ResultModel<bool>> ProducerTopic(ProductsEntity model);
/// <summary>
/// 消费者-主题模式(Topic)
/// </summary>
/// <returns></returns>
Task<ResultModel<bool>> ConsumerTopic();
/// <summary>
/// 生产者-请求响应模式(RPC)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model);
/// <summary>
/// 消费者-请求响应模式(RPC)
/// </summary>
/// <returns></returns>
Task<ResultModel<bool>> ConsumerRPC();
/// <summary>
/// 死信队列重抛
/// </summary>
/// <returns></returns>
Task<ResultModel<bool>> Republish(string queueName);
/// <summary>
/// 停止消费者
/// </summary>
/// <returns></returns>
Task<ResultModel<bool>> StopAllConsumer(string consumerTag);
}
}
案例如下
7.3.控制器
using Frame1_Service.IService.Product;
using Frame1_Service.Service.Product;
using Frame2_DataModel.Entity.Products;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.AspNetCore.Mvc;
namespace DemoAPI.Controllers
{
/// <summary>
/// 消息队列控制器 -RabbitMQ
/// </summary>
//[Authorize]// 保护整个控制器
[Route("api/[controller]/[action]")]//标记路由地址规格
[ApiController] // 标记该类为 API 控制器,启用一些默认的行为,如模型绑定、输入验证等
[ApiExplorerSettings(GroupName = nameof(ApiVersionInfo.V1))]//设置控制器的API版本
public class RabbitMQTestController : BaseController
{
private IRabbitMQTestSvr _iRabbitMQTestSvr;
/// <summary>
/// 构造
/// </summary>
/// <param name="iRabbitMQTestSvr"></param>
public RabbitMQTestController(IRabbitMQTestSvr iRabbitMQTestSvr)
{
_iRabbitMQTestSvr = iRabbitMQTestSvr;
}
/// <summary>
/// 生产者-点对点(Point-to-Point)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
[HttpPost]
public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTest(model);
/// <summary>
/// 消费者-点对点(Point-to-Point)
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ResultModel<bool>> ConsumerTest() => await _iRabbitMQTestSvr.ConsumerTest();
/// <summary>
/// 生产者-发布订阅(Pub/Sub)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
[HttpPost]
public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerPubSub(model);
/// <summary>
/// 消费者-发布订阅(Pub/Sub)
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ResultModel<bool>> ConsumerPubSub() => await _iRabbitMQTestSvr.ConsumerPubSub();
/// <summary>
/// 生产者-路由模式(Routing)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
[HttpPost]
public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRouting(model);
/// <summary>
/// 消费者-路由模式(Routing)
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ResultModel<bool>> ConsumerRouting() => await _iRabbitMQTestSvr.ConsumerRouting();
/// <summary>
/// 生产者-主题模式(Topic)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
[HttpPost]
public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTopic(model);
/// <summary>
/// 消费者-主题模式(Topic)
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ResultModel<bool>> ConsumerTopic() => await _iRabbitMQTestSvr.ConsumerTopic();
/// <summary>
/// 生产者-请求响应模式(RPC)
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
[HttpPost]
public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRPC(model);
/// <summary>
/// 消费者-请求响应模式(RPC)
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ResultModel<bool>> ConsumerRPC() => await _iRabbitMQTestSvr.ConsumerRPC();
/// <summary>
/// 死信队列重抛
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ResultModel<bool>> Republish(string queueName) => await _iRabbitMQTestSvr.Republish(queueName);
/// <summary>
/// 停止消费者
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag) => await _iRabbitMQTestSvr.StopAllConsumer(consumerTag);
}
}
案例如下