先回忆下flow吧!
flow是啥
Flow 是 Kotlin 协程框架中的一个异步数据流处理组件,专为响应式编程设计,适用于需要连续或异步返回多个值的场景,如网络请求、数据库查询、传感器数据等
- 1 异步流(Asynchronous Streams)
- Flow 允许以非阻塞方式处理一系列值或事件,适用于大量数据或涉及 IO 操作的场景
- 与 suspend 函数不同,Flow 可以返回多个值,而 suspend 函数仅能返回单个计算结果
- 2 冷流(Cold Flow)与热流(Hot Flow)
- 冷流:仅在收集器(collect)订阅后才会开始发射数据
- 热流(如 SharedFlow):创建后立即发射数据,无论是否有收集器订阅
- 冷流:仅在收集器(collect)订阅后才会开始发射数据
- 3 声明式 API
- 提供丰富的操作符(如 map、filter、reduce),支持链式调用
- 4 协程集成
- Flow 基于协程,支持结构化并发、取消操作和背压管理
flow的创建
- 使用 flow{} 构建器
val numberFlow = flow {
emit(1)
delay(100) // 模拟耗时操作
emit(2)
}
- 使用 flowOf 快速创建
val fixedFlow = flowOf(1, 2, 3)
- 集合转 Flow
val listFlow = listOf("A", "B", "C").asFlow()
高级创建方式
(1) 基于回调的 API 转换
将回调式 API(如网络请求)封装为 Flow
fun fetchDataFlow() = callbackFlow {
val callback = object : DataCallback {
override fun onData(value: String) {
trySend(value) // 发送数据到 Flow
}
override fun onComplete() {
close() // 关闭 Flow
}
}
registerCallback(callback)
awaitClose { unregisterCallback(callback) } // 确保资源释放
}
(2) 从挂起函数生成
通过 channelFlow 或 flow 结合挂起函数实现复杂逻辑
fun pollUpdates() = flow {
while (true) {
val updates = fetchUpdates() // 挂起函数
emit(updates)
delay(5000) // 间隔轮询
}
}
Flow 中处理背压(Backpressure)的核心策略
- 缓冲机制
通过 buffer() 设置缓冲区容量,允许生产者和消费者异步执行,缓解数据积压压力
flow {
repeat(100) { emit(it) }
}.buffer(50) // 设置50容量的缓冲区
.collect { /* 处理数据 */ }
2. 合并策略
使用 conflate() 跳过中间值,仅保留最新数据
flow {
emit(1); delay(10); emit(2); emit(3)
}.conflate()
.collect { println(it) } // 输出:1 → 3(跳过2)
3. 最新值优先
collectLatest 取消未完成的任务,立即处理最新发射值
flow {
emit("A"); delay(100); emit("B")
}.collectLatest { value ->
delay(200) // 处理"A"时被"B"中断
println(value) // 仅输出"B"
}
- 调度优化
利用 flowOn 切换协程上下文,分散计算负载
flow { /* 密集计算 */ }
.flowOn(Dispatchers.Default) // 在后台线程生产
.collect { /* 主线程消费 */ }
创建操作符
flow{}
基础构建器,通过 emit 发射数据,支持挂起操作:
flow { emit(1); delay(100); emit(2) }
flowOf
快速创建固定值序列的 Flow:
flowOf("A", "B", "C")
asFlow
将集合(如 List)转换为 Flow:
listOf(1, 2, 3).asFlow()
转换操作符
map
对每个元素进行转换:
flowOf(1, 2).map { it * 10 } // 输出 10, 20
filter
按条件过滤元素:
flowOf(1, 2, 3).filter { it % 2 == 0 } // 输出 2
transform
灵活转换,可多次发射值:
flowOf("Hi").transform { emit(it.uppercase()); emit(it.length) }
组合操作符
zip
合并两个 Flow 的对应元素:
flowOf(1, 2).zip(flowOf("A", "B")) { num, str -> "$num$str" } // 输出 1A, 2B
flatMapConcat
顺序展开嵌套 Flow:
flowOf(1, 2).flatMapConcat { flowOf(it, it * 2) } // 输出 1, 2, 2, 4
终端操作符
collect
触发流执行并处理数据:
flowOf(1).collect { println(it) }
first/last
获取首个或末尾元素:
flowOf(1, 2).first() // 返回 1
reduce
累积计算(如求和):
flowOf(1, 2, 3).reduce { acc, v -> acc + v } // 输出 6
背压处理操作符
buffer
设置缓冲区缓解生产消费速度差异:
flow { emit(1) }.buffer(10)
conflate
跳过中间值,保留最新数据:
flow { emit(1); emit(2) }.conflate() // 仅处理 2
其他实用操作符
take
限制收集的元素数量:
flowOf(1, 2, 3).take(2) // 输出 1, 2
onEach
在每次发射时执行副作用(如日志):
flowOf(1).onEach { println("发射: $it") }
开整
调用retrofit+okhttp
导包
// 网络请求
api("com.google.code.gson:gson:2.8.6")
api("com.squareup.retrofit2:retrofit:2.9.0")
api("com.squareup.retrofit2:converter-gson:2.9.0")
api("com.squareup.retrofit2:converter-scalars:2.0.0")
api("com.squareup.okhttp3:okhttp:3.14.9")
api("com.squareup.okhttp3:logging-interceptor:3.12.2")
api("com.squareup.okio:okio:1.17.4")
open class HttpCreater private constructor() {
val timeOut: Long = 60 //30秒超时
val baseUrlInterceptor: BaseUrlInterceptor = BaseUrlInterceptor()
var okhttpClient: OkHttpClient
var downOkHttpClient: OkHttpClient
//日志拦截
var loggingInterceptor = HttpLoggingInterceptor(object : HttpLoggingInterceptor.Logger {
override fun log(message: String) {
//打印retrofit日志
Log.i("log_http", "retrofitBack = $message")
}
})
init {
loggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
//用于请求
okhttpClient = OkHttpClient.Builder()
.connectTimeout(timeOut, TimeUnit.SECONDS)
.readTimeout(timeOut, TimeUnit.SECONDS)
.writeTimeout(timeOut, TimeUnit.SECONDS)
.addInterceptor(baseUrlInterceptor)
.addInterceptor(loggingInterceptor)
.build()
//用于下载 因为请求的okhttpClient 有日志拦截器 会先拦截请求结果 所以专门创建个用于下载的
downOkHttpClient = OkHttpClient.Builder()
.connectTimeout(timeOut, TimeUnit.SECONDS)
.readTimeout(timeOut, TimeUnit.SECONDS)
.writeTimeout(timeOut, TimeUnit.SECONDS)
.build()
}
companion object {
open val instance by lazy (LazyThreadSafetyMode.SYNCHRONIZED){
HttpCreater()
}
}
/**
* 设置baseUrl 对应的token
*/
open fun setToken(baseUrl: String,token:String){
baseUrlInterceptor.setToken(baseUrl,token)
}
/**
* 获取请求的retrofit 调用的时候创建 传入baseUrl 因为我们项目连了好几个服务器
*/
open fun getRetrofit(baseUrl:String) : Retrofit{
val gson: Gson = GsonBuilder().setLenient().create();
return Retrofit.Builder()
.client(okhttpClient)
.baseUrl(baseUrl)
// .addConverterFactory(GsonConverterFactory.create(gson)) //配置转化库 Gson解析失败,不报错崩溃
.addConverterFactory(ScalarsConverterFactory.create()) //返回字符串
.build()
}
/**
* 下载的tokenretrofit
*/
open fun getDownRetrofit(baseUrl:String) : Retrofit{
return Retrofit.Builder()
.client(downOkHttpClient)
.baseUrl(baseUrl)
.addConverterFactory(ScalarsConverterFactory.create()) //配置转化库 Gson解析失败,不报错崩溃
.build()
}
}
请求拦截器 用于获取token后设置后,自动将token设置到请求头
class BaseUrlInterceptor:Interceptor {
val tokenMap = mutableMapOf<String,String>()
override fun intercept(chain: Interceptor.Chain): Response {
// 获取request
val request = chain.request()
val builder = request.newBuilder()
builder.addHeader("Content-Type", "application/json; charset=UTF-8")
builder.addHeader("Accept", "application/json;versions=1")
val httpUrl = request.url().url().host
Log.i("log_http","httpUrl>>${httpUrl}")
if(!tokenMap.get(httpUrl).isNullOrEmpty()){
builder.addHeader("Authorization", "Bearer ${tokenMap.get(httpUrl)}")
}
return chain.proceed(builder.build())
}
fun setToken(baseUrl: String, token: String) {
tokenMap.put(baseUrl,token)
}
}
添加请求接口
interface NetApi {
@GET("/article/list/{path}/json")
suspend fun getList(@Path("path") page:Int):String
}
NetRepository flow调用
class NetRepository private constructor(){
val service by lazy { HttpCreater.instance.getRetrofit("https://www.wanandroid.com").create(NetApi::class.java) }
companion object{
val instance by lazy { NetRepository() }
}
fun getList():Flow<String> = flow {
val result = service.getList(0)
Log.i("zq_demo","flow result>>${result}")
emit(result)
}
}
测试
测试代码
findViewById<TextView>(R.id.tv_hello).setOnClickListener {
Log.i("zqq_demo","tv_hello")
lifecycleScope.launch {
Log.i("zqq_demo","lifecycleScope")
netRepository.getList().onEach {
Log.i("zqq_demo","onEach>>${it}")
}.collect{
Log.i("zqq_demo","collect>>${it}")
}
}
}
结果
其他
1 我们可以使用配置文件配置baseurl
创建config.gradle.kts
添加测试代码
val myProperty: String by extra("Hello, World!")
使用 app下
添加
// 加载 config.gradle.kts
apply(from = "${rootDir}/config.gradle.kts")
// 使用 myProperty
println(extra["myProperty"]) // 输出: Hello, World!
运行
接下来创建baseUrl
config.gradle.kts:
val baseUrl: String by extra("https://www.wanandroid.com")
app下 build.gradle.kts
plugins {
alias(libs.plugins.android.application)
alias(libs.plugins.jetbrains.kotlin.android)
}
// 加载 config.gradle.kts
apply(from = "${rootDir}/config.gradle.kts")
android {
defaultConfig {
buildConfigField("String",name = "baseUrl", value = "\"${extra["baseUrl"]}\"")
}
//这个别忘记添加
buildFeatures {
buildConfig = true // 启用 BuildConfig 功能
}
}
然后打印
Log.i("zqq_demo","baseUrl>>${BuildConfig.baseUrl}")
这样上边NetRepository 就可以使用
class NetRepository private constructor(){
val service by lazy { HttpCreater.instance.getRetrofit(BuildConfig.baseUrl).create(NetApi::class.java) }
当然你可以定义版本号 applicationId 其他的key versionCode versionName 签名文件路径 等等配置都可以
使用旧的config.gradle
config.gradle如下
ext {
test = "hello"
android = [
hello = "hello"
]
}
项目的build.gradle.kts
plugins {
alias(libs.plugins.android.application) apply false
alias(libs.plugins.jetbrains.kotlin.android) apply false
alias(libs.plugins.android.library) apply false
}
apply(from = "${rootDir}/config.gradle")
app的build.gradle.kts
println("${(rootProject.extra["android"] as Map<String,Any>).get("hello")}")
defaultConfig {
applicationId = "com.zqq.demo"
minSdk = 24
targetSdk = 34
versionCode = 1
versionName = "1.0"
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
buildConfigField("String",name = "hello", value = "\"${rootProject.extra["test"]}\"")
}
buildFeatures {
buildConfig = true // 启用 BuildConfig 功能
}