观察者模式深度解析与实战案例
一、传统观察者模式痛点分析
强制接收所有通知:观察者被迫处理无关事件
事件信息不透明:状态变更缺乏上下文信息
类型安全缺失:需要手动进行类型判断和转换
扩展成本高:新增事件类型需要修改接口
二、增强型观察者模式解决方案
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
// ================== 事件体系 ==================
/**
* 事件标记接口(事件对象)
*/
interface Event {
Order getSource();
}
/**
* 订单创建事件
*/
//jdk16以上语法
record OrderCreatedEvent(Order order, Date createTime) implements Event {
@Override
public Order getSource() {
return order;
}
}
/**
* 订单支付事件
*/
//jdk16以上语法
record OrderPaidEvent(Order order, String paymentMethod) implements Event {
@Override
public Order getSource() {
return order;
}
}
/**
* 订单完成事件
*/
//jdk16以上语法
record OrderCompletedEvent(Order order, String operator) implements Event {
@Override
public Order getSource() {
return order;
}
}
// ================== 核心接口 ==================
@FunctionalInterface
interface EventListener<E extends Event> {
void onEvent(E event);
}
class EventPublisher {
// 线程安全的观察者注册表(事件类型 -> 监听器列表)
private final Map<Class<? extends Event>, List<EventListener<?>>> listeners = new ConcurrentHashMap<>();
// 对象锁解决并发修改问题
private final Object lock = new Object();
/**
* 注册事件监听器(类型安全)
* @param eventType 事件类型
* @param listener 事件监听器
*/
public <E extends Event> void subscribe(Class<E> eventType, EventListener<E> listener) {
synchronized (lock) {
listeners.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
.add(listener);
}
}
/**
* 取消注册(精确到具体监听器实例)
*/
public <E extends Event> void unsubscribe(Class<E> eventType, EventListener<E> listener) {
synchronized (lock) {
List<EventListener<?>> list = listeners.get(eventType);
if (list != null) {
list.removeIf(l -> l == listener);
}
}
}
/**
* 发布事件(支持事件溯源)
*/
public void publish(Event event) {
Class<? extends Event> eventType = event.getClass();
List<EventListener<?>> eventListeners = listeners.get(eventType);
if (eventListeners != null) {
for (EventListener<?> listener : eventListeners) {
// 类型安全的回调(无需强制转换)
@SuppressWarnings("unchecked")
EventListener<Event> specificListener = (EventListener<Event>) listener;
specificListener.onEvent(event);
}
}
}
}
// ================== 业务对象 ==================
class Order {
private final String orderId;
private OrderStatus status;
private final List<String> statusHistory = new ArrayList<>();
public Order(String orderId) {
this.orderId = orderId;
this.status = OrderStatus.CREATED;
statusHistory.add("Order created at " + new Date());
}
// 状态变更方法(封装状态转换逻辑)
public void pay(String paymentMethod) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Invalid order status for payment");
}
status = OrderStatus.PAID;
statusHistory.add("Paid with " + paymentMethod + " at " + new Date());
}
public void complete(String operator) {
if (status != OrderStatus.PAID) {
throw new IllegalStateException("Order must be paid before completion");
}
status = OrderStatus.COMPLETED;
statusHistory.add("Completed by " + operator + " at " + new Date());
}
public String getOrderId() { return orderId; }
public OrderStatus getStatus() { return status; }
public List<String> getStatusHistory() { return Collections.unmodifiableList(statusHistory); }
enum OrderStatus { CREATED, PAID, COMPLETED }
}
// ================== 监听器实现 ==================
class InventoryService implements EventListener<OrderPaidEvent> {
@Override
public void onEvent(OrderPaidEvent event) {
Order order = event.getSource();
System.out.printf("[库存系统] 订单 %s 已支付,准备出库(支付方式:%s)%n",
order.getOrderId(), event.paymentMethod());
}
}
class PaymentService implements EventListener<OrderCompletedEvent> {
@Override
public void onEvent(OrderCompletedEvent event) {
Order order = event.getSource();
System.out.printf("[支付系统] 订单 %s 已完成,准备结算(操作员:%s)%n",
order.getOrderId(), event.operator());
}
}
class AuditLogger implements EventListener<Event> {
@Override
public void onEvent(Event event) {
Order order = event.getSource();
System.out.printf("[审计日志] 订单 %s 状态变更:%s%n",
order.getOrderId(), event.getClass().getSimpleName());
}
}
// ================== 使用示例 ==================
public class EnhancedObserverPattern {
public static void main(String[] args) {
// 初始化事件总线
EventPublisher publisher = new EventPublisher();
// 注册监听器
InventoryService inventory = new InventoryService();
PaymentService payment = new PaymentService();
AuditLogger logger = new AuditLogger();
publisher.subscribe(OrderPaidEvent.class, inventory);
publisher.subscribe(OrderCompletedEvent.class, payment);
publisher.subscribe(Event.class, logger); // 监听所有事件
// 创建订单并触发事件
Order order = new Order("ORDER_20230720_001");
publisher.publish(new OrderCreatedEvent(order, new Date()));
// 支付订单
order.pay("支付宝");
publisher.publish(new OrderPaidEvent(order, "支付宝"));
// 完成订单
order.complete("操作员A");
publisher.publish(new OrderCompletedEvent(order, "操作员A"));
// 输出审计日志
System.out.println("\n=== 订单状态历史 ===");
order.getStatusHistory().forEach(System.out::println);
}
}
三、关键改进点解析
事件对象封装:
使用不同事件类型(OrderCreatedEvent, OrderPaidEvent 等)封装状态变更上下文
事件携带完整的业务上下文(支付方式、操作人员等)
类型安全监听:
// 泛型约束确保类型安全
publisher.subscribe(OrderPaidEvent.class, inventory);
编译器自动检查事件类型与监听器的匹配性
消除运行时的类型转换风险
精细事件过滤:
// 审计服务监听所有事件
publisher.subscribe(Event.class, logger);
支持监听具体事件类型或全部事件
通过事件继承体系实现多级监听
线程安全保障:
使用ConcurrentHashMap + CopyOnWriteArrayList保证线程安全
同步块控制关键修改操作
synchronized (lock) {
listeners.computeIfAbsent(...)
}
内存泄漏防护:
// 精确取消注册
publisher.unsubscribe(OrderPaidEvent.class, inventory);
提供明确的取消订阅接口
支持基于事件类型和监听器实例的精确注销
四、实战场景问题解决
订单状态追踪:
class Order {
private final List<String> statusHistory = new ArrayList<>();
}
完整记录状态变更历史
支持业务追溯和审计
分布式系统适配:
// 事件发布方法可扩展为消息队列发送
public void publish(Event event) {
// 本地通知
notifyLocal(event);
// 发送到消息队列
kafkaTemplate.send("order-events", event);
}
方便扩展为分布式事件驱动架构
性能优化:
// 使用CopyOnWriteArrayList保证遍历安全
List<EventListener<?>> list = new CopyOnWriteArrayList<>();
写时复制集合优化读多写少场景
事件处理异步化(可扩展)
领域事件驱动:
record OrderCreatedEvent(Order order, Date createTime) implements Event
符合领域驱动设计(DDD)理念
事件携带完整的业务语义
五、模式扩展建议
异步事件处理:
// 使用线程池处理事件
ExecutorService executor = Executors.newFixedThreadPool(4);
public void publish(Event event) {
executor.submit(() -> {
// 事件处理逻辑
});
}
提升系统吞吐量
防止阻塞主业务流程
事务事件管理:
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderPaidEvent(OrderPaidEvent event) {
// 数据库事务提交后处理
}
集成Spring事务事件机制
确保事件处理与事务一致性
事件溯源(Event Sourcing):
class EventStore {
public void save(Event event) {
// 持久化事件到数据库
}
}
完整保存所有状态变更事件
支持状态重建和业务追溯
六、设计模式对比
特性 | 传统观察者模式 | 增强型事件驱动模式 |
---|---|---|
事件区分 | 需要手动判断类型 | 天然支持多事件类型 |
上下文信息 | 有限的参数传递 | 完整的事件对象封装 |
线程安全 | 需要自行实现 | 内置并发控制机制 |
扩展性 | 修改接口影响现有实现 | 新增事件类型无需修改接口 |
领域模型契合度 | 面向技术实现 | 符合领域驱动设计理念 |
七、适用场景推荐
电商订单状态变更通知
物联网设备状态监控
微服务架构中的领域事件传播
GUI事件处理系统
实时数据监控仪表盘
分布式系统配置更新
比较典型的观察者模式:
鸿蒙通知服务系统
鸿蒙公共事件系统
这两个系统的docs里面都有对应的订阅以及取消订阅接口,通过事件的订阅,然后系统侧去分发事件和通知。
通过这种增强型观察者模式实现,我们不仅解决了传统实现中的类型安全和扩展性问题,还为系统演进为事件驱动架构(EDA)打下了坚实基础。在实际开发中,可以结合具体框架(如Spring Event)进行进一步优化,实现更强大的事件处理能力。
一句话总结
观察者模式是定义对象间一种一对多的依赖关系,使得每当一个对象状态发生变化时,其所有依赖者都会得到通知并被自动更新。