在开发过程中,想要传递信号和数据,就得在不同模块之间实现通信。直接通过单例调用虽然简单,但会导致代码高度耦合,难以维护。消息中心提供了一种松耦合的通信方式:发布者不需要知道谁接收事件,接收者不需要知道事件来自哪里,由此减少模块间的直接依赖,便于扩展和维护。
架构
%% EventCenter 架构图
graph TD
subgraph 事件中心核心
A[EventCenter] --> B[ConcurrentDictionary eventKey:callback]
end
subgraph 调用接口
A --> E[添加监听 AddListener]
A --> F[移除监听 RemoveListener]
A --> G[同步触发 SyncBroadcast]
A --> H[异步触发 Broadcast]
end
subgraph 线程管理
G -->|立即执行| J[当前线程]
H -->|主线程队列延迟| K[主线程]
end
subgraph 异常处理
G --> L[Try-Catch块]
H --> L
L --> M[打印错误日志]
M --> N[继续触发剩余回调]
end
数据结构
我们需要一个数据结构来存储eventKey与callback的映射关系,可以使用字典。
如果使用Dictionary,当多个线程同时注册或触发事件,可能导致数据异常,因此我们可以使用ConcurrentDictionary,它是门为高并发场景设计的线程安全集合,内置原子操作,无需手动加锁。
ConcurrentDictionary 提供以下方法:
bool TryAdd(TKey key, TValue value);
bool TryRemove(TKey key, out TValue value);
bool TryGetValue(TKey key, out TValue value);
bool ContainsKey(TKey key);
思考一下,可以使用 ConcurrentDictionary<string,delegate>存储eventKey与callback的映射,但很快便发现,这样一个key只能对应一个回调,不能满足多个模块监听一个事件的应用场景。
于是我们尝试 ConcurrentDictionary<string,List<delegate>>,但是ConcurrentDictionary只能保证获取到 List<T> 的过程是安全的,修改 List<T> 仍然会存在线程不安全的问题。
假设有两个线程同时执行这段代码:
if (!eventDic.ContainsKey("Attack"))
{
eventDic["Attack"] = new List<Delegate>();
eventDic["Attack"].Add(callback);
}
时间 | 线程1 | 线程2 |
---|---|---|
t1 | 执行步骤1(判断"Attack"不存在) | - |
t2 | - | 执行步骤1(同样判断"Attack"不存在) |
t3 | 执行步骤2(创建新List) | - |
t4 | - | 执行步骤2(再次创建新List,覆盖线程1创建的List) |
t5 | 执行步骤3(向被覆盖的List添加handler1) | - |
t6 | - | 执行步骤3(向新List添加handler2) |
可以看到线程2覆盖了线程1创建的List,这可能会导致数据错误。
我们可以尝试使用ConcurrentDictionary<string,ConcurrentDictionary<Delegate,bool>>,嵌套的内层字典可以存储多个Delegate,并且修改操作都是线程安全的,对于内存字典的值,我们是用不上的,可以使用字节数最小的bool类型来占位。
广播
在广播时,我们遍历存储当前key所有委托的内层字典,并依次执行其回调函数。
因为我们不需要回调函数的返回值,所以我们把这些方法委托都从基类Delegate转换成无返回值类型的Action委托。
回调函数无参数:
public static void SyncBroadcast(string eventKey)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
foreach (var callback in callbackList.Keys)
{
(callback as Action)?.Invoke();
}
}
}
回调函数带参数,使用泛型实现:
一般来说,三个参数就可以覆盖绝大多数的使用场景,所以我们只实现0~3个参数的方法。
public static void SyncBroadcast<T>(string eventKey, T data)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
foreach (var callback in callbackList.Keys)
{
(callback as Action<T>)?.Invoke(data);
}
}
}
错误处理
如果某个回调抛出异常,会中断后续回调的执行,还需要增加 try-catch
包裹回调执行部分。
Broadcast与SyncBroadcast
对于SyncBroadcast,Invoke会在调用SyncBroadcast的线程执行。
但很多时候,我们需要在Unity主线程执行回调函数,可以使LoomManager.QueueOnMainThread方法把回调函数传给主线程执行。于是我们可以这样包装一下:
public static void Broadcast(string eventKey)
{
LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey));
}
代码
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using UnityEngine;
public static class EventCenter
{
private static string TAG = "[EventCenter]";
private static readonly ConcurrentDictionary<string, ConcurrentDictionary<Delegate, bool>>
eventDic = new ConcurrentDictionary<string, ConcurrentDictionary<Delegate, bool>>();
// 添加事件监听
public static void AddListener(string eventKey, Action callback)
{
if (!eventDic.ContainsKey(eventKey))
{
eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();
}
eventDic[eventKey].TryAdd(callback, false);
}
#region 有参
public static void AddListener<T>(string eventKey, Action<T> callback)
{
if (!eventDic.ContainsKey(eventKey))
{
eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();
}
eventDic[eventKey].TryAdd(callback, false);
}
public static void AddListener<T,U>(string eventKey, Action<T,U> callback)
{
if (!eventDic.ContainsKey(eventKey))
{
eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();
}
eventDic[eventKey].TryAdd(callback, false);
}
public static void AddListener<T, U, V>(string eventKey, Action<T, U, V> callback)
{
if (!eventDic.ContainsKey(eventKey))
{
eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();
}
eventDic[eventKey].TryAdd(callback, false);
}
#endregion
// 移除事件监听
public static void RemoveListener(string eventKey, Action callback)
{
if(eventDic.TryGetValue(eventKey,out ConcurrentDictionary<Delegate,bool> callbackList))
{
if (callbackList.ContainsKey(callback))
callbackList.TryRemove(callback, out bool result);
}
}
// 移除全部事件监听
public static void Clear() => eventDic.Clear();
#region 有参
public static void RemoveListener<T>(string eventKey, Action<T> callback)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
if (callbackList.ContainsKey(callback))
callbackList.TryRemove(callback, out bool result);
}
}
public static void RemoveListener<T, U>(string eventKey, Action<T, U> callback)
{
if(eventDic.TryGetValue(eventKey,out ConcurrentDictionary<Delegate,bool> callbackList))
{
if (callbackList.ContainsKey(callback))
callbackList.TryRemove(callback, out bool result);
}
}
public static void RemoveListener<T, U, V>(string eventKey, Action<T, U, V> callback)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
if (callbackList.ContainsKey(callback))
callbackList.TryRemove(callback, out bool result);
}
}
#endregion
// 立即触发事件
public static void SyncBroadcast(string eventKey)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
foreach (var callback in callbackList.Keys)
{
try
{
(callback as Action)?.Invoke();
}
catch(Exception ex)
{
Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");
}
}
}
}
#region 有参
public static void SyncBroadcast<T>(string eventKey, T data)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
foreach (var callback in callbackList.Keys)
{
try
{
(callback as Action<T>)?.Invoke(data);
}
catch (Exception ex)
{
Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");
}
}
}
}
public static void SyncBroadcast<T, U>(string eventKey, T dataT, U dataU)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
foreach (var callback in callbackList.Keys)
{
try
{
(callback as Action<T, U>)?.Invoke(dataT, dataU);
}
catch (Exception ex)
{
Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");
}
}
}
}
public static void SyncBroadcast<T, U, V>(string eventKey, T dataT, U dataU, V dataV)
{
if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList))
{
foreach (var callback in callbackList.Keys)
{
try
{
(callback as Action<T, U, V>)?.Invoke(dataT, dataU, dataV);
}
catch (Exception ex)
{
Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");
}
}
}
}
#endregion
// 在主线程触发事件
public static void Broadcast(string eventKey)
{
LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey));
}
#region 有参
public static void Broadcast<T>(string eventKey, T data)
{
LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey, data));
}
public static void Broadcast<T, U>(string eventKey, T dataT, U dataU)
{
LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey, dataT, dataU));
}
public static void Broadcast<T, U, V>(string eventKey, T dataT, U dataU, V dataV)
{
LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey, dataT, dataU, dataV));
}
#endregion
}