1. Channel 和 Flow 基础概念
Channel(通道)
// Channel 是一个可以发送和接收数据的管道
val channel = Channel<String>()
// 发送数据
channel.send("Hello")
// 接收数据
val data = channel.receive()
Flow(流)
// Flow 是一个可以发射数据的流
val flow = flow {
emit("Hello")
emit("World")
}
2. callbackFlow 的工作原理
callbackFlow 创建了一个 Channel
fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {
// callbackFlow 内部创建了一个 Channel
// 这个 Channel 可以发送 SSEEvent 类型的数据
// 发送数据到 Channel
trySend(SSEEvent.Message("event", "data"))
// 等待 Channel 关闭
awaitClose {
// 清理资源
disconnect()
}
}
callbackFlow 的完整流程
fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {
// 1. callbackFlow 创建一个 Channel
// 2. 这个 Channel 可以发送 SSEEvent 类型的数据
// 3. callbackFlow 返回一个 Flow,这个 Flow 会从这个 Channel 接收数据
val request = Request.Builder()
.url(url)
.addHeader("Accept", "text/event-stream")
.build()
call = okHttpClient.newCall(request)
call?.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
val body = response.body ?: return
try {
var currentEvent = "message"
val dataBuffer = StringBuilder()
while (true) {
val line = body.source().readUtf8Line() ?: break
when {
line.startsWith("event:") -> currentEvent = line.substring(6).trim()
line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")
line.isEmpty() -> {
if (dataBuffer.isNotEmpty()) {
val data = dataBuffer.toString().trim()
// 4. 发送数据到 Channel
trySend(SSEEvent.Message(currentEvent, data))
dataBuffer.clear()
}
}
}
}
// 5. 发送关闭事件
trySend(SSEEvent.Closed)
} catch (e: Exception) {
// 6. 发送错误事件
trySend(SSEEvent.Error(e))
} finally {
response.close()
close() // 关闭 Channel
}
}
override fun onFailure(call: Call, e: IOException) {
// 7. 发送错误事件
trySend(SSEEvent.Error(e))
close(e) // 关闭 Channel
}
})
// 8. 等待 Channel 被关闭
awaitClose {
disconnect()
}
}
3. trySend 的作用
trySend 发送数据到 Channel
// trySend 尝试发送数据到 Channel
trySend(SSEEvent.Message(currentEvent, data))
// trySend 的特点:
// 1. 非阻塞:不会等待接收者
// 2. 返回 Boolean:发送成功返回 true,失败返回 false
// 3. 如果 Channel 已关闭,返回 false
与 send 的区别
// send:阻塞式发送,会等待接收者
channel.send(data) // 如果 Channel 满了,会阻塞
// trySend:非阻塞式发送,立即返回结果
val success = channel.trySend(data) // 立即返回 true/false
4. Flow 的 onEach 工作原理
onEach 是 Flow 的中间操作符
sseClient.connect(sseUrl, token)
.onEach { event -> // 对每个事件进行处理
when (event) {
is SSEClient.SSEEvent.Message -> {
// 处理消息事件
println("收到消息: ${event.data}")
}
is SSEClient.SSEEvent.Error -> {
// 处理错误事件
println("发生错误: ${event.throwable.message}")
}
SSEClient.SSEEvent.Closed -> {
// 处理关闭事件
println("连接已关闭")
}
}
}
.launchIn(lifecycleScope) // 启动 Flow
onEach 的执行流程
// 1. callbackFlow 创建 Channel
fun connect(): Flow<SSEEvent> = callbackFlow {
// 发送数据到 Channel
trySend(SSEEvent.Message("event1", "data1"))
trySend(SSEEvent.Message("event2", "data2"))
trySend(SSEEvent.Closed)
}
// 2. onEach 对每个数据进行处理
.onEach { event ->
// 每当 Channel 中有新数据时,这里就会被调用
when (event) {
is SSEClient.SSEEvent.Message -> {
// 处理消息
}
}
}
// 3. launchIn 启动 Flow
.launchIn(lifecycleScope)
5. 完整的数据流
数据流向图
HTTP 响应 → 解析 SSE 数据 → trySend → Channel → Flow → onEach → UI 更新
详细步骤
// 步骤 1: HTTP 响应数据
data: {"message": "Hello"}
// 步骤 2: 解析 SSE 数据
line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim())
// 步骤 3: 发送到 Channel
trySend(SSEEvent.Message("message", "{\"message\": \"Hello\"}"))
// 步骤 4: Channel 中的数据流向 Flow
callbackFlow { ... } // 返回 Flow
// 步骤 5: Flow 的 onEach 处理
.onEach { event ->
when (event) {
is SSEClient.SSEEvent.Message -> {
// 处理消息
updateUI(event.data)
}
}
}
// 步骤 6: 启动 Flow
.launchIn(lifecycleScope)
6. 实际运行示例
服务器发送的数据
data: {"message": "Hello"}
data: {"message": "World"}
data: {"message": "Test"}
代码执行流程
// 1. 服务器发送第一行数据
data: {"message": "Hello"}
// 2. 代码解析
line.startsWith("data:") -> dataBuffer.append("{\"message\": \"Hello\"}")
// 3. 遇到空行,发送数据
line.isEmpty() -> {
val data = dataBuffer.toString().trim()
trySend(SSEEvent.Message("message", data)) // 发送到 Channel
}
// 4. onEach 接收到数据
.onEach { event ->
when (event) {
is SSEClient.SSEEvent.Message -> {
// event.data = "{\"message\": \"Hello\"}"
updateUI(event.data)
}
}
}
// 5. UI 更新
updateUI("{\"message\": \"Hello\"}")
7. 关键概念总结
callbackFlow
- 创建一个 Channel
- 返回一个 Flow
- Flow 从 Channel 接收数据
trySend
- 向 Channel 发送数据
- 非阻塞操作
- 返回发送是否成功
onEach
- Flow 的中间操作符
- 对每个数据进行处理
- 不会改变数据流
launchIn
- 启动 Flow
- 在指定的协程作用域中运行
- 自动管理生命周期
8. 为什么这样设计?
优势
- 异步处理:HTTP 响应在后台处理,UI 不会阻塞
- 结构化数据:使用 sealed class 定义事件类型
- 生命周期管理:自动在 Activity 销毁时取消
- 错误处理:统一的错误处理机制
- 可扩展:易于添加新的事件类型
数据流的好处
// 清晰的数据流向
HTTP 数据 → SSE 解析 → 结构化事件 → UI 更新
// 而不是传统的回调方式
HTTP 数据 → 回调函数 → UI 更新
这样设计使得代码更加清晰、可维护,并且充分利用了 Kotlin 协程和 Flow 的优势!