Android Studio: RxJava如何取消订阅

发布于:2025-02-16 ⋅ 阅读:(114) ⋅ 点赞:(0)

一、事件流如何导致内存泄漏

        上一篇提到过Observable.interval()是定期发射事件,Activity 关闭后它还在运行,导致内存泄漏。

public class MyActivity extends AppCompatActivity {
    private Subscription subscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        subscription = Observable.interval(1, TimeUnit.SECONDS) // 每秒发射一个事件
            .subscribe(aLong -> Log.d("RxJava", "收到事件:" + aLong));
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        // ❌ 忘记取消订阅,Activity 可能会泄漏
    }
}

  Observable.interval() 是 RxJava 提供的一个工厂方法,用于创建一个发射定时事件的 Observable。它的作用是每隔一定的时间间隔发射一个数字。类似于java中的定时任务。

 subscribe()Observable 的方法,它用于订阅事件流,并定义事件到达后的处理逻辑。通过 subscribe() 来订阅这个 Observable,并定义了处理逻辑:每次收到事件时,输出日志,显示事件的数字。

Logcat 中,你会看到类似以下的输出:

收到事件:0
收到事件:1
收到事件:2
收到事件:3

事件会不断发射,直到你主动取消订阅或者应用程序退出。

如何单独对订阅进行取消:

单独subscription 进行取消订阅,可以在 onDestroy() 方法中手动调用 unsubscribe()。

public class MyActivity extends AppCompatActivity {
    private Subscription subscription; // RxJava 1.x
    // private Disposable disposable; // RxJava 2.x 及以上用 Disposable

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        subscription = Observable.interval(1, TimeUnit.SECONDS) // 每秒发射一个事件
            .subscribe(aLong -> Log.d("RxJava", "收到事件:" + aLong));
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (subscription != null && !subscription.isUnsubscribed()) {
            subscription.unsubscribe(); // 取消订阅,防止内存泄漏
        }
    }
}

二、订阅管理工具

        虽然可以手动对Observable一个个取消订阅,但是一个 Activity 可能会有多个 Observable,手动管理多个 Subscription 代码复杂,而 CompositeSubscription 可以批量管理。

订阅管理工具:

        下面的类确保可以在合适的时机取消订阅,避免内存泄漏。它相当于一个订阅管理工具,用于集中管理多个 Subscription,方便在 ActivityFragment 销毁时统一清理。

public abstract class ICompositeSubscription {

    CompositeSubscription mCompositeSubscription;

    public ICompositeSubscription(){
        mCompositeSubscription = new CompositeSubscription();
    }

    /**
     * 添加到订阅管理
     * @param subscription
     */

    public Subscription putSubscription(Subscription subscription){
        mCompositeSubscription.add(subscription);
        return subscription;
    }

    /**
     * 取消订阅
     */
    public void unSubscribe(){
        if(null == mCompositeSubscription){
            return;
        }
        mCompositeSubscription.clear();
    }
}

  CompositeSubscription 作用是管理多个 Subscription。它的内部是一个集合,可以存储多个 Subscription,并且可以统一清理。

public Subscription putSubscription(Subscription subscription){
    mCompositeSubscription.add(subscription);
    return subscription;
}
  • 传入一个 Subscription,并把它添加到 mCompositeSubscription 中进行管理。
  • 这样,所有订阅都可以集中管理,方便后续取消。
public void unSubscribe(){
    if(null == mCompositeSubscription){
        return;
    }
    mCompositeSubscription.clear();
}
  • clear() 方法会取消所有的订阅,防止 Observer 持续监听,导致内存泄漏。
  • 通常在 ActivityFragment 销毁时调用此方法,释放资源。

三、如何使用订阅管理工具

 subscriptionManager.unSubscribe(); 可以自动清理所有订阅,不会造成内存泄漏。

public class MyActivity extends AppCompatActivity {
    private ICompositeSubscription subscriptionManager = new ICompositeSubscription();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        Subscription subscription = Observable.interval(1, TimeUnit.SECONDS) // 每秒发射一个事件
            .subscribe(aLong -> Log.d("RxJava", "收到事件:" + aLong));

        // 添加到订阅管理
        subscriptionManager.putSubscription(subscription);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        // ✅ 取消所有订阅,避免内存泄漏
        subscriptionManager.unSubscribe();
    }
}

上面只有一个订阅的情况下,ICompositeSubscription 并没有明显的优势,直接手动调用 unsubscribe() / dispose() 就可以了。

📌 ICompositeSubscription 适用的场景

它的作用主要体现在多个订阅的管理上,比如:

  • 多个 Observable 同时订阅
  • 某些订阅需要随时添加和移除
  • 避免手动管理多个 Subscription

假设现在有多个订阅:

public class MyActivity extends AppCompatActivity {
    private ICompositeSubscription subscriptionManager = new ICompositeSubscription();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        // 订阅 1:定时任务
        Subscription subscription1 = Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(aLong -> Log.d("RxJava", "收到事件 1:" + aLong));

        // 订阅 2:用户输入搜索事件
        Subscription subscription2 = RxBus.toObservable(SearchEvent.class)
            .subscribe(event -> Log.d("RxJava", "搜索事件:" + event.getQuery()));

        // 订阅 3:网络请求
        Subscription subscription3 = api.getUserInfo()
            .subscribe(user -> Log.d("RxJava", "用户信息:" + user.getName()));

        // 统一管理订阅
        subscriptionManager.putSubscription(subscription1);
        subscriptionManager.putSubscription(subscription2);
        subscriptionManager.putSubscription(subscription3);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        // 统一取消所有订阅,防止内存泄漏
        subscriptionManager.unSubscribe();
    }
}

ICompositeSubscription 适用于管理多个订阅,如果只有一个订阅,直接手动管理就足够了!


网站公告

今日签到

点亮在社区的每一天
去签到