概述
文章内容如有错误欢迎探讨指正!
在之前从源码角度解析协程挂起和恢复的基础上,这篇文章通过协程的三层包装以及续体 Continuation 概念,我们换个角度再来看一遍协程。
续体Continuation
我们看看维基百科上对 续体 的解释(以下摘抄自维基百科原文):
在计算机科学中,续体(英语:Continuation,也译作计算续体、续延、延续性),是对计算机程序的 控制状态 的一种抽象表示。续体实化了程序控制状态,可以理解为,续体是一种数据结构,它表示在进程执行中给定点上的计算过程,所创建的数据结构可以被编程语言访问,而不是被运行时环境所隐藏掉。这对实现编程语言的某些控制机制,比如例外处理、协程、生成器非常有用。
当前续体 从运行代码的角度看来,是可以从程序执行的当前点导出的续体。续体还被用来提及“头等续体”,它是一种构造,赋予编程语言保存在任何点上的执行状态的能力,并在程序中后来的点上可能多次返回到这一点。
头等续体 对一门语言而言是能完全控制指令执行次序的能力。它们可以用来跳转到产生对当前函数调用的那个函数,或者跳转到此前已经退出了的函数。可以认为头等续体保存了程序执行状态,注意到真正的头等续体不同于进程映像是很重要的,它不保存程序数据,只保存执行上下文。
这经常采用“续体三明治”譬喻来说明: 假想你正站在厨房冰箱之前,想要一个三明治。你就在这里将一个续体放入口袋里。接着在从冰箱里拿出火鸡肉和面包自己做了一个三明治,然后坐到桌案前。你启用了口袋里的续体,这时发现自己再次站到了冰箱之前,想要一个三明治。幸运的是,在桌案上就有一个三明治,而用于做三明治的材料都没有了。你可以吃三明治了。
在这个譬喻中,三明治是一部分程序数据,比如在分配堆上一个对象,并非去调用“制做三明治”例程并等它返回,这里调用“以当前续体制做三明治”例程,它创建一个三明治并在已脱离执行的续体所保存的地方继续执行。
这个三明治的譬喻,简直太形象了,茅厕顿开!
我们来看看 Kotlin 中的续体结构:
Kotlin 代码解读复制代码public interface Continuation<in T> {
public abstract val context: CoroutineContext
public abstract fun resumeWith(result: Result<T>): Unit
}
这个定义也符合维基百科上对续体的描述:不保存程序数据,只保存执行上下文。通过 resumeWith 可以 "返回到冰箱前"。
续体传递风格(CPS)
Continuation Passing Style(续体传递风格): 约定一种编程规范,函数不直接返回结果值,而是在函数最后一个参数位置传入一个 callback 函数参数,并在函数执行完成时通过 callback 来处理结果。回调函数 callback 被称为续体(Continuation),它决定了程序接下来的行为,整个程序的逻辑通过一个个 Continuation 拼接在一起。
维基百科上提到:以续体传递风格(CPS)书写的函数接受一个额外的实际参数:显式的续体,它是有一个实际参数的函数。当CPS函数已经计算出来其结果值的时候,它通过以这个值作为实际参数调用续体函数来“返回”它。
Kotlin 协程本质就是利用 CPS 来实现对过程的控制,并解决了 CPS 会产生的问题(如回调地狱,栈空间占用),它是无栈协程。
- Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换,在 suspend 函数的最后增加一个 Continuation 参数,等到 suspend 函数执行完,就通过这个续体参数来恢复执行,这就是咱们常说的用看起来同步的方式写出异步的代码。
- 另外为了避免栈空间过大的问题, Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型。每两个挂起点之间可以看为一个状态,每次进入状态机时都有一个当前的状态,然后执行该状态对应的代码;如果程序执行完毕则返回结果值,否则返回一个特殊值,表示从这个状态退出并等待下次进入。相当于创建了一个可复用的回调,每次都使用这同一个回调,根据不同状态来执行不同的代码。
对于 suspend 关键词修饰的挂起函数,编译器会为其增加一个 Continuation 续体类型的参数(相当于 CPS 中的回调),可以通过这个 Continuation 续体对象的 resume 方法返回结果值来恢复协程的执行。
Kotlin 代码解读复制代码private suspend fun test(i: Int): Int {}
// 编译后
private final Object test(int i, Continuation var2) {}
Function
Function 是 Kotlin 对函数类型的封装,对于函数类型,它会被编译成 FunctionX 系列的类:
Kotlin 代码解读复制代码// 0 个参数
public interface Function0<out R> : Function<R> {
public operator fun invoke(): R
}
// 1 个参数
public interface Function1<in P1, out R> : Function<R> {
public operator fun invoke(p1: P1): R
}
// X 个参数
Kotlin 提供了从 Function0 到 Function22 之间的接口,这意味着我们的 lambda 函数最多可以支持 22 个参数,另外 Function 接口有一个 invoke 操作符重载,因此我们可以直接通过 () 调用 lambda 函数,举个栗子:
Kotlin 代码解读复制代码val sum = { a: Int, b: Int ->
a + b
}
sum(10, 12)
// 等同于
sum.invoke(10, 12)
编译成 Java 代码后:
Kotlin 代码解读复制代码Function2 sum = (Function2)null.INSTANCE;
sum.invoke(10, 12);
sum.invoke(10, 12);
// lambda 编译后的类
final class KotlinTest$main$sum$1 extends Lambda implements Function2<Integer, Integer, Integer> {
public static final KotlinTest$main$sum$1 INSTANCE = new KotlinTest$main$sum$1();
KotlinTest$main$sum$1() {
super(2);
}
@Override // kotlin.jvm.functions.Function2
public /* bridge */ /* synthetic */ Integer invoke(Integer num, Integer num2) {
return invoke(num.intValue(), num2.intValue());
}
public final Integer invoke(int a, int b) {
return Integer.valueOf(a + b);
}
}
可以看到对于 lambda 函数,在编译后会生成一个实现 Function 接口的类,并在使用 lambda 函数时创建一个单例对象来调用,创建对象的过程是编译器自动生成的代码。
而对于协程里的 lambda 代码块,也会为其创建一个对象,它实现 FunctionX 接口,并继承 SuspendLambda 类,不一样的地方在于它会自动增加一个 Continuation 类型的参数。
协程三层封装里的CPS
上篇文章提到协程有三层封装:
以 scope.launch() 为例
,看一下这个过程中三层封装的 CPS 行为。
第一层封装的创建
创建出 StandaloneCoroutine 实例:
Kotlin 代码解读复制代码// launch
val coroutine = StandaloneCoroutine(newContext, active = true)
第二层封装的创建
this 表示协程体,它继承自 SuspendLambda 类,这里会通过 create 方法再创建出 SuspendLambda 实例,其中 completion 就是第一层封装 StandaloneCoroutine 对象:
Kotlin 代码解读复制代码public actual fun (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation
): Continuation<Unit> {
return if (this is BaseContinuationImpl)
create(completion)
else
// ...
}
所以在协程启动过程中针对一个协程体会创建两个 SuspendLambda 的子类对象:
- 调用 launch() 时创建第一个,传入 null 作为参数,作为一个普通的 Function 对象使用
- 调用 create() 时创建第二个,传入 completion 续体作为参数
第三层封装的创建
创建出 DispatchedContinuation 实例,其中 continuation 就是第二层封装 SuspendLambda 对象:
Kotlin 代码解读复制代码public final override fun interceptContinuation(continuation: Continuation<T>): Continuation =
DispatchedContinuation(this, continuation)
继承关系
Kotlin代码解读复制代码Continuation: 续体,恢复协程的执行 - BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义了 invokeSuspend 抽象方法 - ContinuationImpl: 增加 intercepted 拦截器,通过它创建 DispatchedContinuation 实例 - SuspendLambda: 封装协程体代码块 - 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑 - DispatchedContinuation: 第三层封装,用来做线程调度 - AbstractCoroutine: 第一层封装的基类 - StandaloneCoroutine: 第一层封装
三层封装结束后恢复上层封装的调用
上面每创建下一层封装时,都会把上一层的引用 continuation 作为参数传入,这就是续体传递风格(CPS),当下一层结束其逻辑时,需要调用上层的 continuation 来回到中断的地方,即 "返回到冰箱前去拿三明治"。
第三层封装结束时 resume 第二层封装:
Kotlin 代码解读复制代码// DispatchedTask
public final override fun run() {
// ...
// continuation 是 SuspendLambda 实例
continuation.resume(getSuccessfulResult(state))
}
第二层封装结束时 resume 第一层封装:
Kotlin 代码解读复制代码// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// ...
val outcome: Result = try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
// ...
// completion 是 StandaloneCoroutine 实例
completion.resumeWith(outcome)
return
}
挂起函数里的CPS
接下来看看协程体执行过程中,遇到挂起函数时的 CPS。这点在之前的文章其实就提到过了,suspend 方法在编译期会被加入一个 Continuation 参数,在协程调用它时,会把 this 传给这个参数,即续体传递:
Kotlin 代码解读复制代码public final Object invokeSuspend(@NotNull Object $result) {
// ...
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
idTmp = "id";
this.L$0 = idTmp;
this.label = 1;
if (DelayKt.delay(200L, this) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
// ...
}
// ...
}
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit.INSTANCE;
} else {
// ...
return var10000;
}
}
可以看到 delay 方法编译后自动添加了 Continuation 参数。
这里就不得不提到 suspendCoroutineUninterceptedOrReturn
这个方法了,它用来获取当前调用处协程的 Continuation 续体对象。像 delay(), withContext(), suspendCoroutine(), suspendCancellableCoroutine() 等,其内部都是通过 suspendCoroutineUninterceptedOrReturn() 来获取到当前的续体对象,以便在挂起函数体执行完毕后,能通过这个续体对象恢复协程执行。Kotlin 编译器针对这个方法,在调用方法处新增了一个 Continuation 参数,并把调用处的 Continuation 续体对象传入,比如上面调用 delay 方法时传入的 this —— DelayKt.delay(200L, this)
。
至于 suspend 方法执行完后,是怎么回到中断处的,自然而然又是 Continuation 的 resumeWith 方法。比如说 delay 方法执行完后,调用了 continuation.resumeUndispatched()
方法来恢复:
Kotlin 代码解读复制代码// HandlerContext
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
// 执行结束后 resume
with(continuation) { resumeUndispatched(Unit) }
}
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
cancelOnRejection(continuation.context, block)
}
}
协程CPS案例模拟
我们使用一个简单的案例,来模拟协程的 CPS。以一个简单逻辑为例:
-
定义一个请求网络的 suspend request() 方法,返回 response 结果;
-
使用结果。
Kotlin代码解读复制代码suspend fun request() = suspendCoroutine
{ cont -> runCatching { doRequest() }.onSuccess { cont.resume(it) }.onFailure { cont.resumeWithException(it) } } fun test() = runBlocking { val response = runCatching { request() }.getOrElse { println(it) } println(response) }
用下面的伪代码模拟 Continuation 的工作:
-
定义一个包含 resume 方法的 Continuation 续体接口;
-
给 request() 方法增加 Continuation 续体入参,请求完毕后通过 continuation 回调结果;
-
首先通过调用 Monitor 的 resumeWith 方法开启状态机流程,内部调用 request 方法,request 执行完后,Monitor 会收到回调,继续执行状态机逻辑。
Kotlin代码解读复制代码// 定义 Continuation 接口 interface Continuation<T> { fun resumeWith(t: Result<T>) } // 使用 Continuation 模拟 CPS fun request(continuation: Continuation<Any>) { val response = runCatching { doRequest() } // 恢复续体执行 continuation.resumeWith(response) } // 简易状态机 class Monitor : Continuation<Any> { private var label = 0 override fun resumeWith(t: Result<Any>) { when (label) { 0 -> { // 转换状态,调用 request 方法,传入当前协程续体 label = 1 request(this) return } 1 -> { t.exceptionOrNull()?.let { println(it) } println(t.getOrNull()) } } } } fun test() { Monitor().resumeWith(Result.success(Unit)) }
小结
结论:
- 无论是协程三层封装的调用和恢复,还是 suspend 函数的挂起和恢复,其本质都是利用 CPS 来实现对过程的控制,并解决了 CPS 会产生的问题(如回调地狱,栈空间占用)。
- Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换。
- 另外 Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型,消除 callback hell, 解决栈空间占用问题。
协程的启动,挂起和恢复有两个关键方法: invokeSuspend()
和 resumeWith(Result)
:
- invokeSuspend() 方法是对协程代码块的封装,内部加入状态机机制将整个逻辑分为多块,分隔点就是每个挂起点。协程启动时会先调用一次 invokeSuspend() 函数触发协程体的开始执行,后面每当调用到一个挂起函数时,挂起函数会返回 COROUTINE_SUSPENDED 标识,从而 return 停掉 invokeSuspend() 函数的执行,即非阻塞挂起。
- 编译器会为挂起函数自动添加一个 continuation 续体对象参数,表示调用它的那个协程代码块,在该挂起函数执行完成后,就会调用到续体 continuation.resumeWith() 方法来返回结果(或异常),而在 resumeWith() 中又继续调用了 invokeSuspend() 方法,其内根据状态机的状态来恢复协程的执行。这就是整个协程的挂起和恢复过程。
到这里,协程的挂起和恢复流程就讲解完了,下篇文章开始解析协程线程调度相关的原理,在此之前,提几个问题:
问题一:1 和 3 处的代码,一定会跑在同一个线程吗?
Kotlin 代码解读复制代码scope.launch(Dispatchers.Default) {
// 1
withContext(Dispatchers.IO) {
// 2
}
// 3
}
问题二:下面 1, 2, 3, 4 可能跑在同一个线程吗?1 和 4, 2 和 3 会一定跑在同一个线程吗?
Kotlin 代码解读复制代码scope.launch(Dispatchers.IO) {
// 1
withContext(Dispatchers.Default) {
// 2
delay(1000)
// 3
}
// 4
}
问题三:下面 1, 2, 3, 4 有哪些一定跑在同一个线程吗?
Kotlin 代码解读复制代码scope.launch(Dispatchers.IO) {
// 1
withContext(Dispatchers.IO) {
// 2
delay(1000)
// 3
}
// 4
}
问题四:下面代码的输出顺序是什么?
Kotlin 代码解读复制代码CoroutineScope(Dispatchers.Main).launch {
println(1)
CoroutineScope(Dispatchers.Main.immediate).launch {
println(2)
delay(500)
println(3)
}
println(4)
}
问题五:下面代码的输出顺序是什么?
Kotlin 代码解读复制代码CoroutineScope(Dispatchers.Main).launch {
println(1)
CoroutineScope(Dispatchers.Unconfined).launch {
println(2)
CoroutineScope(Dispatchers.Main.immediate).launch {
println(3)
delay(500)
println(4)
}
println(5)
}
println(6)
}
带着这些问题,下篇文章开始解析线程调度!
之前在掘金上对Kotlin协程的解析比较零散,小小地推荐一下《深入理解Kotlin协程》,从源码和实例出发,结合图解,系统地分析 Kotlin 协程启动,挂起,恢复,异常处理,线程切换,并发等流程,只用一顿饭钱!感兴趣的朋友可以了解下,互相交流,不喜勿喷。
评论记录:
回复评论: