【Android】Flow学习及使用

发布于:2025-07-03 ⋅ 阅读:(18) ⋅ 点赞:(0)

前言

​ Flow是kotlin协程库中的一个重要组成部分,它可以按顺序发送多个值,用于对异步数据流进行处理。所谓异步数据流就是连续的异步事件,如连续的网络请求、查询数据库等。

​ 了解LiveData的同学可能知道,使用LiveData也可以大体实现这样的处理连续的异步事件的效果,比如说每发一次网络请求执行一次postValue()方法,那么我们就能通过Observer来监听到这个值的改变,接着去进行相应的操作,但与此同时这样会遇到许多相应的问题,这里先按下不表,后面会介绍为什么使用Flow而不使用LiveData来对连续的异步事件进行处理。

​ 同时为了方便大家理解啥场景用Flow,下面会另外实现一个搜索输入流来实现实时搜索的Demo。

基础

基本用法

// 生产者:发送数据
val numbersFlow = flow {
    emit(1)
  	delay(1000)
    emit(2)
  	delay(1000)
    emit(3)
}

// 收集者:接收数据
lifecycleScope.launch {
    numbersFlow.collect { value ->
        Log.d("Flow", "收到:$value")
    }
}

打印:

收到:1
收到:2
收到:3

这是一个最简单的Flow例子,可以看到使用生产者使用emit来发送数据,接着接收者使用collect来接收数据

概念与核心特点

上面讲了这些,好像还没具体说下Flow是个啥。

Flow是一种可以异步地发送多个值的冷流。所谓冷流就是说Flow是冷的,只有收集的时候(collect时)才会开始执行,也就是调用collect之后Flow再来生产数据再发送数据。与之对应的还有热流,典型的就是StateFlow,它不管有没有收集者,数据都会产生,下面再具体介绍。

Flow的核心特点:

特征 描述
冷流 Flow是冷的,直到被收集的时候(collect时)才会开始执行,有助于节省资源
支持协程 Flow是基于协程构建的,支持协程,能在emit和collect中调用挂起函数
支持自动取消 Flow可以结合协程作用域实现自动取消
Flow的收集过程是挂起的,需要运行在一个协程中。只要这个协程被取消了,Flow的收集就会自动终止
背压处理 背压控制:消费者来不及处理生产者发送的数据时的处理机制
内置背压控制,不会造成UI卡顿或内存泄露
链式操作 可以进行链式操作(map、filter等)

上面说到Flow支持链式操作,介绍一下Flow的常用的操作符map、filter的用法,其他感兴趣的可自行Chatgpt:

fun processedFlow(): Flow<String> = flow {
    for (i in 1..5) {
        delay(500)
        emit(i)
    }
}.map { 
    // 中间操作:转换为字符串
    "Item $it"
}.filter {
    // 中间操作:过滤偶数
    it.last().toString().toInt() % 2 != 0
}

收集到的结果:

Item 1
Item 3
Item 5

Android中使用

这里拿一个网络请求的Demo来举例,在ViewModel进行网络请求,在Activity中对请求结果进行收集:

// ViewModel
fun fetchUserData(): Flow<String> = flow {
    
    delay(2000)// 模拟网络请求获取数据
    emit("User Data")
     
    delay(1500)// 模拟更新数据
    emit("Updated Data")
}.flowOn(Dispatchers.IO) // 指定在IO线程执行

// Activity
lifecycleScope.launchWhenStarted {
    viewModel.fetchUserData()
        .onEach { data ->
            println(data)
        }
        .catch { e ->
            // 异常处理
            Toast.makeText(context, "Error: ${e.message}", Toast.LENGTH_SHORT).show()
        }
        .collect()
}

打印:

User Data
Updated Data

与LiveData对比

上面的在Android中的使用Flow来对网络请求结果收集如果用LiveData来写的话:

  • 不使用LiveData{}
// ViewModel
private val _userData = MutableLiveData<String>()
val userData: LiveData<String> = _userData

fun fetchUserDataManual() {
    viewModelScope.launch {
        delay(2000)
        _userData.postValue("User Data")

        delay(1500)
        _userData.postValue("Updated Data")
    }
}

// Activity
viewModel.userData.observe(this) { data ->
    println(data)
}
  • 使用LiveData{}
// ViewModel
fun fetchUserData(): LiveData<String> = liveData(Dispatchers.IO) {
    try {
        delay(2000)
        emit("User Data")

        delay(1500)
        emit("Updated Data")
    } catch (e: Exception) {
        emit("Error: ${e.message}")
    }
}

// Activity
viewModel.fetchUserData().observe(this) { data ->
    println(data)
}

可以发现好像使用LiveData也可以实现连续发送网络请求接着进行处理,但使用LiveData时会存在以下问题与局限性:

  • 感知生命周期但不支持挂起函数:无法在LiveData中使用suspend关键字

因为LiveData是基于观察者模式和生命周期感知构建的,不是基于suspend的挂起函数的异步机制,所以LiveData中不支持挂起函数调用,可以使用LiveData{}构建器来让LiveData支持调用挂起函数:

fun fetchData(): LiveData<String> = liveData(Dispatchers.IO) {
    val result = fetchFromNetwork() // 可以调用suspend函数
    emit(result)
}
  • 不支持链式操作

