Kotlin中的协程经过几个版本的升级已经非常成熟了,但是协程的概念目前没有一个明确且被普遍接受的定义。究其根源无论我们怎么去理解协程的概念,它最核心的点就是函数或者一段程序能够被挂起,稍后再挂起的位置恢复。所以在任何场景下探讨协程都能够落脚到挂起和恢复。本文通过源码对协程创建->挂起->恢复流程进行分析解读。希望能够帮助大家对Kotlin协程的理解起到帮助。
Kotlin中协程是复合协程,是为了方便开发者使用而进一步封装的API,当我们在分析的时候无从下手就是因为经过封装的协程在经过编译后才能看到它的庐山真面目。下面我们通过构造一个简单的协程并反编译成java代码查看。
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
startCoroutine()
}
private fun startCoroutine() {
val coroutine: suspend CoroutineScope.() -> Unit = {
cumpute1()
cumpute2()
}
GlobalScope.launch(block = coroutine)
}
suspend fun cumpute1() {
print("cupmpute1")
}
suspend fun cumpute2() {
print("cupmpute1")
}
}
coroutine
属性是 suspend CoroutineScope.() -> Unit
函数类型对象,反编译成java代码
final class MainActivity$startCoroutine$coroutine$1 extends SuspendLambda implements Function2 {
//状态机初始值0
int label;
public final Object invokeSuspend(Object $result) {
Object coroutine_suspend = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
...
//将label置为1 开始调用挂起函数cumpute1
this.label = 1;
if (MainActivity.this.cumpute1(this) == coroutine_suspend) {
//如果函数被挂起返回挂起标识
return coroutine_suspend;
}
break;
...
}
//将label置为2 开始调用挂起函数cumpute2
this.label = 2;
if (MainActivity.this.cumpute2(this) == coroutine_suspend) {
return coroutine_suspend;
} else {
return Unit.INSTANCE;
}
}
public final Continuation create(Object value,Continuation completion) {
...
//创建并返回一个Continuation对象
MainActivity$startCoroutine$coroutine$1 coroutine = new
MainActivity$startCoroutine$coroutine$1(completion);
return coroutine;
}
public final Object invoke(Object var1, Object var2) {
return ((MainActivity$startCoroutine$coroutine$1)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}
通过反编译代码可以看到声明的 coroutine
转换成了继承 SuspendLambda
的类,可以称之为协程体类。内部实现了两个方法
invokeSuspend()
内部是通过label状态来控制调用流程.create()
方法接收一个 Continuation
对象,然后创建并返回协程体类对象。
public final Object cumpute1(Continuation completion) {
...
return Unit.INSTANCE;
}
suspend
修饰的函数经过反编译后额外接收了一个 Continuation
类型参数,这也就是为什么普通函数内不能调用 suspend
修饰的函数的原因。附上 SuspendLambda
的类图
从类图上可以看出该类的承链 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
Continuation
是一个接口定义了 resumeWith(result:Result)
方法 和 CoroutineContext
上下文属性BaseContinuationImpl
是一个抽象类实现了 resumeWith( result : Result )
方法 ,并声明抽象方法 invokeSuspend()
和 create()
方法。ContinuationImpl
继承 BaseContinuationImpl
构造方法接收 Continuation
类型参数,内部实现 intercepted()
方法将原始协程体类对象拦截包装,添加调度器实现并返回新的协程体类对象 DispatchedContinuation
。接上面创建的协程代码,以 launch()
函数作为入口开始分析协程是如何挂起的。launch()
函数是 CoroutineScope
的一个扩展实现
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//合并CoroutineContext上下文
val newContext = newCoroutineContext(context)
//根据isLazy生成不同的Continuation对象
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//开始执行协程体
coroutine.start(start, coroutine, block)
return coroutine
}
launch() 函数接收三个参数
1.CoroutineContext
是协程上下文,内部通过链表实现存储,可以保存协程执行过程中的调度器,异常处理器等信息,可以理解为通过 key-value 的形式存储值的Map。
2.CoroutineStart
是一个枚举类有四种类型分别是:
3.block 协程体类对象
newCoroutineContext(context)
内部做了上下文合并操作,如果上下文中没有设置线程调度器则会设置一个默认的线程调度器,isLazy 默认情况下是 false,执行创建 StandalonCoroutine
对象,StandalonCoroutine
继承 AbstractCoroutine
类,AbstractCoroutine
类主要负责协程的恢复和结果的返回,相关类图:
通过类图可以看到该类也是实现了 Continuation
接口,resumeWith()
方法内提供了协程恢复的功能(稍后分析),继续看下 start()
方法,源码如下:
//coroutine.start(start, coroutine, block)
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
//触发CotoutineStart invoke()方法
start(block, receiver, this)
}
CoroutineStart
类 invoke()
方法
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
when (this) {
//默认情况下走DEFAULT
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
CoroutineStart.ATOMIC -> block.startCoroutine(completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
CoroutineStart.LAZY -> Unit
}
这里会根据 launch()
方法中设置的枚举类型来进行分类调用,默认是 CoroutineStart.DEFAULT
。startCoroutineCancellable( completion )
接收一个 Continuation
对象,这个 Continuation
对象就是 AbstractCoroutine
派生类的实例。该方法属于链式调用,可以一步一步来分析,首先是 createCoroutineUnintercepted( completion )
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
...
return if (this is BaseContinuationImpl)
//协程体类 create() 方法构造实例
create(receiver, probeCompletion)
else
...
}
这一步主要是创建一个协程体类的实例,也就是我们协程体代码编译后生成的继承 SuspendLambda
类的实例。内部通过判断是否是BaseContinuationImpl
类型,因为 SuspendLambda
类是继承 BaseContinuationImpl
的,所以条件成立,直接调用 create()
进行实例创建,这个地方就拿到了协程体类的实例。接下来看链式调用的下一步 intercepted()
方法 。
intercepted()
//调用了ContinuationImpl的intercepted方法
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
//如果不是ContinuationImpl类别 返回自身
(this as? ContinuationImpl)?.intercepted() ?: this
public fun intercepted(): Continuation<Any?> =
//如果未经过拦截器调用则使用上下文中设置的拦截器进行拦截
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
intercepted
是拦截之 后的协程体类对象,如果为空就通过上下文中指定的拦截器对原协程体类进行拦截并返回包装好的协程体类对象,上下文中存储的拦截器是在launch()
调用的时候就设置好的,如果我们不进行指定的话,内部会设置一个默认的拦截器 Disptchers.Default
。具体实现在 launch()
调用中的第一步 newCoroutineContext(context)
方法中可以看到:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//CoroutineContext内部重载了操作符 +
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
//拦截器为空的时候 将Dispatchers.Default加入到CoroutineContext中
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
接下来将会通过 Dispathchers.Default
来探究拦截器的内部实现。我们先从 Default
的定义入手:
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
通过定义可以看出 Default
是 CoroutineDispatcher
类别,实例是通过 createDefaultDispatcher()
方法进行创建,看下 createDefaultDispatcher()
内部如何创建的?
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
这里出现了一个 userCoroutinesScheduler
的标志位,它属于系统变量,默认是开启状态,也就是说 createDefaultDispatcher()
将会返回 DefaultScheduler
。这里为了后续方便理解列一下 DefaultScheduler
的继承链 :DefaultScheduler -> ExperimentalCoroutineDispatcher -> ExecutorCoroutineDispatcher -> CoroutineDispatcher -> ContinuationInterceptor
拿到拦截器DefaultScheduler
后会继续调用 interceptContinuation()
它具体实现在CoroutineDispatcher
类中:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
流程到这里我们可以确定协程体对象在经过拦截器拦截后返回了 DispatchedContinuation
对象。DispatchedContinuation
对象构造的时候接收了两个参数 第一个是 调度器 DefaultScheduler
第二个是原始协程体类对象,现在到了链式调用的最后一步resumeCancellableWith()
是 Continuation
的一个扩展方法:
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
//判断是否是DispatchedContinuation类别
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
通过 this
判断类别来调用不同的处理方法,如果 this
不是拦截包装后的协程体类对象,则会调用 resumeWith( result )
,如果 this 是经过拦截包装后的DispatchedContinuation
类别对象,那么这里就会调用 resumeCancellableWith( result ,onCancellation )
方法,重点关注下这个方法内部做了什么?
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//判断是否需要调度器分发处理
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//分发处理
dispatcher.dispatch(context, this)
} else {
...
}
}
resumeCancellableWith()
方法中通过 dispatcher.isDispatchNeeded( context )
判断内容是否需要分发处理,这个方法在大多数情况下返回 true,如果返回 false 会立即在当前线程中恢复,可能会形成一个事件循环,造成堆栈溢出。还有就是如果当需要减少不必要的分发提高性能的时候可以通过重写该方法例如:MainCoroutineDispatcher 。
dispatch()
方法在 DefaultScheduler
的父类 ExperimentalCoroutineDispatcher
中实现,内部是直接交由 CoroutineScheduler
去执行,CoroutineScheduler
实现 Executor
接口,内部封装线程池。为了方便理解看一下相关类图:
我们先看下 DispatchedContinuation
这个类,它实现了Runnable()
接口,可以直接通过线程池 execute()
方法进行执行。调度器的详细实现原理后续单独进行分析,本篇还是以分析挂起和恢复流程为主。既然是实现了Runnable()
接口,那我们就需要关注在 run()
方法里面做了什么?run()
方法的实现在 DispatchedContinuation
父类 DispatchedTask
中,源码如下
public final override fun run()
//检查状态
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
//取出原始协程体类
val continuation = delegate.continuation
//上下文
val context = continuation.context
//获取状态
val state = takeState()
withCoroutineContext(context, delegate.countOrElement) {
...
//协程体代码执行
continuation.resume(getSuccessfulResult(state))
}
}
}
}
continuation.resume()
内部调用了 resumeWith()
方法 , 具体的实现是在 BaseContinuationImpl
类中,看下实现源码
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
//循环
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
//调用invokeSuspend()方法去执行协程体内代码
val outcome = invokeSuspend(param)
//如果返回挂起标识COROUTINE_SUSPENDED就直接return
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
//递归调用执行
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
内部有一个无限循环,假设我们协程体内没有挂起函数,那么将会循环执行 invokeSuspend()
方法直到结束,方法内部通过状态机依次执行。那么当遇到挂起函数的时候,也就是方法返回 COROUTINE_SUSPENDED
挂起标识,将直接 return 退出循环,同时协程体代码也会退出,因为退出的是协程体,并不会造成线程阻塞。那后面未执行的代码怎么办呢?因为之前状态机在方法执行前将 label 置为某一个状态,当挂起函数恢复执行的时候,会继续向下执行剩余代码。
接下来分析一下协程是如何恢复的。
协程的恢复分析我们以 withContext
操作为例,将例子中的 cumpute1
方法内部调用 withContext
,我们先看下 withContext
方法。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
//新的context和旧的context合并
val oldContext = uCont.context
val newContext = oldContext + context
//检测是否已经完成
newContext.checkCompletion()
...
//当两者指定的调度器不一致时
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
不难看出 block
是 withContext
需要挂起的协程体,我们再看 suspendCoroutineUninterceptedOrReturn
这个方法接收一个 lambda
表达式参数是 Continuation
对象,这个 uCont
就是用来恢复调用的,但是它从哪里来的呢?withConntext
只接收两个参数,我们从反编译后的代码来一探究竟。
public final Object cumpute1(Continuation $completion) {
Object var1 = BuildersKt.withContext(
(CoroutineContext)EmptyCoroutineContext.INSTANCE,
(Function2)(new Function2((Continuation)null) { ... }),
$completion);
return var1 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var1 : Unit.INSTANCE;
}
我们发现 withContext
方法反编译后接收了三个参数,最后一个参数 completion
就是传入的原协程体对象,suspendCoroutineUninterceptedOrReturn
看不到具体的实现,如果我们想恢复原协程体对象后续操作的话,推测这里 uCont
就是传入的原协程体类对象。
继续分析 withContext
,第一步显示新的 context 和旧的 context 合并,当两者使用的调度器不一致时,DispatchedCoroutine
是 AbstractCoroutine
的子类,当协程体内操作执行完成后会调用 AbstractCoroutine
的 resumeWith()
方法,内部调用afterResume()
方法具体实现交由 afterCompletion()
抽象方法进行处理,DispatchedCoroutine
内部是实现了 afterCompletion()
抽象方法,我们来看下内部源码是如何处理的
override fun afterCompletion(state: Any?) {
afterResume(state)
}
override fun afterResume(state: Any?) {
if (tryResume()) return
//外部协程的恢复调用
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
可以很清楚的看到这里是将原协程体继续执行调用。这里需要在注意一个点,之前我们分析协程体类内部执行的时候
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
当调用的函数返回 COROUTINE_SUSPENDED
的时候直接挂起,在 withContext
中 coroutine.getResult()
会调用 trySuspend()
方法利用 CAS 进行状态变更,如果变更成功就返回true,然后 getResult
函数返回 COROUTINE_SUSPENDED
,协程体就会挂起。
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
以上就是对协程的启动流程以及挂起和恢复过程的源码分析。
经过上面的流程分析,最后对Kotlin中协程的挂起和恢复的流程做个总结:
SuspendLambda
的类,具体继承链 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuationlanuch()
方法内部生成了 AbstractCoroutine
子类对象,用来处理结果和恢复协程startCoroutineCancellable(completion: Continuation<T>)
被调用,这个 completion
就是处理结果和恢复协程的AbstractCoroutine
子类对象createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
createCoroutineUnintercepted(completion)
负责生成协程体类对象intercepted()
将原协程体进行包装增加调度器实现返回 DispatchedContinuation
对象resumeCancellableWith()
任务执行 ,协程体类的 resumeWith()
被调用,循环体中 invokeSuspend()
连续被调用,遇到返回 COROUTINE_SUSPENDED
标识的函数就退出协程体执行循环,尚未执行完成的协程体会传入挂起函数中,等待挂起函数执行完毕后,通过原协程体类对象继续执行。1.《深入理解Kotlin协程》- 霍丙乾
2.https://www.kotlincn.net/docs/reference/coroutines-overview.html
3.https://blog.csdn.net/zou8944/article/details/106447727
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/IXJJvJZPXH6rzU22-tGm4Q
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。