【消息队列】RabbitMQ “消息队列模式” 以及NET8集成

发布于:2025-08-12 ⋅ 阅读:(21) ⋅ 点赞:(0)

在 .NET 8 中集成 RabbitMQ 消息队列,可以使用官方推荐的 RabbitMQ.Client 库或封装好的 MassTransit/EasyNetQ 等高级库。以下是 RabbitMQ 的基本集成代码 和 常见消息模式 的实现。

RabbitMQ 本身并没有直接支持延时消息的功能,但是可以通过一些机制来实现延时消息的效果。以下是两种常用的方法:

  • TTL(Time To Live)+ 死信交换机(Dead Letter Exchange, DLX)

可以为队列或消息设置 TTL,当消息的 TTL 到期后,如果没有被消费,就会变成死信。
设置了死信交换机(DLX)的队列中的死信会被转发到指定的 DLX 上,然后可以由绑定到这个 DLX 的队列进行处理,这样就实现了延时消息的功能。

  • 使用插件 rabbitmq-delayed-message-exchange

RabbitMQ 提供了一个官方插件 rabbitmq-delayed-message-exchange,它允许你创建一个特殊的交换机类型,该交换机能够接受带有延迟时间的消息,并在指定的时间后将消息投递给相应的队列。
这个插件需要安装并启用,并且要求 Erlang/OPT 版本在 18.0 及以上。

一、. RabbitMQ 基础集成(.NET 8)

安装 NuGet 包

dotnet add package RabbitMQ.Client

配置 RabbitMQ 连接

csharp
using RabbitMQ.Client;
 
public class RabbitMQService
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
 
    public RabbitMQService(string hostname = "localhost", string username = "guest", string password = "guest")
    {
        var factory = new ConnectionFactory
        {
            HostName = hostname,
            UserName = username,
            Password = password,
            DispatchConsumersAsync = true // 启用异步消费
        };
 
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }
 
    public void Dispose()
    {
        _channel?.Close();
        _connection?.Close();
    }
}
二. RabbitMQ 常见消息模式
(1)、 简单队列(Simple Queue)

场景:生产者发送消息到队列,消费者从队列接收消息(一对一)。

  • 生产者(Producer)
csharp
public void SendMessage(string queueName, string message)
{
    _channel.QueueDeclare(
        queue: queueName,
        durable: true, // 持久化队列
        exclusive: false,
        autoDelete: false
    );
 
    var body = Encoding.UTF8.GetBytes(message);
    _channel.BasicPublish(
        exchange: "",
        routingKey: queueName,
        basicProperties: null,
        body: body
    );
}
  • 消费者(Consumer)
csharp
public void ReceiveMessages(string queueName)
{
    _channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
 
    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.Received += async (model, ea) =>
    {
        var body = Encoding.UTF8.GetString(ea.Body.Span);
        Console.WriteLine($"Received: {body}");
        await Task.Yield(); // 模拟异步处理
        _channel.BasicAck(ea.DeliveryTag, false); // 手动ACK
    };
 
    _channel.BasicConsume(
        queue: queueName,
        autoAck: false, // 关闭自动ACK
        consumer: consumer
    );
}
(2)、 工作队列(Work Queue)

场景:多个消费者竞争消费同一个队列的消息(任务分发)。

  • 生产者
    同 简单队列 的 SendMessage 方法。

  • 消费者

csharp
// 启动多个消费者实例,RabbitMQ 会轮询分发消息
for (int i = 0; i < 3; i++) // 3个消费者
{
    Task.Run(() =>
    {
        using var service = new RabbitMQService();
        service.ReceiveMessages("task_queue");
        Console.WriteLine($"Consumer {i} started...");
        Thread.Sleep(Timeout.Infinite);
    });
}
(3)、 发布/订阅(Pub/Sub)【Fannout】

场景:一个生产者发送消息到交换机(Exchange),多个队列绑定到交换机,每个队列有自己的消费者(广播模式)。

  • 生产者(发送到交换机)
csharp
public void PublishToExchange(string exchangeName, string message)
{
    _channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // Fanout 广播
    var body = Encoding.UTF8.GetBytes(message);
    _channel.BasicPublish(exchangeName, "", null, body);
}
  • 消费者(绑定队列到交换机)
csharp
public void SubscribeToExchange(string exchangeName, string queueName)
{
    _channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
    _channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
    _channel.QueueBind(queueName, exchangeName, "");
 
    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.Received += (model, ea) =>
    {
        var body = Encoding.UTF8.GetString(ea.Body.Span);
        Console.WriteLine($"Received: {body}");
        return Task.CompletedTask;
    };
 
    _channel.BasicConsume(queueName, autoAck: true, consumer);
}
(4) 、路由模式(Routing)

场景:根据 RoutingKey 定向投递消息到特定队列。

  • 生产者
csharp
public void SendWithRouting(string exchangeName, string routingKey, string message)
{
    _channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
    var body = Encoding.UTF8.GetBytes(message);
    _channel.BasicPublish(exchangeName, routingKey, null, body);
}
  • 消费者
