ABP - 事件总线之分布式事件总线内置特性
1. 预定义事件
与本地事件总线一样,分布式事件总线也针对实体的创建、更新、删除会自动发布事件,预定义的事件有:
EntityCreatedEto<T>
是实体 T 创建后发布.EntityUpdatedEto<T>
是实体 T 更新后发布.EntityDeletedEto<T>
是实体 T 删除后发布.
除了这些预定的实体事件之外,我们如果需要在聚合根中发布事件,可以通过 AddDistributedEvent 方法添加事件,或者实现 IGeneratesDomainEvents 接口,这些都会在事务提交,数据持久化的时候被发布。原理上一篇文章中已经讲过了。
对于分布式事件总线预定义事件的订阅,我们可以实现 IDistributedEventHandler<T<Entity>>
接口,这里的 T 就是上面三种预定义事件中的一种,例如 EntityUpdatedEto 。
public class MyHandler : IDistributedEventHandler<EntityUpdatedEto<ProductEto>>,ITransientDependency
{
public async Task HandleEventAsync(EntityUpdatedEto<ProductEto> eventData)
{
var productId = eventData.Entity.Id;
//TODO
}
}
分布式事件总线的预定义事件提供了一个 AbpDistributedEntityEventOptions 配置选项,可以让我们配置针对哪些实体进行事件的发布,如以下配置:
Configure<AbpDistributedEntityEventOptions>(options =>
{
//这些选择器可以同时配置
//针对所有实体
options.AutoEventSelectors.AddAll();
//针对单个实体
options.AutoEventSelectors.Add<IdentityUser>();
//针对命名空间下的实体
options.AutoEventSelectors.AddNamespace("Volo.Abp.Identity");
//通过命名空间前缀进行筛选
options.AutoEventSelectors.Add(
type => type.Namespace.StartsWith("MyProject.")
);
});
在默认的情况下,一个实体触发了预定义事件,ABP 框架会使用一个标准类型 EntityEto 作为事件类,它只有两个属性:
- EntityType (string): 实体类的全名(包括命令空间).
- KeysAsString (string): 已更改实体的主键.如果它只有一个主键,这个属性将是主键值. 对于复合键,它包含所有用,(逗号)分隔的键。
因此,我们可以通过实现这个接口 IDistributedEventHandler<EntityUpdatedEto<EntityEto>>
处理所有实体的变更事件。不过,推荐的方式还是针对不同的实体提供一个事件传输对象,并且通过单独的处理程序进行处理,这样更有利于业务的分离,也方便于根据需要传递对象的其他信息。为实体提供对应的事件传输对象时,需要通过 AbpDistributedEntityEventOptions 选项设置一下实体和其事件传输对象,还需在 AutoMapper 中配置一下映射关系:
Configure<AbpDistributedEntityEventOptions>(options =>
{
options.AutoEventSelectors.Add<Product>();
options.EtoMappings.Add<Product, ProductEto>();
});
[AutoMap(typeof(Product))]
public class ProductEto : EntityEto
{
public Guid Id { get; set; }
public string Name { get; set; }
}
2. 实体同步器 - 7.1 版本特性
在微服务架构下,一个服务订阅另一个服务的实体变更事件,在相应的实体改变时执行当前服务的一些业务逻辑,更新相关联的一些信息,这是很常见的需求。我们可以直接通过上面讲到的预定义事件通过分布式事件总线订阅实体变更事件,但是这样需要同时定于 create、update、delete 等多个事件,一旦需要跟踪的实体多了,就会有很多重复的逻辑代码。
而实体同步器,是 ABP 框架内置的,用于简化这些实现的一个特性。
背景:假设你在 产品服务 中有一个产品实体Product,而在订单服务中有一个OrderProduct实体,用来同步保存产品的部分信息,以便在处理订单相关业务时,不用跨服务调用。那么 Product 和 OrderProduct 之间的信息就得保持一致。
使用实体同步器的步骤如下:
- 产品服务中创建一个数据传输类:ProductETO
[EventName("product")]
public class ProductEto : EntityEto<Guid>
{
// 产品实体的相关属性
}
这个数据传输类,可以在共享的类库,也可以在不同的服务中分别创建。如果在不同服务中分别创建的话,EventName
特性就非常重要。因为这种情况下,不同服务中的 ProductEto 的命名空间会有所区别,对于事件的识别就只能通过 EventName
特性。
- 产品服务中配置实体事件映射
Configure<AbpDistributedEntityEventOptions>(options =>
{
options.AutoEventSelectors.Add<Product>();
options.EtoMappings.Add<Product, ProductEto>();
});
这一步上面已经讲过,通过这样的配置对实体的变更事件进行发布。
- 订单服务中通过继承
EntitySynchronizer
提供一个同步器
public class ProductSynchronizer : EntitySynchronizer<OrderProduct, Guid, ProductEto>
{
public ProductSynchronizer(
IObjectMapper objectMapper,
IRepository<OrderProduct, Guid> repository
) : base(objectMapper, repository)
{
}
}
同时要注意需要在订单服务中配置从 ProductEto
到 OrderProduct
的实体映射关系。
这种情况下,同步器会订阅 ProductEto 相关的事件,包括产品服务中 Product 的增、删、改事件,并且自动映射为 OrderProduct,再通过相应仓储进行持久化。
一个初步的实体同步器应用就是这样子了。如果在实体发生变化时,不仅仅只是数据同步变更保存,还有一些其他逻辑的话,可以通过重写 HandleEventAsync(EntityCreatedEto<TSourceEto> eventData)
等方法,从而加入自己特定的业务逻辑。
public class ProductSynchronizer : EntitySynchronizer<OrderProduct, Guid, ProductEto>
{
public ProductSynchronizer(IObjectMapper objectMapper, IRepository<OrderProduct, Guid> repository) : base(objectMapper, repository)
{
}
public override Task HandleEventAsync(EntityCreatedEto<ProductEto> eventData)
{
return base.HandleEventAsync(eventData);
}
}
如果需要进行同步的实体比较复杂,使用了复合主键,那么就需要自行实现根据 Eto 类的信息定位本地实体的逻辑,如下:
public class ProductSynchronizer : EntitySynchronizer<OrderProduct, ProductEto>
{
public ProductSynchronizer(IObjectMapper objectMapper, IRepository<OrderProduct> repository) : base(objectMapper, repository)
{
}
protected async override Task<OrderProduct?> FindLocalEntityAsync(ProductEto eto)
{
// 通过 eto 从数据库中检索出相应的 OrderProduct
return new OrderProduct();
}
}
有些情况下,我们可能并不需要订阅该实体的所有事件,只希望对特定的事件进行订阅和响应,这时候可以通过以下方式忽略一些事件。
public class ProductSynchronizer
: EntitySynchronizer<OrderProduct, Guid, ProductEto>
{
public ProductSynchronizer(
IObjectMapper objectMapper,
IRepository<OrderProduct, Guid> repository
) : base(objectMapper, repository)
{
// 忽略实体删除的事件
IgnoreEntityDeletedEvent = true;
}
}
除此之外,实体同步器使用时,相应的实体和 ETO 类上都可以实现实现 IHasEntityVersion
接口,从而与实体版本控制系统兼容。这样,即使接收到的实体变更事件是无序的,也就是后修改的事件先接收到,也不会影响实体最终的保存结果,因为如果本地数据库中的实体版本比接收到的事件中的实体更新,则该事件将被忽略。实体版本控制系统相关的内容在后续的文章再详细解析。
3. 分布式事件监控 - 7.3 版本特性
当我们的应用程序从分布式事件总线接收到一个分布式事件时,除了触发事件处理器执行,ABP分布式事件总线系统还会发布DistributedEventReceived
本地事件。
DistributedEventReceived
类具有以下字段:
- **
Source
:**它代表分布式事件的来源。来源可以是Direct
、Inbox
、Outbox
。 - **
EventName
:**表示接收到的事件的名称。 - **
EventData
:**它表示与接收到的事件相关联的实际数据。由于它是object
类型,它可以保存任何类型的数据
在某些业务场景下,我们可能需要对这些事件通讯进行记录,而既然这是一个本地事件,则可以通过实现 ILocalEventHandler<DistributedEventReceived>
接口来获取到事件信息。
public class DistributedEventLogHandler : ILocalEventHandler<DistributedEventReceived>, ITransientDependency
{
private readonly ILogger<DistributedEventLogHandler> _logger;_
public DistributedEventLogHandler(ILogger<DistributedEventLogHandler> logger)
{
_logger = logger;
}
public async Task HandleEventAsync(DistributedEventReceived eventData)
{
// 其他逻辑
_logger.LogInfo(JsonConvert.SerializeObject(eventData));
}
}
同时,当我们发布一个分布式消息的时候,ABP 分布式事件总线系统也一样会触发相应的本地事件,事件类型是:DistributedEventSent
。我们一样可以订阅事件类型,从而对消息进行记录。处理方式和上面一样。
参考文档:
ABP 官方文档 - 分布式事件总线