Kotlin并发请求的一些知识记录

发布于:2025-05-16 ⋅ 阅读:(21) ⋅ 点赞:(0)
    private suspend fun fetchDataConcurrently(
        list: MutableList<MyType>,
        onRequestResult: (Int, List<MyType>?) -> Unit	//高阶函数回调
    ) {
        val deferredList = mutableListOf<Deferred<MyType?>>()
        // 设定任务超时时间为12秒,并使用 async 并发执行请求
        withTimeoutOrNull(12_000L) {
            Log.d(TAG, "request size:${list.size}")
            for ((index, item) in list.withIndex()) {
                val deferred = async {	//对每个item都发起一次异步请求, 这里是并发的请求
                	//通过callbackChannel来传递结果,参数UNLIMITED为无限缓冲,具体的在下面扩展有讲
                    val callbackChannel = Channel<MyType?>(Channel.UNLIMITED)
                    SDKInstance(item.id,
                            object : SDKCallback() {
                                override fun onSuccess(
                                    children: List<SDKType>,
                                ) {
                                    super.onSuccess(children)
                                    Log.d(TAG, "success name: ${item.name}")
                                    val item = MyType(item.name, item.id)
                                    item.list = children
                                    callbackChannel.trySend(item).isSuccess
                                }

                                override fun onError() {
                                    super.onError()
                                    callbackChannel.trySend(null).isFailure
                                    Log.d(TAG, "error name: ${item.name}")
                                }
                            })
                    callbackChannel.receive()
                }
                deferredList.add(deferred)
            }
        }
        // 等待所有请求完成
        val resultData = mutableListOf<MyType>()
        var requestSituation: Int = REQUEST_TIME_DEFAULT	//超时情况记录
        for (deferred in deferredList) {
            try {
                val result = deferred.await()	//这里的await就是等待异步任务完成
                result?.let {
                    resultData.add(it)
                    requestSituation = REQUEST_TIME_NORMAL
                }
            } catch (e: Exception) {
                // 处理任务异常
                Log.d(TAG, "error: ${e}, isTimeOut = $requestSituation")
                if (requestSituation != REQUEST_TIME_NORMAL) { //如果有数据返回成功就无需记录超时
                    requestSituation = REQUEST_TIME_TIMEOUT	//如果所有数据获取超时,需要反馈异常
                }
            }
        }
        Log.d(TAG, "response size: ${resultData.size}")
        if(requestSituation == REQUEST_TIME_TIMEOUT) {
            onRequestResult(REQ_ERROR, null)
        } else if (resultData.isEmpty()) {
            onRequestResult(REQ_NO_DATA, null)
        } else {
            if (list.size - resultData.size > Math.max((list.size - 1) / 2, 1) && resultData.size < 5) {
                onRequestResult(REQ_ERROR, null)
            } else {
                onRequestResult(REQ_SUCCESS, resultData)
                myData.applyPut { cache ->
                    cache[MyKey] = resultData	//这里使用了LruCache,以后再讲
                }
            }
        }
    }

扩展:

Channel在这段代码中的作用

  1. 桥接api与协程:将传统的回调式API(SDK的回调)转换为协程友好的异步操作
  2. 同步时序:确保在SDK回调后,协程能够继续执行
  3. 结果传递:将回调结果传递回主协程流程

潜在问题

  1. 使用无限缓冲可能不必要,因为channel开启在for循环中,一次只需要接收一个结果
  2. channel没有被显式关闭,可能导致资源泄漏
try{
//回调处理
...
}	finally {
	callbackChannel.close()	//确保关闭
}

Channel是什么?

它是Kotlin协程中的一个并发通信原语,用于在不同协程之间安全的传递数据。类似阻塞队列,但完全基于协程的非阻塞特性实现。
它是协程间通信的强大工具,特别适合将回调式API转换为挂起函数,使异步代码更线性易读。

Channel的基本特点

生产者-消费者模式:一个协程发送数据,另一个协程接收数据
线程安全:内部已处理好线程同步的问题
可挂起:当Channel满或空时,发生和接收操作会挂起协程而非阻塞线程

Channel在以上代码中的时序关系

  1. 创建channel:在每次async任务中创建一个channel

  2. SDK回调:当收到SDK回调,成功获取数据时,使用trySend发送数据,失败时使用trySend发送null

  3. 接收结果:通过callbackChannel.receive()等待SDK回调

    关键时序点:receive会挂起协程,直到SDK回调触发并发送数据到Channel

