DDD架构实战 领域层 事件驱动

发布于:2025-06-07 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

核心实现:

这种实现方式的优势:

在实际项目中,你可能需要:


事件驱动往往是在一个微服务内部实现的

领域时间是DDD架构中比较常见的概念

在领域层内部的一个模型更改了状态或者发生了一些行为

向外发送一些通知

这些通知叫做领域时间

类似于MQ

这个通知不会在分布式系统内部传递

只会在单个微服务内部传递

和MQ一样 这样能解耦

我们可以通过发布事件的方式进行一种松耦合的通信

以下是DDD架构中领域层事件驱动的Java实现示例,展示如何通过领域事件实现松耦合的业务逻辑:

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
 * 领域事件接口
 * 所有领域事件需实现此接口
 */
interface DomainEvent {
    UUID getEventId();
    LocalDateTime getOccurredOn();
}

/**
 * 领域事件抽象基类
 * 提供事件ID和发生时间的基础实现
 */
abstract class BaseDomainEvent implements DomainEvent {
    private final UUID eventId;
    private final LocalDateTime occurredOn;

    protected BaseDomainEvent() {
        this.eventId = UUID.randomUUID();
        this.occurredOn = LocalDateTime.now();
    }

    @Override
    public UUID getEventId() {
        return eventId;
    }

    @Override
    public LocalDateTime getOccurredOn() {
        return occurredOn;
    }
}

/**
 * 订单已创建事件
 */
class OrderCreatedEvent extends BaseDomainEvent {
    private final String orderId;
    private final String customerId;

    public OrderCreatedEvent(String orderId, String customerId) {
        this.orderId = orderId;
        this.customerId = customerId;
    }

    public String getOrderId() {
        return orderId;
    }

    public String getCustomerId() {
        return customerId;
    }
}

/**
 * 订单已支付事件
 */
class OrderPaidEvent extends BaseDomainEvent {
    private final String orderId;
    private final LocalDateTime paymentTime;

    public OrderPaidEvent(String orderId, LocalDateTime paymentTime) {
        this.orderId = orderId;
        this.paymentTime = paymentTime;
    }

    public String getOrderId() {
        return orderId;
    }

    public LocalDateTime getPaymentTime() {
        return paymentTime;
    }
}

/**
 * 库存扣减事件
 */
class InventoryDeductedEvent extends BaseDomainEvent {
    private final String productId;
    private final int quantity;

    public InventoryDeductedEvent(String productId, int quantity) {
        this.productId = productId;
        this.quantity = quantity;
    }

    public String getProductId() {
        return productId;
    }

    public int getQuantity() {
        return quantity;
    }
}

/**
 * 领域事件发布者接口
 */
interface DomainEventPublisher {
    void publish(DomainEvent event);
}

/**
 * 领域事件订阅者接口
 */
interface DomainEventSubscriber<T extends DomainEvent> {
    void handleEvent(T event);
    Class<T> subscribedToEventType();
}

/**
 * 领域事件总线(单例实现)
 * 负责事件的注册、发布和分发
 */
class DomainEventBus implements DomainEventPublisher {
    private static final DomainEventBus INSTANCE = new DomainEventBus();
    private final List<DomainEventSubscriber<?>> subscribers = new ArrayList<>();

    private DomainEventBus() {}

    public static DomainEventBus getInstance() {
        return INSTANCE;
    }

    public <T extends DomainEvent> void register(DomainEventSubscriber<T> subscriber) {
        subscribers.add(subscriber);
    }

    @Override
    public void publish(DomainEvent event) {
        subscribers.stream()
            .filter(subscriber -> subscriber.subscribedToEventType().isInstance(event))
            .forEach(subscriber -> dispatch(event, subscriber));
    }

    @SuppressWarnings("unchecked")
    private <T extends DomainEvent> void dispatch(DomainEvent event, DomainEventSubscriber<?> subscriber) {
        // 同步处理事件
        ((DomainEventSubscriber<T>) subscriber).handleEvent((T) event);
        
        // 异步处理示例:
        // CompletableFuture.runAsync(() -> 
        //     ((DomainEventSubscriber<T>) subscriber).handleEvent((T) event)
        // );
    }
}

/**
 * 领域事件聚合根接口
 * 定义聚合根应具备的事件管理能力
 */
interface EventSourcedAggregateRoot {
    List<DomainEvent> getDomainEvents();
    void clearDomainEvents();
}

/**
 * 订单实体(聚合根)
 * 集成领域事件功能
 */
class Order implements EventSourcedAggregateRoot {
    private final String id;
    private final String customerId;
    private OrderStatus status;
    private final List<DomainEvent> domainEvents = new ArrayList<>();

    public Order(String id, String customerId) {
        this.id = id;
        this.customerId = customerId;
        this.status = OrderStatus.CREATED;
        
        // 创建订单时发布订单创建事件
        registerEvent(new OrderCreatedEvent(id, customerId));
    }

    public void pay() {
        if (status != OrderStatus.CREATED) {
            throw new IllegalStateException("订单状态错误,当前状态:" + status);
        }
        
        this.status = OrderStatus.PAID;
        
        // 支付时发布订单已支付事件
        registerEvent(new OrderPaidEvent(id, LocalDateTime.now()));
    }

    private void registerEvent(DomainEvent event) {
        domainEvents.add(event);
    }

