初識協程
什么是協程
Kotlin 1.3 添加了協程 Coroutine 的概念,檔案中介紹協程是一種并發設計模式,可以在 Android 平臺上使用它來簡化異步執行的代碼,
協程具有如下特點:
-
異步代碼同步化:使用撰寫同步代碼的方式撰寫異步代碼,
-
輕量:您可以在單個執行緒上運行多個協程,因為協程支持掛起,不會使正在運行協程的執行緒阻塞,掛起比阻塞節省記憶體,且支持多個并行操作,
-
記憶體泄漏更少:使用結構化并發機制在一個作用域內執行多項操作,
-
內置取消支持:取消操作會自動在運行中的整個協程層次結構內傳播,
-
Jetpack集成:許多 Jetpack 庫都包含提供全面協程支持的擴展,某些庫還提供自己的協程作用域,可供您用于結構化并發,
協程的掛起和恢復
Kotlin 協程的掛起和恢復本質上是掛起函式的掛起和恢復,
suspend fun suspendFun() {}
掛起函式:suspend 關鍵字修飾的普通函式,如果在協程體內呼叫了掛起函式,那么呼叫處就被稱為 掛起點,掛起點如果出現 異步呼叫,那么當前協程就會被掛起,直到對應的 Continuation.resume() 函式被呼叫才會恢復執行,
掛起函式和普通函式的區別在于:
-
掛起函式只能在協程體內或其他掛起函式內呼叫;
-
掛起函式可以呼叫任何函式,普通函式只能呼叫普通函式,
suspend 除用于修飾函式外還可用于修飾 lambda 運算式,在原始碼分析的章節會詳細分析它們的區別,
基本用法
Gradle 引入
dependencies {
// Kotlin Coroutines
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
// 使用 `Dispatchers.Main` 需要添加如下依賴
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2'
}
啟動協程
kotlin 協程框架為我們提供了兩種便捷的方式啟動協程:
-
GlobalScop.launch -
GlobalScope.async
分別來使用兩種方式輸出 Hello World!:
fun main() {
GlobalScope.launch { // 使用 GlobalScope.launch 啟動協程
delay(1000L) // 非阻塞的等待 1 秒鐘(默認時間單位是毫秒)
println("World!") // 在延遲后列印輸出
}
print("Hello ") // 協程已在等待時主執行緒還在繼續
Thread.sleep(2000L) // 阻塞主執行緒 2 秒鐘來保證 JVM 存活
}
fun main() {
GlobalScope.async { // 使用 GlobalScope.async 啟動協程
delay(1000L)
println("World!")
}
print("Hello ")
Thread.sleep(2000L)
}
從上面的例子里看這兩種方式好像并沒有什么區別,其實區別在他們的回傳值上
-
GlobalScop.launch:回傳值Job -
GlobalScope.async:回傳值Deferred<T>
Deferred<T> 是 Job 的子類,并且可以通過呼叫 await 函式獲取協程的回傳值,上面 GlobalScope.async 的例子改造一下:
GlobalScope.launch {
val result = GlobalScope.async { // 使用 GlobalScope.async 啟動協程
delay(1000L)
"World!"
}
println("Hello ${result.await()}")
}
Thread.sleep(2000L)
//輸出:Hello World!
上面的示例把 async 嵌套在了 launch 函式體內部,這是因為 await 是一個掛起函式,而掛起函式不同于普通函式的就是它必須在協程體或其他掛起函式內部呼叫,
在協程體內 ({} 內) 可以隱藏 GlobalScope 直接使用 async、launch 啟動協程,所以上面的示例可以修改如下:
GlobalScope.launch {
val result = async { // 使用 GlobalScope.async 啟動協程
...
}
...
// launch {}
}
...
協程操作
通過了解協程的兩種啟動方式,我們知道 GlobalScop.launch、GlobalScop.async 的回傳值都是 Job 物件或其子類物件,那 Job 是什么呢? 又有哪些功能,
Job 是一個可取消的后臺任務,用于操作協程的執行并記錄執行程序中協程的狀態,所以一般來說 Job 實體也代表了協程,
Job 具有如下幾種狀態:
| State | [isActive] | [isCompleted] | [isCancelled] |
|---|---|---|---|
| New (可選初始狀態) | false | false | false |
| Active (默認初始狀態) | true | false | false |
| Completing (瞬態) | true | false | false |
| Cancelling (瞬態) | false | false | true |
| Cancelled (最終狀態) | false | true | true |
| Completed (最終狀態) | false | true | false |
通常情況下,創建 Job 時會自動啟動,狀態默認為 _Active_,但是如果創建時添加引數 CoroutineStart.Lazy 則狀態為 _NEW_,可以通過 start() 或 join() 等函式激活,
Job 狀態流程圖:
wait children
+-----+ start +--------+ complete +-------------+ finish +-----------+
| New | -----> | Active | ---------> | Completing | -------> | Completed |
+-----+ +--------+ +-------------+ +-----------+
| cancel / fail |
| +----------------+
| |
V V
+------------+ finish +-----------+
| Cancelling | --------------------------------> | Cancelled |
+------------+ +-----------+
Job 的可用方法:
-
cancel(CancellationException):取消Job對應的協程并發送協程取消錯誤 (CancellationException), -
invokeOnCompletion():注冊當此Job狀態更新為Completed時同步呼叫的處理程式, -
join():掛起Job對應的協程,當協程完成時,外層協程恢復, -
start():如果創建Job物件時使用的啟動模式為CoroutineStart.Lazy,通過它可以啟動協程, -
cancelAndJoin():取消Job并掛起當前協程,直到Job被取消,
當要取消正在運行的協程:
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延遲一段時間
println("main: I'm tired of waiting!")
job.cancel() // 取消該作業
job.join() // 等待作業執行結束
println("main: Now I can quit.")
// 輸出
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
上面示例中可以使用 cancelAndJoin 函式它合并了對 cancel 以及 join 函式的呼叫,
注意:如果在協程執行程序中沒有掛起點,那么協程是不可被取消的,
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // 一個執行計算的回圈,只是為了占用 CPU
// 每秒列印訊息兩次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段時間,并保證協程開始執行
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消一個作業并且等待它結束
println("main: Now I can quit.")
// 輸出
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.
簡單來說,如果協程體內沒有掛起點的話,已開始執行的協程是無法取消的,
下面來介紹,協程啟動時傳參的含義及作用:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
協程的啟動模式
CoroutineStart:協程啟動模式,協程內提供了四種啟動模式:
-
DEFAULT:協程創建后,立即開始調度,在調度前如果協程被取消,其將直接進入取消相應的狀態, -
ATOMIC:協程創建后,立即開始調度,協程執行到第一個掛起點之前不回應取消, -
LAZY:只有協程被需要時,包括主動呼叫協程的start()、join()、await()等函式時才會開始調度,如果調度前就被取消,那么該協程將直接進入例外結束狀態, -
UNDISPATCHED:協程創建后立即執行,直到遇到第一個真正掛起的點,
立即調度和立即執行的區別:立即調度表示協程的調度器會立即接收到調度指令,但具體執行的時機以及在那個執行緒上執行,還需要根據調度器的具體情況而定,也就是說立即調度到立即執行之間通常會有一段時間,因此,我們得出以下結論:
-
DEFAULT雖然是立即調度,但也有可能在執行前被取消, -
UNDISPATCHED是立即執行,因此協程一定會執行, -
ATOMIC雖然是立即調度,但其將調度和執行兩個步驟合二為一了,就像它的名字一樣,其保證調度和執行是原子操作,因此協程也一定會執行, -
UNDISPATCHED和ATOMIC雖然都會保證協程一定執行,但在第一個掛起點之前,前者運行在協程創建時所在的執行緒,后者則會調度到指定的調度器所在的執行緒上執行,
協程背景關系和調度器
CoroutineContext:協程背景關系,用于控制協程的行為,上文提到的 Job 和準備介紹的調度器都屬于 CoroutineContext,
協程默認提供了四種調度器:
-
Dispatchers.Default:默認調度器,如果沒有指定協程調度器和其他任何攔截器,那默認都使用它來構建協程,適合處理后臺計算,其是一個CPU密集型任務調度器, -
Dispatchers.IO:IO調度器,適合執行IO相關操作,其是一個IO密集型任務調度器, -
Dispatchers.Main:UI調度器,會將協程調度到主執行緒中執行, -
Dispatchers.Unconfined:非受限制調度器,不要求協程執行在特定執行緒上,協程的調度器如果是Unconfined,那么它在掛起點恢復執行時會在恢復所在的執行緒上直接執行,當然,如果嵌套創建以它為調度器的協程,那么這些協程會在啟動時被調度到協程框架內部的時間回圈上,以避免出現StackOverflow, -
Dispatchers.Unconfined:非受限調度器,會在呼叫它的執行緒啟動協程,但它僅僅只是運行到第一個掛起點,掛起后,它恢復執行緒中的協程,而這完全由被呼叫的掛起函式來決定,
runBlocking {
launch { // 運行在父協程的背景關系中,即 runBlocking 主協程
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限的——將作業在主執行緒中
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 將會獲取默認調度器
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
}
//輸出結果
Unconfined : I'm working in thread main @coroutine#3
Default : I'm working in thread DefaultDispatcher-worker-1 @coroutine#4
main runBlocking : I'm working in thread main @coroutine#2
withContext
除了可以在 GlobalScope.launch {}、GlobalScope.async {} 創建協程時設定協程調度器,
與
async {...}.await()相比withContext的記憶體開銷更低,因此對于使用async之后立即呼叫await的情況,應當優先使用withContext,
withTimeout
Kotlin 協程提供了 withTimeout 函式設定超時取消,如果運行超時,取消后會拋出 TimeoutCancellationException 例外,拋出例外的情況下回影響到其他協程,這時候可以使用 withTimeoutOrNull 函式,它會在超時的情況下回傳 null 而不拋出例外,
runBlocking {
val result = withContext(coroutineContext) {
withTimeoutOrNull(500) {
delay(1000)
"hello"
}
}
println(result)
}
// 輸出結果
hello
yield
如果想要解決上面示例中的問題可以使用 yield 函式,它的作用在于檢查所在協程的狀態,如果已經取消,則拋出取消例外予以回應,此外它還會嘗試出讓執行緒的執行權,給其他協程提供執行機會,
在上面示例中添加 yield 函式:
if (System.currentTimeMillis() >= nextPrintTime) {
yield()
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
// 輸出結果
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
協程的作用域
協程作用域:協程作用域主要用于明確協程之間的父子關系,以及對于取消或者例外處理等方面的傳播行為,
協程作用域包括以下三種:
-
頂級作用域:沒有父協程的協程所在的作用域為頂級作用域, -
協同作用域:協程中啟動新的協程,新協程為所在協程的子協程,這種情況下子協程所在的作用域默認為協同作用域,此時子協程拋出的未捕獲例外將傳遞給父協程處理,父協程同時也會被取消, -
主從作用域:與協程作用域在協程的父子關系上一致,區別在于處于該作用域下的協程出現未捕獲的例外時不會將例外向上傳遞給父協程,
父子協程間的關系:
-
父協程被取消,則所有子協程均被取消,
-
父協程需要等待子協程執行完畢之后才會最終進入完成狀態,不管父協程自身的協程體是否已經執行完畢,
-
子協程會繼承父協程的協程背景關系元素,如果自身有相同
key的成員,則覆寫對應的key,覆寫的效果僅限自身范圍內有效,
宣告頂級作用域:GlobalScope.launch {}、runBlocking {}
宣告協同作用域:coroutineScope {}
宣告主從作用域:supervisorScope {}
coroutineScope {} 和 supervisorScope {} 是掛起函式所以它們只能在協程作用域中或掛起函式中呼叫,
coroutineScope {} 和 supervisorScope {} 的區別在于 SupervisorCoroutine 重寫了 childCancelled() 函式使例外不會向父協程傳遞,
協程并發
通過上文的介紹可以了解到協程其實就是執行在執行緒上的代碼片段,所以執行緒的并發處理都可以用在協程上,比如 synchorinzed、CAS 等,而協程本身也提供了兩種方式處理并發:
-
Mutex:互斥鎖; -
Semaphore:信號量,
Mutex
Mutex 類似于 synchorinzed,協程競爭時將協程包裝為 LockWaiter 使用雙向鏈表存盤,Mutex 還提供了 withLock 擴展函式,以簡化使用:
runBlocking<Unit> {
val mutex = Mutex()
var counter = 0
repeat(10000) {
GlobalScope.launch {
mutex.withLock {
counter ++
}
}
}
Thread.sleep(500) //暫停一會兒等待所有協程執行結束
println("The final count is $counter")
}
Semaphore
Semaphore 用以限制訪問特定資源的協程數量,
runBlocking<Unit> {
val semaphore = Semaphore(1)
var counter = 0
repeat(10000) {
GlobalScope.launch {
semaphore.withPermit {
counter ++
}
}
}
Thread.sleep(500) //暫停一會兒等待所有協程執行結束
println("The final count is $counter")
}
注意:只有在
permits = 1時才和Mutex功能相同,
原始碼分析
suspend
我們來看 suspend 修飾函式和修飾 lambda 的區別,
掛起函式:
suspend fun suspendFun() {
}
編譯成 java 代碼如下:
@Nullable
public final Object suspendFun(@NotNull Continuation $completion) {
return Unit.INSTANCE;
}
可以看到掛起函式其實隱藏著一個 Continuation 協程實體引數,而這個引數其實就來源于協程體或者其他掛起函式,因此掛起函式只能在協程體內或其他函式內呼叫了,
suspend 修飾 lambda 運算式:
suspend {}
// 反編譯結果如下
Function1 var2 = (Function1)(new Function1((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch(this.label) {
case 0:
return Unit.INSTANCE;
default:
}
}
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
public final Object invoke(Object var1) {
return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
}
});
suspend lambda 實際會被編譯成 SuspendLambda 的子類,suspendLambda 的繼承關系如下圖:

通過反編譯的代碼可以發現我們在協程體內撰寫的代碼最終是在 invokeSuspend 函式內執行的,而在 BaseContinuationImpl 內實作了 Continuation 協程介面的 resumeWidth 函式,并在其內呼叫了 invokeSuspend 函式,
suspend 關鍵字的介紹先到這里,接下來我們看協程是如何創建并運行的,
協程是如何被創建的
檔案地址
kotlin.coroutines.Continuation.kt,
Continuation.kt 檔案基本屬于協程的基礎核心了,搞懂了它也就相當于搞懂了協程的基礎原理,
-
協程介面的定義;
-
喚醒或啟動協程的函式;
-
四種創建協程的函式;
-
幫助獲取協程內的協程實體物件的函式,
首先是協程的介面宣告,非常簡單:
/**
* 協程介面,T 表示在最后一個掛起點恢復時的回傳值型別
*/
public interface Continuation<in T> {
/**
* 協程背景關系
*/
public val context: CoroutineContext
/**
* 這個函式的功能有很多,它可以啟動協程,也可以恢復掛點,還可以作為最后一次掛起點恢復時輸出協程的結果
*/
public fun resumeWith(result: Result<T>)
}
協程介面宣告之后 Continuation.kt 檔案提供了兩個呼叫 resumeWith 函式的函式:
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
這兩個函式除了傳參一成功一失敗,它們的功能是一模一樣的,都是直接呼叫了 resumeWith 函式,相當于是 resumeWith 函式的封裝,
再然后就是四種創建協程的方式了:
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
這四種方式可以說是相似度超高,createCoroutine 和 startCoroutine 最大的區別在于,通過 createCoroutine 創建的協程需要掉用 resume 函式啟動,而 startCoroutine 函式內部已經默認呼叫了 resume 函式,那我們先用第一種方式創建一個協程:
// 創建協程
val continuation = suspend {
println("In Coroutine")
}.createCoroutine(object : Continuation<Unit> {
override fun resumeWith(result: Result<Unit>) {
println(result)
}
override val context = EmptyCoroutineContext
})
// 啟動協程
continuation.resume(Unit)
呼叫 createCoroutine 函式創建協程時傳入了 Continuation 協程的匿名類物件,誒?好像有點不對,為什么創建協程的時候要傳一個協程實體進去,直接用不就成了,想知道為什么的話,那就需要看看 createCoroutine 到底做了什么操作了,
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
首先呼叫的是 createCoroutineUnintercepted 函式,它的原始碼可以在 kotlin.coroutines.intrinsics.IntrinsicsJvm.kt 內找到:
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
probeCoroutineCreated函式內直接將引數回傳了,并且通過斷點的方式,它的回傳值和completion傳參是一樣的,所以這里先忽略它,
通過斷點會發現 (this is BaseContinuationImpl) 判斷的回傳值是 true 這也就間接證明了上文中 suspend lambda 和 BaseContinuationImpl 的繼承關系,最后回傳的是 create(Continuation) 函式的回傳值,這里可以發現作為引數傳入的 Continuation 變數被 suspend lambda 包裹了一層,然后回傳,相當于 suspend lambda 成為了 Continuation 的代理,
到這里 createCoroutineUnintercepted(completion) 的含義就搞明白了:
將
object : Continuation<Unit> {}創建的協程實體傳入suspend lambda,由其代理協程執行操作,
緊接著又呼叫了 intercepted 函式,intercepted 函式宣告也在 IntrinsicsJvm.kt 檔案內:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
接著看 ContinuationImpl 的 intercepted 函式:
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
其中 context[ContinuationInterceptor]?.interceptContinuation(this) 這句代碼涉及到協程攔截器的概念,下文會詳細分析,這里可以先簡單介紹一下,協程攔截器和協程其實也是代理的關系,所以 intercepted() 可以理解為如果協程背景關系中添加了協程攔截器,那么就回傳協程攔截器,不然就回傳 suspend lambda 實體本身,而它們都實作了 Continuation 介面,
先做一個小結,通過上文的介紹基本就清楚了,createCoroutine、startCoroutine 函式其實不是用來創建協程的,協程實體就是它們的傳參,它們是為協程添加代理的,
createCoroutineUnintercepted(completion).intercepted()
通過上面的代碼,為協程添加了代理,分別是 suspend lambda 和協程攔截器,這時候通過協程實體呼叫 resumeWith 函式時會先執行兩層代理內實作的 resumeWith 函式邏輯,最終才會執行到協程的 resumeWith 函式輸出最終結果,
在 createCoroutine 函式內,在添加兩層代理之后又添加了一層代理,SafeContinuation,SafeContinuation 內部使用協程的三種狀態,并配合 CAS 操作,保證當前回傳的 SafeContinuation 實體物件僅能呼叫一次 resumeWith 函式,多次呼叫會報錯,
-
UNDECIDED:初始狀態 -
COROUTINE_SUSPENDED:掛起狀態 -
RESUMED:恢復狀態
協程是如何被掛起又是如何被恢復的
那為什么協程要這么做,很麻煩不是?要弄清楚這個問題先來看 BaseContinuationImpl 的 resumeWith 函式實作吧,
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
當呼叫 resume(Unit) 啟動協程時,由于代理的存在會呼叫到 BaseContinuationImpl 的 resumeWith() 函式,函式內會執行 invokeSuspend() 函式,也就說我們所說的協程體,
查看如下代碼的 invokeSuspend 函式:
suspend {5}
// 反編譯后的 invokeSuspend 函式
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
return Boxing.boxInt(5);
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
可以看到這里直接回傳了最終的結果 5,接著在 ContinuationImpl.resumeWith 函式內最終呼叫
completion.resumeWith(outcome)
輸出協程的最終結果,
這是協程執行同步代碼的程序,可以看到在整個程序中,ContinuationImpl 好像并沒有起到什么作用,那接著來看在協程體內執行異步代碼:
suspend {
suspendFunc()
}
suspend fun suspendFunc() = suspendCoroutine<Int> { continuation ->
thread {
Thread.sleep(1000)
continuation.resume(5)
}
}
// 反編譯后
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
var10000 = DeepKotlin3Kt.suspendFunc(this);
if (var10000 == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return var10000;
}
public static final Object suspendFunc(@NotNull Continuation $completion) {
boolean var1 = false;
boolean var2 = false;
boolean var3 = false;
SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
Continuation continuation = (Continuation)var4;
int var6 = false;
ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new DeepKotlin3Kt$suspendFunc02$2$1(continuation)), 31, (Object)null);
Object var10000 = var4.getOrThrow();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
return var10000;
}
resume 函式啟動協程,invokeSuspend 函式第一次執行時 this.label == 0 執行 case 0 代碼,this.label 變數賦值為 1, 然后判斷如果 if (var10000 == var2) 為 true 那么 invokeSuspend 函式回傳 var2,也就是 COROUTINE_SUSPENDED 標識,在 resumeWith 函式內,判斷如果 invokeSuspend 函式的回傳值為 COROUTINE_SUSPENDED 則 reture,這也就是協程的掛起程序,
當執行緒執行結束,呼叫 resume 函式恢復協程時再次執行到 invokeSuspend 函式,這時 this.label == 1,執行 case 1 代碼,直接回傳結果 5,那在 resumeWith 函式內,這時就不會執行 return 了,最侄訓呼叫協程的 resumeWith 函式輸出最終的結果,這也就是協程的恢復程序,
通過了解協程運行流程可以發現 ContinuationImpl 其實是協程掛起和恢復邏輯的真正執行者,也正是因為協程掛起和恢復邏輯的存在,所以我們可以像撰寫同步代碼一樣呼叫異步代碼:
suspend {
println("Coroutine start")
println("Coroutine: ${System.currentTimeMillis()}")
val resultFun = suspendThreadFun()
println("Coroutine: suspendThreadFun-$resultFun-${System.currentTimeMillis()}")
val result = suspendNoThreadFun()
println("Coroutine: suspendNoThreadFun-$result-${System.currentTimeMillis()}")
}.startCoroutine(object : Continuation<Unit> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
println("Coroutine End: $result")
}
})
suspend fun suspendThreadFun() = suspendCoroutine<Int> { continuation ->
thread {
Thread.sleep(1000)
continuation.resumeWith(Result.success(5))
}
}
suspend fun suspendNoThreadFun() = suspendCoroutine<Int> { continuation ->
continuation.resume(5)
}
//輸出:
Coroutine start
Coroutine: 1627014868152
Coroutine: suspendThreadFun-5-1627014869182
Coroutine: suspendNoThreadFun-5-1627014869186
Coroutine End: Success(kotlin.Unit)
創建協程作用域
在通過 createCoroutine 創建協程時,你會發現還可為它傳遞 receiver 引數,這個引數的作用是用于擴展協程體,一般稱其為 協程作用域,
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
可以看到 suspend lambda 運算式也出現了變化,我們知道 () -> T 是 Function0 的 lambda 運算式,R.() -> T 相當于 R 類的 () -> T 擴展,如果了解擴展函式的話就知道擴展函式會將所擴展的類作為其引數,那么 R.() -> T 也就是 Function1 的 lambda 運算式了,
當然由于
suspend關鍵字的作用,又增加了Continuation引數,所以最終看到的就是Function1和Function2,
因為擴展函式的作用,所以可以在協程體內通過 this (可隱藏)呼叫 receiver 的函式或者屬性,示例如下:
launchCoroutine(ProducerScope<Int>()) {
produce(1000)
}
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<T>) {
println("Coroutine End: $result")
}
})
}
class ProducerScope<T> {
fun produce(value: T) {
println(value)
}
}
GlobalScope.launch 原始碼分析
了解上文創建協程的邏輯之后再來分析 GlobalScope.launch 就非常簡單了,GlobalScope.launch 最侄訓執行到 CoroutineStart.invoke 函式:
AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
代碼基本跟上文分析的一致,
自定義協程背景關系
協程背景關系在協程中的作用非常大,有它在相當于協程有了裝備卡槽一樣,你可以將你想添加的背景關系物件合并到 CoroutineContext 引數上,然后在其他地方使用,
CoroutineContext 的資料結構有如下特點:
-
可以通過
[]以類似List的方式訪問任何一個協程背景關系物件,[]內是目標協程背景關系, -
協程背景關系可以通過
+的方式依次累加,當然+=也是可用的,
我們來自定義一個協程背景關系給協程添加一個名字:
public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {
public companion object Key : CoroutineContext.Key<CoroutineName>
override fun toString(): String = "CoroutineName($name)"
}
應用到示例中:
var coroutineContext: CoroutineContext = EmptyCoroutineContext
coroutineContext += CoroutineName("c0-01")
suspend {
println("Run Coroutine")
}.startCoroutine(object : Continuation<Unit> {
override fun resumeWith(result: Result<Unit>) {
println("${context[CoroutineName]?.name}")
}
override val context = coroutineContext
})
//輸出:
Run Coroutine
c0-01
其實協程已經為我們提供了
CoroutineName實作,
自定義協程攔截器
通過實作攔截器介面 ContinuationInterceptor 來定義攔截器,因為攔截器也是協程背景關系的一類實作,所以使用攔截器時將其添加到對應的協程背景關系中即可,
宣告一個日志攔截器:
class LogInterceptor : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = LogContinuation(continuation)
}
class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("before resumeWith: $result")
continuation.resumeWith(result)
println("after resumeWith")
}
}
攔截器的關鍵攔截函式是 interceptContinuation,可以根據需要回傳一個新的 Continuation 實體,
在協程生命周期內每次恢復呼叫都會觸發攔截器,恢復呼叫有如下兩種情況:
-
協程啟動時呼叫一次,通過恢復呼叫來開始執行協程體從開始到下一次掛起之間的邏輯,
-
掛起點處如果異步掛起,則在恢復時再呼叫一次,
由此可知,恢復呼叫的次數為 n+1 次,其中 n 是協程體內真正掛起執行異步邏輯的掛起點的個數,
改寫上面的例子:
// 異步掛起函式
suspend fun suspendFunc02() = suspendCoroutine<Int> { continuation ->
thread {
continuation.resumeWith(Result.success(5))
}
}
// 開啟協程 - 未添加日志攔截器
suspend {
suspendFunc02()
suspendFunc02()
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
...
result.onSuccess {
println("Coroutine End: ${context[CoroutineName]?.name}, $result")
}
}
})
// 輸出如下
Coroutine End: Success(5)
// 開啟協程 - 添加日志攔截器
suspend {
suspendFunc02()
suspendFunc02()
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = LogInterceptor()
override fun resumeWith(result: Result<Int>) {
...
result.onSuccess {
println("Coroutine End: ${context[CoroutineName]?.name}, $result")
}
}
})
// 輸出如下:
before resumeWith: Success(kotlin.Unit)
after resumeWith
before resumeWith: Success(5)
after resumeWith
before resumeWith: Success(5)
after resumeWith
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/292218.html
標籤:其他
