CAP应用

发布于:2025-04-15 ⋅ 阅读:(15) ⋅ 点赞:(0)

1.工作原理

 CAP(Consistent,Available,Partition - tolerant)框架在.NET 中的工作原理。CAP 是一个用于.NET 的分布式事务解决方案,主要用于实现微服务架构中的数据一致性实现分布式事务的最终一致性,其工作原理如下:

发布 / 订阅模型

  • CAP 基于发布 / 订阅模式来处理消息。当一个应用程序需要执行一个可能涉及多个微服务的数据操作时,它会将相关的操作封装成一个消息,并将其发布到消息队列中。其他对该消息感兴趣的微服务则会订阅这个消息队列,当消息到达时,订阅者会接收并处理消息。

本地事务与消息持久化

  • 在发布消息的过程中,CAP 使用了本地事务来确保消息的可靠性。当一个服务执行一个业务操作(如数据库插入、更新等)时,它会将这个操作与消息的发布放在同一个本地事务中。如果业务操作成功,那么消息也会被成功持久化到数据库的Published表中;如果业务操作失败,那么整个事务回滚,消息也不会被发布。这样就保证了消息与业务数据的一致性。

消息发送与重试机制

  • 消息持久化后,CAP 会尝试将消息发送到消息队列(如 RabbitMQ、Kafka 等)。如果消息发送失败,CAP 会根据预设的重试策略进行重试。重试机制可以确保在消息队列暂时不可用或出现网络故障等情况下,消息不会丢失,最终能够成功发送到消息队列。

消费者接收与处理消息

  • 订阅了消息队列的消费者服务会从队列中接收消息,并根据消息的内容执行相应的业务逻辑。在处理消息时,消费者也会使用本地事务来确保消息处理的原子性。如果消息处理成功,消费者会将消息标记为已处理,并从Received表中删除相关记录;如果处理失败,消费者可以根据具体情况选择重试或进行其他处理方式。

分布式事务一致性保证

  • 通过上述的发布 / 订阅模型、本地事务与消息持久化、消息发送与重试机制以及消费者接收与处理消息等一系列操作,CAP 框架实现了在分布式系统中跨多个微服务的数据一致性。即使在出现网络分区、服务故障等情况下,CAP 也能够通过消息的持久化和重试等机制,保证数据在最终一致性的前提下进行可靠的传输和处理。

CAP 框架通过巧妙地结合本地事务、消息队列和重试机制等技术,为.NET 微服务架构提供了一种高效、可靠的分布式事务解决方案,帮助开发者在复杂的分布式环境中轻松实现数据的一致性和可靠性。

2.作用

实现分布式事务的最终一致性

  • 在分布式系统中,保证数据一致性颇具挑战,如电商场景里用户下单需同时更新库存、订单和支付信息,若服务失败易致数据不一致。CAP 不采用两阶段提交(2PC) ,而是用本地消息表 + MQ(如 RabbitMQ、Kafka 等消息队列 )的异步确保方式。具体过程为:
  • 业务操作与事件发布:服务完成操作后,发布含操作类型、数据标识等信息的事件到消息队列。
  • 事件存储与确认:发布的事件存于指定介质以防丢失,CAP 等待事件处理确认。
  • 事件处理与重试机制:其他服务接收事件并处理,成功则发送确认,失败则 CAP 按设定策略重试,直至成功或达最大重试次数。为避免重复处理致数据不一致,处理逻辑需保证幂等性。
  • 数据一致性检查与补偿:必要时,CAP 定期检查数据一致性,通过补偿机制修复不一致问题。
  • 充当高可用的 EventBus(事件总线)
  • 实现了 EventBus 的发布 / 订阅功能,借助本地消息表对消息持久化。当消息队列宕机或连接失败,消息也不会丢失。比如在复杂业务系统中,各微服务可通过 CAP 实现的 EventBus 进行通信。一个服务发布事件(如订单创建事件 ),其他订阅该事件的服务(如库存服务、物流服务 )就能收到通知并执行相应操作(如扣减库存、安排发货 ) 。
  • 实际使用时,需引用相关包(基本包、消息层包、数据库包 ) ,配置本地消息记录库,生产者通过 _capBus.PublishAsync 方法发布消息,消费者在 Controller 或服务层通过 [CapSubscribe] 特性标记方法来订阅处理消息 。 此外,在应用中要注意确保幂等性、合理配置重试策略、搭建监控与报警机制以及进行性能调优 。

3.Published Received 表作用

在.NET CAP(一个用于解决微服务或分布式系统中分布式事务问题的开源项目 )中:

