观察者模式详解实战

发布于:2025-04-11 ⋅ 阅读:(35) ⋅ 点赞:(0)

观察者模式深度解析与实战案例

一、传统观察者模式痛点分析

强制接收所有通知:观察者被迫处理无关事件
事件信息不透明:状态变更缺乏上下文信息
类型安全缺失:需要手动进行类型判断和转换
扩展成本高:新增事件类型需要修改接口

二、增强型观察者模式解决方案

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

// ================== 事件体系 ==================
/**
 * 事件标记接口(事件对象)
 */
interface Event {
    Order getSource();
}

/**
 * 订单创建事件 
 */
 //jdk16以上语法
record OrderCreatedEvent(Order order, Date createTime) implements Event {
    @Override
    public Order getSource() {
        return order;
    }
}

/**
 * 订单支付事件
 */
 //jdk16以上语法
record OrderPaidEvent(Order order, String paymentMethod) implements Event {
    @Override
    public Order getSource() {
        return order;
    }
}

/**
 * 订单完成事件
 */
 //jdk16以上语法
record OrderCompletedEvent(Order order, String operator) implements Event {
    @Override
    public Order getSource() {
        return order;
    }
}

// ================== 核心接口 ==================
@FunctionalInterface
interface EventListener<E extends Event> {
    void onEvent(E event);
}

class EventPublisher {
    // 线程安全的观察者注册表(事件类型 -> 监听器列表)
    private final Map<Class<? extends Event>, List<EventListener<?>>> listeners = new ConcurrentHashMap<>();
    
    // 对象锁解决并发修改问题
    private final Object lock = new Object();

    /**
     * 注册事件监听器(类型安全)
     * @param eventType 事件类型
     * @param listener 事件监听器
     */
    public <E extends Event> void subscribe(Class<E> eventType, EventListener<E> listener) {
        synchronized (lock) {
            listeners.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
                    .add(listener);
        }
    }

    /**
     * 取消注册(精确到具体监听器实例)
     */
    public <E extends Event> void unsubscribe(Class<E> eventType, EventListener<E> listener) {
        synchronized (lock) {
            List<EventListener<?>> list = listeners.get(eventType);
            if (list != null) {
                list.removeIf(l -> l == listener);
            }
        }
    }

    /**
     * 发布事件(支持事件溯源)
     */
    public void publish(Event event) {
        Class<? extends Event> eventType = event.getClass();
        List<EventListener<?>> eventListeners = listeners.get(eventType);
        
        if (eventListeners != null) {
            for (EventListener<?> listener : eventListeners) {
                // 类型安全的回调(无需强制转换)
                @SuppressWarnings("unchecked")
                EventListener<Event> specificListener = (EventListener<Event>) listener;
                specificListener.onEvent(event);
            }
        }
    }
}

// ================== 业务对象 ==================
class Order {
    private final String orderId;
    private OrderStatus status;
    private final List<String> statusHistory = new ArrayList<>();

    public Order(String orderId) {
        this.orderId = orderId;
        this.status = OrderStatus.CREATED;
        statusHistory.add("Order created at " + new Date());
    }

    // 状态变更方法(封装状态转换逻辑)
    public void pay(String paymentMethod) {
        if (status != OrderStatus.CREATED) {
            throw new IllegalStateException("Invalid order status for payment");
        }
        status = OrderStatus.PAID;
        statusHistory.add("Paid with " + paymentMethod + " at " + new Date());
    }

    public void complete(String operator) {
        if (status != OrderStatus.PAID) {
            throw new IllegalStateException("Order must be paid before completion");
        }
        status = OrderStatus.COMPLETED;
        statusHistory.add("Completed by " + operator + " at " + new Date());
    }

    public String getOrderId() { return orderId; }
    public OrderStatus getStatus() { return status; }
    public List<String> getStatusHistory() { return Collections.unmodifiableList(statusHistory); }

    enum OrderStatus { CREATED, PAID, COMPLETED }
}

// ================== 监听器实现 ==================
class InventoryService implements EventListener<OrderPaidEvent> {
    @Override
    public void onEvent(OrderPaidEvent event) {
        Order order = event.getSource();
        System.out.printf("[库存系统] 订单 %s 已支付,准备出库(支付方式:%s)%n",
                order.getOrderId(), event.paymentMethod());
    }
}

class PaymentService implements EventListener<OrderCompletedEvent> {
    @Override
    public void onEvent(OrderCompletedEvent event) {
        Order order = event.getSource();
        System.out.printf("[支付系统] 订单 %s 已完成,准备结算(操作员:%s)%n",
                order.getOrderId(), event.operator());
    }
}

class AuditLogger implements EventListener<Event> {
    @Override
    public void onEvent(Event event) {
        Order order = event.getSource();
        System.out.printf("[审计日志] 订单 %s 状态变更:%s%n",
                order.getOrderId(), event.getClass().getSimpleName());
    }
}

