教程:协程与通道
在本教程中,将会学到如何使用协程来执行网络请求,而不会阻塞底层线程或回调。具体内容:
- 为什么以及如何使用挂起函数执行网络请求
- 如何使用协程并发地发送请求
- 如何在不同的协程之间使用通道共享信息
1.1 准备工作
克隆以下项目到 IDEA:
git clone https://github.com/kotlin-hands-on/intro-coroutines
生成 GitHub 开发者令牌
为了使用 GitHub API,需要在 GitHub 上生成一个新的 token,不用选择任何作用域,直接点击页面最下方的 Generate token 按钮,然后复制生成的 token 即可。
运行代码
克隆完成后进入项目,找到 src/contributors/main.kt
文件并运行 main(),会弹以下窗口:
在前两个字段中输入 GitHub 用户名和刚刚创建的令牌(或密码),Variant 在下拉菜单中选择 BLOCKING 选项,点击 Load contributors 按钮开始加载贡献者的数据并显示在 Login 和 Contributions 下,同时 Start new loading 的位置会显示加载状态,比如加载成功后显示加载时长。
如果运行时抛出如下异常:
Exception in thread "AWT-EventQueue-0" java.lang.UnsupportedClassVersionError: ch/qos/logback/classic/spi/LogbackServiceProvider has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
说明使用的 JDK 版本较老,应在 IDEA 的 File -> Project Structure -> Project -> SDK 中选择新一点的 JDK 或 JRE。
说回示例本身。其实有多种实现该逻辑的方法,比如阻塞请求或回调。在下面列举各种实现方式时,注意将它们与协程进行比较,并了解如何使用通道在不同的协程之间共享信息。
1.2 阻塞请求
使用 Retrofit 向 GitHub 发送 HTTP 请求,分别获取指定组织下的存储库列表和每个存储库的贡献者列表:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<User>>
}
在 loadContributorsBlocking() 内调用上面两个函数:
fun loadContributorsBlocking(service: GitHubService, req: RequestData): List<User> {
val repos = service
.getOrgReposCall(req.org) // #1
.execute() // #2
.also { logRepos(req, it) } // #3
.body() ?: emptyList() // #4
return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name) // #1
.execute() // #2
.also { logUsers(repo, it) } // #3
.bodyList() // #4
}.aggregate()
}
两个函数的使用方式类似,简单说下:
先调用 getOrgReposCall() 获取给定组织下的存储库列表并存入 repos,再对 repos 中的每个存储库请求其贡献者列表,并将结果列表合并为最终结果列表
在 #1 处,getOrgReposCall() 和 getRepoContributorsCall() 都会返回一个 Call 实例,不过此时请求还没有发送出去
在 #2 处,调用 Call.execute() 以执行请求,execute() 是阻塞底层线程的同步调用
在 #3 处,获得响应后,通过 logRepos() 或 logUsers() 来记录结果,如果 HTTP 响应包含错误,则将错误记录在此处
在 #4 处,获取到响应体,包含你所需的数据。在本教程中使用空列表代替 null,因此在 repos 的 #4 处使用了 ?: emptyList(),而在 return 的 #4 处,给结果类型
Response<List<T>>
定义了扩展函数,目的是为了避免重复调用 .body() ?: emptyList():fun <T> Response<List<T>>.bodyList(): List<T> { return body() ?: emptyList() }
再来看程序运行后,在 IDEA 内的输出:
1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos
2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors
2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors
...
每行输出的信息包括程序运行的毫秒数、加载请求由哪个线程调用、加载了多少存储库或贡献者。
从输出的结果能看出,所有结果都是从主线程记录的。当使用 BLOCKING 选项运行代码时,窗口会冻结,并且在加载完成之前不会对输入做出反应。这是因为所有请求都从与被调用者相同的线程,即主 UI 线程执行,由于主线程被阻塞,所以 UI 被冻结:
在 src/contributors/Contributors.kt
中找到负责选择加载贡献者方式的函数 loadContributors(),并查看 loadContributorsBlocking() 是如何被调用的:
// loadContributors():
when (getSelectedVariant()) {
BLOCKING -> { // Blocking UI thread
val users = loadContributorsBlocking(service, req)
updateResults(users, startTime)
}
}
updateResults() 更新 UI,因此它必须是在主线程中被调用的,所以 loadContributorsBlocking() 也是在主线程中被调用,因此 UI 线程被阻塞并且 UI 被冻结。
任务 1
第一个任务用于帮助你熟悉任务域(Task domain)。当前,每个贡献者的名字会被重复多次,即每参与一个项目就出现一次。通过 aggregate() 合并用户,使每个贡献者只出现一次,并且结果列表应按照 User.contributions 属性,即贡献总数,降序排列。
打开 src/tasks/Aggregation.kt
文件并实现 List<User>.aggregate()
,用户应该根据它们的贡献数排序。对应的测试文件 test/tasks/AggregationKtTest.kt
展示了预期结果。
在 IDEA 中可以通过 ctrl + shift + T 在源代码和测试类之间自动跳转
解决方案如下:
fun List<User>.aggregate(): List<User> =
groupBy { it.login }
.map { (login, group) -> User(login, group.sumOf { it.contributions }) }
.sortedByDescending { it.contributions }
解释如下:
- 要按登录名进行分组,就给 groupBy 传入登录名,得到
Map<String, List<User>>
,即登录名与 User 列表的映射 - map 对上一步的结果进行转换,输入的就是
Map<String, List<User>>
,将每一个键值对构造成 User 对象,名字就用 Map 的 key,而贡献数量则取List<User>
贡献数量之和 - 上一步的结果是
List<User>
,按照贡献数量降序排列,就给 sortedByDescending() 传入表示贡献数量的字段 User.contributions 即可
1.3 回调
阻塞请求虽然能完成需求,但是它会阻塞线程并冻结 UI,避免这种情况的传统方法是使用回调。你可以将它提取到一个单独的回调(通常是一个 lambda)中,然后将该回调传给调用者,以便稍后调用。
此外,要使 UI 响应,可以将整个计算移动到单独的线程或者将 Retrofit API 转换为回调而不是阻塞调用。
使用后台线程
打开 src/tasks/Request2Background.kt
查看其实现,原本阻塞式的计算全部被移到另一个线程:
fun loadContributorsBackground(service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit) {
thread {
loadContributorsBlocking(service, req)
}
}
这样所有的加载都被移入另一个单独的线程中,主线程空闲,可以被其他任务占用:
注意 loadContributorsBackground() 签名中的参数 updateResults 会在所有加载完成后作为回调被调用:
loadContributorsBackground(service, req) { users ->
SwingUtilities.invokeLater { // 确保括号内的操作在 UI 线程中执行
// 更新 UI 数据
updateResults(users, startTime)
}
}
但是,在 UI 上选择 BACKGROUND 选项加载贡献者,你会发现 log 是正常工作的,但 UI 没有任何显示,修复该问题是任务 2 要做的工作。
任务 2
在 IDEA 中看代码时其实就能发现,loadContributorsBackground() 的 updateResults 参数是灰色的,说明没被调用,也就值没执行回调,加上就好:
fun loadContributorsBackground(service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit) {
thread {
updateResults(loadContributorsBlocking(service, req))
}
}
使用 Retrofit 回调 API
上一种解决方案将整个加载逻辑都移到了后台线程,这显然不是最佳的资源利用方式。因为它的加载请求还是按照顺序执行的,线程在等待加载结果时会被阻塞,此时线程可能被其他任务占用。
处理每一个仓库数据应该被分成两部分:加载和处理结果响应,二次加工部分应该提取到回调中。
每一个仓库的加载都可以在上一个仓库的结果返回(以及相应的回调被调用)之前开始:
Retrofit 回调 API 可以帮助实现这一点。Call.enqueue() 会启动 HTTP 请求并接收一个回调作为参数,在回调中需要指明在请求完成后要执行的操作。
在 src/tasks/Request3Callbacks.kt
中查看 loadContributorsCallbacks():
fun loadContributorsCallbacks(
service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit
) {
service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = mutableListOf<User>()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers -> // #2
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
}
}
}
// TODO: Why doesn't this code work? How to fix that?
updateResults(allUsers.aggregate())
}
- 方便起见,此代码段使用在同一个文件中声明的扩展函数 onResponse(),它采用 lambda 作为参数而不是对象表达式
- 处理响应的逻辑被抽取到回调中:相应的 lambda 在 #1 和 #2 两行开始
代码段的 TODO 注释已经表明,UI 上并没有显示任何内容,但是,对应的测试可以通过。任务 3 就是解决这个问题。
任务 3
解决上述问题需要重写 src/tasks/Request3Callbacks.kt
文件中的代码。以下介绍三种尝试解决方案:
检查 repos 中元素的索引,如果是最后一个元素,说明所有的 repo 的响应都已经被处理(当然这个想法是有漏洞的,后续会说),这时再执行回调的 updateResults 方法或许可行:
val allUsers = mutableListOf<User>() for ((index, repo) in repos.withIndex()) { // #1 service.getRepoContributorsCall(req.org, repo.name) .onResponse { responseUsers -> logUsers(repo, responseUsers) val users = responseUsers.bodyList() allUsers += users if (index == repos.lastIndex) { // #2 updateResults(allUsers.aggregate()) } } }
运行程序你会发现修改并没有生效。这是因为请求是异步执行的,虽然通过 getRepoContributorsCall() 发送请求是按照 repos 的 index 顺序发送的,但是响应返回到客户端是可以按任何顺序出现的。也就是说,index 是最后一个的 repo,它的响应有可能不是最后一个到达客户端的,因此这种修改是无效的。
既然检查索引值行不通,那检查 repos 总数应该是可行的:
val allUsers = Collections.synchronizedList(mutableListOf<User>()) val numberOfProcessed = AtomicInteger() for (repo in repos) { service.getRepoContributorsCall(req.org, repo.name) .onResponse { responseUsers -> logUsers(repo, responseUsers) val users = responseUsers.bodyList() allUsers += users if (numberOfProcessed.incrementAndGet() == repos.size) { updateResults(allUsers.aggregate()) } } }
这里使用同步列表 allUsers 和原子整型 numberOfProcessed,因为回调有可能来自不同的线程。
一个更好的解决方案是使用 CountDownLatch:
val countDownLatch = CountDownLatch(repos.size) for (repo in repos) { service.getRepoContributorsCall(req.org, repo.name) .onResponse { responseUsers -> // processing repository countDownLatch.countDown() } } countDownLatch.await() // countDownLatch 减到 0 之前会在这里等待 updateResults(allUsers.aggregate())
这样可以从主线程更新结果,比将逻辑委托给子线程更直接。
回顾这三种解决方案后,你会发现使用回调编写正确的代码并非易事且容易出错,尤其是在多个底层线程和同步发生时。
1.4 挂起函数
可以使用挂起函数实现相同的逻辑而不返回 Call<List<Repo>>
:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): List<Repo>
}
有关 getOrgRepos():
- getOrgRepos() 被定义为挂起函数。当你使用挂起函数执行请求时,底层线程不会被阻塞
- getOrgRepos() 返回直接结果而不是 Call(个人感觉这是 Retrofit 框架对 Kotlin 挂起函数的特殊支持),如果获取结果失败会抛出异常
或者,Retrofit 允许返回被 Response 包装的结果。在这个例子中,已经提供了结果的响应体,并且可以手动检查错误。因此本教程会使用返回 Response 的版本。
在 src/contributors/GitHubService.kt
中,在 GitHubService 接口中添加以下声明:
interface GitHubService {
// getOrgReposCall & getRepoContributorsCall declarations
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): Response<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
suspend fun getRepoContributors(
@Path("owner") owner: String,
@Path("repo") repo: String
): Response<List<User>>
}
任务 4
使用上述新添加的两个挂起函数 getOrgRepos() 和 getRepoContributors(),去更新加载贡献者的函数 loadContributorsSuspend()。该函数被声明为挂起函数以调用 getOrgRepos() 和 getRepoContributors()。
挂起函数只能在协程中使用,或者被另一个挂起函数调用。不能在其他位置使用。
任务步骤:
- 将
src/tasks/Request1Blocking.kt
内的 loadContributorsBlocking() 的实现拷贝到src/tasks/Request4Suspend.kt
的 loadContributorsSuspend() 内 - 修改代码使得返回 Call 的函数被挂起函数替代
- 运行程序选择 SUSPEND 并确保 UI 可以在执行 GitHub 请求时可以响应
解决方案如下:
// 因为要调用 getOrgRepos 和 getRepoContributors 两个挂起函数,因此本函数也必须是挂起函数
suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body() ?: emptyList()
return repos.flatMap { repo ->
service
.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
对于 Retrofit 而言,使用挂起函数会直接返回 Response,这样就无需再调用此前返回 Response 的 execute()。对于其他库而言,可能 API 不同,但是概念是一样的。
1.5 协程
挂起函数的代码看起来与阻塞版本相似。主要的不同是阻塞版本会阻塞线程,而协程是挂起的:
// 由阻塞到挂起,由线程到协程
block -> suspend
thread -> coroutine
协程通常被称为轻量级线程,因为你可以以在线程上运行代码相似的方式,在协程上运行代码。以前阻塞的操作(必须避免)先在可以通过挂起协程代替。
启动一个新的协程
去 src/contributors/Contributors.kt
中看 loadContributorsSuspend() 如何被使用,你会发现它是在 launch 内被调用的。launch 是一个以 lambda 作为参数的库函数:
launch {
val users = loadContributorsSuspend(req)
updateResults(users, startTime)
}
这里 launch 开始了一个新的计算,负责加载数据并显示结果。计算是可暂停的——当执行网络请求时,它被暂停并释放底层线程。当网络请求返回结果时,重新开始计算。
这种可暂停的(suspendable)计算称为协程
。协程在线程之上运行并且可以被挂起。当协程挂起时,相应的计算也会暂停,从线程中移除并存储在内存中。同时,该线程可以自由地被其他任务占用:
当计算准备好继续时,它返回到一个线程(不一定是同一个线程)。
在上面 loadContributorsSuspend() 的示例中,使用挂起机制等待结果。在发送请求后等待响应的期间,整个由 launch 启动的“加载贡献者”的协程会被暂停,只有在收到相应的响应后才会恢复:
等待接收响应时,线程可以自由地被其他任务占用。UI 会保持响应,尽管所有请求都发生在 UI 线程:
使用 SUSPEND 选项运行程序,日志确认所有请求都已发送到主 UI 线程:
2538 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 2729 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - ts2kt: loaded 11 contributors 3029 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-koans: loaded 45 contributors ... 11252 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-coroutines-workshop: loaded 1 contributors
日志可以告诉你相应的代码运行在哪个协程上,要启用该功能,在 IDEA 的 run -> Edit Configurations -> VM options 中添加 Dkotlinx.coroutines.debug。像上面的日志就是开启后的效果,所有请求都在 AWT-EventQueue-0 线程的 coroutine#1 协程上运行。
在线程等待期间不应该重用它来发送其他请求,因为代码是按顺序编写的。仅当收到先前的结果后才可发送新请求。
挂起函数公平对待线程,不会因为“等待”而阻塞它。但是,这还没有带来任何并发性。
1.6 并发
Kotlin 协程比线程占用的资源要少得多。每当开始一个新的异步计算时,都可以创建一个新的协程。
协程的主要构建器有:launch、async 和 runBlocking,不同的库也可以定义额外的协程构建器。
async 是初次见到,我们来简单说说。async 会启动一个新的协程并返回一个 Deferred 对象。Deferred 是一个代表延迟操作的概念,类似的还有 Future 或 Promise。它存储了一项计算任务,但推迟了获取最终结果的时刻,并承诺在将来的某个时刻提供结果。
async 与 launch 的主要区别在于,launch 被用于启动一个不需要确切返回结果的计算任务,它返回一个表示协程的 Job 对象。通过调用 Job.join() 方法,可以等待 launch 完成。
Deferred 是继承 Job 的泛型类型,调用 async 可以返回一个 Deferred<Int>
或 Deferred<CustomType>
,取决于 lambda 的返回(lambda 内最后一个表达式就是结果)。
为了获取协程的结果,可以在 Deferred 实例上调用 await()。在等待结果期间,调用 await() 的协程将会被挂起:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred: Deferred<Int> = async {
loadData()
}
println("waiting...")
println(deferred.await())
}
suspend fun loadData(): Int {
println("loading...")
delay(1000L)
println("loaded!")
return 42
}
runBlocking 是常规函数和挂起函数之间的桥梁,或者说是阻塞世界和非阻塞世界之间的桥梁。它被用作启动顶级主协程的适配器,主要用于 main() 和测试中。
如果有一个列表的延迟(deferred)对象,可以调用 awaitAll() 来等待它们的结果:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}
当每个“贡献者”请求在一个新协程中启动时,所有请求都是异步启动的。可以在收到前一个请求的结果之前发送一个新请求:
总加载时间与 CALLBACKS 版本大致相同,但不需要任何回调。更重要的是,async 在代码中明确强调哪些部分是并发运行的。
任务 5
在 Request5Concurrent.kt
文件中,使用之前的 loadContributorsSuspend() 实现 loadContributorsConcurrent()。
解决方案:
suspend fun loadContributorsConcurrent(service: GitHubService, req: RequestData): List<User> = coroutineScope {
// repos 的获取是不需要并发的,因为都是要等结果全回来再做下一步,所以不用 async
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body() ?: emptyList()
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}
deferreds.awaitAll().flatten().aggregate()
}
解释如下:
使用 map 请求对每个仓库以 async 并发的方式去获取每个仓库下的贡献者数据。尽管创建了与仓库数目相同的多个协程,但这并不是问题,因为创建新协程不会占用大量资源
map 的返回值类型,是其接收的 lambda 表达式的返回值类型的 List,async 返回的是
Deferred<List<User>>
,因此 map 的返回类型是List<Deferred<List<User>>>
,再继续,deferreds.awaitAll() 得到的就是List<List<User>>
,因此需要 flatten().aggregate() 获取最终结果尚未采用多线程,协程是在主 UI 线程上运行的,这里能看到并发运行协程的好处
为了让运行贡献者的协程在公共线程池的其他线程上运行,需为 async 函数指定 Dispatchers.Default 作为上下文参数:
async(Dispatchers.Default) { }
- CoroutineDispatcher 决定相应的协程应在那个或哪些协程上运行。如果你不在参数中指定,async 会使用外部作用域的 dispatcher
- Dispatchers.Default 表示 JVM 上的共享线程池,该线程池提供了一个并行执行的方法。它拥有与 CPU 可用核心数一样多的线程,但是如果只有一个核心,它仍会有两个线程
修改代码让 loadContributorsConcurrent() 在公共线程池的不同线程上启动新协程:
async(Dispatchers.Default) { log("starting loading for ${repo.name}") service.getRepoContributors(req.org, repo.name) .also { logUsers(repo, it) } .bodyList() }
再次运行程序,可以看到每个协程都可以在线程池中的一个线程上启动,并在另一个线程上恢复。例如 coroutine#4 在 worker-2 上启动,在 worker-1 上继续:
1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO Contributors - starting loading for dokka 1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO Contributors - starting loading for ts2kt ... 2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors 2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO Contributors - dokka: loaded 36 contributors 2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO Contributors - ts2kt: loaded 11 contributors
接下来,在 src/contributors/Contributors.kt
中检查 CONCURRENT 选项的实现:
如果要仅在 UI 线程上运行程序,需指定参数为 Dispatchers.Main:
launch(Dispatchers.Main) { updateResults() }
- 如果你在主线程上启动协程时,主线程正忙,该协程会挂起并计划在此线程上执行。协程只有在线程空闲时才会继续
- 使用外部作用域的 dispatcher,而不是在每个端点上显式指定它,被认为是一种良好的做法。如果在定义 loadContributorsConcurrent() 时不传递 Dispatchers.Default 作为参数,那么你可以在任意上下文中调用此函数:在默认的 dispatcher 中、在主 UI 线程中,或者在自定义 dispatcher 中。
- 当从测试中调用 loadContributorsConcurrent() 时,你可以在 TestDispatcher 的上下文中调用它,这简化了测试
要在调用端指定调度程序,让 loadContributorsConcurrent() 在继承的上下文中启动协程时,应在项目中执行如下更改:
launch(Dispatchers.Default) { val users = loadContributorsConcurrent(service, req) withContext(Dispatchers.Main) { updateResults(users, startTime) } }
- updateResults() 应该在主 UI 线程中调用,因此你使用 Dispatchers.Main 的上下文调用它
- withContext() 使用指定的协程上下文调用给定代码,会挂起直到代码执行完,并返回结果。另一个更灵活的表达方式是开启协程并显式地等待(通过挂起)直到它完成:
launch(context) { ... }.join()
运行代码,确保协程在线程池中的线程上执行
1.7 结构化并发
- 协程作用域负责不同协程之间的结构和父子关系。新的协程通常需要在作用域内启动
- 协程上下文存储用于运行给定协程的附加技术信息,如协程自定义名称,或指定应该调度协程的线程的 dispatcher。
使用 launch、async、runBlocking 创建新的协程时,它们自己会创建相应的作用域。所有这些函数都会接收一个带有接收者的 lambda 作为参数,隐式接收者的类型是 CoroutineScope:
launch { /* this: CoroutineScope */ }
- 新协程只能在作用域内启动
- launch 和 async 被声明为 CoroutineScope 的扩展,所以调用它们时,必须传一个显式或隐式的接收者
- 由 runBlocking 启动的协程是唯一的例外,因为它是被作为顶级函数定义的。但是因为它会阻塞当前线程,所以它主要作为桥函数用于 main() 和测试
launch、async、runBlocking 内的新协程会自动启动:
import kotlinx.coroutines.*
fun main() = runBlocking { /* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
在 runBlocking 内调用 launch 时,它作为 CoroutineScope 类型的隐式接收者的扩展被调用。或者也可以显式写为 this.launch。
嵌套的协程(本例中是 launch 启动的协程)会被认为是外部协程(由 runBlocking 启动的协程)的孩子。这种“父子关系”在整个作用域内生效,子协程是从相应的父协程的作用域启动的。
通过 coroutineScope 函数,可以不必创建一个新的协程也能创建一个新的协程作用域。为了在挂起函数内部不访问外部作用域,以结构化方式启动一个新的协程,你可以创建一个新的协程作用域,该作用域会自动成为这个挂起函数被调用的外部作用域的孩子。loadContributorsConcurrent() 就是一个很好的例子。
你也可以通过 GlobalScope.async 或 GlobalScope.launch 从全局作用域启动一个新协程,这会创建一个顶级“独立”的协程。
协程结构背后的机制被称为结构化并发
,它在全局作用域内提供了如下好处:
- 作用域一般负责子协程,子协程的生命周期依附于作用域的生命周期
- 如果出现问题或用户改变主意并决定撤销操作,作用域可以自动取消子协程
- 作用域自动等待所有子协程的完成。因此,如果作用域对应于协程,则在其作用域中启动的所有协程都完成后,父协程才会完成
当使用 GlobalScope.async 时,没有结构将几个协程绑定到较小的作用域。从全局作用域开始的协程都是独立的,它们的生命周期仅受整个应用程序的生命周期限制。可以存储对从全局作用域开始的协程的引用,并等待其完成或显式取消它,但这不会像结构化并发那样自动发生。
取消加载贡献者
考虑 loadContributorsConcurrent() 的两个版本:第一个版本使用 coroutineScope 来启动所有子协程,而第二个版本使用 GlobalScope。比较取消父协程时,两个版本的行为如何。
在第一个版本中,使用了coroutineScope来启动所有子协程,这意味着所有子协程都被绑定到该作用域中,当该作用域被取消时,所有子协程也会被取消。因此,当您取消父协程时,所有子协程也会被取消。
将
Request5Concurrent.kt
中的 loadContributorsConcurrent() 复制到Request5NotCancellable.kt
中的 loadContributorsNotCancellable(),并删除创建新的coroutineScope 的代码现在异步调用无法解析,因此请使用 GlobalScope.async 启动它们:
suspend fun loadContributorsNotCancellable( service: GitHubService, req: RequestData ): List<User> { // #1 // ... GlobalScope.async { // #2 log("starting loading for ${repo.name}") // load repo contributors } // ... return deferreds.awaitAll().flatten().aggregate() // #3 }
- 该函数现在直接返回结果,而不是作为 lambda 表达式中的最后一个表达式(第 1 行和第 3 行)
- 所有的“贡献者”协程都是在 GlobalScope 内部启动的,而不是作为协程作用域的子协程(第 2 行)
为所有发送请求的协程添加 3 秒延迟,以便在启动协程后但发送请求之前有足够的时间取消加载:
suspend fun loadContributorsConcurrent( service: GitHubService, req: RequestData ): List<User> = coroutineScope { // ... async { log("starting loading for ${repo.name}") delay(3000) // load repo contributors } // ... }
运行程序并选择 CONCURRENT 选项加载贡献者
等待所有“贡献者”协程启动后,然后单击取消。日志将不会显示任何新结果,这意味着所有请求确实已被取消:
2896 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 40 repos 2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans ... 2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO Contributors - starting loading for mpp-example /* click on 'cancel' */ /* no requests are sent */
重复第 5 步,但是这次选择
NOT_CANCELLABLE
选项:2570 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - starting loading for kotlin-koans ... 2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO Contributors - starting loading for mpp-example /* click on 'cancel' */ /* but all the requests are still sent: */ 6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors ... 9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO Contributors - mpp-example: loaded 8 contributors
这种情况中没有协程被取消,所有的请求都发送出去了。
检查在“贡献者”程序中如何触发取消。当单击取消按钮时,主“loading”协程被显式取消,子协程将自动取消:
interface Contributors { fun loadContributors() { // ... when (getSelectedVariant()) { CONCURRENT -> { launch { val users = loadContributorsConcurrent(service, req) updateResults(users, startTime) }.setUpCancellation() // #1 } } } private fun Job.setUpCancellation() { val loadingJob = this // #2 // cancel the loading job if the 'cancel' button was clicked: val listener = ActionListener { loadingJob.cancel() // #3 updateLoadingStatus(CANCELED) } // add a listener to the 'cancel' button: addCancelListener(listener) // update the status and remove the listener // after the loading job is completed } }
launch 函数返回一个 Job 实例。Job 存储了对“loading 协程”的引用,该协程加载所有数据并更新结果。你可以在它上面调用 setUpCancellation 扩展函数(第 1 行),将 Job 实例作为接收器传递。
另一种表达方式是显式编写以下代码:
val job = launch { } job.setUpCancellation()
- 为了提高可读性,你可以在函数内使用新的 loadingJob 变量来引用 setUpCancellation 函数的接收器(第 2 行)
- 然后,您可以为“取消”按钮添加一个监听器,以便在单击该按钮时取消 loadingJob(第 3 行)
在结构化并发中,你只需要取消父协程,这将自动传播到所有子协程。
使用外部作用域的上下文
当你在给定作用域内启动新的协程时,更容易确保它们都使用相同的上下文。如果需要,也更容易替换上下文。
下面学习如何使用外部作用域的调度器。由 coroutineScope 或协程构建器创建的新作用域始终继承外部作用域的上下文。在这种情况下,外部作用域是调用挂起函数 loadContributorsConcurrent 的作用域:
launch(Dispatchers.Default) { // outer scope
val users = loadContributorsConcurrent(service, req)
// ...
}
所有嵌套协程都会自动使用继承的上下文启动。调度器是该上下文的一部分。这就是为什么所有由 async 启动的协程都使用默认调度器的上下文启动的原因:
suspend fun loadContributorsConcurrent(
service: GitHubService, req: RequestData
): List<User> = coroutineScope {
// this scope inherits the context from the outer scope
// ...
async { // nested coroutine started with the inherited context
// ...
}
// ...
}
使用结构化并发,您可以在创建顶级协程时一次性指定主要的上下文元素(例如调度器)。然后,所有嵌套的协程都将继承该上下文,并仅在必要时修改它。
在编写用于 UI 应用程序的协程代码时,例如 Android 应用程序,通常的做法是默认情况下使用 CoroutineDispatchers.Main 作为顶级协程的调度器,然后在需要在不同线程上运行代码时显式地指定不同的调度器。
1.8 显示进度
尽管某些仓库的信息加载非常快,但用户只能在所有数据加载完毕后才能看到结果列表。在此期间,加载器图标显示进度,但没有关于当前状态或已加载的贡献者的信息。
您可以更早地显示中间结果,并在为每个存储库加载数据后显示所有贡献者:
要实现此功能,在 src/tasks/Request6Progress.kt
中,您需要将更新 UI 的逻辑作为回调传递,以便在每个中间状态下调用它:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
// loading the data
// calling `updateResults()` on intermediate states
}
在 Contributors.kt
中的调用站点上,回调被传递以从主线程更新 PROGRESS 选项的结果:
launch(Dispatchers.Default) {
loadContributorsProgress(service, req) { users, completed ->
withContext(Dispatchers.Main) {
updateResults(users, startTime, completed)
}
}
}
- 在 loadContributorsProgress() 中,updateResults() 参数被声明为挂起函数。因此,在对应的 lambda 参数内部,需要使用挂起函数 withContext 来调用它
- updateResults 回调函数使用额外的 Boolean 参数作为参数,指定加载是否已完成并且结果是否为最终结果
任务 6
在这个例子中,我们将 Boolean 参数添加到 updateResults() 中,以指示加载是否已完成并且结果是否为最终结果。在函数内部,我们使用 showContributors() 更新 UI,并根据 isFinal 参数隐藏加载器。
- 使用一个不涉及并发的简单版本;稍后在下一节中再添加并发
- 贡献者的中间列表应该以“聚合”的方式显示,而不仅仅是加载每个存储库的用户列表
- 每当加载每个新存储库的数据时,应该增加每个用户的贡献总数
解决方案:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
val repos = service.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body() ?: emptyList()
var allUsers = emptyList<User>()
for ((index, repo) in repos.withIndex()) {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
// 以“聚合”状态存储已加载贡献者的中间列表
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, index == repos.lastIndex)
}
}
连续与并发(Consecutive vs concurrent):
- consecutive 意味着任务是按照顺序一个接一个地执行的,即一个任务完成后,才会开始下一个任务的执行。在这种情况下,任务的执行顺序是非常明确的,但是可能会出现等待时间,导致整体执行时间变长
- concurrent 意味着多个任务可以同时进行执行,即使一个任务还没有完成,其他任务也可以开始执行。在这种情况下,任务的执行顺序是不确定的,但是可以通过并发执行来提高整体执行效率
updateResults() 在每个请求完成后都会调用回调:
这段代码不包括并发。它是顺序的,因此不需要同步。
最好的选择是并发发送请求,在每个存储库的响应返回后更新中间结果:
要添加并发,可以使用通道(channels)来实现。
1.9 通道
使用共享可变状态编写代码非常困难且容易出错(如使用回调的解决方案)。更简单的方法是通过通信共享信息,而不是使用共同的可变状态。协程可以通过通道相互通信。
通道是允许数据在协程之间传递的通信原语。一个协程可以将某些信息发送到通道中,而另一个协程可以从中接收该信息:
发送(生产)信息的协程通常称为生产者,而接收(消费)信息的协程称为消费者。一个或多个协程可以将信息发送到同一个通道中,而一个或多个协程可以从中接收数据:
当许多协程从同一个通道接收信息时,每个元素只被消费者中的一个处理一次。一旦元素被处理,它就会立即从通道中移除。
你可以将通道视为类似于元素集合或更精确地说是队列,其中元素从一端添加并从另一端接收。但是,有一个重要的区别:与集合(即使是它们的同步版本)不同,通道可以暂停 send() 和 receive() 操作。这种情况发生在通道为空或满时。如果通道大小有上限,则通道可以是满的。
通道由三个不同的接口表示:SendChannel、ReceiveChannel 和 Channel,后者扩展了前两个。通常你会创建一个通道并将其作为 SendChannel 实例提供给生产者,以便只有它们可以向通道发送信息。你会将通道作为 ReceiveChannel 实例提供给消费者,以便只有他们可以从中接收信息。发送和接收方法都声明为suspend:
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
生产者可以关闭通道以表示不再有更多元素到来。
库中定义了几种类型的通道。它们在内部存储多少元素以及 send() 调用是否可以挂起方面有所不同。对于所有通道类型,receive() 调用的行为类似:如果通道不为空,则它会接收一个元素;否则,它会被挂起。
以下介绍几种类型的通道:
无限通道是与队列最相似的模拟:生产者可以向该通道发送元素,并且它将无限增长。send() 调用永远不会被挂起。如果程序耗尽内存,你将收到OutOfMemoryException 异常。无限通道和队列之间的区别在于,当消费者尝试从空通道接收时,它会被挂起,直到发送一些新元素为止。
缓冲通道的大小受指定数字的限制。生产者可以向该通道发送元素,直到达到大小限制。所有的元素都会被内部存储。当通道已满时,下一个 send 调用会被挂起,直到有更多的可用空间。
“Rendezvous”通道(也被翻译为交汇通道)是一个没有缓冲区的通道,与大小为零的缓冲通道相同。其中一个函数(send() 或 receive())总是挂起,直到调用另一个函数。
如果调用了 send(),但没有挂起的 receive() 调用可以处理该元素,则 send() 将被挂起。同样,如果调用 receive() 并且通道为空,或者换句话说,没有挂起的 send() 调用可以发送该元素,则 receive() 调用将被挂起。
“rendezvous”名称(“在约定的时间和地点会面”)指的是 send() 和 receive() 应该“准时会面”的事实。
发送到合并通道的新元素将覆盖先前发送的元素,因此接收方始终只能获取最新的元素。send() 调用永远不会被挂起。
创建通道时,请指定其类型或缓冲区大小(如果需要缓冲区):
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
默认情况下会创建 Rendezvous 通道。
在以下任务中,你将创建一个“Rendezvous”通道、两个生产者协程和一个消费者协程:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
任务 7
在 src/tasks/Request7Channels.kt
中,实现函数 loadContributorsChannels(),该函数并发请求所有 GitHub 贡献者,并同时显示中间进度。
使用之前的函数,loadContributorsConcurrent() 来自 Request5Concurrent.kt 和 loadContributorsProgress() 来自 Request6Progress.kt。
【提示】不同的协程可以同时接收来自不同存储库的贡献者列表,并将所有接收到的结果发送到同一个通道中:
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = TODO()
// ...
channel.send(users)
}
}
然后可以逐个接收并处理来自该通道的元素:
repeat(repos.size) {
val users = channel.receive()
// ...
}
由于 receive() 调用是顺序的,因此不需要额外的同步。
【解决方案】与 loadContributorsProgress() 一样,可以创建一个 allUsers 变量来存储“所有贡献者”列表的中间状态。每个从通道接收的新列表都将添加到所有用户的列表中。使用 updateResults 回调聚合结果并更新状态:
suspend fun loadContributorsChannels(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val channel = Channel<List<User>>()
// 注意一下,launch 不要写在 for 的上一层,还是有区别的
for (repo in repos) {
launch {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
channel.send(users)
}
}
var allUsers = emptyList<User>()
repeat(repos.size) {
val users = channel.receive()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, it == repos.lastIndex)
}
}
- 不同存储库的结果一旦准备就会被添加到通道中。一开始,当所有请求都已发送但没有接收到数据时,receive() 调用会被挂起。在这种情况下,“加载贡献者”协程将被挂起。
- 然后,当用户列表被发送到通道时,“加载贡献者”协程将恢复,receive() 调用返回此列表,结果将立即更新。
你现在可以运行程序并选择 CHANNELS 选项以加载贡献者并查看结果。
虽然协程和通道都不能完全消除并发带来的复杂性,但在需要了解正在发生的事情时,它们会使生活变得更加轻松。
1.10 测试协程
现在来测试所有解决方案,以检查使用并发协程是否比使用挂起函数更快,并检查使用通道是否比简单的“进度”更快。
在下面的任务中,你将比较解决方案的总运行时间。模拟一个 GitHub 服务,并使该服务在给定的超时后返回结果:
repos request - returns an answer within 1000 ms delay
repo-1 - 1000 ms delay
repo-2 - 1200 ms delay
repo-3 - 800 ms delay
使用挂起函数的顺序解决方案应该需要约 4000 毫秒(4000 = 1000 +(1000 + 1200 + 800))。并发解决方案应该需要约 2200 毫秒(2200 = 1000 + max(1000,1200,800))。
对于显示进度的解决方案,还可以使用时间戳检查中间结果。
相应的测试数据定义在 test/contributors/testData.kt
中,文件 Request4SuspendKtTest、Request7ChannelsKtTest 等包含直接使用模拟服务调用的简单测试。
但是这里有两个问题:
- 这些测试运行时间太长了。每个测试需要约 2 到 4 秒的时间,并且每次都需要等待结果。这不是很有效率
- 你不能依赖解决方案的确切运行时间,因为准备和运行代码仍需要额外的时间。你可以添加一个常数,但是该时间将因机器而异。模拟服务的延迟应该比这个常数更高,这样才能看到差异。如果常数为 0.5 秒,则将延迟设置为 0.1 秒将不足以达到目的
更好的方法是使用特殊的框架来测试运行相同代码多次的时间(这会进一步增加总时间),但是这很复杂并且需要一些学习和设置。
为了解决这些问题并确保具有提供的测试延迟的解决方案按预期行事,其中一个比另一个更快,请使用带有特殊测试分派程序的虚拟时间。该分派程序跟踪从开始时经过的虚拟时间,并立即以实际时间运行所有内容。当你在该分派程序上运行协程时,延迟将立即返回并推进虚拟时间。
使用这种机制的测试运行速度很快,但是你仍然可以检查虚拟时间的不同时刻发生了什么。总运行时间大大缩短:
要使用虚拟时间,需将 runBlocking 调用替换为 runTest。runTest 接受一个扩展 lambda 作为 TestScope 的参数。在此特殊作用域内调用挂起函数中的 delay 时,delay 将增加虚拟时间而不是实时延迟:
@Test
fun testDelayInSuspend() = runTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
foo()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun foo() {
delay(1000) // auto-advances without delay
println("foo") // executes eagerly when foo() is called
}
你可以使用 TestScope 的 currentTime 属性检查当前虚拟时间。
在这个例子中,实际运行时间仅为几毫秒,而虚拟时间等于延迟参数,即 1000 毫秒。
为了获得子协程中“虚拟”延迟的完整效果,请使用 TestDispatcher 启动所有子协程。否则,它将无法正常工作。该分派程序会自动从其他 TestScope 继承,除非你提供不同的分派程序:
@Test
fun testDelayInLaunch() = runTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
bar()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun bar() = coroutineScope {
launch {
delay(1000) // auto-advances without delay
println("bar") // executes eagerly when bar() is called
}
}
如果在上面的示例中使用 Dispatchers.Default 的上下文调用 launch,测试将失败。你将收到一个异常,指示该作业尚未完成。
只有在 loadContributorsConcurrent() 函数使用继承的上下文启动子协程且没有使用 Dispatchers.Default 调度程序修改该上下文时,才可以以这种方式测试它。
你可以在调用函数时指定上下文元素,例如调度程序,而不是在定义函数时指定它,这样可以提供更大的灵活性和更容易的测试。
支持虚拟时间的测试 API 是实验性的,未来可能会发生变化。
默认情况下,如果使用实验性测试 API,编译器会显示警告。要取消这些警告,请使用 @OptIn(ExperimentalCoroutinesApi::class) 对测试函数或包含测试的整个类进行注释。添加编译器参数,指示编译器你正在使用实验性 API:
compileTestKotlin {
kotlinOptions {
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
}
}
在与本教程相对应的项目中,编译器参数已经添加到 Gradle 脚本中了。
任务 8
请将 tests/tasks/
目录下的以下测试重构为使用虚拟时间而不是真实时间:
- Request4SuspendKtTest.kt
- Request5ConcurrentKtTest.kt
- Request6ProgressKtTest.kt
- Request7ChannelsKtTest.kt
比较重构前后的总运行时间可以帮助我们评估使用虚拟时间的好处。
【提示】
用
runTest
替换runBlocking
调用,并用currentTime
替换System.currentTimeMillis()
:@Test fun test() = runTest { val startTime = currentTime // action val totalTime = currentTime - startTime // testing result }
取消注释检查确切虚拟时间的断言
不要忘记添加
@UseExperimental(ExperimentalCoroutinesApi::class)
【解决方案】
对于并发和通道的解决方案:
fun testConcurrent() = runTest {
val startTime = currentTime
val result = loadContributorsConcurrent(MockGithubService, testRequestData)
Assert.assertEquals("Wrong result for 'loadContributorsConcurrent'", expectedConcurrentResults.users, result)
val totalTime = currentTime - startTime
Assert.assertEquals(
"The calls run concurrently, so the total virtual time should be 2200 ms: " +
"1000 for repos request plus max(1000, 1200, 800) = 1200 for concurrent contributors requests)",
expectedConcurrentResults.timeFromStart, totalTime
)
}
首先检查结果是否恰好在预期的虚拟时间可用,然后再检查结果本身:
fun testChannels() = runTest {
val startTime = currentTime
var index = 0
loadContributorsChannels(MockGithubService, testRequestData) { users, _ ->
val expected = concurrentProgressResults[index++]
val time = currentTime - startTime
Assert.assertEquals(
"Expected intermediate results after ${expected.timeFromStart} ms:",
expected.timeFromStart, time
)
Assert.assertEquals("Wrong intermediate results after $time:", expected.users, users)
}
}
在使用通道的最后一个版本中,第一个中间结果比进度版本更早可用,在使用虚拟时间的测试中可以看到差异。
剩下的“挂起”和“进度”任务测试类似,可以在项目的 solutions 分支上找到它们。