Published 表作用

  • 消息暂存:当生产者服务执行本地事务(如创建订单 )时,会将相关事件消息(如订单创建事件 )插入到该表,此时消息状态为未发布。这是为了确保消息和本地业务事务的原子性,即要么两者都成功执行,要么都失败回滚,避免消息丢失。

  • 发布状态记录:CAP 会扫描该表,将未发布消息发送到消息队列(如 RabbitMQ、Kafka ) 。消息成功发送后,会更新该表中消息的状态为已发布,用于记录消息的发布过程和状态。

  • 保障数据一致性:通过该表对消息的存储和管理,为实现分布式事务的最终一致性提供支持。比如在分布式系统中,不同服务通过消息进行交互,该表可确保消息按正确顺序、在合适时机被发送到消息队列,进而被消费者服务接收处理,保证各服务间数据状态的一致。

  • 故障恢复依据:当系统出现故障(如消息队列短暂不可用 ) ,重启后可依据该表中未成功发布的消息记录,进行重试发送操作,恢复消息传递流程,保障系统可靠运行。

Received 表作用

  • 消息接收记录:消费者从消息队列接收消息后,该表用于记录已接收的消息。记录内容包括消息的唯一标识、消息体、接收时间、消息来源等相关信息,方便对消息的消费情况进行跟踪和审计。

  • 处理重复消费:通过记录已处理消息的相关信息(如消息 ID ) ,帮助消费者判断接收的消息是否已被处理过,以实现幂等消费。当消费者接收到消息时,会检查该表中是否已有相同标识的消息记录,若存在则可避免重复处理,确保即使在网络抖动、消息重发等情况下,也不会因重复消费导致数据不一致等问题。

  • 监控与排查:开发人员可通过查看该表,了解消息的接收和处理情况,排查消息消费过程中出现的问题。比如,若发现某类消息一直未被正确处理,可根据表中记录进一步分析是消息格式问题、消费者服务异常还是其他原因,便于定位和解决问题,保障分布式系统的正常运行。

4.CapSubscribe的作用

[CapSubscribe]是.NET CAP 框架中的一个特性(Attribute)1。

作用

标识事件处理方法:用于标记那些需要处理特定事件的方法,告诉 CAP 框架当有相关事件发布时,需要调用这些被标记的方法来进行处理1。

实现消息订阅:使服务能够订阅由其他服务发布的事件消息,从而实现微服务之间基于事件的异步通信和交互,以保证各服务间的数据同步和业务流程的协同1。

使用方法

在 Controller 中使用:如果处理事件的方法在 Controller 中,直接在 Action 方法上添加[CapSubscribe]特性即可。例如:
 

csharp

[Route("api/[controller]")][ApiController]public class MyController : ControllerBase{

    [CapSubscribe("xxx.services.show.time")]

    public IActionResult HandleShowTime(DateTime time)

    {

        // 处理接收到的消息,例如记录日志、更新数据库等操作

        Console.WriteLine($"Received time: {time}");

        return Ok();

    }}

在非 Controller 类中使用:若订阅方法不在 Controller 中,订阅的类需要继承ICapSubscribe接口。例如:

csharp

public class MySubscriberService : ICapSubscribe{

    [CapSubscribe("xxx.services.show.time")]

    public void HandleShowTime(DateTime time)

    {

        // 处理接收到的消息

        Console.WriteLine($"Received time: {time}");

    }}

然后在Startup.cs中的ConfigureServices方法中注入该服务类:

csharp

services.AddTransient<MySubscriberService>();

指定参数:

Name参数:用于指定订阅的消息名称,对应通过_capBus.PublishAsync("name", ...)发布消息时指定的名称。在不同的消息队列中,它对应不同的项,如在 RabbitMQ 中对应路由键(routing key),在 Kafka 中对应主题(topic)等。

Group参数:可选参数,用于将订阅者放置在一个单独的消费者组中。具有相同名称但不同组的订阅者都会接收消息;而同一组内具有相同名称的订阅者只有一个会接收消息。

GroupConcurrent参数:可选参数,用于设置订阅者并发执行的并行度。如果不指定Group参数,CAP 会自动使用Name的值创建一个组。需要注意的是,该设置仅适用于新消息,重试的消息不受并发限制。

在使用[CapSubscribe]前,需在项目中引入 CAP 相关的 NuGet 包,并在Startup.cs中进行 CAP 的配置,包括选择消息队列(如x.UseRabbitMQ("ConnectionStrings"))和存储介质(如x.UseSqlServer("数据库连接字符串"))

