RxJS 核心方法实践与错误处理详解
- 围绕 RxJS 的核心方法进行实操
- 涵盖 observable 的构建、定时器的使用、错误处理机制(如
retry
和catchError
)、以及defer
和lastValueFrom
的使用场景
1 )RxJS 基础实践:Timer 的使用
我们首先在项目中使用 RxJS 的
Observable
和timer
方法来进行实际操作由于项目中已通过
nestjs
依赖引入了 RxJS,因此无需额外安装我们新建一个名为
index.mjs
的文件,并在其中导入以下内容:import { Observable, timer } from 'rxjs'; const exampleTimer = timer(2000); // 2秒后触发 exampleTimer.subscribe({ next: (value) => console.log('timer emitted', value), complete: () => console.log('timer complete') });
这段代码会在两秒后输出
timer emitted 0
,并在完成后打印timer complete
timer 操作符支持两种参数模式,不同模式下的值发射逻辑如下
参数形式 功能描述 发射值序列示例 timer(delay: number) 仅延迟执行:延迟 delay 毫秒后发射第一个值,随后完成。 [0](2秒后发射 0,立即触发 complete) timer(delay: number, period: number) 延迟+周期执行:延迟 delay 毫秒后发射第一个值,之后每 period 毫秒发射下一个值(无限序列,除非主动取消订阅)。 [0, 1, 2, 3…](2秒后发射 0,之后每1秒发射1、2、3…) 关键结论:无论是否设置周期,timer 发射的第一个值始终为 0,后续值按“当前发射次数-1”的规则递增(即第1次发射0,第2次发射1,以此类推)
执行流程解析:
- 创建 Observable:timer(2000) 创建一个 Observable,设置“延迟2秒后发射值”
- 订阅与等待:调用 subscribe 后,Observable 开始计时,2秒后执行发射逻辑
- 发射第一个值:由于未设置周期参数,timer 仅发射一次值 0,触发 next 回调,故 value 为 0
- 完成 Observable:发射值后立即触发 complete 回调,整个流程结束
若修改代码为带周期的 timer(如 timer(2000, 1000)),则输出结果将变为:
// 代码:timer(2000, 1000) // 输出顺序: // 2秒后:timeremitted 0 // 3秒后:timeremitted 1(2+1秒) // 4秒后:timeremitted 2(3+1秒) // ...(无限递增,直至取消订阅)
timer 操作符的设计初衷是提供“基于时间的序列发射能力”,其发射的 value 本质是“计时周期的索引”:
- 当仅用于延迟执行时(单参数),索引仅为 0(表示“第一个周期”)
- 当用于周期性任务时(双参数),索引从 0 开始递增,标识“第 n 个周期”
因此,用户代码中 value 为 0 是 timer 操作符的默认行为,与延迟时间无关,仅由其内部值生成逻辑决定
2 )错误处理:Retry 与 CatchError 的配合使用
接下来我们演示如何使用 RxJS 提供的 错误处理 API,包括 throwError
、retry
和 catchError
import { Observable, retry, catchError, throwError } from 'rxjs';
let count = 0;
// 我们定义一个 observable,主动抛出错误
const errorObservable = new Observable((subscriber) => {
console.log('retry: ', count ++);
subscriber.error(new Error('this is an error'));
});
errorObservable.pipe(
retry(3),
catchError((err) => {
console.log('caught error:', err.message);
return throwError(() => new Error('Error after retries'));
})
).subscribe({
error: (error) => console.log('subscribe error final:', error?.message ?? error)
});
输出结果
retry: 0
retry: 1
retry: 2
retry: 3
caught error: this is an error
subscribe error final: Error after retries
通过 pipe
方法结合 retry(3)
和 catchError
实现重试三次并捕获最终错误:
- 在测试过程中,“retry”被执行了4次,这是因为第一次执行也算作一次尝试,之后才进行三次重试
- 这一机制非常适用于网络请求或数据库连接等需要自动重试的场景
3 )延迟创建:使用 Defer 与 LastValueFrom 获取最终值
我们将使用
defer
和lastValueFrom
来演示延迟创建 observable 并获取其最后一个值import { Observable, defer, lastValueFrom } from 'rxjs'; const deferredObservable = defer(() => { console.log('observable created'); return new Observable((subscriber) => { subscriber.next('hello1'); subscriber.next('hello2'); subscriber.next('hello3'); subscriber.complete(); }); }); async function getDeferredValue() { const result = await lastValueFrom(deferredObservable); console.log('deferred value:', result); } getDeferredValue();
运行后,控制台将输出:
observable created deferred value: hello3
lastValueFrom
的作用是将 observable 转换为 promise,并返回其最后一个值它会自动订阅该 observable 并等待其完成
4 ) 结合 Timer 实现延迟输出
- 示例 让
hello3
在两秒后输出import { Observable, defer, lastValueFrom, timer } from 'rxjs'; const deferredObservable = defer(() => { console.log('observable created'); return new Observable((subscriber) => { subscriber.next('hello1'); subscriber.next('hello2'); timer(2000).subscribe(() => { subscriber.next('hello 3'); subscriber.complete(); }); }); }); async function getDeferredValue() { const result = await lastValueFrom(deferredObservable); console.log('deferred value:', result); } getDeferredValue();
- 运行后,
observable created
会立即输出,而deferred value: hello 3
将在两秒后打印
总结
- 通过以上示例,实现了 RxJS 的几个核心方法:
-
- Timer:用于定时触发 observable 的响应
-
- Retry 与 CatchError:实现错误自动重试及最终错误捕获
-
- Defer 与 LastValueFrom:延迟创建 observable 并获取其最终值
-
- 这些方法在实际开发中具有广泛的应用价值
- 例如在处理网络请求、数据库连接、异步任务调度等场景时
- 能够极大地提升代码的健壮性与可维护性
- 建议结合官方 RxJS 文档,进一步深入理解这些操作符的原理与使用技巧