1.工作原理
CAP(Consistent,Available,Partition - tolerant)框架在.NET 中的工作原理。CAP 是一个用于.NET 的分布式事务解决方案,主要用于实现微服务架构中的数据一致性(实现分布式事务的最终一致性),其工作原理如下:
发布 / 订阅模型
CAP 基于发布 / 订阅模式来处理消息。当一个应用程序需要执行一个可能涉及多个微服务的数据操作时,它会将相关的操作封装成一个消息,并将其发布到消息队列中。其他对该消息感兴趣的微服务则会订阅这个消息队列,当消息到达时,订阅者会接收并处理消息。
本地事务与消息持久化
在发布消息的过程中,CAP 使用了本地事务来确保消息的可靠性。当一个服务执行一个业务操作(如数据库插入、更新等)时,它会将这个操作与消息的发布放在同一个本地事务中。如果业务操作成功,那么消息也会被成功持久化到数据库的Published表中;如果业务操作失败,那么整个事务回滚,消息也不会被发布。这样就保证了消息与业务数据的一致性。
消息发送与重试机制
消息持久化后,CAP 会尝试将消息发送到消息队列(如 RabbitMQ、Kafka 等)。如果消息发送失败,CAP 会根据预设的重试策略进行重试。重试机制可以确保在消息队列暂时不可用或出现网络故障等情况下,消息不会丢失,最终能够成功发送到消息队列。
消费者接收与处理消息
订阅了消息队列的消费者服务会从队列中接收消息,并根据消息的内容执行相应的业务逻辑。在处理消息时,消费者也会使用本地事务来确保消息处理的原子性。如果消息处理成功,消费者会将消息标记为已处理,并从Received表中删除相关记录;如果处理失败,消费者可以根据具体情况选择重试或进行其他处理方式。
分布式事务一致性保证
通过上述的发布 / 订阅模型、本地事务与消息持久化、消息发送与重试机制以及消费者接收与处理消息等一系列操作,CAP 框架实现了在分布式系统中跨多个微服务的数据一致性。即使在出现网络分区、服务故障等情况下,CAP 也能够通过消息的持久化和重试等机制,保证数据在最终一致性的前提下进行可靠的传输和处理。
CAP 框架通过巧妙地结合本地事务、消息队列和重试机制等技术,为.NET 微服务架构提供了一种高效、可靠的分布式事务解决方案,帮助开发者在复杂的分布式环境中轻松实现数据的一致性和可靠性。
2.作用
实现分布式事务的最终一致性
- 在分布式系统中,保证数据一致性颇具挑战,如电商场景里用户下单需同时更新库存、订单和支付信息,若服务失败易致数据不一致。CAP 不采用两阶段提交(2PC) ,而是用本地消息表 + MQ(如 RabbitMQ、Kafka 等消息队列 )的异步确保方式。具体过程为:
- 业务操作与事件发布:服务完成操作后,发布含操作类型、数据标识等信息的事件到消息队列。
- 事件存储与确认:发布的事件存于指定介质以防丢失,CAP 等待事件处理确认。
- 事件处理与重试机制:其他服务接收事件并处理,成功则发送确认,失败则 CAP 按设定策略重试,直至成功或达最大重试次数。为避免重复处理致数据不一致,处理逻辑需保证幂等性。
- 数据一致性检查与补偿:必要时,CAP 定期检查数据一致性,通过补偿机制修复不一致问题。
- 充当高可用的 EventBus(事件总线)
- 实现了 EventBus 的发布 / 订阅功能,借助本地消息表对消息持久化。当消息队列宕机或连接失败,消息也不会丢失。比如在复杂业务系统中,各微服务可通过 CAP 实现的 EventBus 进行通信。一个服务发布事件(如订单创建事件 ),其他订阅该事件的服务(如库存服务、物流服务 )就能收到通知并执行相应操作(如扣减库存、安排发货 ) 。
- 实际使用时,需引用相关包(基本包、消息层包、数据库包 ) ,配置本地消息记录库,生产者通过 _capBus.PublishAsync 方法发布消息,消费者在 Controller 或服务层通过 [CapSubscribe] 特性标记方法来订阅处理消息 。 此外,在应用中要注意确保幂等性、合理配置重试策略、搭建监控与报警机制以及进行性能调优 。
3.Published 和Received 表作用
在.NET CAP(一个用于解决微服务或分布式系统中分布式事务问题的开源项目 )中:
Published 表作用
消息暂存:当生产者服务执行本地事务(如创建订单 )时,会将相关事件消息(如订单创建事件 )插入到该表,此时消息状态为未发布。这是为了确保消息和本地业务事务的原子性,即要么两者都成功执行,要么都失败回滚,避免消息丢失。
发布状态记录:CAP 会扫描该表,将未发布消息发送到消息队列(如 RabbitMQ、Kafka ) 。消息成功发送后,会更新该表中消息的状态为已发布,用于记录消息的发布过程和状态。
保障数据一致性:通过该表对消息的存储和管理,为实现分布式事务的最终一致性提供支持。比如在分布式系统中,不同服务通过消息进行交互,该表可确保消息按正确顺序、在合适时机被发送到消息队列,进而被消费者服务接收处理,保证各服务间数据状态的一致。
故障恢复依据:当系统出现故障(如消息队列短暂不可用 ) ,重启后可依据该表中未成功发布的消息记录,进行重试发送操作,恢复消息传递流程,保障系统可靠运行。
Received 表作用
消息接收记录:消费者从消息队列接收消息后,该表用于记录已接收的消息。记录内容包括消息的唯一标识、消息体、接收时间、消息来源等相关信息,方便对消息的消费情况进行跟踪和审计。
处理重复消费:通过记录已处理消息的相关信息(如消息 ID ) ,帮助消费者判断接收的消息是否已被处理过,以实现幂等消费。当消费者接收到消息时,会检查该表中是否已有相同标识的消息记录,若存在则可避免重复处理,确保即使在网络抖动、消息重发等情况下,也不会因重复消费导致数据不一致等问题。
监控与排查:开发人员可通过查看该表,了解消息的接收和处理情况,排查消息消费过程中出现的问题。比如,若发现某类消息一直未被正确处理,可根据表中记录进一步分析是消息格式问题、消费者服务异常还是其他原因,便于定位和解决问题,保障分布式系统的正常运行。
4.CapSubscribe的作用
[CapSubscribe]是.NET CAP 框架中的一个特性(Attribute)1。
作用
标识事件处理方法:用于标记那些需要处理特定事件的方法,告诉 CAP 框架当有相关事件发布时,需要调用这些被标记的方法来进行处理1。
实现消息订阅:使服务能够订阅由其他服务发布的事件消息,从而实现微服务之间基于事件的异步通信和交互,以保证各服务间的数据同步和业务流程的协同1。
使用方法
在 Controller 中使用:如果处理事件的方法在 Controller 中,直接在 Action 方法上添加[CapSubscribe]特性即可。例如:
csharp
[Route("api/[controller]")][ApiController]public class MyController : ControllerBase{
[CapSubscribe("xxx.services.show.time")]
public IActionResult HandleShowTime(DateTime time)
{
// 处理接收到的消息,例如记录日志、更新数据库等操作
Console.WriteLine($"Received time: {time}");
return Ok();
}}
在非 Controller 类中使用:若订阅方法不在 Controller 中,订阅的类需要继承ICapSubscribe接口。例如:
csharp
public class MySubscriberService : ICapSubscribe{
[CapSubscribe("xxx.services.show.time")]
public void HandleShowTime(DateTime time)
{
// 处理接收到的消息
Console.WriteLine($"Received time: {time}");
}}
然后在Startup.cs中的ConfigureServices方法中注入该服务类:
csharp
services.AddTransient<MySubscriberService>();
指定参数:
Name参数:用于指定订阅的消息名称,对应通过_capBus.PublishAsync("name", ...)发布消息时指定的名称。在不同的消息队列中,它对应不同的项,如在 RabbitMQ 中对应路由键(routing key),在 Kafka 中对应主题(topic)等。
Group参数:可选参数,用于将订阅者放置在一个单独的消费者组中。具有相同名称但不同组的订阅者都会接收消息;而同一组内具有相同名称的订阅者只有一个会接收消息。
GroupConcurrent参数:可选参数,用于设置订阅者并发执行的并行度。如果不指定Group参数,CAP 会自动使用Name的值创建一个组。需要注意的是,该设置仅适用于新消息,重试的消息不受并发限制。
在使用[CapSubscribe]前,需在项目中引入 CAP 相关的 NuGet 包,并在Startup.cs中进行 CAP 的配置,包括选择消息队列(如x.UseRabbitMQ("ConnectionStrings"))和存储介质(如x.UseSqlServer("数据库连接字符串"))
5.案例说明
以下是一个完整的示例,展示了如何使用 CAP、SQL Server 和 RabbitMQ 来实现下单服务器和订单处理服务器之间的协作,完成订单处理流程。
项目结构和整体流程概述
下单服务器(OrderPlacingServer):一个.NET Core Web API 项目,负责接收用户的下单请求,执行本地订单数据插入操作,并通过 CAP 发布订单消息到消息队列。
订单处理服务器(OrderProcessingServer):一个.NET Core Console 应用项目,订阅来自消息队列的订单消息,处理订单逻辑并更新订单状态。
共享项目(OrderShared):用于存放共享的实体类和数据库上下文,两个服务器项目都会引用该项目。
整体流程如下:
用户在下单服务器发起下单请求。
下单服务器执行本地事务插入订单数据到 SQL Server 数据库,并通过 CAP 将订单消息发布到 RabbitMQ 队列。
订单处理服务器从 RabbitMQ 队列接收订单消息,执行订单处理逻辑,更新订单状态并保存到 SQL Server 数据库。
详细代码及解释
1. OrderShared 项目
csharp
// Order.cs
using System;
using Microsoft.EntityFrameworkCore;
namespace OrderShared
{
public class Order
{
public int OrderId { get; set; }
public string CustomerName { get; set; }
public decimal TotalAmount { get; set; }
public string Status { get; set; } = "Placed"; // 初始状态为已下单
}
public class OrderDbContext : DbContext
{
public OrderDbContext(DbContextOptions<OrderDbContext> options) : base(options)
{
}
public DbSet<Order> Orders { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// 可以在这里配置实体关系等
base.OnModelCreating(modelBuilder);
}
}
}
解释:
Order
类定义了订单的实体结构,包含订单 ID、客户名称、总金额和订单状态等属性。OrderDbContext
是继承自DbContext
的数据库上下文类,用于与 SQL Server 数据库进行交互,定义了Orders
属性来操作Order
实体的集合。
2. OrderPlacingServer 项目(.NET Core Web API)
csharp
// Startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using DotNetCore.CAP;
using OrderShared;
namespace OrderPlacingServer
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
// 配置SQL Server数据库连接
services.AddDbContext<OrderDbContext>(options =>
options.UseSqlServer(Configuration.GetConnectionString("OrderDbConnection")));
// 配置CAP
services.AddCap(x =>
{
// 使用SQL Server作为存储
x.UseEntityFramework<OrderDbContext>();
// 使用RabbitMQ作为消息队列
x.UseRabbitMQ(Configuration.GetConnectionString("RabbitMQConnection"));
// 设置重试次数和间隔
x.FailedRetryCount = 3;
x.FailedRetryInterval = 5000;
});
services.AddControllers();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
// 注册CAP
app.UseCap();
}
}
}
// OrdersController.cs
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using DotNetCore.CAP;
using OrderShared;
namespace OrderPlacingServer.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{
private readonly ICapPublisher _capPublisher;
public OrdersController(ICapPublisher capPublisher)
{
_capPublisher = capPublisher;
}
[HttpPost]
public async Task<IActionResult> PlaceOrder(Order order)
{
using (var scope = HttpContext.RequestServices.CreateScope())
{
var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
// 开启本地事务
using (var transaction = dbContext.Database.BeginTransaction(_capPublisher))
{
try
{
// 插入订单数据到本地数据库
dbContext.Orders.Add(order);
await dbContext.SaveChangesAsync();
// 发布订单消息到CAP
await _capPublisher.PublishAsync("order.process", order);
// 提交事务
transaction.Commit();
return Ok("Order placed successfully");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
return BadRequest($"Failed to place order: {ex.Message}");
}
}
}
}
}
}
解释:
Startup.cs
:配置了 SQL Server 数据库连接,使用OrderDbContext
来操作数据库。配置 CAP,指定使用 SQL Server 作为消息存储,RabbitMQ 作为消息队列,并设置了重试策略。在Configure
方法中注册了 CAP 中间件。OrdersController.cs
:PlaceOrder
方法接收订单数据,通过ICapPublisher
发布订单消息到名为order.process
的主题。在本地事务中,先插入订单数据到数据库,然后发布消息,如果操作成功则提交事务,失败则回滚事务。
3. OrderProcessingServer 项目(.NET Core Console 应用)
csharp
// Program.cs
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using DotNetCore.CAP;
using OrderShared;
namespace OrderProcessingServer
{
class Program
{
static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
// 配置SQL Server数据库连接
services.AddDbContext<OrderDbContext>(options =>
options.UseSqlServer(hostContext.Configuration.GetConnectionString("OrderDbConnection")));
// 配置CAP
services.AddCap(x =>
{
x.UseEntityFramework<OrderDbContext>();
x.UseRabbitMQ(hostContext.Configuration.GetConnectionString("RabbitMQConnection"));
});
// 注册订单消费者服务
services.AddHostedService<OrderConsumerHostedService>();
})
.Build();
await host.RunAsync();
}
}
// OrderConsumer.cs
using System.Threading.Tasks;
using DotNetCore.CAP;
using OrderShared;
public class OrderConsumer : ICapSubscribe
{
private readonly OrderDbContext _dbContext;
public OrderConsumer(OrderDbContext dbContext)
{
_dbContext = dbContext;
}
[CapSubscribe("order.process")]
public async Task ProcessOrder(Order order)
{
try
{
// 处理订单逻辑,例如更新订单状态为已处理
order.Status = "Processed";
_dbContext.Orders.Update(order);
await _dbContext.SaveChangesAsync();
System.Console.WriteLine($"Processed order: {order.OrderId}");
}
catch (Exception ex)
{
System.Console.WriteLine($"Failed to process order: {ex.Message}");
throw; // 让CAP进行重试
}
}
}
// OrderConsumerHostedService.cs
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using DotNetCore.CAP;
public class OrderConsumerHostedService : BackgroundService
{
private readonly ICapPublisher _capPublisher;
private readonly ILogger<OrderConsumerHostedService> _logger;
public OrderConsumerHostedService(ICapPublisher capPublisher, ILogger<OrderConsumerHostedService> logger)
{
_capPublisher = capPublisher;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// 这里可以做一些定时任务相关的操作,目前只是简单等待
await Task.Delay(1000, stoppingToken);
}
}
}
}
解释:
Program.cs
:创建了一个.NET Core 主机,配置了 SQL Server 数据库连接和 CAP。注册了OrderConsumerHostedService
,用于在后台运行应用并处理消息。OrderConsumer.cs
:OrderConsumer
类实现了ICapSubscribe
接口,ProcessOrder
方法被[CapSubscribe("order.process")]
标记,用于处理接收到的订单消息。在方法中更新订单状态并保存到数据库,如果处理失败则抛出异常让 CAP 进行重试。OrderConsumerHostedService.cs
:OrderConsumerHostedService
继承自BackgroundService
,在ExecuteAsync
方法中可以实现一些定时任务逻辑,目前只是简单地等待,确保应用持续运行以接收和处理消息。
配置和注意事项
配置文件:在两个项目的
appsettings.json
中配置 SQL Server 数据库连接字符串和 RabbitMQ 连接字符串。
json
{
"ConnectionStrings": {
"OrderDbConnection": "Data Source=YOUR_SERVER_NAME;Initial Catalog=YOUR_DATABASE_NAME;User ID=YOUR_USERNAME;Password=YOUR_PASSWORD",
"RabbitMQConnection": "host=YOUR_RABBITMQ_HOST;port=YOUR_RABBITMQ_PORT;user=YOUR_USERNAME;password=YOUR_PASSWORD"
}
}
幂等性:订单处理方法
ProcessOrder
应保证幂等性,避免重复处理导致数据不一致。异常处理:合理处理异常,确保消息能够正确重试或进行其他处理。
数据一致性:下单服务器的本地事务和消息发布应保证原子性,确保订单数据和消息状态的一致性。
消息队列健康:确保 RabbitMQ 服务正常运行,网络连接稳定,避免消息丢失或积压。
通过以上代码和配置,可以实现下单服务器和订单处理服务器之间基于 CAP、SQL Server 和 RabbitMQ 的订单处理流程。