Flow链式操作:

textFlow
	.debounce(500)// 停止输入500ms才触发一次
	.map { it.trim() }// 去掉空格
	.distinctUntilChanged()// 只有当输入发生变化时才会接着往下执行
	.onEach { vm.updateKeyword(it) }// 更新
	.launchIn(lifecycleScope)// 启动Flow

可以看到将一连串的处理像链条一样串联起来,LiveData并没有这些链式操作符。

  • 缺乏背压控制:无法响应高频数据流(如搜索输入流)
  • 只能有限支持异步数据流
  • 组合操作符少:没有map、filter等操作符
  • 逻辑冗余:要手动处理线程、去重、防抖、错误捕获等问题

使用建议:

  • 如果只需要简单的、生命周期安全的 UI 数据绑定,LiveData 是轻量选择。
  • 如果需要复杂的异步流、响应式变换、防抖、取消处理等,Flow 更合适。

热流StateFlow、SharedFlow

常见的热流有StateFlow和SharedFlow。(LiveData也是一个热流)

知道了冷流是只有接收者接受数据时,发送者才会去产生数据再发送数据给接收者。并且每个接收者都会触发完整的数据流从头开始接收完整的数据源。

而热流就是不管有没有接受者来接收数据,发送者都会生产数据,多个接受者时共享同一份数据源的,同时接受者并不会接收完整的数据源,发送者数据生产到哪了接受者就接收到哪的数据。

说的通俗一点就是:

  • 冷流就像刷视频,我们开始刷这个视频这个视频才会开始播放,并且是从头开始播放
  • 热流就像直播,我们不看直播这个直播也在播放,点进直播间观看也并不是从头开始看,而是只能从当前的内容开始看

冷流Demo:

//每次collect都会重新发射数据 
val coldFlow = flow { 
    println("开始生产数据")
    emit(1) 
    emit(2) 
} 

// 观察者1
coldFlow.collect { println("观察者1: $it") } // 输出:1,2 
// 观察者2
coldFlow.collect { println("观察者2: $it") } // 再次输出:1,2

适用场景:

  • 网络请求、数据库查询等需要独立数据源的场景
  • 每个订阅者需要从头消费完整的数据

热流Demo:

// 创建热流(SharedFlow) 
val hotFlow = MutableSharedFlow<Int>()

// 启动协程持续发射数据(即使没有订阅者)
CoroutineScope(Dispatchers.Default).launch { 
    repeat(4) { 
        delay(1000) 
         // 发射 0 1 2 3 4
        hotFlow.emit(it)
    } 
}

// 观察者1(延迟1秒订阅) 
CoroutineScope(Dispatchers.Main).launch {
    delay(1000) 
    hotFlow.collect { println("观察者1: $it") } // 只能收到 1,2,3,4
} 

// 观察者2(延迟5秒订阅) 
CoroutineScope(Dispatchers.Main).launch { 
    delay(5000) 
    hotFlow.collect { println("观察者2: $it") } // 收不到任何数据(发射已结束) 
}

适用场景:

  • 需要共享实时数据的场景(如IM消息、用户定位更新)
  • 数据生产是连续且独立的

总结:

冷流(Flow、asFlow) 热流(StateFlow、SharedFlow)
数据产生发送时机 接受者收集数据时(collect) 直接产生数据,不管有没有接受者收集
数据独立性 每个接受者收到的数据时独立的 所有接受者共享数据
数据历史 每个接受者从头开始获取完整数据 只能获取订阅后产生的数据

搜索输入流实现实时搜索

场景:有一个搜索框,没有搜索图标我们无法手动点击进行搜索,而是对文本进行监听实现实时自动搜索。

在这使用的Flow是callbackFlow,介绍一下它与Flow的区别:

  • flow{}用于挂起函数式的顺序执行
  • callbackFlow{}用于将异步的、回调式的数据源封装成Flow

简单来说,callbackFlow是需要依赖异步回调拿数据的场景,没办法直接emit(),比如说监听文本变化这种。而不是像flow那样,发送完网络请求直接emit()即可。

所以需要对文本进行监听的话,需要使用callbackFlow将TextWatcher转成流。

		val et = vBinding.etSearch
    val textFlow = callbackFlow {
      val watcher = object : TextWatcher {
        override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {}
        override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {}
        override fun afterTextChanged(s: Editable?) {
          val str = s?.toString() ?: ""
          // 将变化的字符串交给Flow
          trySend(str)
        }
      }
      et.addTextChangedListener(watcher)
      awaitClose {
        // 当Flow被取消时,移除监听器避免内存泄露
        et.removeTextChangedListener(watcher)
      }
    }

    textFlow
      .debounce(500)
      .map { it.trim() }
      .distinctUntilChanged()
      .onEach {
        vm.updateKeyword(it)
      }
      .launchIn(lifecycleScope)

这里textFlow做的处理有:

  • 用户通知输入500ms时才做处理,减少频繁触发
  • distinctUntilChanged()表示输入内容不变时不触发搜索
  • 收集trySend发送的字符串来进行搜索
  • launchIn(lifecycleScope)在当前生命周期范围内启动协程收集Flow