    @Override
    public List<DomainEvent> getDomainEvents() {
        return new ArrayList<>(domainEvents);
    }

    @Override
    public void clearDomainEvents() {
        domainEvents.clear();
    }

    // Getters
    public String getId() {
        return id;
    }

    public String getCustomerId() {
        return customerId;
    }

    public OrderStatus getStatus() {
        return status;
    }
}

/**
 * 库存服务(领域服务)
 * 处理库存相关业务逻辑
 */
class InventoryService implements DomainEventSubscriber<OrderPaidEvent> {
    private final InventoryRepository inventoryRepository;

    public InventoryService(InventoryRepository inventoryRepository) {
        this.inventoryRepository = inventoryRepository;
    }

    @Override
    public void handleEvent(OrderPaidEvent event) {
        // 订单支付后扣减库存
        // 实际应用中需要根据订单ID查询订单详情获取商品信息
        // 这里简化处理
        System.out.println("处理订单支付事件:订单ID=" + event.getOrderId());
        
        // 扣减库存逻辑
        // inventoryRepository.deductStock(productId, quantity);
        
        // 发布库存扣减事件
        DomainEventBus.getInstance().publish(
            new InventoryDeductedEvent("P001", 1)
        );
    }

    @Override
    public Class<OrderPaidEvent> subscribedToEventType() {
        return OrderPaidEvent.class;
    }
}

/**
 * 库存仓储接口
 */
interface InventoryRepository {
    void deductStock(String productId, int quantity);
}

/**
 * 库存仓储实现(示例)
 */
class InMemoryInventoryRepository implements InventoryRepository {
    @Override
    public void deductStock(String productId, int quantity) {
        System.out.println("扣减库存:商品ID=" + productId + ",数量=" + quantity);
        // 实际实现会更新数据库中的库存记录
    }
}

/**
 * 订单服务(应用服务)
 * 协调领域对象和事件处理
 */
class OrderApplicationService {
    private final OrderRepository orderRepository;
    private final DomainEventPublisher eventPublisher;

    public OrderApplicationService(OrderRepository orderRepository, 
                                  DomainEventPublisher eventPublisher) {
        this.orderRepository = orderRepository;
        this.eventPublisher = eventPublisher;
    }

    public void createOrder(String customerId, List<OrderItem> items) {
        // 创建订单
        String orderId = UUID.randomUUID().toString();
        Order order = new Order(orderId, customerId);
        
        // 持久化订单
        orderRepository.save(order);
        
        // 发布领域事件
        publishEvents(order);
    }

    public void payOrder(String orderId) {
        // 获取订单
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new IllegalArgumentException("订单不存在"));
        
        // 执行支付业务逻辑
        order.pay();
        
        // 持久化更新
        orderRepository.save(order);
        
        // 发布领域事件
        publishEvents(order);
    }

    private void publishEvents(Order order) {
        order.getDomainEvents().forEach(eventPublisher::publish);
        order.clearDomainEvents();
    }
}

/**
 * 订单仓储接口
 */
interface OrderRepository {
    void save(Order order);
    java.util.Optional<Order> findById(String orderId);
}

/**
 * 主程序演示
 */
public class Main {
    public static void main(String[] args) {
        // 初始化组件
        InventoryRepository inventoryRepository = new InMemoryInventoryRepository();
        InventoryService inventoryService = new InventoryService(inventoryRepository);
        OrderRepository orderRepository = new InMemoryOrderRepository();
        DomainEventPublisher eventPublisher = DomainEventBus.getInstance();
        
        // 注册事件订阅者
        DomainEventBus.getInstance().register(inventoryService);
        
        // 初始化订单应用服务
        OrderApplicationService orderService = new OrderApplicationService(
            orderRepository, eventPublisher
        );
        
        // 创建订单
        orderService.createOrder("C001", Collections.emptyList());
        
        // 支付订单(触发事件)
        orderService.payOrder("ORD12345");
    }
}    

这个示例展示了DDD领域层事件驱动的

核心实现:

  1. 领域事件模型
    • 定义了DomainEvent接口和基础实现类BaseDomainEvent
    • 创建了具体领域事件:OrderCreatedEventOrderPaidEventInventoryDeductedEvent
  1. 事件总线机制
    • DomainEventBus作为单例事件总线,负责事件的注册和分发
    • 支持同步/异步事件处理模式
  1. 事件发布与订阅
    • 聚合根(Order)内部管理和发布事件
    • 领域服务(InventoryService)通过实现DomainEventSubscriber接口订阅特定事件
  1. 事件驱动的业务流程
    • 订单支付(Order.pay())触发OrderPaidEvent
    • 库存服务订阅该事件并执行库存扣减
    • 库存扣减后发布InventoryDeductedEvent供其他服务订阅
  1. 应用服务协调
    • OrderApplicationService负责事务管理和事件发布
    • 确保业务操作完成后发布所有领域事件

这种实现方式的优势:

  • 业务逻辑解耦:订单和库存服务无需直接依赖
  • 可扩展性:新增业务逻辑只需添加新的事件订阅者
  • 一致性保障:通过事件机制保证最终一致性
  • 可测试性:领域事件可作为测试断言的一部分

在实际项目中,你可能需要:

  1. 添加事件持久化机制,确保事件不丢失
  2. 实现事件重试和补偿机制
  3. 考虑使用消息队列(如RabbitMQ/Kafka)实现跨服务事件
  4. 添加事件版本控制和序列化机制