5.案例说明

以下是一个完整的示例,展示了如何使用 CAP、SQL Server 和 RabbitMQ 来实现下单服务器和订单处理服务器之间的协作,完成订单处理流程。

项目结构和整体流程概述

  1. 下单服务器(OrderPlacingServer):一个.NET Core Web API 项目,负责接收用户的下单请求,执行本地订单数据插入操作,并通过 CAP 发布订单消息到消息队列。

  2. 订单处理服务器(OrderProcessingServer):一个.NET Core Console 应用项目,订阅来自消息队列的订单消息,处理订单逻辑并更新订单状态。

  3. 共享项目(OrderShared):用于存放共享的实体类和数据库上下文,两个服务器项目都会引用该项目。

整体流程如下:

  1. 用户在下单服务器发起下单请求。

  2. 下单服务器执行本地事务插入订单数据到 SQL Server 数据库,并通过 CAP 将订单消息发布到 RabbitMQ 队列。

  3. 订单处理服务器从 RabbitMQ 队列接收订单消息,执行订单处理逻辑,更新订单状态并保存到 SQL Server 数据库。

详细代码及解释

1. OrderShared 项目

csharp

// Order.cs
using System;
using Microsoft.EntityFrameworkCore;

namespace OrderShared
{
    public class Order
    {
        public int OrderId { get; set; }
        public string CustomerName { get; set; }
        public decimal TotalAmount { get; set; }
        public string Status { get; set; } = "Placed"; // 初始状态为已下单
    }

    public class OrderDbContext : DbContext
    {
        public OrderDbContext(DbContextOptions<OrderDbContext> options) : base(options)
        {
        }

        public DbSet<Order> Orders { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            // 可以在这里配置实体关系等
            base.OnModelCreating(modelBuilder);
        }
    }
}

解释

  • Order类定义了订单的实体结构,包含订单 ID、客户名称、总金额和订单状态等属性。

  • OrderDbContext是继承自DbContext的数据库上下文类,用于与 SQL Server 数据库进行交互,定义了Orders属性来操作Order实体的集合。

2. OrderPlacingServer 项目(.NET Core Web API)

csharp

// Startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using DotNetCore.CAP;
using OrderShared;

namespace OrderPlacingServer
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            // 配置SQL Server数据库连接
            services.AddDbContext<OrderDbContext>(options =>
                options.UseSqlServer(Configuration.GetConnectionString("OrderDbConnection")));

            // 配置CAP
            services.AddCap(x =>
            {
                // 使用SQL Server作为存储
                x.UseEntityFramework<OrderDbContext>();
                // 使用RabbitMQ作为消息队列
                x.UseRabbitMQ(Configuration.GetConnectionString("RabbitMQConnection"));

                // 设置重试次数和间隔
                x.FailedRetryCount = 3;
                x.FailedRetryInterval = 5000;
            });

            services.AddControllers();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            // 注册CAP
            app.UseCap();
        }
    }
}

// OrdersController.cs
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using DotNetCore.CAP;
using OrderShared;

namespace OrderPlacingServer.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrdersController : ControllerBase
    {
        private readonly ICapPublisher _capPublisher;

        public OrdersController(ICapPublisher capPublisher)
        {
            _capPublisher = capPublisher;
        }

        [HttpPost]
        public async Task<IActionResult> PlaceOrder(Order order)
        {
            using (var scope = HttpContext.RequestServices.CreateScope())
            {
                var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();

                // 开启本地事务
                using (var transaction = dbContext.Database.BeginTransaction(_capPublisher))
                {
                    try
                    {
                        // 插入订单数据到本地数据库
                        dbContext.Orders.Add(order);
                        await dbContext.SaveChangesAsync();

                        // 发布订单消息到CAP
                        await _capPublisher.PublishAsync("order.process", order);

                        // 提交事务
                        transaction.Commit();

                        return Ok("Order placed successfully");
                    }
                    catch (Exception ex)
                    {
                        // 回滚事务
                        transaction.Rollback();
                        return BadRequest($"Failed to place order: {ex.Message}");
                    }
                }
            }
        }
    }
}

解释

  • Startup.cs:配置了 SQL Server 数据库连接,使用OrderDbContext来操作数据库。配置 CAP,指定使用 SQL Server 作为消息存储,RabbitMQ 作为消息队列,并设置了重试策略。在Configure方法中注册了 CAP 中间件。

  • OrdersController.csPlaceOrder方法接收订单数据,通过ICapPublisher发布订单消息到名为order.process的主题。在本地事务中,先插入订单数据到数据库,然后发布消息,如果操作成功则提交事务,失败则回滚事务。