Channel的常见用法

  1. 创建Channel
//创建有缓冲的Channel
val channel = Channel<T>(capacity)

//capacity
//RENDEZVOUS(默认,无缓冲)
//UNLIMITED(无限缓冲,MAX_VALUE)
//CONFLATED(只保留最小值)
//具体数字(固定缓冲大小)
  1. 发送数据
//常规发送(可能挂起)
channel.send(data)

//尝试发送(不挂起)
channel.trySend(data).isSuccess
  1. 接收数据
//常规接收(可能挂起)
val data = channel.receive()

//尝试接收(不挂起)
val data = channel.tryReceive().getOrNull()
  1. 关闭Channel
channel.close()	//发送结束信号, 防止资源泄漏

关于这段代码的优化写法

private suspend fun fetchDataConcurrently(
    list: MutableList<MyType>,
    onRequestResult: (Int, List<MyType>?) -> Unit
) {
    val resultData = mutableListOf<MyType>()
    var requestSituation = REQUEST_TIME_DEFAULT

    try {
        withTimeout(12_000L) {
            val deferredResults = list.map { item ->
                async {
                    try {
                        val result = suspendCancellableCoroutine<MyType?> { continuation ->
                            val callback = object : SDKCallback() {
                                override fun onSuccess(
                                    children: List<SDKType>,
                                ) {
                                    val item= MyType(item.name, item.id).apply {
                                        list = children
                                    }
                                    continuation.resume(item)
                                }

                                override fun onError() {
                                    continuation.resume(null)
                                }
                            }

                            continuation.invokeOnCancellation {
                                // 如果协程被取消,可以在这里取消SDK请求
                                // 需要SDK支持取消操作
                            }

                            SDKInstance(item.id, callback)
                        }
                        result
                    } catch (e: Exception) {
                        null
                    }
                }
            }

            deferredResults.forEach { deferred ->
                deferred.await()?.let {
                    resultData.add(it)
                    requestSituation = REQUEST_TIME_NORMAL
                }
            }
        }
    } catch (e: TimeoutCancellationException) {
        if (requestSituation != REQUEST_TIME_NORMAL) {
            requestSituation = REQUEST_TIME_TIMEOUT
        }
        Log.w(TAG, "Request timeout: ${e.message}")
    } catch (e: Exception) {
        Log.e(TAG, "Unexpected error: ${e.message}", e)
    }

    // 结果处理逻辑保持不变
    when {
        requestSituation == REQUEST_TIME_TIMEOUT -> {
            onRequestResult(REQ_ERROR, null)
        }
        resultData.isEmpty() -> {
            onRequestResult(REQ_NO_DATA, null)
        }
        list.size - resultData.size > maxOf(list.size / 2, 1) && resultData.size < 5 -> {
            onRequestResult(REQ_ERROR, null)
        }
        else -> {
            onRequestResult(REQ_SUCCESS, resultData)
            myData.applyPut { cache ->
                cache[myKey] = resultData
            }
        }
    }
}

优化点说明

  • 替换Channel为suspendCancellableCoroutine:

    更直接地将回调API转换为挂起函数

    避免了Channel资源管理问题

  • 改进资源管理:

    使用invokeOnCancellation处理协程取消

    确保所有可能的异常都被捕获

  • 缓冲策略优化:

    完全移除了不必要的Channel缓冲

    使用更直接的协程控制流

  • 错误处理增强:

    明确区分超时和其他异常

    更好的日志记录

核心知识点

  1. 协程与回调的转换:

    suspendCancellableCoroutine将回调API转换为挂起函数
    
    协程取消处理机制
    
  2. 结构化并发:

    withTimeout创建有时间限制的作用域
    
    async/await并发模式
    
  3. 资源管理:

    协程取消时的清理工作
    
    异常处理边界
    
  4. 并发控制:

    多个请求的并行执行
    
    结果的聚合处理
    
  5. 状态管理:

    请求状态的跟踪(REQUEST_TIME_NORMAL/TIMEOUT)
    
    结果的成功/失败判定逻辑
    

网站公告

今日签到

点亮在社区的每一天
去签到