// ================== 使用示例 ==================
public class EnhancedObserverPattern {
    public static void main(String[] args) {
        // 初始化事件总线
        EventPublisher publisher = new EventPublisher();
        
        // 注册监听器
        InventoryService inventory = new InventoryService();
        PaymentService payment = new PaymentService();
        AuditLogger logger = new AuditLogger();

        publisher.subscribe(OrderPaidEvent.class, inventory);
        publisher.subscribe(OrderCompletedEvent.class, payment);
        publisher.subscribe(Event.class, logger);  // 监听所有事件

        // 创建订单并触发事件
        Order order = new Order("ORDER_20230720_001");
        publisher.publish(new OrderCreatedEvent(order, new Date()));

        // 支付订单
        order.pay("支付宝");
        publisher.publish(new OrderPaidEvent(order, "支付宝"));

        // 完成订单
        order.complete("操作员A");
        publisher.publish(new OrderCompletedEvent(order, "操作员A"));

        // 输出审计日志
        System.out.println("\n=== 订单状态历史 ===");
        order.getStatusHistory().forEach(System.out::println);
    }
}

三、关键改进点解析

事件对象封装:
使用不同事件类型(OrderCreatedEvent, OrderPaidEvent 等)封装状态变更上下文
事件携带完整的业务上下文(支付方式、操作人员等)

类型安全监听:

// 泛型约束确保类型安全
publisher.subscribe(OrderPaidEvent.class, inventory);

编译器自动检查事件类型与监听器的匹配性
消除运行时的类型转换风险

精细事件过滤:

// 审计服务监听所有事件
publisher.subscribe(Event.class, logger);

支持监听具体事件类型或全部事件
通过事件继承体系实现多级监听

线程安全保障:
使用ConcurrentHashMap + CopyOnWriteArrayList保证线程安全
同步块控制关键修改操作

synchronized (lock) {
    listeners.computeIfAbsent(...)
}

内存泄漏防护:

// 精确取消注册
publisher.unsubscribe(OrderPaidEvent.class, inventory);

提供明确的取消订阅接口
支持基于事件类型和监听器实例的精确注销

四、实战场景问题解决

订单状态追踪:

class Order {
    private final List<String> statusHistory = new ArrayList<>();
}

完整记录状态变更历史
支持业务追溯和审计
分布式系统适配:

// 事件发布方法可扩展为消息队列发送
public void publish(Event event) {
    // 本地通知
    notifyLocal(event);
    // 发送到消息队列
    kafkaTemplate.send("order-events", event);
}

方便扩展为分布式事件驱动架构

性能优化:

// 使用CopyOnWriteArrayList保证遍历安全
List<EventListener<?>> list = new CopyOnWriteArrayList<>();

写时复制集合优化读多写少场景
事件处理异步化(可扩展)

领域事件驱动:

record OrderCreatedEvent(Order order, Date createTime) implements Event

符合领域驱动设计(DDD)理念
事件携带完整的业务语义

五、模式扩展建议

异步事件处理:

// 使用线程池处理事件
ExecutorService executor = Executors.newFixedThreadPool(4);

public void publish(Event event) {
    executor.submit(() -> {
        // 事件处理逻辑
    });
}

提升系统吞吐量
防止阻塞主业务流程

事务事件管理:

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderPaidEvent(OrderPaidEvent event) {
    // 数据库事务提交后处理
}

集成Spring事务事件机制
确保事件处理与事务一致性
事件溯源(Event Sourcing):

class EventStore {
    public void save(Event event) {
        // 持久化事件到数据库
    }
}

完整保存所有状态变更事件
支持状态重建和业务追溯

六、设计模式对比

特性 传统观察者模式 增强型事件驱动模式
事件区分 需要手动判断类型 天然支持多事件类型
上下文信息 有限的参数传递 完整的事件对象封装
线程安全 需要自行实现 内置并发控制机制
扩展性 修改接口影响现有实现 新增事件类型无需修改接口
领域模型契合度 面向技术实现 符合领域驱动设计理念

七、适用场景推荐

电商订单状态变更通知
物联网设备状态监控
微服务架构中的领域事件传播
GUI事件处理系统
实时数据监控仪表盘
分布式系统配置更新

比较典型的观察者模式:
鸿蒙通知服务系统
鸿蒙公共事件系统
这两个系统的docs里面都有对应的订阅以及取消订阅接口,通过事件的订阅,然后系统侧去分发事件和通知。

通过这种增强型观察者模式实现,我们不仅解决了传统实现中的类型安全和扩展性问题,还为系统演进为事件驱动架构(EDA)打下了坚实基础。在实际开发中,可以结合具体框架(如Spring Event)进行进一步优化,实现更强大的事件处理能力。

一句话总结

观察者模式是定义对象间一种一对多的依赖关系,使得每当一个对象状态发生变化时,其所有依赖者都会得到通知并被自动更新。