3. OrderProcessingServer 项目(.NET Core Console 应用)

csharp

// Program.cs
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using DotNetCore.CAP;
using OrderShared;

namespace OrderProcessingServer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var host = Host.CreateDefaultBuilder(args)
               .ConfigureServices((hostContext, services) =>
                {
                    // 配置SQL Server数据库连接
                    services.AddDbContext<OrderDbContext>(options =>
                        options.UseSqlServer(hostContext.Configuration.GetConnectionString("OrderDbConnection")));

                    // 配置CAP
                    services.AddCap(x =>
                    {
                        x.UseEntityFramework<OrderDbContext>();
                        x.UseRabbitMQ(hostContext.Configuration.GetConnectionString("RabbitMQConnection"));
                    });

                    // 注册订单消费者服务
                    services.AddHostedService<OrderConsumerHostedService>();
                })
               .Build();

            await host.RunAsync();
        }
    }

    // OrderConsumer.cs
    using System.Threading.Tasks;
    using DotNetCore.CAP;
    using OrderShared;

    public class OrderConsumer : ICapSubscribe
    {
        private readonly OrderDbContext _dbContext;

        public OrderConsumer(OrderDbContext dbContext)
        {
            _dbContext = dbContext;
        }

        [CapSubscribe("order.process")]
        public async Task ProcessOrder(Order order)
        {
            try
            {
                // 处理订单逻辑,例如更新订单状态为已处理
                order.Status = "Processed";
                _dbContext.Orders.Update(order);
                await _dbContext.SaveChangesAsync();

                System.Console.WriteLine($"Processed order: {order.OrderId}");
            }
            catch (Exception ex)
            {
                System.Console.WriteLine($"Failed to process order: {ex.Message}");
                throw; // 让CAP进行重试
            }
        }
    }

    // OrderConsumerHostedService.cs
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using DotNetCore.CAP;

    public class OrderConsumerHostedService : BackgroundService
    {
        private readonly ICapPublisher _capPublisher;
        private readonly ILogger<OrderConsumerHostedService> _logger;

        public OrderConsumerHostedService(ICapPublisher capPublisher, ILogger<OrderConsumerHostedService> logger)
        {
            _capPublisher = capPublisher;
            _logger = logger;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                // 这里可以做一些定时任务相关的操作,目前只是简单等待
                await Task.Delay(1000, stoppingToken);
            }
        }
    }
}

解释

  • Program.cs:创建了一个.NET Core 主机,配置了 SQL Server 数据库连接和 CAP。注册了OrderConsumerHostedService,用于在后台运行应用并处理消息。

  • OrderConsumer.csOrderConsumer类实现了ICapSubscribe接口,ProcessOrder方法被[CapSubscribe("order.process")]标记,用于处理接收到的订单消息。在方法中更新订单状态并保存到数据库,如果处理失败则抛出异常让 CAP 进行重试。

  • OrderConsumerHostedService.csOrderConsumerHostedService继承自BackgroundService,在ExecuteAsync方法中可以实现一些定时任务逻辑,目前只是简单地等待,确保应用持续运行以接收和处理消息。

配置和注意事项

  1. 配置文件:在两个项目的appsettings.json中配置 SQL Server 数据库连接字符串和 RabbitMQ 连接字符串。

json

{
    "ConnectionStrings": {
        "OrderDbConnection": "Data Source=YOUR_SERVER_NAME;Initial Catalog=YOUR_DATABASE_NAME;User ID=YOUR_USERNAME;Password=YOUR_PASSWORD",
        "RabbitMQConnection": "host=YOUR_RABBITMQ_HOST;port=YOUR_RABBITMQ_PORT;user=YOUR_USERNAME;password=YOUR_PASSWORD"
    }
}

  1. 幂等性:订单处理方法ProcessOrder应保证幂等性,避免重复处理导致数据不一致。

  2. 异常处理:合理处理异常,确保消息能够正确重试或进行其他处理。

  3. 数据一致性:下单服务器的本地事务和消息发布应保证原子性,确保订单数据和消息状态的一致性。

  4. 消息队列健康:确保 RabbitMQ 服务正常运行,网络连接稳定,避免消息丢失或积压。

通过以上代码和配置,可以实现下单服务器和订单处理服务器之间基于 CAP、SQL Server 和 RabbitMQ 的订单处理流程。


网站公告

今日签到

点亮在社区的每一天
去签到