目录
事件驱动往往是在一个微服务内部实现的
领域时间是DDD架构中比较常见的概念
在领域层内部的一个模型更改了状态或者发生了一些行为
向外发送一些通知
这些通知叫做领域时间
类似于MQ
这个通知不会在分布式系统内部传递
只会在单个微服务内部传递
和MQ一样 这样能解耦
我们可以通过发布事件的方式进行一种松耦合的通信
以下是DDD架构中领域层事件驱动的Java实现示例,展示如何通过领域事件实现松耦合的业务逻辑:
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* 领域事件接口
* 所有领域事件需实现此接口
*/
interface DomainEvent {
UUID getEventId();
LocalDateTime getOccurredOn();
}
/**
* 领域事件抽象基类
* 提供事件ID和发生时间的基础实现
*/
abstract class BaseDomainEvent implements DomainEvent {
private final UUID eventId;
private final LocalDateTime occurredOn;
protected BaseDomainEvent() {
this.eventId = UUID.randomUUID();
this.occurredOn = LocalDateTime.now();
}
@Override
public UUID getEventId() {
return eventId;
}
@Override
public LocalDateTime getOccurredOn() {
return occurredOn;
}
}
/**
* 订单已创建事件
*/
class OrderCreatedEvent extends BaseDomainEvent {
private final String orderId;
private final String customerId;
public OrderCreatedEvent(String orderId, String customerId) {
this.orderId = orderId;
this.customerId = customerId;
}
public String getOrderId() {
return orderId;
}
public String getCustomerId() {
return customerId;
}
}
/**
* 订单已支付事件
*/
class OrderPaidEvent extends BaseDomainEvent {
private final String orderId;
private final LocalDateTime paymentTime;
public OrderPaidEvent(String orderId, LocalDateTime paymentTime) {
this.orderId = orderId;
this.paymentTime = paymentTime;
}
public String getOrderId() {
return orderId;
}
public LocalDateTime getPaymentTime() {
return paymentTime;
}
}
/**
* 库存扣减事件
*/
class InventoryDeductedEvent extends BaseDomainEvent {
private final String productId;
private final int quantity;
public InventoryDeductedEvent(String productId, int quantity) {
this.productId = productId;
this.quantity = quantity;
}
public String getProductId() {
return productId;
}
public int getQuantity() {
return quantity;
}
}
/**
* 领域事件发布者接口
*/
interface DomainEventPublisher {
void publish(DomainEvent event);
}
/**
* 领域事件订阅者接口
*/
interface DomainEventSubscriber<T extends DomainEvent> {
void handleEvent(T event);
Class<T> subscribedToEventType();
}
/**
* 领域事件总线(单例实现)
* 负责事件的注册、发布和分发
*/
class DomainEventBus implements DomainEventPublisher {
private static final DomainEventBus INSTANCE = new DomainEventBus();
private final List<DomainEventSubscriber<?>> subscribers = new ArrayList<>();
private DomainEventBus() {}
public static DomainEventBus getInstance() {
return INSTANCE;
}
public <T extends DomainEvent> void register(DomainEventSubscriber<T> subscriber) {
subscribers.add(subscriber);
}
@Override
public void publish(DomainEvent event) {
subscribers.stream()
.filter(subscriber -> subscriber.subscribedToEventType().isInstance(event))
.forEach(subscriber -> dispatch(event, subscriber));
}
@SuppressWarnings("unchecked")
private <T extends DomainEvent> void dispatch(DomainEvent event, DomainEventSubscriber<?> subscriber) {
// 同步处理事件
((DomainEventSubscriber<T>) subscriber).handleEvent((T) event);
// 异步处理示例:
// CompletableFuture.runAsync(() ->
// ((DomainEventSubscriber<T>) subscriber).handleEvent((T) event)
// );
}
}
/**
* 领域事件聚合根接口
* 定义聚合根应具备的事件管理能力
*/
interface EventSourcedAggregateRoot {
List<DomainEvent> getDomainEvents();
void clearDomainEvents();
}
/**
* 订单实体(聚合根)
* 集成领域事件功能
*/
class Order implements EventSourcedAggregateRoot {
private final String id;
private final String customerId;
private OrderStatus status;
private final List<DomainEvent> domainEvents = new ArrayList<>();
public Order(String id, String customerId) {
this.id = id;
this.customerId = customerId;
this.status = OrderStatus.CREATED;
// 创建订单时发布订单创建事件
registerEvent(new OrderCreatedEvent(id, customerId));
}
public void pay() {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("订单状态错误,当前状态:" + status);
}
this.status = OrderStatus.PAID;
// 支付时发布订单已支付事件
registerEvent(new OrderPaidEvent(id, LocalDateTime.now()));
}
private void registerEvent(DomainEvent event) {
domainEvents.add(event);
}
@Override
public List<DomainEvent> getDomainEvents() {
return new ArrayList<>(domainEvents);
}
@Override
public void clearDomainEvents() {
domainEvents.clear();
}
// Getters
public String getId() {
return id;
}
public String getCustomerId() {
return customerId;
}
public OrderStatus getStatus() {
return status;
}
}
/**
* 库存服务(领域服务)
* 处理库存相关业务逻辑
*/
class InventoryService implements DomainEventSubscriber<OrderPaidEvent> {
private final InventoryRepository inventoryRepository;
public InventoryService(InventoryRepository inventoryRepository) {
this.inventoryRepository = inventoryRepository;
}
@Override
public void handleEvent(OrderPaidEvent event) {
// 订单支付后扣减库存
// 实际应用中需要根据订单ID查询订单详情获取商品信息
// 这里简化处理
System.out.println("处理订单支付事件:订单ID=" + event.getOrderId());
// 扣减库存逻辑
// inventoryRepository.deductStock(productId, quantity);
// 发布库存扣减事件
DomainEventBus.getInstance().publish(
new InventoryDeductedEvent("P001", 1)
);
}
@Override
public Class<OrderPaidEvent> subscribedToEventType() {
return OrderPaidEvent.class;
}
}
/**
* 库存仓储接口
*/
interface InventoryRepository {
void deductStock(String productId, int quantity);
}
/**
* 库存仓储实现(示例)
*/
class InMemoryInventoryRepository implements InventoryRepository {
@Override
public void deductStock(String productId, int quantity) {
System.out.println("扣减库存:商品ID=" + productId + ",数量=" + quantity);
// 实际实现会更新数据库中的库存记录
}
}
/**
* 订单服务(应用服务)
* 协调领域对象和事件处理
*/
class OrderApplicationService {
private final OrderRepository orderRepository;
private final DomainEventPublisher eventPublisher;
public OrderApplicationService(OrderRepository orderRepository,
DomainEventPublisher eventPublisher) {
this.orderRepository = orderRepository;
this.eventPublisher = eventPublisher;
}
public void createOrder(String customerId, List<OrderItem> items) {
// 创建订单
String orderId = UUID.randomUUID().toString();
Order order = new Order(orderId, customerId);
// 持久化订单
orderRepository.save(order);
// 发布领域事件
publishEvents(order);
}
public void payOrder(String orderId) {
// 获取订单
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new IllegalArgumentException("订单不存在"));
// 执行支付业务逻辑
order.pay();
// 持久化更新
orderRepository.save(order);
// 发布领域事件
publishEvents(order);
}
private void publishEvents(Order order) {
order.getDomainEvents().forEach(eventPublisher::publish);
order.clearDomainEvents();
}
}
/**
* 订单仓储接口
*/
interface OrderRepository {
void save(Order order);
java.util.Optional<Order> findById(String orderId);
}
/**
* 主程序演示
*/
public class Main {
public static void main(String[] args) {
// 初始化组件
InventoryRepository inventoryRepository = new InMemoryInventoryRepository();
InventoryService inventoryService = new InventoryService(inventoryRepository);
OrderRepository orderRepository = new InMemoryOrderRepository();
DomainEventPublisher eventPublisher = DomainEventBus.getInstance();
// 注册事件订阅者
DomainEventBus.getInstance().register(inventoryService);
// 初始化订单应用服务
OrderApplicationService orderService = new OrderApplicationService(
orderRepository, eventPublisher
);
// 创建订单
orderService.createOrder("C001", Collections.emptyList());
// 支付订单(触发事件)
orderService.payOrder("ORD12345");
}
}
这个示例展示了DDD领域层事件驱动的
核心实现:
- 领域事件模型:
-
- 定义了
DomainEvent
接口和基础实现类BaseDomainEvent
- 创建了具体领域事件:
OrderCreatedEvent
、OrderPaidEvent
、InventoryDeductedEvent
- 定义了
- 事件总线机制:
-
DomainEventBus
作为单例事件总线,负责事件的注册和分发- 支持同步/异步事件处理模式
- 事件发布与订阅:
-
- 聚合根(Order)内部管理和发布事件
- 领域服务(InventoryService)通过实现
DomainEventSubscriber
接口订阅特定事件
- 事件驱动的业务流程:
-
- 订单支付(Order.pay())触发
OrderPaidEvent
- 库存服务订阅该事件并执行库存扣减
- 库存扣减后发布
InventoryDeductedEvent
供其他服务订阅
- 订单支付(Order.pay())触发
- 应用服务协调:
-
OrderApplicationService
负责事务管理和事件发布- 确保业务操作完成后发布所有领域事件
这种实现方式的优势:
- 业务逻辑解耦:订单和库存服务无需直接依赖
- 可扩展性:新增业务逻辑只需添加新的事件订阅者
- 一致性保障:通过事件机制保证最终一致性
- 可测试性:领域事件可作为测试断言的一部分
在实际项目中,你可能需要:
- 添加事件持久化机制,确保事件不丢失
- 实现事件重试和补偿机制
- 考虑使用消息队列(如RabbitMQ/Kafka)实现跨服务事件
- 添加事件版本控制和序列化机制