csharp
public void ReceiveWithRouting(string exchangeName, string queueName, string routingKey)
{
    _channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
    _channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
    _channel.QueueBind(queueName, exchangeName, routingKey);
 
    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.Received += (model, ea) =>
    {
        Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");
        return Task.CompletedTask;
    };
 
    _channel.BasicConsume(queueName, autoAck: true, consumer);
}
(5) 、主题模式(Topic)

场景:使用通配符(*、#)匹配 RoutingKey,实现灵活路由。
*(星号)匹配单个单词
#(井号)匹配多个单词

  • 生产者
csharp
public void SendWithTopic(string exchangeName, string topic, string message)
{
    _channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
    var body = Encoding.UTF8.GetBytes(message);
    _channel.BasicPublish(exchangeName, topic, null, body);
}
  • 消费者
csharp
public void ReceiveWithTopic(string exchangeName, string queueName, string topicPattern)
{
    _channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
    _channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
    _channel.QueueBind(queueName, exchangeName, topicPattern);
 
    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.Received += (model, ea) =>
    {
        Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");
        return Task.CompletedTask;
    };
 
    _channel.BasicConsume(queueName, autoAck: true, consumer);
}
(6)、 RPC(远程过程调用)

场景:客户端发送请求并等待服务端响应(同步通信)。

  • 客户端(RPC Client)
public string Call(string message, string queueName = "rpc_queue")
{
    var correlationId = Guid.NewGuid().ToString();
    var replyQueueName = _channel.QueueDeclare().QueueName;
    var properties = _channel.CreateBasicProperties();
    properties.ReplyTo = replyQueueName;
    properties.CorrelationId = correlationId;
 
    var body = Encoding.UTF8.GetBytes(message);
    _channel.BasicPublish("", queueName, properties, body);
 
    var tcs = new TaskCompletionSource<string>();
    var consumer = new EventingBasicConsumer(_channel);
    consumer.Received += (model, ea) =>
    {
        if (ea.BasicProperties.CorrelationId == correlationId)
        {
            var response = Encoding.UTF8.GetString(ea.Body.Span);
            tcs.SetResult(response);
        }
    };
 
    _channel.BasicConsume(replyQueueName, autoAck: true, consumer);
    return tcs.Task.Result; // 同步等待响应(生产环境建议用异步)
}
  • 服务端(RPC Server)
csharp
public void StartRpcServer(string queueName = "rpc_queue")
{
    _channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false);
    var consumer = new AsyncEventingBasicConsumer(_channel);
 
    consumer.Received += async (model, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body.Span);
        Console.WriteLine($"Received RPC call: {message}");
 
        var response = $"Response to: {message}";
        var responseBytes = Encoding.UTF8.GetBytes(response);
 
        var properties = ea.BasicProperties;
        var replyProps = _channel.CreateBasicProperties();
        replyProps.CorrelationId = properties.CorrelationId;
 
        _channel.BasicPublish(
            "",
            properties.ReplyTo,
            replyProps,
            responseBytes
        );
 
        _channel.BasicAck(ea.DeliveryTag, false);
        await Task.CompletedTask;
    };
 
    _channel.BasicConsume(queueName, autoAck: false, consumer);
}

三. 完整示例(.NET 8 Worker Service)

  • 生产者项目
// Program.cs
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
 
var rabbitMQ = new RabbitMQService();
rabbitMQ.SendMessage("hello_queue", "Hello, RabbitMQ!");
 
app.Run();
  • 消费者项目(Worker Service)
csharp
// Program.cs
IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddHostedService<Worker>();
    })
    .Build();
 
await host.RunAsync();
 
// Worker.cs
public class Worker : BackgroundService
{
    private readonly RabbitMQService _rabbitMQ;
 
    public Worker()
    {
        _rabbitMQ = new RabbitMQService();
    }
 
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _rabbitMQ.ReceiveMessages("hello_queue");
        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(1000, stoppingToken);
        }
    }
}
四. 总结

| 模式 | 适用场景 | 关键点 |
| 简单队列 | 一对一消息传递 | QueueDeclare + BasicPublish |
| 工作队列 | 任务分发(竞争消费) | 多个消费者监听同一队列 |
| 发布/订阅 | 广播消息 | ExchangeType.Fanout |
| 路由模式 | 定向路由 | ExchangeType.Direct + RoutingKey |
| 主题模式 | 灵活匹配路由 | ExchangeType.Topic + */# |
| RPC | 同步请求-响应 | ReplyTo + CorrelationId |

推荐实践
连接管理:使用 IHostedService 或单例模式管理 IConnection 和 IModel。
异常处理:监听 Connection.ConnectionShutdown 事件并重连。
性能优化:启用 DispatchConsumersAsync = true 支持异步消费。
高级封装:考虑使用 MassTransit 或 EasyNetQ 简化开发。
通过以上模式,可以灵活应对 异步任务处理、事件驱动架构、微服务通信 等场景。