我现在想用 RxBus
来发布和订阅事件,同时使用 ICompositeSubscription
来管理订阅。跟前一个博客的区别在于,事件流的产生方式不同,更加得全面。
目标
- 使用
RxBus
发布事件。 - 使用
ICompositeSubscription
来管理订阅。 - 在
Activity
中创建订阅,并确保在Activity
销毁时取消订阅,避免内存泄漏。
代码实现
1、RxBus:发布和订阅事件。
public class RxBus {
private final SerializedSubject<Object, Object> subject;
public RxBus() {
subject = new SerializedSubject<>(PublishSubject.create());
}
public void post(Object object) {
subject.onNext(object); // 发布事件
}
@NonNull
public <T> Observable<T> toObservable(Class<T> type) {
return subject.ofType(type); // 转换成指定类型的 Observable
}
public <T> Subscription toSubscription(Class<T> type, Action1<T> action1, Scheduler scheduler) {
return this
.toObservable(type)
.subscribeOn(scheduler) // 指定线程调度器
.subscribe(action1);
}
}
2、 ICompositeSubscription:管理订阅。
public class ICompositeSubscription {
private CompositeSubscription mCompositeSubscription;
public ICompositeSubscription() {
mCompositeSubscription = new CompositeSubscription();
}
public Subscription putSubscription(Subscription subscription) {
mCompositeSubscription.add(subscription); // 添加订阅
return subscription;
}
public void unSubscribe() {
if (mCompositeSubscription != null) {
mCompositeSubscription.clear(); // 取消所有订阅
}
}
}
3、Event 类:定义事件类。
public class Event {
private String message;
public Event(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
4、Activity 代码:在 Activity
中,使用 RxBus
发布和订阅事件,并使用 ICompositeSubscription
来管理订阅。
public class MyActivity extends AppCompatActivity {
private ICompositeSubscription subscriptionManager = new ICompositeSubscription();
private RxBus mRxBus;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mRxBus = new RxBus(); // 初始化 RxBus
// 设置订阅事件的规则
Subscription subscription = mRxBus.toObservable(Event.class) // 订阅 Event 类型的事件
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理
.subscribe(event -> {
Log.d("RxJava", "收到事件:" + event.getMessage());
});
// 管理订阅
subscriptionManager.putSubscription(subscription);
// 模拟发布事件
findViewById(R.id.btnSendEvent).setOnClickListener(view -> {
// 发布一个 Event 类型的事件
mRxBus.post(new Event("Hello from RxBus!"));
});
}
@Override
protected void onDestroy() {
super.onDestroy();
// 取消所有订阅,避免内存泄漏
subscriptionManager.unSubscribe();
}
}
代码解释
RxBus
类:- 用于发布和订阅事件。通过
post()
方法发布事件,使用toObservable()
将事件转换为Observable
,让订阅者可以订阅该事件。
- 用于发布和订阅事件。通过
ICompositeSubscription
类:- 用于管理多个订阅。在
onCreate()
中,订阅事件后将订阅对象加入到CompositeSubscription
中,确保在onDestroy()
时可以统一取消所有订阅,避免内存泄漏。
- 用于管理多个订阅。在
Event
类:- 简单的事件类,包含一个
message
字段和构造方法,用于传递事件数据。
- 简单的事件类,包含一个
MyActivity
:- 在
onCreate()
中初始化RxBus
和ICompositeSubscription
,并订阅Event
类型的事件。 btnSendEvent
按钮点击后,调用RxBus.post()
发布一个事件。- 在
onDestroy()
中调用subscriptionManager.unSubscribe()
来取消订阅。
- 在
重要概念
RxBus
的发布和订阅:RxBus
是一个事件总线,通过toObservable()
和post()
方法实现发布和订阅事件。- 订阅
RxBus
后,每当有事件发布时,订阅者会接收到这个事件,并做出响应。
ICompositeSubscription
的作用:ICompositeSubscription
管理多个订阅,当Activity
销毁时,调用unSubscribe()
取消所有订阅,防止内存泄漏。- 在
Activity
中添加每个新的订阅并通过putSubscription()
方法管理。