前言
Flow是kotlin协程库中的一个重要组成部分,它可以按顺序发送多个值,用于对异步数据流进行处理。所谓异步数据流就是连续的异步事件,如连续的网络请求、查询数据库等。
了解LiveData的同学可能知道,使用LiveData也可以大体实现这样的处理连续的异步事件的效果,比如说每发一次网络请求执行一次postValue()
方法,那么我们就能通过Observer
来监听到这个值的改变,接着去进行相应的操作,但与此同时这样会遇到许多相应的问题,这里先按下不表,后面会介绍为什么使用Flow而不使用LiveData来对连续的异步事件进行处理。
同时为了方便大家理解啥场景用Flow,下面会另外实现一个搜索输入流来实现实时搜索的Demo。
基础
基本用法
// 生产者:发送数据
val numbersFlow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
// 收集者:接收数据
lifecycleScope.launch {
numbersFlow.collect { value ->
Log.d("Flow", "收到:$value")
}
}
打印:
收到:1
收到:2
收到:3
这是一个最简单的Flow例子,可以看到使用生产者使用emit来发送数据,接着接收者使用collect来接收数据
概念与核心特点
上面讲了这些,好像还没具体说下Flow是个啥。
Flow是一种可以异步地发送多个值的冷流。所谓冷流就是说Flow是冷的,只有收集的时候(collect时)才会开始执行,也就是调用collect之后Flow再来生产数据再发送数据。与之对应的还有热流,典型的就是StateFlow,它不管有没有收集者,数据都会产生,下面再具体介绍。
Flow的核心特点:
特征 | 描述 |
---|---|
冷流 | Flow是冷的,直到被收集的时候(collect时)才会开始执行,有助于节省资源 |
支持协程 | Flow是基于协程构建的,支持协程,能在emit和collect中调用挂起函数 |
支持自动取消 | Flow可以结合协程作用域实现自动取消 Flow的收集过程是挂起的,需要运行在一个协程中。只要这个协程被取消了,Flow的收集就会自动终止 |
背压处理 | 背压控制:消费者来不及处理生产者发送的数据时的处理机制 内置背压控制,不会造成UI卡顿或内存泄露 |
链式操作 | 可以进行链式操作(map、filter等) |
上面说到Flow支持链式操作,介绍一下Flow的常用的操作符map、filter的用法,其他感兴趣的可自行Chatgpt:
fun processedFlow(): Flow<String> = flow {
for (i in 1..5) {
delay(500)
emit(i)
}
}.map {
// 中间操作:转换为字符串
"Item $it"
}.filter {
// 中间操作:过滤偶数
it.last().toString().toInt() % 2 != 0
}
收集到的结果:
Item 1
Item 3
Item 5
Android中使用
这里拿一个网络请求的Demo来举例,在ViewModel进行网络请求,在Activity中对请求结果进行收集:
// ViewModel
fun fetchUserData(): Flow<String> = flow {
delay(2000)// 模拟网络请求获取数据
emit("User Data")
delay(1500)// 模拟更新数据
emit("Updated Data")
}.flowOn(Dispatchers.IO) // 指定在IO线程执行
// Activity
lifecycleScope.launchWhenStarted {
viewModel.fetchUserData()
.onEach { data ->
println(data)
}
.catch { e ->
// 异常处理
Toast.makeText(context, "Error: ${e.message}", Toast.LENGTH_SHORT).show()
}
.collect()
}
打印:
User Data
Updated Data
与LiveData对比
上面的在Android中的使用Flow来对网络请求结果收集如果用LiveData来写的话:
- 不使用LiveData{}
// ViewModel
private val _userData = MutableLiveData<String>()
val userData: LiveData<String> = _userData
fun fetchUserDataManual() {
viewModelScope.launch {
delay(2000)
_userData.postValue("User Data")
delay(1500)
_userData.postValue("Updated Data")
}
}
// Activity
viewModel.userData.observe(this) { data ->
println(data)
}
- 使用LiveData{}
// ViewModel
fun fetchUserData(): LiveData<String> = liveData(Dispatchers.IO) {
try {
delay(2000)
emit("User Data")
delay(1500)
emit("Updated Data")
} catch (e: Exception) {
emit("Error: ${e.message}")
}
}
// Activity
viewModel.fetchUserData().observe(this) { data ->
println(data)
}
可以发现好像使用LiveData也可以实现连续发送网络请求接着进行处理,但使用LiveData时会存在以下问题与局限性:
- 感知生命周期但不支持挂起函数:无法在LiveData中使用suspend关键字
因为LiveData是基于观察者模式和生命周期感知构建的,不是基于suspend的挂起函数的异步机制,所以LiveData中不支持挂起函数调用,可以使用LiveData{}构建器来让LiveData支持调用挂起函数:
fun fetchData(): LiveData<String> = liveData(Dispatchers.IO) {
val result = fetchFromNetwork() // 可以调用suspend函数
emit(result)
}
- 不支持链式操作
Flow链式操作:
textFlow
.debounce(500)// 停止输入500ms才触发一次
.map { it.trim() }// 去掉空格
.distinctUntilChanged()// 只有当输入发生变化时才会接着往下执行
.onEach { vm.updateKeyword(it) }// 更新
.launchIn(lifecycleScope)// 启动Flow
可以看到将一连串的处理像链条一样串联起来,LiveData并没有这些链式操作符。
- 缺乏背压控制:无法响应高频数据流(如搜索输入流)
- 只能有限支持异步数据流
- 组合操作符少:没有map、filter等操作符
- 逻辑冗余:要手动处理线程、去重、防抖、错误捕获等问题
使用建议:
- 如果只需要简单的、生命周期安全的 UI 数据绑定,LiveData 是轻量选择。
- 如果需要复杂的异步流、响应式变换、防抖、取消处理等,Flow 更合适。
热流StateFlow、SharedFlow
常见的热流有StateFlow和SharedFlow。(LiveData也是一个热流)
知道了冷流是只有接收者接受数据时,发送者才会去产生数据再发送数据给接收者。并且每个接收者都会触发完整的数据流从头开始接收完整的数据源。
而热流就是不管有没有接受者来接收数据,发送者都会生产数据,多个接受者时共享同一份数据源的,同时接受者并不会接收完整的数据源,发送者数据生产到哪了接受者就接收到哪的数据。
说的通俗一点就是:
- 冷流就像刷视频,我们开始刷这个视频这个视频才会开始播放,并且是从头开始播放
- 热流就像直播,我们不看直播这个直播也在播放,点进直播间观看也并不是从头开始看,而是只能从当前的内容开始看
冷流Demo:
//每次collect都会重新发射数据
val coldFlow = flow {
println("开始生产数据")
emit(1)
emit(2)
}
// 观察者1
coldFlow.collect { println("观察者1: $it") } // 输出:1,2
// 观察者2
coldFlow.collect { println("观察者2: $it") } // 再次输出:1,2
适用场景:
- 网络请求、数据库查询等需要独立数据源的场景
- 每个订阅者需要从头消费完整的数据
热流Demo:
// 创建热流(SharedFlow)
val hotFlow = MutableSharedFlow<Int>()
// 启动协程持续发射数据(即使没有订阅者)
CoroutineScope(Dispatchers.Default).launch {
repeat(4) {
delay(1000)
// 发射 0 1 2 3 4
hotFlow.emit(it)
}
}
// 观察者1(延迟1秒订阅)
CoroutineScope(Dispatchers.Main).launch {
delay(1000)
hotFlow.collect { println("观察者1: $it") } // 只能收到 1,2,3,4
}
// 观察者2(延迟5秒订阅)
CoroutineScope(Dispatchers.Main).launch {
delay(5000)
hotFlow.collect { println("观察者2: $it") } // 收不到任何数据(发射已结束)
}
适用场景:
- 需要共享实时数据的场景(如IM消息、用户定位更新)
- 数据生产是连续且独立的
总结:
冷流(Flow、asFlow) | 热流(StateFlow、SharedFlow) | |
---|---|---|
数据产生发送时机 | 接受者收集数据时(collect) | 直接产生数据,不管有没有接受者收集 |
数据独立性 | 每个接受者收到的数据时独立的 | 所有接受者共享数据 |
数据历史 | 每个接受者从头开始获取完整数据 | 只能获取订阅后产生的数据 |
搜索输入流实现实时搜索
场景:有一个搜索框,没有搜索图标我们无法手动点击进行搜索,而是对文本进行监听实现实时自动搜索。
在这使用的Flow是callbackFlow,介绍一下它与Flow的区别:
- flow{}用于挂起函数式的顺序执行
- callbackFlow{}用于将异步的、回调式的数据源封装成Flow
简单来说,callbackFlow是需要依赖异步回调拿数据的场景,没办法直接emit(),比如说监听文本变化这种。而不是像flow那样,发送完网络请求直接emit()即可。
所以需要对文本进行监听的话,需要使用callbackFlow将TextWatcher转成流。
val et = vBinding.etSearch
val textFlow = callbackFlow {
val watcher = object : TextWatcher {
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {}
override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {}
override fun afterTextChanged(s: Editable?) {
val str = s?.toString() ?: ""
// 将变化的字符串交给Flow
trySend(str)
}
}
et.addTextChangedListener(watcher)
awaitClose {
// 当Flow被取消时,移除监听器避免内存泄露
et.removeTextChangedListener(watcher)
}
}
textFlow
.debounce(500)
.map { it.trim() }
.distinctUntilChanged()
.onEach {
vm.updateKeyword(it)
}
.launchIn(lifecycleScope)
这里textFlow做的处理有:
- 用户通知输入500ms时才做处理,减少频繁触发
- distinctUntilChanged()表示输入内容不变时不触发搜索
- 收集trySend发送的字符串来进行搜索
- launchIn(lifecycleScope)在当前生命周期范围内启动协程收集Flow