目录
参考
设计模式:可复用面向对象软件的基础(典藏版) - 埃里克·伽玛 - 微信读书
项目地址
NitasDemo/10DesignPatterns/DesignPatterns/ObserverPattern at main · Nita121388/NitasDemo
一、概述
观察者模式是一种行为设计模式
,用于定义对象之间的一对多依赖关系。当一个对象(被观察者)的状态发生改变时,所有依赖于它的对象(观察者)都会自动得到通知并更新。
1.1 动机
将一个系统分割成一系列相互协作的类有一个常见的副作用:需要维护相关对象间的一致性。我们不希望为了维持一致性而使各类紧密耦合,因为这样降低了其可复用性。
1.2 核心思想
- 解耦:将被观察者与观察者解耦,使它们之间通过接口或事件机制交互。
- 自动通知:当被观察者状态改变时,自动通知所有观察者。
1.3 别名
依赖模式:强调对象间的依赖关系。
发布–订阅模式:发布者将消息发送给多个订阅者。
模型-视图模式:模型变化时,视图自动更新。
源-监听器模式:事件源触发事件后通知所有监听器。
从属者模式:从属者依赖于其他对象的状态变化。
这些别名都体现了观察者模式的核心思想:定义对象间的一对多依赖关系,实现状态变化的自动通知。
二、角色与实现原理
2.1 角色
- Subject(目标/主题/被观察者)
- 维护一个观察者列表,允许观察者订阅或取消订阅。
- 当状态改变时,通知所有观察者。
- Observer(观察者)
- 观察者将对观察目标的改变做出反应。观察者一般定义为接口,该接口声明了更新数据的方法update(),因此又称为抽象观察者。
- ConcreteSubject(具体被观察者)
- 实现Subject接口,维护自身状态,并在状态改变时通知观察者。
- ConcreteObserver(具体观察者)
- 实现Observer接口,根据被观察者的状态改变做出相应反应。
2.2 实现原理
- 被观察者维护观察者列表
- 被观察者类中包含一个观察者列表,用于存储所有订阅的观察者对象。
- 被观察者类可以注册与注销观察者,提供方法允许观察者对象注册到被观察者列表中,或从列表中注销。
- 通知机制
- 当被观察者状态改变时,遍历观察者列表,调用每个观察者的更新方法。
2.3 类图
三、经典接口实现
Gang of Four(GoF)是指四位著名软件设计模式专家(Erich Gamma、Richard Helm、Ralph Johnson 和 John Vlissides)在1994年出版的《设计模式》。
GoF模式本质*:通过接口规范化观察者模式中的角色职责,强调设计契约优先,适用于需要长期维护、高可扩展性的复杂系统架构设计。*
核心思想:通过显式接口定义观察者和主题的关系。
3.1 示例
3.1.1 观察者接口
// 观察者接口
public interface IObserver
{
void Update(string message);
}
3.1.2 目标接口
// 目标接口
public interface ISubject
{
void Attach(IObserver observer);
void Detach(IObserver observer);
void Notify();
}
3.1.3 具体被观察者
// 具体目标
public class ConcreteSubject : ISubject
{
// 观察者列表
private List<IObserver> _observers = new();
// 目标状态
private string _state;
// 注册观察者: 将观察者对象注册到目标对象中
public void Attach(IObserver observer) => _observers.Add(observer);
// 注销观察者: 移除一个观察者
public void Detach(IObserver observer) => _observers.Remove(observer);
// 通知观察者: 改变目标对象的状态,触发通知
public void Notify()
{
foreach (var observer in _observers)
observer.Update(_state);
}
// 设置目标状态: 改变目标对象的状态
public void SetState(string state)
{
_state = state;
Notify();
}
}
3.1.4 具体观察者
// 具体观察者
public class ConcreteObserver : IObserver
{
// 接收通知并处理
public void Update(string message) => Console.WriteLine($"Received: {message}");
}
3.1.5 Client
using System;
class Program
{
static void Main(string[] args)
{
// 创建具体目标对象
ConcreteSubject subject = new ConcreteSubject();
// 创建多个具体观察者对象
ConcreteObserver observer1 = new ConcreteObserver();
ConcreteObserver observer2 = new ConcreteObserver();
ConcreteObserver observer3 = new ConcreteObserver();
// 将观察者对象注册到目标对象中
subject.Attach(observer1);
subject.Attach(observer2);
subject.Attach(observer3);
// 改变目标对象的状态,触发通知
Console.WriteLine("第一次状态更新:");
subject.SetState("Hello, Observers!");
// 移除一个观察者
subject.Detach(observer2);
// 再次改变目标对象的状态,触发通知
Console.WriteLine("\n第二次状态更新:");
subject.SetState("State has changed!");
}
}
结果:
第一次状态更新:
Received: Hello, Observers!
Received: Hello, Observers!
Received: Hello, Observers!
第二次状态更新:
Received: State has changed!
Received: State has changed!
3.1.6 UML时序图
3.2 特点
- 符合设计模式原生定义,代码结构清晰。
- 强类型约束,编译时检查接口实现。
- 对语言无特殊要求,通用性强。
- 显式依赖关系,逻辑透明。
- 适用于简单场景,无需框架支持。
四、其他实现方式
4.1 委托与事件(.NET 原生实现)
- 机制:利用语言或框架提供的事件监听机制,被观察者触发事件,观察者通过监听器接收事件。
4.1.1 示例
public class EventSubject
{
public event EventHandler<string> StateChanged;
private string _state;
public void SetState(string state)
{
_state = state;
StateChanged?.Invoke(this, _state);
}
}
public class EventObserver
{
public void Subscribe(EventSubject subject)
{
subject.StateChanged += HandleStateChange;
}
private void HandleStateChange(object sender, string message)
{
Console.WriteLine($"Event received: {message}");
}
public void Unsubscribe(EventSubject subject)
{
subject.StateChanged -= HandleStateChange;
}
}
class Program
{
static void Main(string[] args)
{
// 创建 EventSubject 和 EventObserver 对象
EventSubject subject = new EventSubject();
EventObserver observer = new EventObserver();
// 订阅事件
observer.Subscribe(subject);
Console.WriteLine("Observer has subscribed to the subject.");
// 改变状态,触发事件
subject.SetState("State 1");
subject.SetState("State 2");
// 取消订阅
observer.Unsubscribe(subject);
Console.WriteLine("Observer has unsubscribed from the subject.");
// 再次改变状态,观察是否还会触发事件
subject.SetState("State 3"); // 不会触发事件,因为已取消订阅
}
}
结果
Observer has subscribed to the subject.
Event received: State 1
Event received: State 2
Observer has unsubscribed from the subject.
4.1.2 UML类图
4.1.3 特点
- 代码更加简洁,轻量级,利用语言的内置特性,减少了手动管理观察者列表的复杂性。
- 内置线程安全的事件触发机制(?.Invoke)
- 支持多播(多个观察者订阅同一事件)
- 对于一些复杂的业务逻辑,可能无法完全满足需求,因为事件机制通常是基于固定的事件类型和参数进行设计的,不够灵活。
- 而且如果事件的定义不合理,可能会导致系统的可扩展性和维护性变差。
- 无法跨模块解耦(需直接访问事件)。
4.1.4 适用场景
- GUI 事件处理(如按钮点击)。
- 单模块内的局部解耦。
- 适合简单通知逻辑且不涉及复杂数据流的场景。
4.2 IObservable 和 IObserver 接口
核心思想:使用.NET框架内置的观察者模式标准化接口。
4.2.1 接口概述
接口 | 角色 | 职责 |
---|---|---|
`IObservable<T>` | 被观察对象 | 数据/事件的生产者 |
`IObserver<T>` | 观察者 | 数据/事件的消费者 |
4.2.1.1 被观察者接口 : IObservable
namespace System
{
/// <summary>
/// 定义了一个基于推送的事件通知提供者/ 被观察者
/// </summary>
/// <typeparam name="T">提供通知信息的对象类型。</typeparam>
public interface IObservable<out T>
{
/// <summary>
/// 通知提供者有一个观察者将要接收通知。
/// </summary>
/// observer">将要接收通知的对象。
/// <returns>一个接口引用,允许观察者在提供者完成发送通知之前停止接收通知。</returns>
IDisposable Subscribe(IObserver<T> observer);
}
}
IObservable<T>
是一个接口,属于 C# 中的事件驱动编程模型,是响应式编程(Reactive Programming)的核心接口之一。
它定义了一个基于推送的事件通知机制,允许观察者(IObserver<T>
)订阅
通知源(IObservable<T>
),并在通知源产生数据或事件时接收通知。
- 泛型参数
T
:表示通知中携带的数据类型。 - Subscribe方法:是
IObservable<T>
的核心方法。- 它接收一个实现了
IObserver<T>
接口的对象作为参数,表示观察者。 - 当调用
Subscribe
方法时,观察者会注册到通知源,从而能够接收通知。 - 方法返回一个
IDisposable
对象,观察者可以通过调用其Dispose
方法来取消订阅,停止接收通知。
- 它接收一个实现了
4.2.1.2 观察者接口 : IObserver
namespace System
{
/// <summary>
/// 提供一种接收基于推送的通知的机制。//观察者
/// </summary>
/// <typeparam name="T">提供通知信息的对象类型。</typeparam>
public interface IObserver<in T>
{
/// <summary>
/// 向观察者提供新的数据。
/// </summary>
/// <param name="value">当前的通知信息。</param>
void OnNext(T value);
/// <summary>
/// 通知观察者提供者遇到了错误条件。
/// </summary>
/// <param name="error">一个提供有关错误的额外信息的对象。</param>
void OnError(Exception error);
/// <summary>
/// 通知观察者提供者已经完成发送基于推送的通知。
/// </summary>
void OnCompleted();
}
}
IObserver<T>
它定义了一个观察者的角色,用于接收来自通知源(IObservable<T>
)的推送通知。
- 泛型参数
T
:表示通知中携带的数据类型。 OnNext
方法:当通知源有新的数据可用时,调用此方法向观察者传递数据。参数value
是当前的通知信息。OnError
方法:当通知源在发送通知过程中遇到错误时,调用此方法通知观察者。参数error
是一个Exception
对象,提供有关错误的详细信息。OnCompleted
方法:当通知源完成所有通知的发送后,调用此方法通知观察者。这表示通知源不会再发送任何新的通知。
4.2.2 示例
4.2.2.1 具体被观察者Subject:实现 IObservable
4.2.2.1.1 订阅管理 (Subscribe
方法)
public IDisposable Subscribe(IObserver<string> observer) {
lock (_lock) {
if (!_observers.Contains(observer)) {
_observers.Add(observer);
}
}
return new Unsubscriber(_observers, observer, _lock);
}
功能:允许观察者订阅主题。
线程安全:通过
lock
确保多线程下订阅操作的原子性。防止重复订阅:检查观察者是否已存在。
返回
Unsubscriber
:通过IDisposable
实现优雅的取消订阅机制。关于
IDisposable
,可以查看我的另一篇文章C#中的非托管资源释放机制详解|Finalizer与Dispose模式-CSDN博客。
4.2.2.1.2 取消订阅 (Unsubscriber
类)
private class Unsubscriber : IDisposable {
// ... 略去字段和构造函数 ...
public void Dispose() {
lock (_lock) {
if (_observer != null && _observers.Contains(_observer)) {
_observers.Remove(_observer);
_observer = null;
}
}
}
}
- 功能:调用
Dispose()
时从观察者列表中移除目标观察者。 - 资源释放:移除后置空引用,避免内存泄漏。
- 线程安全:通过
lock
确保取消订阅的原子性。
4.2.2.1.3 状态通知 (NotifyObservers
方法)
public void NotifyObservers(string state) {
lock (_lock) {
foreach (var observer in _observers) {
observer.OnNext(state);
}
}
}
- 功能:遍历所有观察者,调用其
OnNext
方法推送新状态。 - 线程安全:遍历期间锁定列表,防止并发修改。
4.2.2.1.4 完成与错误通知 (OnCompleted
和 OnError
)
public void OnCompleted() {
lock (_lock) {
foreach (var observer in _observers) {
observer.OnCompleted();
}
_observers.Clear();
}
}
public void OnError(Exception error) {
lock (_lock) {
foreach (var observer in _observers) {
observer.OnError(error);
}
_observers.Clear();
}
}
- 完成通知:调用所有观察者的
OnCompleted()
,清空列表(终止后续通知)。 - 错误通知:调用所有观察者的
OnError()
,清空列表。 - 线程安全:全程加锁。
4.2.2.1.5 线程安全设计
- 锁对象
_lock
:所有对观察者列表的操作(增、删、遍历)均通过lock (_lock)
确保原子性。 - 场景覆盖:
- 多线程同时订阅/取消订阅。
- 通知过程中触发新的订阅/取消订阅。
4.2.2.1.6 全部代码
具体目标
using System; using System.Collections.Generic; using System.Threading; // Subject 类实现了 IObservable<string> 接口,用于管理观察者并通知状态变化 public class Subject : IObservable<string> { // 用于存储所有订阅的观察者 private List> _observers = new(); // 用于线程安全的锁对象 private readonly object _lock = new(); // 订阅方法,允许观察者订阅状态变化 public IDisposable Subscribe(IObserver observer) { lock (_lock) // 确保线程安全 { if (!_observers.Contains(observer)) // 防止重复订阅 { _observers.Add(observer); } } // 返回一个 Unsubscriber 对象,用于取消订阅 return new Unsubscriber(_observers, observer, _lock); } // Unsubscriber 类实现了 IDisposable 接口,用于取消观察者的订阅 private class Unsubscriber : IDisposable { private List> _observers; private IObserver _observer; private readonly object _lock; // 构造函数,初始化观察者列表、当前观察者和锁对象 public Unsubscriber(List> observers, IObserver observer, object lockObj) { _observers = observers; _observer = observer; _lock = lockObj; } // Dispose 方法用于取消订阅 public void Dispose() { lock (_lock) // 确保线程安全 { if (_observer != null && _observers.Contains(_observer)) { _observers.Remove(_observer); // 从观察者列表中移除当前观察者 _observer = null; // 清空当前观察者引用 } } } } // SetState 方法用于设置状态并通知所有观察者 public void NotifyObservers(string state) { lock (_lock) // 确保线程安全 { foreach (var observer in _observers) { observer.OnNext(state); // 调用观察者的 OnNext 方法通知状态变化 } } } // OnCompleted 方法用于通知所有观察者完成事件 public void OnCompleted() { lock (_lock) // 确保线程安全 { foreach (var observer in _observers) { observer.OnCompleted(); // 调用观察者的 OnCompleted 方法通知完成事件 } _observers.Clear(); // 清空观察者列表 } } // OnError 方法用于通知所有观察者发生错误 public void OnError(Exception error) { lock (_lock) // 确保线程安全 { foreach (var observer in _observers) { observer.OnError(error); // 调用观察者的 OnError 方法通知错误事件 } _observers.Clear(); // 清空观察者列表 } } }
具体观察者
ConcreteObserver
类实现了IObserver<string>
接口,用于接收被观察者的状态变化通知。OnNext
方法:接收状态变化通知,并输出状态信息。OnError
方法:接收错误通知,并输出错误信息。OnCompleted
方法:接收完成通知,并输出完成信息。
// ConcreteObserver 类实现了 IObserver<string> 接口,用于接收状态变化通知 public class ConcreteObserver : IObserver<string> { // 观察者的名称,用于区分不同的观察者 private readonly string _name; // 构造函数,初始化观察者名称 public ConcreteObserver(string name) { _name = name; } // OnNext 方法用于接收状态变化通知 public void OnNext(string value) { Console.WriteLine($"{_name} received: {value}"); // 输出接收到的状态信息 } // OnError 方法用于接收错误通知 public void OnError(Exception error) { Console.WriteLine($"{_name} received an error: {error.Message}"); // 输出错误信息 } // OnCompleted 方法用于接收完成通知 public void OnCompleted() { Console.WriteLine($"{_name} received completion notification."); // 输出完成通知 } }
客户端使用实例(Client)
using System; namespace IObservableTDemo { class Program { static void Main(string[] args) { // 1. 创建被观察者和观察者 Subject subject = new Subject(); ConcreteObserver observer1 = new ConcreteObserver("observer 1"); ConcreteObserver observer2 = new ConcreteObserver("observer 2"); // 2. 订阅观察者 IDisposable subscription1 = subject.Subscribe(observer1); IDisposable subscription2 = subject.Subscribe(observer2); // 状态通知 subject.NotifyObservers("Hello, World!"); // 取消订阅 observer2 subscription2.Dispose(); // 再次设置状态,观察者1会收到通知,观察者2不会收到 subject.NotifyObservers("Hello again!"); // 模拟错误 此时会清空观察者列表 subject.OnError(new Exception("Something went wrong!")); // 再次设置状态,观察者1和观察者2都不会收到通知 subject.NotifyObservers("Hello again!"); // 再次订阅观察者 IDisposable subscription3 = subject.Subscribe(observer1); // 再次设置状态,观察者1收到通知 subject.NotifyObservers("Hello again!"); // 完成通知 subject.OnCompleted(); //再次设置状态,都不会收到通知 subject.NotifyObservers("Hello again!"); // 等待用户输入后退出 Console.WriteLine("Press any key to exit..."); Console.ReadKey(); } } }
结果
Observer 1 received: Hello, World! Observer 2 received: Hello, World! Observer 1 received: Hello again! Observer 1 received an error: Something went wrong! Observer 3 received: Hello again! Observer 3 received completion notification. Press any key to exit...
4.2.3 UML类图
4.2.4 扩展内容
4.2.4.1 异步通知
可以通过Task
或async/await
来实现异步通知。
异步接口
- 每个方法都返回Task以支持异步操作
- 完全异步的观察者接口
public interface IAsyncObserver<in T> { Task OnNextAsync(T value); Task OnErrorAsync(Exception exception); Task OnCompletedAsync(); }
顺序异步通知机制
- 严格按顺序通知观察者
- 每个观察者处理完成后再通知下一个
- 保留通知顺序性
foreach (var observer in observersCopy) { try { await observer.OnNextAsync(value); } // ... }
完整代码
using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Threading; #region Client Code var subject = new AsyncSubject<string>(); var observer1 = new AsyncObserver("observer 1"); var observer2 = new AsyncObserver("observer 2"); // 订阅观察者 using var subscription1 = subject.Subscribe(observer1); using var subscription2 = subject.Subscribe(observer2); // 异步通知 await subject.NotifyAsync("First Message"); // 取消订阅 observer2 subscription2.Dispose(); // 再次通知 await subject.NotifyAsync("Second Message"); // 错误通知 await subject.NotifyErrorAsync(new Exception("Test Error")); // 完成通知 await subject.OnCompletedAsync(); Console.WriteLine("Press any key to exit..."); Console.ReadKey(); #endregion #region Interfaces public interface IAsyncObserver<in T> { Task OnNextAsync(T value); Task OnErrorAsync(Exception exception); Task OnCompletedAsync(); } public interface IAsyncObservable<out T> { IDisposable Subscribe(IAsyncObserver observer); } #endregion #region Async Subject public class AsyncSubject<T> : IAsyncObservable<T> { private readonly List> _observers = new(); private readonly object _lock = new(); public IDisposable Subscribe(IAsyncObserver observer) { lock (_lock) { if (!_observers.Contains(observer)) { _observers.Add(observer); } } return new Unsubscriber(() => { lock (_lock) { _observers.Remove(observer); } }); } public async Task NotifyAsync(T value) { IAsyncObserver<T>[] observersCopy; lock (_lock) { observersCopy = _observers.ToArray(); } foreach (var observer in observersCopy) { try { await observer.OnNextAsync(value); } catch (Exception ex) { Console.WriteLine($"Notification failed: {ex.Message}"); } } } public async Task NotifyErrorAsync(Exception error) { IAsyncObserver<T>[] observersCopy; lock (_lock) { observersCopy = _observers.ToArray(); _observers.Clear(); } foreach (var observer in observersCopy) { try { await observer.OnErrorAsync(error); } catch (Exception ex) { Console.WriteLine($"Error notification failed: {ex.Message}"); } } } public async Task OnCompletedAsync() { IAsyncObserver<T>[] observersCopy; lock (_lock) { observersCopy = _observers.ToArray(); _observers.Clear(); } foreach (var observer in observersCopy) { try { await observer.OnCompletedAsync(); } catch (Exception ex) { Console.WriteLine($"Completion notification failed: {ex.Message}"); } } } private class Unsubscriber : IDisposable { private readonly Action _unsubscribeAction; public Unsubscriber(Action unsubscribeAction) { _unsubscribeAction = unsubscribeAction; } public void Dispose() => _unsubscribeAction?.Invoke(); } } #endregion #region Async Observer public class AsyncObserver : IAsyncObserver<string> { private readonly string _name; public AsyncObserver(string name) => _name = name; public async Task OnNextAsync(string value) { await Task.Delay(100); // 模拟异步处理 Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} received: {value}"); } public async Task OnErrorAsync(Exception exception) { await Task.Delay(100); // 模拟异步处理 Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} error: {exception.Message}"); } public async Task OnCompletedAsync() { await Task.Delay(100); // 模拟异步处理 Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} completed"); } } #endregion
结果
[22:14:48.269] Observer 1 received: First Message [22:14:48.449] Observer 2 received: First Message [22:14:48.554] Observer 1 received: Second Message [22:14:48.662] Observer 1 error: Test Error Press any key to exit...
4.2.4.2 事件过滤
可以通过在通知方法中添加过滤逻辑来实现事件过滤。
FilteredObservable
的构造函数接收一个过滤函数(Func<T, bool>
),用于决定哪些消息需要通知给观察者。- 在
Notify
方法中,只有满足过滤条件的消息才会被发送。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
#region Client Code
var subject = new FilteredObservable<string>(s => s.StartsWith("[IMPORTANT]"));
var observer1 = new Observer("Observer 1");
var observer2 = new Observer("Observer 2");
using var subscription1 = subject.Subscribe(observer1);
using var subscription2 = subject.Subscribe(observer2);
// 这些消息将被过滤
subject.Notify("Normal Message 1");
subject.Notify("Normal Message 2");
// 这些消息将被传递
subject.Notify("[IMPORTANT] Message 1");
subject.Notify("[IMPORTANT] Message 2");
// 取消订阅 observer2
subscription2.Dispose();
// 再次通知
subject.Notify("[IMPORTANT] Message 3"); // 只有 observer1 收到
// 错误通知(不过滤)
subject.NotifyError(new Exception("Critical Error"));
// 完成通知(不过滤)
subject.OnCompleted();
#endregion
#region Subject
public class FilteredObservable<T> : IObservable<T>
{
private readonly List<IObserver<T>> _observers = new();
private readonly Func<T, bool> _filter;
private readonly object _lock = new();
public FilteredObservable(Func<T, bool> filter)
{
_filter = filter ?? throw new ArgumentNullException(nameof(filter));
}
public IDisposable Subscribe(IObserver<T> observer)
{
lock (_lock)
{
if (!_observers.Contains(observer))
_observers.Add(observer);
}
return new Unsubscriber(() =>
{
lock (_lock)
{
_observers.Remove(observer);
}
});
}
public async Task Notify(T value)
{
if (!_filter(value)) return;
IObserver<T>[] observersCopy;
lock (_lock)
{
observersCopy = _observers.ToArray();
}
foreach (var observer in observersCopy)
{
try
{
observer.OnNext(value);
}
catch (Exception ex)
{
Console.WriteLine($"Notification failed: {ex.Message}");
}
}
}
public async Task NotifyError(Exception error)
{
IObserver<T>[] observersCopy;
lock (_lock)
{
observersCopy = _observers.ToArray();
_observers.Clear();
}
foreach (var observer in observersCopy)
{
try
{
observer.OnError(error);
}
catch (Exception ex)
{
Console.WriteLine($"Error notification failed: {ex.Message}");
}
}
}
public async Task OnCompleted()
{
IObserver<T>[] observersCopy;
lock (_lock)
{
observersCopy = _observers.ToArray();
_observers.Clear();
}
foreach (var observer in observersCopy)
{
try
{
observer.OnCompleted();
}
catch (Exception ex)
{
Console.WriteLine($"Completion notification failed: {ex.Message}");
}
}
}
private class Unsubscriber : IDisposable
{
private readonly Action _unsubscribeAction;
public Unsubscriber(Action unsubscribeAction) => _unsubscribeAction = unsubscribeAction;
public void Dispose() => _unsubscribeAction?.Invoke();
}
}
#endregion
#region Observer
public class Observer : IObserver<string>
{
private readonly string _name;
public Observer(string name)
{
_name = name;
}
public void OnCompleted()
{
Console.WriteLine($"{_name} completed.");
}
public void OnError(Exception error)
{
Console.WriteLine($"{_name} error: {error.Message}");
}
public void OnNext(string value)
{
Console.WriteLine($"{_name} received: {value}");
}
}
#endregion
Observer 1 received: [IMPORTANT] Message 1
Observer 2 received: [IMPORTANT] Message 1
Observer 1 received: [IMPORTANT] Message 2
Observer 2 received: [IMPORTANT] Message 2
Observer 1 received: [IMPORTANT] Message 3
Observer 1 error: Critical Error
4.2.5 特点
- 依赖框架:依赖于.NET框架,不适合跨平台或非.NET环境。
- 学习曲线:需要一定的.NET框架知识才能熟练使用。
- 与LINQ集成:可以使用LINQ查询语法对事件流进行操作,简化代码。
- 性能优化:通过高效的订阅机制和事件分发,提升了性能。
- 扩展性强:支持事件过滤、组合、转换等高级功能。
- 线程安全:框架提供了线程安全的机制,减少了线程冲突的风险。
- 标准化:基于.NET框架的标准接口,具有统一的规范。
4.2.6 适用场景
- 复杂事件处理:适用于事件流复杂、需要高级操作(如过滤、组合、转换)的场景。
- 多线程环境:在多线程或异步编程中,可以有效避免线程安全问题。
- 数据流处理:适合处理数据流,如传感器数据、实时消息等。
- 与.NET生态系统集成:与.NET的其他功能(如LINQ、Task并行库)无缝集成。
4.3 System.Reactive
System.Reactive 是基于IObservable<T>
和IObserver<T>
的扩展库,用于处理事件流和异步数据流。
它将事件和数据流抽象为可观察序列,并通过 LINQ 风格的操作符实现订阅、过滤、转换和合并。
核心思想:使用响应式扩展库处理复杂事件流。Rx可以这样定义:Rx = Observables + LINQ + Schedulers。
4.3.1 安装
通过NuGet包管理器安装System.Reactive包
4.3.2 示例
using System.Reactive.Linq;
using System.Reactive.Subjects;
// 创建可观察序列
var subject = new Subject<string>();
var observable = subject.AsObservable();
// 订阅观察者
var subscription = observable
.Where(msg => msg.StartsWith("IMPORTANT"))
.Subscribe(msg => Console.WriteLine($"Rx received: {msg}"));
// 推送消息
subject.OnNext("IMPORTANT: System update");
subject.OnNext("Normal message"); // 被过滤
// 取消订阅
subscription.Dispose();
输出:
Rx received: IMPORTANT: System update
4.3.3 特点
- 强大的事件流处理(过滤、映射、合并等)
- 支持LINQ查询操作
- 异步事件处理支持
- 自动管理资源释放(通过IDisposable)
- 需引入第三方库(System.Reactive)
- 学习曲线较陡
4.3.4 适用场景
- 适合复杂事件流处理
- 实时数据更新
- 多线程和异步编程场景
- 功能强大且与.NET生态系统无缝集成
- 优先选择在需要复杂事件流处理的场景
五、使用场景
以下是观察者模式不同实现方式的对比总结:
实现方式 | 适合场景 | 选择建议 |
---|---|---|
经典实现 | 适用于简单的事件通知场景,如 GUI 编程中组件间的交互。 | 当事件逻辑简单、不需要复杂的数据流处理时,适合使用经典实现。 |
委托与事件实现 | 适用于.NET中的事件处理,尤其是需要在多个组件或类之间传递事件的场景。 | 如果使用的是.NET框架,并且需要在类之间传递事件,委托与事件是首选。 |
`IObservable<T>`/`IObserver<T>`实现 | 适用于需要灵活处理数据流的场景,如异步数据处理、多线程环境下的事件推送。 | 当需要更灵活地控制数据流,或者需要支持多个观察者订阅时,`IObservable<T>`/`IObserver<T>`是一个不错的选择。 |
`System.Reactive`实现 | 适用于复杂的数据流处理,尤其是需要对事件进行转换、过滤、组合等操作的场景。 | 如果涉及复杂的数据流处理,或者需要响应式编程的支持,`System.Reactive`是最佳选择。 |
六、扩展
6.1 初始化和基础架构搭建
6.1.1 初始化和基础架构搭建
6.1.1.1 一个观察者观察多个目标
场景:有时观察者需要依赖多个目标对象。
问题:如果观察者无法区分通知的来源,导致无法针对不同目标对象做出准确响应。
示例:假设一个用户界面中有一个观察者用于监控多个数据源(如温度、湿度和空气质量传感器)。
当任何一个数据源更新时,观察者都会收到通知,但无法区分是哪个数据源发生了变化,从而无法针对性地更新界面元素。
改动:对目标对象的
Update接口
进行扩展,确保观察者能够准确识别通知的来源
using System;
using System.Collections.Generic;
public class Subject
{
private List<IObserver> observers = new List<IObserver>();
// 注册观察者
public void Attach(IObserver observer)
{
observers.Add(observer);
}
// 注销观察者
public void Detach(IObserver observer)
{
observers.Remove(observer);
}
// 通知所有观察者
public void NotifyObservers(string message)
{
foreach (var observer in observers)
{
observer.Update(this, message); // 将自己作为参数传递给观察者
}
}
}
6.1.1.2 目标多而观察者少
问题
在传统观察者模式中,目标对象(
Subject
)直接保存观察者的引用。这种方式简单直观,但当目标对象数量多而观察者数量少时,会产生明显弊端:- 存储开销问题
每个目标对象都要分配存储空间保存观察者引用,即使某些目标对象没有观察者。
这会导致大量不必要的存储开销,尤其在目标对象数量庞大时,开销更加显著。
生命周期管理问题
如果目标对象的生命周期较短,而观察者集合被强引用保留,可能会导致内存泄漏。
因为即使目标对象被垃圾回收,观察者集合仍然占用内存,无法被清理。
解决方案
- 使用
HashSet<IObserver>
为了解决存储开销问题,可以考虑使用HashSet<IObserver>
来存储观察者。HashSet
提供了高效的动态添加和移除操作,能够更好地支持观察者在运行时的动态变化。
- 优点:
- 动态管理观察者:
HashSet
提供高效的动态添加和移除操作,支持观察者在运行时的动态变化。 - 避免重复存储:
HashSet
自动去重,避免了重复存储相同的观察者。 - 提高存储效率:通过哈希表实现快速查找和插入操作,减少了存储和检索观察者的开销。
- 动态管理观察者:
- 缺点:
- 内存泄漏风险:如果目标对象被垃圾回收,但观察者集合(
HashSet<IObserver>
)仍然存在引用,那么这些观察者可能不会被正确清理,从而导致内存泄漏。 - 生命周期管理复杂:需要手动管理目标对象和观察者集合的生命周期,确保在目标对象被销毁时,观察者集合也被正确清理。
- 内存泄漏风险:如果目标对象被垃圾回收,但观察者集合(
- 使用
ConditionalWeakTable
ConditionalWeakTable
是一种特殊的哈希表,它允许键(目标对象)在没有其他强引用时被垃圾回收,而不会影响值(观察者集合)的存在。
通过ConditionalWeakTable
,只有真正有观察者的目标对象才会占用存储空间,同时避免了内存泄漏问题。
以下是使用ConditionalWeakTable
实现观察者模式的示例代码:
using System.Runtime.CompilerServices;
using System.Collections.Generic;
// 目标和观察者接口省略...
public class Subject : ISubject
{
private static readonly object _syncLock = new object();
private static readonly ConditionalWeakTable<Subject, HashSet<IObserver>> observerMap =
new ConditionalWeakTable<Subject, HashSet<IObserver>>();
public void Attach(IObserver observer)
{
lock (_syncLock)
{
if (!observerMap.TryGetValue(this, out var observers))
{
observers = new HashSet();
observerMap.Add(this, observers);
}
observers.Add(observer);
}
}
public void Detach(IObserver observer)
{
lock (_syncLock)
{
if (observerMap.TryGetValue(this, out var observers))
{
observers.Remove(observer);
if (observers.Count == 0)
{
observerMap.Remove(this);
}
}
}
}
public void NotifyObservers(string message)
{
HashSet<IObserver> observersCopy;
lock (_syncLock)
{
if (!observerMap.TryGetValue(this, out var observers))
{
return;
}
observersCopy = new HashSet(observers);
}
foreach (var observer in observersCopy)
{
observer.Update(this, message);
}
}
}
优点
- 避免内存泄漏:
ConditionalWeakTable
允许目标对象在没有其他强引用时被垃圾回收,从而避免了内存泄漏问题。 - 动态管理:目标对象和观察者之间的关系是动态的,
ConditionalWeakTable
提供了一种灵活的方式来管理这种关系。 - 优化存储效率:只有真正有观察者的目标对象才会占用存储空间,减少了不必要的存储开销。
- 避免内存泄漏:
缺点
- 性能开销:
ConditionalWeakTable
的查找和管理操作比直接使用HashSet<IObserver>
更复杂,可能会引入额外的性能开销。 - 复杂性增加:代码的复杂性增加,需要理解
ConditionalWeakTable
的工作机制。
- 性能开销:
实际应用中的权衡
观察者模式在实际应用中需要根据具体需求选择合适的实现方式。
如果目标对象数量较多且生命周期较短,推荐使用
ConditionalWeakTable
,以避免内存泄漏并优化存储效率。如果目标对象生命周期较长且观察者管理较为简单,则可以直接使用
HashSet<IObserver>
,以简化实现和提高性能。
6.1.1.3 目标对象之间存在复杂依赖关系
问题
当目标对象存在复杂依赖关系时,直接通知观察者可能引发以下问题:
- 多次更新:观察者可能因多个目标对象的改变收到重复通知,导致冗余操作。
- 更新顺序问题:目标对象的改变顺序可能导致观察者在状态未完全更新时收到通知,获取不一致的状态。
- 维护成本高:复杂的依赖关系增加了代码复杂性和维护难度。
解决方案
引入一个独立的更改管理器(ChangeManager) 来封装和管理复杂的更新逻辑。
更改管理器的作用
- 维护映射关系:管理目标对象与观察者的映射,降低耦合度。
- 定义更新策略:在所有相关目标对象状态更新完毕后统一通知观察者,避免冗余和不一致问题。
- 优化更新逻辑:根据依赖关系优化更新流程,确保观察者只接收一次更新。
更改管理器的两种实现
是一个典型的中介者模式实例,通常以单例模式全局可见,从而确保整个系统中只有一个协调中心。两种特殊的更改管理器实现:
SimpleChangeManager
实现方案
- 使用字典维护目标对象与观察者的映射
- 使用脏标记集合跟踪需要更新的目标对象
- Commit时统一通知观察者并去重
示例
using System;
using System.Collections.Generic;
using System.Linq;
#region Client Code
// 使用简单更改管理器
ChangeManager.Instance = SimpleChangeManager.Instance;
var subject = new ConcreteSubject();
var observer = new ConcreteObserver();
ChangeManager.Instance.Register(subject, observer);
subject.State = 42;
subject.Notify();
ChangeManager.Instance.Commit();
// 使用DAG更改管理器
ChangeManager.Instance = DAGChangeManager.Instance;
var subjectA = new ConcreteSubject();
var subjectB = new ConcreteSubject();
var dagObserver = new ConcreteObserver();
ChangeManager.Instance.Register(subjectA, dagObserver);
ChangeManager.Instance.Register(subjectB, dagObserver);
((DAGChangeManager)ChangeManager.Instance).AddDependency(subjectA, subjectB);
subjectA.State = 10;
subjectB.State = 20;
subjectA.Notify();
subjectB.Notify();
ChangeManager.Instance.Commit();
#endregion
#region IObserver
// 观察者接口
public interface IObserver
{
void Update(ISubject subject);
}
#endregion
#region IObservable
// 目标对象接口
public interface ISubject
{
void Notify();
}
#endregion
#region ChangeManager
// 更改管理器抽象类
public abstract class ChangeManager
{
public static ChangeManager Instance { get; set; }
public abstract void Register(ISubject subject, IObserver observer);
public abstract void Unregister(ISubject subject, IObserver observer);
public abstract void Notify(ISubject subject);
public abstract void Commit();
}
#endregion
#region SimpleChangeManager
// 简单更改管理器实现
public sealed class SimpleChangeManager : ChangeManager
{
private readonly Dictionary> _observers = new();
private readonly HashSet<ISubject> _dirtySubjects = new();
static SimpleChangeManager()
{
}
private static SimpleChangeManager _instance;
public static SimpleChangeManager Instance
{
get
{
if (_instance == null)
{
_instance = new SimpleChangeManager();
}
return _instance;
}
}
public override void Register(ISubject subject, IObserver observer)
{
if (!_observers.ContainsKey(subject))
{
_observers[subject] = new HashSet();
}
_observers[subject].Add(observer);
}
public override void Unregister(ISubject subject, IObserver observer)
{
if (_observers.TryGetValue(subject, out var observers))
{
observers.Remove(observer);
}
}
public override void Notify(ISubject subject)
{
lock (_dirtySubjects)
{
_dirtySubjects.Add(subject);
}
}
public override void Commit()
{
HashSet<IObserver> notified = new();
List<ISubject> toProcess;
lock (_dirtySubjects)
{
toProcess = _dirtySubjects.ToList();
_dirtySubjects.Clear();
}
foreach (var subject in toProcess)
{
if (_observers.TryGetValue(subject, out var observers))
{
foreach (var observer in observers.Where(observer => notified.Add(observer)))
{
observer.Update(subject);
}
}
}
}
}
#endregion
#region ConcreteSubject
// 示例使用
public class ConcreteSubject : ISubject
{
public int State { get; set; }
public void Notify()
{
ChangeManager.Instance.Notify(this);
}
}
#endregion
#region ConcreteObserver
public class ConcreteObserver : IObserver
{
public void Update(ISubject subject)
{
if (subject is ConcreteSubject concreteSubject)
{
Console.WriteLine($"Received update: {concreteSubject.State}");
}
}
}
#endregion
```
```c#
Received update: 42
特点:总是更新每个目标对象的所有观察者。实现简单,易于理解。可能会导致冗余更新,效率较低。
适用场景:当目标对象之间没有复杂的依赖关系,或者更新逻辑简单时,这种实现方式比较合适。
DAGChangeManager
实现方案
继承简单管理器基础功能
添加依赖关系管理
使用拓扑排序确保更新顺序
1. 使用深度优先搜索(DFS)。
2. 先处理依赖项,再处理当前主题。
3. 最终得到的排序结果是一个线性顺序,满足所有依赖关系示例
#region DAGChangeManager
// 基于DAG的复杂更改管理器
public sealed class DAGChangeManager : ChangeManager
{
private readonly Dictionary> _observers = new();
private readonly Dictionary<ISubject, HashSet<ISubject>> _dependencies = new();
private readonly HashSet<ISubject> _dirtySubjects = new();
static DAGChangeManager()
{
}
private static DAGChangeManager _instance;
public static DAGChangeManager Instance
{
get
{
if (_instance == null)
{
_instance = new DAGChangeManager();
}
return _instance;
}
}
// 添加依赖关系(dependent 依赖于 dependency)
public void AddDependency(ISubject dependent, ISubject dependency)
{
if (!_dependencies.ContainsKey(dependent))
{
_dependencies[dependent] = new HashSet();
}
_dependencies[dependent].Add(dependency);
}
public override void Register(ISubject subject, IObserver observer)
{
if (!_observers.ContainsKey(subject))
{
_observers[subject] = new HashSet();
}
_observers[subject].Add(observer);
}
public override void Unregister(ISubject subject, IObserver observer)
{
if (_observers.TryGetValue(subject, out var observers))
{
observers.Remove(observer);
}
}
public override void Notify(ISubject subject)
{
lock (_dirtySubjects)
{
_dirtySubjects.Add(subject);
}
}
public override void Commit()
{
List<ISubject> processingOrder;
lock (_dirtySubjects)
{
processingOrder = TopologicalSort(_dirtySubjects);
_dirtySubjects.Clear();
}
HashSet<IObserver> notified = new();
foreach (var subject in processingOrder)
{
if (_observers.TryGetValue(subject, out var observers))
{
foreach (var observer in observers.Where(observer => notified.Add(observer)))
{
observer.Update(subject);
}
}
}
}
private List<ISubject> TopologicalSort(HashSet<ISubject> subjects)
{
var sorted = new List<ISubject>();
var visited = new HashSet<ISubject>();
foreach (var subject in subjects.OrderBy(s => s.GetHashCode()))
{
Visit(subject, visited, sorted);
}
return sorted;
}
private void Visit(ISubject subject, HashSet<ISubject> visited, List<ISubject> sorted)
{
if (!visited.Add(subject)) return;
if (_dependencies.TryGetValue(subject, out var dependencies))
{
foreach (var dependency in dependencies)
{
Visit(dependency, visited, sorted);
}
}
sorted.Add(subject);
}
}
#endregion
```
```c#
Received update: 20
- **特点**:处理目标对象及其观察者之间依赖关系构成的**无环有向图(DAG,Directed Acyclic Graph)**。
优点:可以避免冗余更新,确保观察者只接收一次更新。
缺点:实现复杂度较高,需要维护依赖关系图。
适用场景:当观察者可能依赖多个目标对象,且目标对象之间存在复杂的依赖关系时,这种实现方式更好。
总结
更改管理器(ChangeManager)是一种优化机制,用于封装复杂更新逻辑,简化目标对象与观察者之间的依赖关系。它通过以下方式实现优化:
- 职责分离:将映射关系和更新逻辑封装到独立对象中。
- 统一通知:在所有目标对象状态更新完毕后,一次性通知观察者。
- 优化策略:避免冗余更新。
更改管理器可以是简单的SimpleChangeManager或复杂的DAGChangeManager,具体取决于系统需求。它通常以单例模式全局可见,作为系统的协调中心。
6.2 注册机制
6.2.1 问题
在观察者模式中,传统的事件通知机制可能存在以下问题:
- 通知效率低:目标对象可能向所有观察者发送通知,即使某些观察者并不关心某些事件。
- 耦合度高:观察者与目标对象之间的依赖关系较强,难以灵活调整。
- 缺乏灵活性:观察者无法动态选择关注的事件类型,难以适应复杂的应用场景。
这些问题可能导致系统性能下降,代码难以维护和扩展。
6.2.2 解决方案
通过显式注册机制,观察者可以明确指定其感兴趣的事件类型,目标对象仅向已注册的观察者发送相关通知。
6.2.2.1 实现思路
- 引入“方面(Aspect)”概念:将目标对象的状态变化分解为多个独立的方面,每个方面代表一种特定类型的变更。
- 观察者选择性注册:观察者可以根据需要注册对特定方面的兴趣,从而只接收关注的事件通知。
- 目标对象优化通知:目标对象仅向已注册特定方面的观察者发送通知,避免不必要的消息传递。
6.2.2.2 示例
定义方面(Aspect)枚举
方面(Aspect)枚举:定义了目标对象可能的状态变化类型,例如状态变化、数据更新和错误发生。
// 定义方面(Aspect)枚举,表示目标对象可能的状态变化类型 public enum Aspect { StateChange, DataUpdate, ErrorOccurred }
目标对象类 Subject
- 使用字典存储每个方面对应的观察者列表。
- 提供注册和取消注册的方法,允许观察者显式指定感兴趣的方面。
- 提供通知方法,仅向注册了特定方面的观察者发送通知。
// 目标对象类 public class Subject { // 用于存储观察者订阅的方面 private Dictionary>> observers = new Dictionary>>(); public Subject() { // 初始化方面列表 foreach (Aspect aspect in Enum.GetValues(typeof(Aspect))) { observers[aspect] = new List>(); } } // 注册观察者 public void RegisterObserver(Aspect aspect, Action observer) { observers[aspect].Add(observer); Console.WriteLine($"Observer registered for aspect: {aspect}"); } // 取消注册观察者 public void UnregisterObserver(Aspect aspect, Action observer) { observers[aspect].Remove(observer); Console.WriteLine($"Observer unregistered from aspect: {aspect}"); } // 通知观察者 public void NotifyObservers(Aspect aspect, string message) { if (observers[aspect].Count > 0) { Console.WriteLine($"Notifying observers for aspect: {aspect}"); foreach (var observer in observers[aspect]) { observer(message); } } else { Console.WriteLine($"No observers registered for aspect: {aspect}"); } } // 模拟目标对象状态变化 public void ChangeState() { Console.WriteLine("Subject state changed."); NotifyObservers(Aspect.StateChange, "State has changed."); } public void UpdateData() { Console.WriteLine("Subject data updated."); NotifyObservers(Aspect.DataUpdate, "Data has been updated."); } public void ErrorOccurred() { Console.WriteLine("Error occurred in the subject."); NotifyObservers(Aspect.ErrorOccurred, "An error has occurred."); } }
观察者类 Observer
- 包含多个回调方法,分别对应不同的方面。
- 观察者可以根据需要注册对特定方面的兴趣。
// 观察者类 public class Observer { private string name; public Observer(string name) { this.name = name; } public void OnStateChange(string message) { Console.WriteLine($"{name} received state change notification: {message}"); } public void OnDataUpdate(string message) { Console.WriteLine($"{name} received data update notification: {message}"); } public void OnError(string message) { Console.WriteLine($"{name} received error notification: {message}"); } }
Client
- 创建目标对象和观察者。
- 观察者显式注册对特定方面的兴趣。
- 模拟目标对象的状态变化,观察通知机制的运行。
// 测试程序 public class Program { public static void Main() { // 创建目标对象 Subject subject = new Subject(); // 创建观察者 Observer observer1 = new Observer("Observer1"); Observer observer2 = new Observer("Observer2"); // 观察者1注册对所有方面的兴趣 subject.RegisterObserver(Aspect.StateChange, observer1.OnStateChange); subject.RegisterObserver(Aspect.DataUpdate, observer1.OnDataUpdate); subject.RegisterObserver(Aspect.ErrorOccurred, observer1.OnError); // 观察者2仅注册对错误方面的兴趣 subject.RegisterObserver(Aspect.ErrorOccurred, observer2.OnError); // 模拟目标对象状态变化 subject.ChangeState(); subject.UpdateData(); subject.ErrorOccurred(); // 观察者2取消对错误方面的兴趣 subject.UnregisterObserver(Aspect.ErrorOccurred, observer2.OnError); // 再次触发错误事件,观察者2不再接收通知 subject.ErrorOccurred(); } }
结果
Observer registered for aspect: StateChange Observer registered for aspect: DataUpdate Observer registered for aspect: ErrorOccurred Observer registered for aspect: ErrorOccurred Subject state changed. Notifying observers for aspect: StateChange Observer1 received state change notification: State has changed. Subject data updated. Notifying observers for aspect: DataUpdate Observer1 received data update notification: Data has been updated. Error occurred in the subject. Notifying observers for aspect: ErrorOccurred Observer1 received error notification: An error has occurred. Observer2 received error notification: An error has occurred. Observer unregistered from aspect: ErrorOccurred Error occurred in the subject. Notifying observers for aspect: ErrorOccurred Observer1 received error notification: An error has occurred.
6.2.2.3 优点
- 提高通知效率:目标对象仅发送观察者关心的事件。
- 降低耦合度:观察者与目标对象之间保持松散耦合。
- 灵活性强:观察者可根据需求动态调整关注的事件类型。
6.2.2.4 适用场景
这种方法通过事件驱动机制实现了高效的通信,适用于需要灵活配置和优化性能的场景,例如:
- 多用户系统中对不同事件的关注。
- 复杂系统的状态监控和事件响应。
- 需要动态调整事件监听的应用。
6.3 触发机制
6.3.1 触发之前
在通知观察者之前,目标对象的状态必须是完整且正确的,否则观察者可能会基于错误的状态进行操作,从而引发问题。
6.3.2 更新的触发者
目标对象与观察者通过通知机制保持同步,存在以下两种触发方式:
- 自动触发:目标对象状态变化后自动执行
Notify
。优势在于无需客户手动操作,但可能因连续操作频繁触发更新,影响效率。 - 手动触发:客户在状态变化完成后适时调用
Notify
。优点在于可避免不必要的中间更新,但增加了客户操作负担,且存在因遗漏调用而导致错误的风险。
6.3.3 示例
6.3.3.1 抽象目标对象
observers
:存储观察者的集合。mainState
和secondaryState
:主题的状态信息。Attach
和Detach
:用于添加和移除观察者。Notify
:抽象方法,由具体主题类实现,用于通知观察者。PrepareUpdate
:用于准备状态更新,但不触发通知。ShowState
:用于打印当前状态。
#region abstract Subject
// 抽象目标对象
abstract class Subject
{
protected HashSet<IObserver> observers = new HashSet<IObserver>();
protected int mainState;
protected int secondaryState;
public void Attach(IObserver observer) => observers.Add(observer);
public void Detach(IObserver observer) => observers.Remove(observer);
public abstract void Notify();
protected void PrepareUpdate(int main, int secondary)
{
mainState = main;
secondaryState = secondary;
}
public void ShowState()
{
Console.WriteLine($"State: [{mainState}, {secondaryState}]");
}
}
#endregion
6.3.3.2 观察者接口
#region IObserver
// 观察者接口
interface IObserver
{
void Update();
}
#endregion
6.3.3.3 具体的的观察者
#region ConcreteObservers
// 具体观察者
class StateObserver : IObserver
{
private readonly Subject subject;
public StateObserver(Subject subject)
{
this.subject = subject;
}
public void Update()
{
Console.Write("Observer received update: ");
subject.ShowState();
}
}
#endregion
6.3.3.4 自动触发的目标对象
状态发生改变时,自动触发通知
#region AutoTriggerSubject
// 自动触发实现
class AutoTriggerSubject : Subject
{
public void SetMainState(int value)
{
mainState = value;
Notify(); // 自动触发
}
public override void Notify()
{
Console.WriteLine("[AutoTrigger] Notifying observers...");
foreach (var observer in observers)
{
observer.Update();
}
}
}
#endregion
6.3.3.5 手动触发的目标对象
#region ManualTriggerSubject
// 手动触发实现
class ManualTriggerSubject : Subject
{
public void CompleteUpdate(int main, int secondary)
{
PrepareUpdate(main, secondary);
// 不自动触发
}
public override void Notify()
{
Console.WriteLine("[ManualTrigger] Notifying observers...");
foreach (var observer in observers)
{
observer.Update();
}
}
}
#endregion
6.3.3.6 使用示例
#region Client Code
// 自动触发演示
Console.WriteLine("=== Automatic Trigger Demo ===");
var autoSubject = new AutoTriggerSubject();
var obs1 = new StateObserver(autoSubject);
autoSubject.Attach(obs1);
autoSubject.SetMainState(10); // 触发通知
autoSubject.SetMainState(20); // 再次触发
// 手动触发演示
Console.WriteLine("\n=== Manual Trigger Demo ===");
var manualSubject = new ManualTriggerSubject();
var obs2 = new StateObserver(manualSubject);
manualSubject.Attach(obs2);
manualSubject.CompleteUpdate(1, 100);
manualSubject.CompleteUpdate(2, 200);
manualSubject.CompleteUpdate(3, 300);
manualSubject.Notify(); // 单次触发
#endregion
结果
=== Automatic Trigger Demo ===
[AutoTrigger] Notifying observers...
Observer received update: State: [10, 0]
[AutoTrigger] Notifying observers...
Observer received update: State: [20, 0]
=== Manual Trigger Demo ===
[ManualTrigger] Notifying observers...
Observer received update: State: [3, 300]
6.4 信息传递机制
观察者模式中目标将这些信息作为Update操作的一个参数传递出去。这些信息的量可能很小,也可能很大。其信息量传递大小的两个极端便是:推模型(Push Model)和拉模型(Pull Model)。
推模型:一种主动推送信息的机制,主题对象在状态发生变化时,主动将包含具体变更信息的参数推送给所有观察者,观察者通过更新方法来接收这些信息。这种方式传递的是结构化数据,适用于需要接收完整、结构化数据且数据格式相对稳定的场景。
拉模型:一种按需获取信息的机制,主题对象在状态发生变化时,仅发送一个简单的通知给观察者,观察者需要主动调用主题对象的查询接口来获取所需的信息。这种方式更加灵活,适用于观察者需要不同数据子集或数据格式可能频繁变化的场景。
6.4.1 解决方案与实现方式
6.4.1.1 推模型(Push Model)
实现原理
目标对象主动推送包含具体变更信息的参数至观察者的更新方法,采用事件驱动机制传递结构化数据。
典型实现示例(气象监测系统)
IObserver 接口
#region IObserver 接口 // 观察者接口 public interface IObserver { void Update(WeatherData data); } #endregion
具体观察者 :Display 类
#region 具体观察者 :Display 类 // 具体观察者 public class Display : IObserver { public void Update(WeatherData data) { Console.WriteLine($"当前温度:{data.Temperature}℃"); } } #endregion
Subject 类: WeatherStation 类
SetMeasurements(float temp)
:设置新的温度数据,并触发通知。NotifyObservers(WeatherData data)
:遍历观察者列表,调用每个观察者的Update
方法。
#region Subject 类: WeatherStation 类 // 目标对象 public class WeatherStation { private List<IObserver> _observers = new List<IObserver>(); private float _temperature; public void SetMeasurements(float temp) { _temperature = temp; NotifyObservers(new WeatherData(temp)); } private void NotifyObservers(WeatherData data) { foreach (var observer in _observers) { observer.Update(data); } } public void Attach(IObserver observer) => _observers.Add(observer); public void Detach(IObserver observer) => _observers.Remove(observer); } // 数据传输对象 public class WeatherData { public float Temperature { get; } public WeatherData(float temperature) { Temperature = temperature; } } #endregion
Client
#region Client Code // 创建一个天气站对象 WeatherStation weatherStation = new WeatherStation(); // 创建两个观察者对象 Display display1 = new Display(); Display display2 = new Display(); // 将观察者添加到天气站的观察者列表 weatherStation.Attach(display1); weatherStation.Attach(display2); // 模拟天气站更新数据 Console.WriteLine("天气站更新温度为 25.5℃:"); weatherStation.SetMeasurements(25.5f); // 移除一个观察者 weatherStation.Detach(display1); // 再次更新数据 Console.WriteLine("\n天气站更新温度为 28.0℃:"); weatherStation.SetMeasurements(28.0f); Console.ReadLine(); #endregion
结果
天气站更新温度为 25.5℃: 当前温度:25.5℃ 当前温度:25.5℃ 天气站更新温度为 28.0℃: 当前温度:28℃
适用场景
- 观察者需要接收完整、结构化的数据
- 数据格式相对稳定且预先可知
- 需要最小化观察者的查询操作
- 实时性要求高于通信成本
6.4.1.2 拉模型(Pull Model)
实现原理
目标对象仅发送简单通知,观察者主动调用目标对象的查询接口获取所需信息,实现按需获取
典型实现示例
假设一个股票市场监控系统,目标对象是股票市场,观察者是不同的投资者。当股票价格发生变化时,股票市场仅通知投资者价格发生了变化,而投资者需要主动查询当前的股票价格。
目标和观察者接口
#region Interface Code // 观察者接口 public interface IObserver { void Update(); } // 目标接口 public interface ISubject { void Attach(IObserver observer); void Detach(IObserver observer); void Notify(); } #endregion
具体目标
#region Concrete Subject : StockMarket // 具体目标类 public class StockMarket : ISubject { private List<IObserver> observers = new List<IObserver>(); private decimal stockPrice; public decimal StockPrice { get { return stockPrice; } set { stockPrice = value; Notify(); } } public void Attach(IObserver observer) { observers.Add(observer); } public void Detach(IObserver observer) { observers.Remove(observer); } public void Notify() { foreach (var observer in observers) { observer.Update(); } } } #endregion
具体观察者
通过
Update
方法接收通知,并从目标对象中获取当前股票价格。#region Concrete Observer : Investor // 具体观察者类 public class Investor : IObserver { private string name; private ISubject stockMarket; public Investor(string name, ISubject stockMarket) { this.name = name; this.stockMarket = stockMarket; } public void Update() { decimal currentPrice = ((StockMarket)stockMarket).StockPrice; Console.WriteLine($"{name} received notification. Current stock price: {currentPrice:C}"); } } #endregion
使用示例
#region Client Code // 创建股票市场对象 StockMarket stockMarket = new StockMarket(); // 创建投资者 Investor investor1 = new Investor("Alice", stockMarket); Investor investor2 = new Investor("Bob", stockMarket); // 将投资者添加到股票市场的观察者列表 stockMarket.Attach(investor1); stockMarket.Attach(investor2); // 模拟股票价格变化 Console.WriteLine("Stock price changes to $100."); stockMarket.StockPrice = 100; Console.WriteLine("Stock price changes to $120."); stockMarket.StockPrice = 120; // 移除一个投资者 stockMarket.Detach(investor1); Console.WriteLine("Stock price changes to $150."); stockMarket.StockPrice = 150; Console.ReadLine(); #endregion
结果
Stock price changes to $100. Alice received notification. Current stock price: ¥100.00 Bob received notification. Current stock price: ¥100.00 Stock price changes to $120. Alice received notification. Current stock price: ¥120.00 Bob received notification. Current stock price: ¥120.00 Stock price changes to $150. Bob received notification. Current stock price: ¥150.00
适用场景
- 适用于观察者需要不同数据子集的情况。
- 数据格式可能频繁变化,观察者可以根据需要动态获取数据。
- 通信成本不是主要瓶颈,因为观察者主动查询数据。
6.4.1.3 设计对比与权衡
维度 | 推模型 | 拉模型 |
---|---|---|
耦合度 | 高(需预知观察者需求) | 低(观察者自主决定获取内容) |
通信效率 | 高(单次传输完整数据) | 低(需多次请求-响应) |
接口稳定性 | 要求高(参数结构需固定) | 要求低(仅需保持查询接口) |
可扩展性 | 较差(新增观察者类型需修改接口) | 较好(新观察者可自主获取数据) |
通过理解两种模型的本质特征,可以根据具体业务需求、系统约束和演进方向,制定出最适合当前上下文的信息传递策略。
在分布式系统和微服务架构中,这种设计权衡往往直接影响系统的可维护性和扩展能力。
6.4.1.4 最佳实践建议
混合模式设计
结合两种模型的优势,推送基础变更通知,允许观察者拉取补充信息。
观察者接口
#region Interface Code public interface IObserver { void Update(int state); // 接收基础状态 } #endregion
具体目标
#region Subject public class Subject { private List<IObserver> observers = new List<IObserver>(); private int state; // 被观察者的状态 public int State { get { return state; } set { state = value; NotifyObservers(); // 状态改变时通知观察者 } } // 注册观察者 public void Attach(IObserver observer) { observers.Add(observer); } // 移除观察者 public void Detach(IObserver observer) { observers.Remove(observer); } // 通知观察者 public void NotifyObservers() { foreach (IObserver observer in observers) { observer.Update(state); // 推送基础状态 } } } #endregion
具体观察者:ConcreteObserver
#region ConcreteObserver public class ConcreteObserver : IObserver { private Subject subject; // 持有被观察者的引用 public ConcreteObserver(Subject subject) { this.subject = subject; subject.Attach(this); // 注册到被观察者 } public void Update(int state) { Console.WriteLine($"Received base state: {state}"); // 根据需要拉取补充信息 if (state > 10) { int additionalInfo = subject.State; // 拉取补充信息 Console.WriteLine($"Additional info: {additionalInfo}"); } } } #endregion
使用示例:Client
#region Client Code Subject subject = new Subject(); // 创建观察者并注册到被观察者 ConcreteObserver observer1 = new ConcreteObserver(subject); ConcreteObserver observer2 = new ConcreteObserver(subject); // 改变被观察者的状态 Console.WriteLine("Setting state to 5:"); subject.State = 5; // 输出基础状态 Console.WriteLine("\nSetting state to 15:"); subject.State = 15; // 输出基础状态和补充信息 #endregion
结果
Setting state to 5: Received base state: 5 Received base state: 5 Setting state to 15: Received base state: 15 Additional info: 15 Received base state: 15 Additional info: 15
通信优化策略
实现思路
- 批量处理拉取请求:通过在
Subject
中维护一个队列,将观察者的拉取请求批量处理。 - 建立数据缓存机制:在
Subject
中缓存状态信息,避免重复计算或重复拉取。 - 使用差异更新(delta update):仅推送状态变化的部分,而不是完整状态。
观察者接口 IObserver
#region Interface Code public interface IObserver { void Update(int state); // 接收基础状态 } #endregion
目标对象 Subject
#region Subject public class Subject { private List<IObserver> observers = new List<IObserver>(); private int state; // 被观察者的状态 private int previousState; // 上一次的状态,用于差异更新 public int State { get { return state; } set { if (state != value) // 检查状态是否变化 { previousState = state; // 保存上一次状态 state = value; NotifyObservers(); // 状态改变时通知观察者 } } } // 注册观察者 public void Attach(IObserver observer) { observers.Add(observer); } // 移除观察者 public void Detach(IObserver observer) { observers.Remove(observer); } // 通知观察者 public void NotifyObservers() { foreach (IObserver observer in observers) { observer.Update(state); // 推送基础状态 } } // 提供拉取补充信息的接口 public int GetAdditionalInfo() { // 模拟缓存机制:如果状态未变,直接返回缓存值 if (state == previousState) { Console.WriteLine("Using cached additional info."); return previousState; } // 模拟拉取补充信息 Console.WriteLine("Fetching additional info..."); return state; } } #endregion
具体观察者:ConcreteObserver
#region ConcreteObserver public class ConcreteObserver : IObserver { private Subject subject; // 持有被观察者的引用 public ConcreteObserver(Subject subject) { this.subject = subject; subject.Attach(this); // 注册到被观察者 } public void Update(int state) { Console.WriteLine($"Received base state: {state}"); // 根据需要拉取补充信息 if (state > 10) { int additionalInfo = subject.GetAdditionalInfo(); // 拉取补充信息 Console.WriteLine($"Additional info: {additionalInfo}"); } } } #endregion
使用示例 Client
#region Client Code Subject subject = new Subject(); // 创建观察者并注册到被观察者 ConcreteObserver observer1 = new ConcreteObserver(subject); ConcreteObserver observer2 = new ConcreteObserver(subject); // 改变被观察者的状态 Console.WriteLine("Setting state to 5:"); subject.State = 5; // 输出基础状态 Console.WriteLine("\nSetting state to 15:"); subject.State = 15; // 输出基础状态和补充信息 Console.WriteLine("\nSetting state to 15 again (cached info will be used):"); subject.State = 15; // 使用缓存信息 #endregion
结果
Setting state to 5: Received base state: 5 Received base state: 5 Setting state to 15: Received base state: 15 Fetching additional info... Additional info: 15 Received base state: 15 Fetching additional info... Additional info: 15 Setting state to 15 again (cached info will be used):
- 批量处理拉取请求:通过在
6.5 资源管理和错误处理
避免已删除目标的悬挂引用
删除目标时,需确保其观察者中不遗留对该目标的无效引用。
否则,当观察者尝试访问已销毁的目标时,可能会引发错误或异常,导致程序崩溃或行为不可预测。
为了避免这种情况,可以在目标对象被销毁之前,主动通知所有观察者解除对其的订阅。
示例
目标接口和观察者接口
#region Interface // 定义观察者接口 public interface IObserver { void Update(string message); void Unsubscribe(); } // 定义目标接口 public interface ISubject { void Attach(IObserver observer); void Detach(IObserver observer); void Notify(string message); } #endregion
具体目标类 Subject
在目标对象被销毁之前,通知所有观察者解除订阅
#region 定义具体目标类 : Subject // 定义具体目标(主题)类 public class Subject : ISubject { private List _observers = new List(); public void Attach(IObserver observer) { _observers.Add(observer); } public void Detach(IObserver observer) { _observers.Remove(observer); } public void Notify(string message) { foreach (var observer in _observers) { observer.Update(message); } } // 在目标对象被销毁之前,通知所有观察者解除订阅 public void Dispose() { foreach (var observer in _observers) { observer.Unsubscribe(); } _observers.Clear(); } } #endregion
具体观察者类 ConcreteObserver
#region 具体观察者类 : ConcreteObserver // 定义具体观察者类 public class ConcreteObserver : IObserver { private string _name; private Subject _subject; public ConcreteObserver(string name, Subject subject) { _name = name; _subject = subject; } public void Update(string message) { Console.WriteLine($"{_name} received message: {message}"); } public void Unsubscribe() { if (_subject != null) { _subject.Detach(this); _subject = null; } } } #endregion
使用实例 Client Code
在销毁目标对象之前,通知所有观察者解除订阅.
#region Client Code Subject subject = new Subject(); IObserver observer1 = new ConcreteObserver("observer 1", subject); IObserver observer2 = new ConcreteObserver("observer 2", subject); subject.Attach(observer1); subject.Attach(observer2); subject.Notify("Hello Observers!"); // 在销毁目标对象之前,通知所有观察者解除订阅 subject.Dispose(); // 尝试再次通知(应该不会有任何效果,因为观察者已被移除) subject.Notify("This should not be received."); #endregion
结果
Observer 1 received message: Hello Observers! Observer 2 received message: Hello Observers!