在 .NET 8 中集成 RabbitMQ 消息队列,可以使用官方推荐的 RabbitMQ.Client
库或封装好的 MassTransit/EasyNetQ
等高级库。以下是 RabbitMQ 的基本集成代码 和 常见消息模式 的实现。
RabbitMQ 本身并没有直接支持延时消息的功能,但是可以通过一些机制来实现延时消息的效果。以下是两种常用的方法:
- TTL(Time To Live)+ 死信交换机(Dead Letter Exchange, DLX)
可以为队列或消息设置 TTL,当消息的 TTL 到期后,如果没有被消费,就会变成死信。
设置了死信交换机(DLX)的队列中的死信会被转发到指定的 DLX 上,然后可以由绑定到这个 DLX 的队列进行处理,这样就实现了延时消息的功能。
- 使用插件 rabbitmq-delayed-message-exchange
RabbitMQ 提供了一个官方插件 rabbitmq-delayed-message-exchange,它允许你创建一个特殊的交换机类型,该交换机能够接受带有延迟时间的消息,并在指定的时间后将消息投递给相应的队列。
这个插件需要安装并启用,并且要求 Erlang/OPT 版本在 18.0 及以上。
一、. RabbitMQ 基础集成(.NET 8)
安装 NuGet 包
dotnet add package RabbitMQ.Client
配置 RabbitMQ 连接
csharp
using RabbitMQ.Client;
public class RabbitMQService
{
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMQService(string hostname = "localhost", string username = "guest", string password = "guest")
{
var factory = new ConnectionFactory
{
HostName = hostname,
UserName = username,
Password = password,
DispatchConsumersAsync = true // 启用异步消费
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
public void Dispose()
{
_channel?.Close();
_connection?.Close();
}
}
二. RabbitMQ 常见消息模式
(1)、 简单队列(Simple Queue)
场景:生产者发送消息到队列,消费者从队列接收消息(一对一)。
- 生产者(Producer)
csharp
public void SendMessage(string queueName, string message)
{
_channel.QueueDeclare(
queue: queueName,
durable: true, // 持久化队列
exclusive: false,
autoDelete: false
);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(
exchange: "",
routingKey: queueName,
basicProperties: null,
body: body
);
}
- 消费者(Consumer)
csharp
public void ReceiveMessages(string queueName)
{
_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($"Received: {body}");
await Task.Yield(); // 模拟异步处理
_channel.BasicAck(ea.DeliveryTag, false); // 手动ACK
};
_channel.BasicConsume(
queue: queueName,
autoAck: false, // 关闭自动ACK
consumer: consumer
);
}
(2)、 工作队列(Work Queue)
场景:多个消费者竞争消费同一个队列的消息(任务分发)。
生产者
同 简单队列 的 SendMessage 方法。消费者
csharp
// 启动多个消费者实例,RabbitMQ 会轮询分发消息
for (int i = 0; i < 3; i++) // 3个消费者
{
Task.Run(() =>
{
using var service = new RabbitMQService();
service.ReceiveMessages("task_queue");
Console.WriteLine($"Consumer {i} started...");
Thread.Sleep(Timeout.Infinite);
});
}
(3)、 发布/订阅(Pub/Sub)【Fannout】
场景:一个生产者发送消息到交换机(Exchange),多个队列绑定到交换机,每个队列有自己的消费者(广播模式)。
- 生产者(发送到交换机)
csharp
public void PublishToExchange(string exchangeName, string message)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // Fanout 广播
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchangeName, "", null, body);
}
- 消费者(绑定队列到交换机)
csharp
public void SubscribeToExchange(string exchangeName, string queueName)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
_channel.QueueBind(queueName, exchangeName, "");
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($"Received: {body}");
return Task.CompletedTask;
};
_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(4) 、路由模式(Routing)
场景:根据 RoutingKey 定向投递消息到特定队列。
- 生产者
csharp
public void SendWithRouting(string exchangeName, string routingKey, string message)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchangeName, routingKey, null, body);
}
- 消费者
csharp
public void ReceiveWithRouting(string exchangeName, string queueName, string routingKey)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
_channel.QueueBind(queueName, exchangeName, routingKey);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");
return Task.CompletedTask;
};
_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(5) 、主题模式(Topic)
场景:使用通配符(*、#)匹配 RoutingKey,实现灵活路由。
*(星号)匹配单个单词
#(井号)匹配多个单词
- 生产者
csharp
public void SendWithTopic(string exchangeName, string topic, string message)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchangeName, topic, null, body);
}
- 消费者
csharp
public void ReceiveWithTopic(string exchangeName, string queueName, string topicPattern)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
_channel.QueueBind(queueName, exchangeName, topicPattern);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");
return Task.CompletedTask;
};
_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(6)、 RPC(远程过程调用)
场景:客户端发送请求并等待服务端响应(同步通信)。
- 客户端(RPC Client)
public string Call(string message, string queueName = "rpc_queue")
{
var correlationId = Guid.NewGuid().ToString();
var replyQueueName = _channel.QueueDeclare().QueueName;
var properties = _channel.CreateBasicProperties();
properties.ReplyTo = replyQueueName;
properties.CorrelationId = correlationId;
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish("", queueName, properties, body);
var tcs = new TaskCompletionSource<string>();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
if (ea.BasicProperties.CorrelationId == correlationId)
{
var response = Encoding.UTF8.GetString(ea.Body.Span);
tcs.SetResult(response);
}
};
_channel.BasicConsume(replyQueueName, autoAck: true, consumer);
return tcs.Task.Result; // 同步等待响应(生产环境建议用异步)
}
- 服务端(RPC Server)
csharp
public void StartRpcServer(string queueName = "rpc_queue")
{
_channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($"Received RPC call: {message}");
var response = $"Response to: {message}";
var responseBytes = Encoding.UTF8.GetBytes(response);
var properties = ea.BasicProperties;
var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = properties.CorrelationId;
_channel.BasicPublish(
"",
properties.ReplyTo,
replyProps,
responseBytes
);
_channel.BasicAck(ea.DeliveryTag, false);
await Task.CompletedTask;
};
_channel.BasicConsume(queueName, autoAck: false, consumer);
}
三. 完整示例(.NET 8 Worker Service)
- 生产者项目
// Program.cs
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
var rabbitMQ = new RabbitMQService();
rabbitMQ.SendMessage("hello_queue", "Hello, RabbitMQ!");
app.Run();
- 消费者项目(Worker Service)
csharp
// Program.cs
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
// Worker.cs
public class Worker : BackgroundService
{
private readonly RabbitMQService _rabbitMQ;
public Worker()
{
_rabbitMQ = new RabbitMQService();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_rabbitMQ.ReceiveMessages("hello_queue");
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
}
}
四. 总结
| 模式 | 适用场景 | 关键点 |
| 简单队列 | 一对一消息传递 | QueueDeclare + BasicPublish |
| 工作队列 | 任务分发(竞争消费) | 多个消费者监听同一队列 |
| 发布/订阅 | 广播消息 | ExchangeType.Fanout |
| 路由模式 | 定向路由 | ExchangeType.Direct + RoutingKey |
| 主题模式 | 灵活匹配路由 | ExchangeType.Topic + */# |
| RPC | 同步请求-响应 | ReplyTo + CorrelationId |
推荐实践
连接管理:使用 IHostedService 或单例模式管理 IConnection 和 IModel。
异常处理:监听 Connection.ConnectionShutdown 事件并重连。
性能优化:启用 DispatchConsumersAsync = true 支持异步消费。
高级封装:考虑使用 MassTransit 或 EasyNetQ 简化开发。
通过以上模式,可以灵活应对 异步任务处理、事件驱动架构、微服务通信 等场景。