vivo 互聯網客戶端團隊- Ruan Wen
本文是Kotlin協程決議系列文章的開篇,主要介紹Kotlin協程的創建、協程調度與協程掛起相關的內容

一、協程引入
Kotlin 中引入 Coroutine(協程) 的概念,可以幫助撰寫異步代碼,
在使用和分析協程前,首先要了解一下:
協程是什么?
為什么需要協程?
協程最為人稱道的就是可以用看起來同步的方式寫出異步的代碼,極大提高了代碼的可讀性,在實際開發中最常見的異步操作莫過于網路請求,通常我們需要通過各種回呼的方式去處理網路請求,很容易就陷入到地獄回呼中,
WalletHttp.target(VCoinTradeSubmitResult.class).setTag(tag)
.setFullUrl(Constants.VCOIN_TRADE_SUBMIT_URL).setParams(params)
.callback(new HttpCallback<VCoinTradeSubmitResult>() {
@Override
public void onSuccess(VCoinTradeSubmitResult vCoinTradeSubmitResult) {
super.onSuccess(vCoinTradeSubmitResult);
if (mView == null) {
return;
}
//......
}
}).post();
上述示例是一個專案開發中常見的一個網路請求操作,通過介面回呼的方式去獲取網路請求結果,實際開發中也會經常遇到連續多個介面請求的情況,例如我們專案中的個人中心頁的邏輯就是先去異步獲取,
本地快取,獲取失敗的話就需要異步重繪一下賬號token,然后網路請求相關個人中心的其他資訊,這里簡單舉一個支付示例,進行支付時,可能要先去獲取賬號token,然后依賴該token再去做支付,
請求操作,根據支付回傳資料再去查詢支付結果,這種情況通過回呼就可能演變為“地獄回呼”,
//獲取賬號token
WalletHttp.target(Account.class).setTag(tag)
.setFullUrl(Constants.ACCOUNT_URL).setParams(params)
.callback(new HttpCallback<Account>() {
@Override
public void onSuccess(Account account) {
super.onSuccess(account);
//根據賬號token進行支付操作
WalletHttp.target(Pay.class).setFullUrl(Constants.PAY_URL).addToken(account.getToken()).callback(new HttpCallback<Pay>() {
@Override
public void onSuccess(Pay pay){
super.onSuccess(pay);
//根據支付操作回傳查詢支付結果
WalletHttp.target(PayResult.class).setFullUrl(Constants.RESULT_URL).addResultCode(pay.getResultCode()).callback(new HttpCallback<PayResult>() {
@Override
public void onSuccess(PayResult result){
super.onSuccess(result);
//......
}
}).post();
}
}).post();
}
}).post();
對于這種場景,kotlin協程“同步方式寫出異步代碼”的這個特性就可以很好的解決上述問題,若上述場景用kotlin 協程代碼實作呢,可能就為:
fun postItem(tag: String, params: Map<String, Any?>) = viewModelScope.launch {
// 獲取賬號資訊
val account = repository.queryAccount(tag, params)
// 進行支付操作
val pay = repository.paySubmit(tag,account.token)
//查詢支付結果
val result = repository.queryPayResult(tag,pay.resultCode)
//......
}
可以看出,協程代碼非常簡潔,以順序的方式書寫異步代碼,代碼可讀性極強,
如果想要將原先的網路回呼請求也改寫成這種同步模式呢,只需要對原先請求回呼叫協程提供的suspendCancellableCoroutine等方法進行封裝處理,即可讓早期的異步代碼也享受上述“同步代碼”的絲滑,
協程:
一種非搶占式或者協作式的計算機程式并發調度實作,程式可以主動掛起或者恢復執行,其核心點是函式或一段程式能夠被掛起,稍后再在掛起的位置恢復,通過主動讓出運行權來實作協作,程式自己處理掛起和恢復來實作程式執行流程的協作調度,
協程本質上是輕量級執行緒,
協程的特點有:
協程可以讓異步代碼同步化,其本質是輕量級執行緒,
可在單個執行緒運行多個協程,其支持掛起,不會使運行協程的執行緒阻塞,
可以降低異步程式的設計復雜度,
Kotlin協程實作層次:
基礎設施層:標準庫的協程API,主要對協程提供了概念和語意上最基本的支持;
業務框架層:協程的上層框架支持,基于標準庫實作的封裝,也是我們日常開發使用的協程擴展庫,

二、協程啟動
具體在使用協程前,首先要配置對Kotlin協程的依賴,
(1)專案根目錄build.gradle
buildscript {
...
ext.kotlin_coroutines = 'xxx'
...
}
(2)Module下build.gradle
dependencies {
...
//協程標準庫
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_coroutines"
//依賴協程核心庫,包含協程公共API部分
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines"
//依賴android支持庫,協程Android平臺的具體實作方式
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"
...
}
2.1 Thread 啟動
在Java中,可以通過Thread開啟并發操作:
new Thread(new Runnable() {
@Override
public void run() {
//... do what you want
}
}).start();
在Kotlin中,使用執行緒更為便捷:
val myThread = thread {
//.......
}
這個Thread方法有個引數start默認為true,即創造出來的執行緒默認啟動,你可以自定義啟動時機:
val myThread = thread(start = false) {
//......
}
myThread.start()
2.2 協程啟動
動協程需要三部分:背景關系、啟動模式、協程體,
啟動方式一般有三種,其中最簡單的啟動協程的方式為:
GlobalScope.launch {
//......
}
GlobalScope.launch()屬于協程構建器Coroutine builders,Kotlin 中還有其他幾種 Builders,負責創建協程:
runBlocking:T
使用runBlocking頂層函式創建,會創建一個新的協程同時阻塞當前執行緒,直到協程結束,適用于main函式和單元測驗
launch
創建一個新的協程,不會阻塞當前執行緒,必須在協程作用域中才可以呼叫,它回傳的是一個該協程任務的參考,即Job物件,這是最常用的啟動協程的方式,
async
創建一個新的協程,不會阻塞當前執行緒,必須在協程作用域中才可以呼叫,并回傳Deffer物件,可通過呼叫Deffer.await()方法等待該子協程執行完成并獲取結果,常用于并發執行-同步等待和獲取回傳值的情況,
2.2.1 runBlocking
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

runBlocking是一個頂層函式,可以在任意地方獨立使用,它能創建一個新的協程同時阻塞當前執行緒,直到其內部所有邏輯以及子協程所有邏輯全部執行完成,常用于main函式和測驗中,
//main函式中應用
fun main() = runBlocking {
launch { // 創建一個新協程,runBlocking會阻塞執行緒,但內部運行的協程是非阻塞的
delay(1000L)
println("World!")
}
println("Hello,")
delay(2000L) // 延時2s,保證JVM存活
}
//測驗中應用
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking {
// ......
}
}
2.2.2 launch
launch是最常用的用于啟動協程的方式,會在不阻塞當前執行緒的情況下啟動一個協程,并回傳對該協程任務的參考,即Job物件,
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job

協程需要運行在協程背景關系環境中,在非協程環境中的launch有兩種:GlobalScope 與 CoroutineScope ,
- GlobalScope.launch()
在應用范圍內啟動一個新協程,不會阻塞呼叫執行緒,協程的生命周期與應用程式一致,
fun launchTest() {
print("start")
GlobalScope.launch {
delay(1000)//1秒無阻塞延遲
print("GlobalScope.launch")
}
print("end")
}
/** 列印結果
start
end
GlobalScope.launch
*/
這種啟動的協程存在組件被銷毀但協程還存在的情況,一般不推薦,其中GlobalScope本身就是一個作用域,launch屬于其子作用域,
- CoroutineScope.launch()
啟動一個新的協程而不阻塞當前執行緒,并回傳對協程的參考作為一個Job,
fun launchTest2() {
print("start")
val job = CoroutineScope(Dispatchers.IO).launch {
delay(1000)
print("CoroutineScope.launch")
}
print("end")
}
協程背景關系控制協程生命周期和執行緒調度,使得協程和該組件生命周期系結,組件銷毀時,協程一并銷毀,從而實作安全可靠地協程呼叫,這是在應用中最推薦的協程使用方式,
關于launch,根據業務需求需要創建一個或多個協程,則可能就需要在一個協程中啟動子協程,
fun launchTest3() {
print("start")
GlobalScope.launch {
delay(1000)
print("CoroutineScope.launch")
//在協程內創建子協程
launch {
delay(1500)
print("launch 子協程")
}
}
print("end")
}
/**** 列印結果
start
end
CoroutineScope.launch
launch 子協程
*/
2.2.3 async
async類似于launch,都是創建一個不會阻塞當前執行緒的新的協程,區別在于:async的回傳是Deferred物件,可通過Deffer.await()等待協程執行完成并獲取結果,而 launch 不行,常用于并發執行-同步等待和獲取回傳值的情況,
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>

注意:
-
await() 不能在協程之外呼叫,因為它需要掛起直到計算完成,而且只有協程可以以非阻塞的方式掛起,所以把它放到協程中,
-
如果Deferred不執行await()則async內部拋出的例外不會被logCat或try Catch捕獲,但是依然會導致作用域取消和例外崩潰; 但當執行await時例外資訊會重新拋出
-
如果將async函式中的啟動模式設定為CoroutineStart.LAZY懶加載模式時則只有呼叫Deferred物件的await時(或者執行async.satrt())才會開始執行異步任務,
三、協程補充知識
在敘述協程啟動內容,涉及到了Job、Deferred、啟動模式、作用域等概念,這里補充介紹一下上述概念,
3.1 Job
Job 是協程的句柄,賦予協程可取消,賦予協程以生命周期,賦予協程以結構化并發的能力,
Job是launch構建協程回傳的一個協程任務,完成時是沒有回傳值的,可以把Job看成協程物件本身,封裝了協程中需要執行的代碼邏輯,協程的操作方法都在Job身上,Job具有生命周期并且可以取消,它也是背景關系元素,繼承自CoroutineContext,
在日常 Android 開發程序中,協程配合 Lifecycle 可以做到自動取消,
Job生命周期
Job 的生命周期分為 6 種狀態,分為
-
New
-
Active
-
Completing
-
Cancelling
-
Cancelled
-
Completed
通常外界會持有 Job 介面作為參考被協程呼叫者所持有,Job 介面提供 isActive、isCompleted、isCancelled 3 個變數使外界可以感知 Job 內部的狀態,

val job = launch(start = CoroutineStart.LAZY) {
println("Active")
}
println("New")
job.join()
println("Completed")
/**列印結果**/
New
Active
Completed
/**********
* 1. 以 lazy 方式創建出來的協程 state 為 New
* 2. 對應的 job 呼叫 join 函式后,協程進入 Active 狀態,并開始執行協程對應的具體代碼
* 3. 當協程執行完畢后,由于沒有需要等待的子協程,協程直接進入 Completed 狀態
*/
關于Job,常用的方法有:
//活躍的,是否仍在執行
public val isActive: Boolean
//啟動協程,如果啟動了協程,則為true;如果協程已經啟動或完成,則為false
public fun start(): Boolean
//取消Job,可通過傳入Exception說明具體原因
public fun cancel(cause: CancellationException? = null)
//掛起協程直到此Job完成
public suspend fun join()
//取消任務并等待任務完成,結合了[cancel]和[join]的呼叫
public suspend fun Job.cancelAndJoin()
//給Job設定一個完成通知,當Job執行完成的時候會同步執行這個函式
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
Job父子層級
對于Job,還需要格外關注的是Job的父子層級關系,
-
一個Job可以包含多個子Job,
-
當父Job被取消后,所有的子Job也會被自動取消,
-
當子Job被取消或者出現例外后父Job也會被取消,
-
具有多個子 Job 的父Job 會等待所有子Job完成(或者取消)后,自己才會執行完成,
3.2 Deferred
Deferred繼承自Job,具有與Job相同的狀態機制,
它是async構建協程回傳的一個協程任務,可通過呼叫await()方法等待協程執行完成并獲取結果,其中Job沒有結果值,Deffer有結果值,
public interface Deferred<out T> : Job
3.3 作用域
協程作用域(CoroutineScope):協程定義的作用范圍,本質是一個介面,
確保所有的協程都會被追蹤,Kotlin 不允許在沒有使用CoroutineScope的情況下啟動新的協程,CoroutineScope可被看作是一個具有超能力的ExecutorService的輕量級版本,它能啟動新的協程,同時這個協程還具備suspend和resume的優勢,
每個協程生成器launch、async等都是CoroutineScope的擴展,并繼承了它的coroutineContext,自動傳播其所有元素和取消,
啟動協程需要作用域,但是作用域又是在協程創建程序中產生的,
public interface CoroutineScope {
/**
* 此域的背景關系,Context被作用域封裝,用于在作用域上擴展的協程構建器的實作,
*/
public val coroutineContext: CoroutineContext
}
官方提供的常用作用域:
- runBlocking:
頂層函式,可啟動協程,但會阻塞當前執行緒
- GlobalScope
全域協程作用域,通過GlobalScope創建的協程不會有父協程,可以把它稱為根協程,它啟動的協程的生命周期只受整個應用程式的生命周期的限制,且不能取消,在運行時會消耗一些記憶體資源,這可能會導致記憶體泄露,不適用于業務開發,
- coroutineScope
創建一個獨立的協程作用域,直到所有啟動的協程都完成后才結束自身,
它是一個掛起函式,需要運行在協程內或掛起函式內,當這個作用域中的任何一個子協程失敗時,這個作用域失敗,所有其他的子協程都被取消,
- supervisorScope
與coroutineScope類似,不同的是子協程的例外不會影響父協程,也不會影響其他子協程,(作用域本身的失敗(在block或取消中拋出例外)會導致作用域及其所有子協程失敗,但不會取消父協程,)
- MainScope
為UI組件創建主作用域,一個頂層函式,背景關系是SupervisorJob() + Dispatchers.Main,說明它是一個在主執行緒執行的協程作用域,通過cancel對協程進行取消,
fun scopeTest() {
GlobalScope.launch {//父協程
launch {//子協程
print("GlobalScope的子協程")
}
launch {//第二個子協程
print("GlobalScope的第二個子協程")
}
}
val mainScope = MainScope()
mainScope.launch {//啟動協程
//todo
}
}
Jetpack 的Lifecycle相關組件提供了已經系結UV宣告周期的作用域供我們直接使用:
- lifecycleScope:
Lifecycle Ktx庫提供的具有生命周期感知的協程作用域,與Lifecycle系結生命周期,生命周期被銷毀時,此作用域將被取消,會與當前的UI組件系結生命周期,界面銷毀時該協程作用域將被取消,不會造成協程泄漏,推薦使用,
- viewModelScope:
與lifecycleScope類似,與ViewModel系結生命周期,當ViewModel被清除時,這個作用域將被取消,推薦使用,
3.4 啟動模式
前述進行協程創建啟動時涉及到了啟動模式CoroutineStart,其是一個列舉類,為協程構建器定義啟動選項,在協程構建的start引數中使用,

DEFAULT模式
DEFAULT 是餓漢式啟動,launch 呼叫后,會立即進入待調度狀態,一旦調度器 OK 就可以開始執行,
suspend fun main() {
log(1)
val job = GlobalScope.launch{
log(2)
}
log(3)
Thread.sleep(5000) //防止程式退出
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
前述示例代碼采用默認的啟動模式和默認的調度器,,運行結果取決于當前執行緒與后臺執行緒的調度順序,
/**可能的運行結果一****/
[main]:1
[main]:3
[main]:2
/**可能的運行結果二****/
[main]:1
[main]:2
[main]:3
LAZY模式
LAZY 是懶漢式啟動,launch 后并不會有任何調度行為,協程體不會進入執行狀態,直到我們需要他的運行結果時進行執行,其launch 呼叫后會回傳一個 Job 實體,
對于這種情況,可以:
-
呼叫Job.start,主動觸發協程的調度執行
-
呼叫Job.join,隱式的觸發協程的調度執行
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.LAZY){
log(2)
}
log(3)
job.join()
log(4)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
對于join,一定要等待協程執行完畢,所以其運行結果一定為:
[main]:1
[main]:3
[DefaultDispatcher-worker-1]:2
[main]:4
如果把join()換為start(),則輸出結果不一定,
ATOMIC模式
ATOMIC 只有涉及 cancel 的時候才有意義,呼叫cancel的時機不同,結果也有差異,
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC){
log(2)
}
job.cancel()
log(3)
Thread.sleep(2000)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
前述代碼示例創建協程后立即cancel,由于是ATOMIC模式,因此協程一定會被調度,則log 1、2、3一定都會被列印輸出,如果將模式改為DEFAULT模式,則log 2有可能列印輸出,也可能不會,
其實cancel 呼叫一定會將該 job 的狀態置為 cancelling,只不過ATOMIC 模式的協程在啟動時無視了這一狀態,
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
log(2)
delay(1000)
log(3)
}
job.cancel()
log(4)
job.join()
Thread.sleep(2000)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
/**列印輸出結果可能如下****/
[main]:1
[DefaultDispatcher-worker-1]:2
[main]:4
前述代碼中,2和3中加了一個delay,delay會使得協程體的執行被掛起,1000ms 之后再次調度后面的部分,對于 ATOMIC 模式,它一定會被啟動,實際上在遇到第一個掛起點之前,它的執行是不會停止的,而 delay 是一個 suspend 函式,這時我們的協程迎來了自己的第一個掛起點,恰好 delay 是支持 cancel 的,因此后面的 3 將不會被列印,
UNDISPATCHED模式
協程在這種模式下會直接開始在當前執行緒下執行,直到第一個掛起點,
與ATOMIC的不同之處在于 UNDISPATCHED 不經過任何調度器即開始執行協程體,遇到掛起點之后的執行就取決于掛起點本身的邏輯以及背景關系當中的調度器了,
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
log(2)
delay(100)
log(3)
}
log(4)
job.join()
log(5)
Thread.sleep(2000)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
協程啟動后會立即在當前執行緒執行,因此 1、2 會連續在同一執行緒中執行,delay 是掛起點,因此 3 會等 100ms 后再次調度,這時候 4 執行,join 要求等待協程執行完,因此等 3 輸出后再執行 5,
結果如下:
[main]:1
[main]:2
[main]:4
[DefaultDispatcher-worker-1]:3
[DefaultDispatcher-worker-1]:5
3.5 withContext
withContext {}不會創建新的協程,在指定協程上運行掛起代碼塊,放在該塊內的任何代碼都始終通過IO調度器執行,并掛起該協程直至代碼塊運行完成,
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T

withContext會使用新指定的背景關系的dispatcher,將block的執行轉移到指定的執行緒中,
它會回傳結果, 可以和當前協程的父協程存在互動關系, 主要作用為了來回切換調度器,
coroutineScope{
launch(Dispatchers.Main) { // 在 UI 執行緒開始
val image = withContext(Dispatchers.IO) { // 切換到 IO 執行緒,并在執行完成后切回 UI 執行緒
getImage(imageId) // 將會運行在 IO 執行緒
}
avatarIv.setImageBitmap(image) // 回到 UI 執行緒更新 UI
}
}
四、協程調度

4.1 協程背景關系
在協程啟動部分提到,啟動協程需要三個部分,其中一個部分就是背景關系,其介面型別是CoroutineContext,通常所見的背景關系型別是CombinedContext或者EmptyCoroutineContext,一個表示背景關系組合,另一個表示空,
協程背景關系是Kotlin協程的基本結構單元,主要承載著資源獲取,配置管理等作業,是執行環境的通用資料資源的統一管理者,除此之外,也包括攜帶引數,攔截協程執行等,是實作正確的執行緒行為、生命周期、例外以及除錯的關鍵,
協程使用以下幾種元素集定義協程行為,他們均繼承自CoroutineContext:
【Job】:協程的句柄,對協程的控制和管理生命周期,
【CoroutineName】:協程的名稱,用于除錯
【CoroutineDispatcher】:調度器,確定協程在指定的執行緒執行
【CoroutineExceptionHandler】:協程例外處理器,處理未捕獲的例外

這里回顧一下launch和async兩個函式簽名,
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
兩個函式第一個引數都是CoroutineContext型別,
所有協程構建函式都是以CoroutineScope的擴展函式的形式被定義的,而CoroutineScope的介面唯一成員就是CoroutineContext型別,
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
簡而言之,協程背景關系是協程必備組成部分,管理了協程的執行緒系結、生命周期、例外處理和除錯,
4.1.1 協程背景關系結構
看一下CoroutineContext的介面方法:
public interface CoroutineContext {
//運算子[]多載,可以通過CoroutineContext[Key]這種形式來獲取與Key關聯的Element
public operator fun <E : Element> get(key: Key<E>): E?
//提供遍歷CoroutineContext中每一個Element的能力,并對每一個Element做operation操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
//運算子+多載,可以CoroutineContext + CoroutineContext這種形式把兩個CoroutineContext合并成一個
public operator fun plus(context: CoroutineContext): CoroutineContext = .......
//回傳一個新的CoroutineContext,這個CoroutineContext洗掉了Key對應的Element
public fun minusKey(key: Key<*>): CoroutineContext
//Key定義,空實作,僅僅做一個標識
public interface Key<E : Element>
///Element定義,每個Element都是一個CoroutineContext
public interface Element : CoroutineContext {
//每個Element都有一個Key實體
public val key: Key<*>
......
}
}
Element:協程背景關系的一個元素,本身就是一個單例背景關系,里面有一個key,是這個元素的索引,
可知,Element本身也實作了CoroutineContext介面,
這里我們再看一下官方解釋:
/**
Persistent context for the coroutine. It is an indexed set of [Element] instances.
An indexed set is a mix between a set and a map.
Every element in this set has a unique [Key].*/
從官方解釋可知,CoroutineContext是一個Element的集合,這種集合被稱為indexed set,介于set 和 map 之間的一種結構,set 意味著其中的元素有唯一性,map 意味著每個元素都對應一個鍵,
如果將協程背景關系內部的一系列背景關系稱為子背景關系,背景關系為每個子背景關系分配了一個Key,它是一個帶有型別資訊的介面,
這個介面通常被實作為companion object,
//Job
public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job>
}
//攔截器
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
}
//協程名
public data class CoroutineName(
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
/**
* Key for [CoroutineName] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineName>
}
//例外處理器
public interface CoroutineExceptionHandler : CoroutineContext.Element {
/**
* Key for [CoroutineExceptionHandler] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
}
原始碼中定義的子背景關系,都會在內部宣告一個靜態的Key,類內部的靜態變數意味著被所有類實體共享,即全域唯一的 Key 實體可以對應多個子背景關系實體,
在一個類似 map 的結構中,每個鍵必須是唯一的,因為對相同的鍵 put 兩次值,新值會代替舊值,通過上述方式,通過鍵的唯一性保證了背景關系中的所有子背景關系實體都是唯一的,
我們按照這個格式仿寫一下然后反編譯,
class MyElement :AbstractCoroutineContextElement(MyElement) {
companion object Key : CoroutineContext.Key<MyElement>
}
//反編譯的java檔案
public final class MyElement extends AbstractCoroutineContextElement {
@NotNull
public static final MyElement.Key Key = new MyElement.Key((DefaultConstructorMarker)null);
public MyElement() {
super((kotlin.coroutines.CoroutineContext.Key)Key);
}
public static final class Key implements kotlin.coroutines.CoroutineContext.Key {
private Key() {
}
// $FF: synthetic method
public Key(DefaultConstructorMarker $constructor_marker) {
this();
}
}
}
對比kt和Java檔案,可以看到Key就是一個靜態變數,且其實作類未做處理,作用與HashMap中的Key類似,
Key是靜態變數,全域唯一,為Element提供唯一性保障,
前述內容總結如下:
協程背景關系是一個元素的集合,單個元素本身也是一個背景關系,其定義是遞回的,自己包含若干個自己,
協程背景關系這個集合有點像 set 結構,其中的元素都是唯一的,不重復的,其通過給每一個元素配有一個靜態的鍵實體,構成一組鍵值對的方式實作,這使其類似 map 結構,這種介于 set 和 map 之間的結構稱為indexed set,
CoroutineContext.get()獲取元素
關于CoroutineContext,我們先看一下其是如何取元素的,
這里看一下Element、CombinedContext、EmptyCoroutineContext的內部實作,其中CombinedContext就是CoroutineContext集合結構的實作,EmptyCoroutineContext就表示一個空的CoroutineContext,它里面是空實作,
@SinceKotlin("1.3")
internal class CombinedContext(
//左背景關系
private val left: CoroutineContext,
//右元素
private val element: Element
) : CoroutineContext, Serializable {
override fun <E : Element> get(key: Key<E>): E? {
var cur = this
while (true) {
//如果輸入 key 和右元素的 key 相同,則回傳右元素
cur.element[key]?.let { return it }
// 若右元素不匹配,則向左繼續查找
val next = cur.left
if (next is CombinedContext) {
cur = next
} else { // 若左背景關系不是混合背景關系,則結束遞回
return next[key]
}
}
}
......
}
public interface Element : CoroutineContext {
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
// 如果給定鍵和元素本身鍵相同,則回傳當前元素,否則回傳空
if (this.key == key) this as E else null
......
}
public object EmptyCoroutineContext : CoroutineContext, Serializable {
//回傳空
public override fun <E : Element> get(key: Key<E>): E? = null
}
通過Key檢索Element,回傳值只能是Element或null,鏈表節點中的元素值,其中CombinedContext利用while回圈實作了類似遞回的效果,其中較早被遍歷到的元素自然具有較高的優先級,
//使用示例
println(coroutineContext[CoroutineName])
println(Dispatchers.Main[CoroutineName])
CoroutineContext.minusKey()洗掉元素
同理看一下Element、CombinedContext、EmptyCoroutineContext的內部實作,
internal class CombinedContext(
//左背景關系
private val left: CoroutineContext,
//右元素
private val element: Element
) : CoroutineContext, Serializable {
public override fun minusKey(key: Key<*>): CoroutineContext {
//如果element就是要洗掉的元素,回傳left,否則說明要洗掉的元素在left中,繼續從left中洗掉對應的元素
element[key]?.let { return left }
//在左背景關系中去掉對應元素
val newLeft = left.minusKey(key)
return when {
//如果left中不存在要洗掉的元素,那么當前CombinedContext就不存在要洗掉的元素,直接回傳當前CombinedContext實體
newLeft === left -> this
//如果left中存在要洗掉的元素,洗掉了這個元素后,left變為了空,那么直接回傳當前CombinedContext的element就行
newLeft === EmptyCoroutineContext -> element
//如果left中存在要洗掉的元素,洗掉了這個元素后,left不為空,那么組合一個新的CombinedContext回傳
else -> CombinedContext(newLeft, element)
}
}
......
}
public object EmptyCoroutineContext : CoroutineContext, Serializable {
public override fun minusKey(key: Key<*>): CoroutineContext = this
......
}
public interface Element : CoroutineContext {
//如果key和自己的key匹配,那么自己就是要洗掉的Element,回傳EmptyCoroutineContext(表示洗掉了自己),否則說明自己不需要被洗掉,回傳自己
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
......
}
如果把CombinedContext和Element結合來看,那么CombinedContext的整體結構如下:

其結構類似鏈表,left就是指向下一個結點的指標,get、minusKey操作大體邏輯都是先訪問當前element,不滿足,再訪問left的element,順序都是從right到left,
CoroutineContext.fold()元素遍歷
internal class CombinedContext(
//左背景關系
private val left: CoroutineContext,
//右元素
private val element: Element
) : CoroutineContext, Serializable {
//先對left做fold操作,把left做完fold操作的的回傳結果和element做operation操作
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(left.fold(initial, operation), element)
......
}
public object EmptyCoroutineContext : CoroutineContext, Serializable {
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
......
}
public interface Element : CoroutineContext {
//對傳入的initial和自己做operation操作
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
......
}
fold也是遞回的形式操作,fold的操作大體邏輯是:先訪問left,直到遞回到最后的element,然后再從left到right的回傳,從而訪問了所有的element,
CoroutineContext.plus()添加元素
關于CoroutineContext的元素添加方法,直接看其plus()實作,也是唯一沒有被重寫的方法,
public operator fun plus(context: CoroutineContext): CoroutineContext =
//如果要相加的CoroutineContext為空,那么不做任何處理,直接回傳
if (context === EmptyCoroutineContext) this else
//如果要相加的CoroutineContext不為空,那么對它進行fold操作,可以把acc理解成+號左邊的CoroutineContext,element理解成+號右邊的CoroutineContext的某一個element
context.fold(this) { acc, element ->
//首先從左邊CoroutineContext中洗掉右邊的這個element
val removed = acc.minusKey(element.key)
//如果removed為空,說明左邊CoroutineContext洗掉了和element相同的元素后為空,那么回傳右邊的element即可
if (removed === EmptyCoroutineContext) element else {
//如果removed不為空,說明左邊CoroutineContext洗掉了和element相同的元素后還有其他元素,那么構造一個新的CombinedContext回傳
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
plus方法大部分情況下回傳一個CombinedContext,即我們把兩個CoroutineContext相加后,回傳一個CombinedContext,在組合成CombinedContext時,+號右邊的CoroutineContext中的元素會覆寫+號左邊的CoroutineContext中的含有相同key的元素,
這個覆寫操作就在fold方法的引數operation代碼塊中完成,通過minusKey方法洗掉掉重復元素,
plus方法中可以看到里面有個對ContinuationInterceptor的處理,目的是讓ContinuationInterceptor在每次相加后都能變成CoroutineContext中的最后一個元素,
ContinuationInterceptor繼承自Element,稱為協程背景關系攔截器,作用是在協程執行前攔截它,從而在協程執行前做出一些其他的操作,通過把ContinuationInterceptor放在最后面,協程在查找背景關系的element時,總能最快找到攔截器,避免了遞回查找,從而讓攔截行為前置執行,
4.1.2 CoroutineName
public data class CoroutineName(
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
CoroutineName是用戶用來指定的協程名稱的,用于方便除錯和定位問題,
GlobalScope.launch(CoroutineName("GlobalScope")) {
launch(CoroutineName("CoroutineA")) {//指定協程名稱
val coroutineName = coroutineContext[CoroutineName]//獲取協程名稱
print(coroutineName)
}
}
/** 列印結果
CoroutineName(CoroutineA)
*/
協程內部可以通過coroutineContext這個全域屬性直接獲取當前協程的背景關系,
4.1.3 背景關系組合
如果要傳遞多個背景關系元素,CoroutineContext可以使用"+"運算子進行合并,由于CoroutineContext是由一組元素組成的,所以加號右側的元素會覆寫加號左側的元素,進而組成新創建的CoroutineContext,
GlobalScope.launch {
//通過+號運算添加多個背景關系元素
var context = CoroutineName("協程1") + Dispatchers.Main
print("context == $context")
context += Dispatchers.IO //添加重復Dispatchers元素,Dispatchers.IO 會替換 ispatchers.Main
print("context == $context")
val contextResult = context.minusKey(context[CoroutineName]!!.key)//移除CoroutineName元素
print("contextResult == $contextResult")
}
/**列印結果
context == [CoroutineName(協程1), Dispatchers.Main]
context == [CoroutineName(協程1), Dispatchers.IO]
contextResult == Dispatchers.IO
*/
如果有重復的元素(key一致)則右邊的會代替左邊的元素,相關原理參看協程背景關系結構章節,
4.1.4 CoroutineScope 構建
CoroutineScope實際上是一個CoroutineContext的封裝,當我們需要啟動一個協程時,會在CoroutineScope的實體上呼叫構建函式,如async和launch,
在構建函式中,一共出現了3個CoroutineContext,

查看協程構建函式async和launch的原始碼,其第一行都是如下代碼:
val newContext = newCoroutineContext(context)
進一步查看:
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context //CoroutineContext拼接組合
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
構建器內部進行了一個CoroutineContext拼接操作,plus左值是CoroutineScope內部的CoroutineContext,右值是作為構建函式引數的CoroutineContext,
抽象類AbstractCoroutineScope實作了CoroutineScope和Job介面,大部分CoroutineScope的實作都繼承自AbstractCoroutineScope,意味著他們同時也是一個Job,
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
/**
* The context of this coroutine that includes this coroutine as a [Job].
*/
public final override val context: CoroutineContext = parentContext + this
//重寫了父類的coroutineContext屬性
public override val coroutineContext: CoroutineContext get() = context
}
從上述分析可知:coroutine context = parent context + coroutine job
4.1.5 典型用例
全限定Context
launch( Dispatchers.Main + Job() + CoroutineName("HelloCoroutine") + CoroutineExceptionHandler { _, _ -> /* ... */ }) {
/* ... */
}
全限定Context,即全部顯式指定具體值的Elements,不論你用哪一個CoroutineScope構建該協程,它都具有一致的表現,不會受到CoroutineScope任何影響,
CoroutineScope Context
基于Activity生命周期實作一個CoroutineScope
abstract class ScopedAppActivity:
AppCompatActivity(),
CoroutineScope
{
protected lateinit var job: Job
override val coroutineContext: CoroutineContext
get() = job + Dispatchers.Main // 注意這里使用+拼接CoroutineContext
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
}
override fun onDestroy() {
super.onDestroy()
job.cancel()
}
}
Dispatcher:使用Dispatcher.Main,以在UI執行緒進行繪制
Job:在onCreate時構建,在onDestroy時銷毀,所有基于該CoroutineContext創建的協程,都會在Activity銷毀時取消,從而避免Activity泄露的問題
臨時指定引數
CoroutineContext的引數主要有兩個來源:從scope中繼承+引數指定,我們可以用withContext便捷地指定某個引數啟動子協程,例如我們想要在協程內部執行一個無法被取消的子協程:
withContext(NonCancellable) {
/* ... */
}
讀取協程背景關系引數
通過頂級掛起只讀屬性coroutineContext獲取協程背景關系引數,它位于 kotlin-stdlib / kotlin.coroutines / coroutineContext
println("Running in ${coroutineContext[CoroutineName]}")
Nested Context內嵌背景關系
內嵌背景關系切換:在協程A內部構建協程B時,B會自動繼承A的Dispatcher,
可以在呼叫async時加入Dispatcher引數,切換到作業執行緒
// 錯誤的做法,在主執行緒中直接呼叫async,若耗時過長則阻塞UI
GlobalScope.launch(Dispatchers.Main) {
val deferred = async {
/* ... */
}
/* ... */
}
// 正確的做法,在作業執行緒執行協程任務
GlobalScope.launch(Dispatchers.Main) {
val deferred = async(Dispatchers.Default) {
/* ... */
}
/* ... */
}
4.2 協程攔截器
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//......
}
-
無論在CoroutineContext后面 放了多少個攔截器,Key 為 ContinuationInterceptor 的攔截器只能有一個,
-
Continuation 在呼叫其 Continuation#resumeWith() 方法,會執行其 suspend 修飾的函式的代碼塊,如果我們提前攔截到,可以做點其他事情,比如說切換執行緒,這是 ContinuationInterceptor 的主要作用,
協程的本質就是回呼,這個回呼就是被攔截的Continuation,OkHttp用攔截器做快取,打日志,模擬請求等,協程攔截器同理,
我們通過Dispatchers 來指定協程發生的執行緒,Dispatchers 實作了 ContinuationInterceptor介面,
這里我們自定義一個攔截器放到協程背景關系,看一下會發生什么,
class MyContinuationInterceptor: ContinuationInterceptor{
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}
class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
override val context = continuation.context
override fun resumeWith(result: Result<T>) {
log("<MyContinuation> $result" )
continuation.resumeWith(result)
}
}
suspend fun main(args: Array<String>) { // start main coroutine
GlobalScope.launch(MyContinuationInterceptor()) {
log(1)
val job = async {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.await()
log("5. $result")
}.join()
log(6)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
/******列印結果******/
[main]:<MyContinuation> Success(kotlin.Unit) //11
[main]:1
[main]:<MyContinuation> Success(kotlin.Unit) //22
[main]:2
[main]:4
[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(kotlin.Unit) //33
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(Hello)
[kotlinx.coroutines.DefaultExecutor]:5. Hello
[kotlinx.coroutines.DefaultExecutor]:6
- 所有協程啟動時,都有一次Continuation.resumeWith 的操作,協程有機會調度到其他執行緒的關鍵之處就在于此,
- delay是掛起點,1s之后需要繼續調度執行該協程,因此就有了33處日志,
前述分析CoroutineContext的plus方法涉及到了ContinuationInterceptor,plus每次都會將ContinuationInterceptor添加到拼接鏈的尾部,這里再詳細解釋一下原因,
public operator fun plus(context: CoroutineContext): CoroutineContext =
//如果要相加的CoroutineContext為空,那么不做任何處理,直接回傳
if (context === EmptyCoroutineContext) this else
//如果要相加的CoroutineContext不為空,那么對它進行fold操作,可以把acc理解成+號左邊的CoroutineContext,element理解成+號右邊的CoroutineContext的某一個element
context.fold(this) { acc, element ->
//首先從左邊CoroutineContext中洗掉右邊的這個element
val removed = acc.minusKey(element.key)
//如果removed為空,說明左邊CoroutineContext洗掉了和element相同的元素后為空,那么回傳右邊的element即可
if (removed === EmptyCoroutineContext) element else {
//如果removed不為空,說明左邊CoroutineContext洗掉了和element相同的元素后還有其他元素,那么構造一個新的CombinedContext回傳
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}

原因一:CombinedContext的結構決定,
其有兩個元素,left是一個前驅集合,element為一個純粹CoroutineContext,它的get方法每次都是從element開始進行查找對應Key的CoroutineContext物件;沒有匹配到才會去left集合中進行遞回查找,為了加快查找ContinuationInterceptor型別的實體,才將它加入到拼接鏈的尾部,對應的就是element,
原因二:ContinuationInterceptor使用很頻繁
每次創建協程都會去嘗試查找當前協程的CoroutineContext中是否存在ContinuationInterceptor,這里我們用launch來驗證一下
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.()
->
Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
如果使用的launch使用的是默認引數,此時Coroutine就是StandaloneCoroutine,然后呼叫start方法啟動協程,
start(block, receiver, this)
}
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
如果我們使用默認引數,看一下默認引數對應執行的block.startCoroutineCancellable(completion)
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
-
首先通過createCoroutineUnintercepted來創建一個協程
-
然后再呼叫intercepted方法進行攔截操作
-
最后呼叫resumeCancellable,即Continuation的resumeWith方法,啟動協程,所以每次啟動協程都會自動回調一次resumeWith方法
這里看一下intercepted
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
看其在ContinuationImpl的intercepted方法實作
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
-
首先獲取到ContinuationInterceptor實體
-
然后呼叫它的interceptContinuation方法回傳一個處理過的Continuation(多次呼叫intercepted,對應的interceptContinuation只會呼叫一次)
至此可知,ContinuationInterceptor的攔截是通過interceptContinuation方法進行
下面再看一個ContinuationInterceptor的典型示例
val interceptor = object : ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
println("intercept todo something. change run to thread")
return object : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("create new thread")
thread {
continuation.resumeWith(result)
}
}
}
}
}
println(Thread.currentThread().name)
lifecycleScope.launch(interceptor) {
println("launch start. current thread: ${Thread.currentThread().name}")
withContext(Dispatchers.Main) {
println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
}
launch {
println("new continuation todo something. current thread: ${Thread.currentThread().name}")
}
println("launch end. current thread: ${Thread.currentThread().name}")
}
/******列印結果******/
main
// 第一次launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8
-
首先程式運行在main執行緒,啟動協程時將自定義的interceptor加入到背景關系中,協程啟動時進行攔截,將在main執行緒運行的程式切換到新的thread執行緒
-
withContext沒有攔截成功,具體原因在下面的調度器再詳細解釋,簡單來說就是我們自定義的interceptor被替換了,
-
launch start與launch end所處的執行緒不一樣,因為withContext結束之后,它內部還會進行一次執行緒恢復,將自身所處的main執行緒切換到之前的執行緒,協程每一個掛起后恢復都是通過回呼resumeWith進行的,然而外部launch協程我們進行了攔截,在它回傳的Continuation的resumeWith回呼中總是會創建新的thread,
4.3 調度器
CoroutineDispatcher調度器指定指定執行協程的目標載體,它確定了相關的協程在哪個執行緒或哪些執行緒上執行,可以將協程限制在一個特定的執行緒執行,或將它分派到一個執行緒池,亦或是讓它不受限地運行,
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//將可運行塊的執行分派到給定背景關系中的另一個執行緒上
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//回傳一個continuation,它封裝了提供的[continuation],攔截了所有的恢復
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//......
}
協程需要調度的位置就是掛起點的位置,只有當掛起點正在掛起的時候才會進行調度,實作調度需要使用協程的攔截器,
調度的本質就是解決掛起點恢復之后的協程邏輯在哪里運行的問題,調度器也屬于協程背景關系一類,它繼承自攔截器,
-
【val Default】: CoroutineDispatcher
-
【val Main】: MainCoroutineDispatcher
-
【val Unconfined】: CoroutineDispatcher

IO僅在 Jvm 上有定義,它基于 Default 調度器背后的執行緒池,并實作了獨立的佇列和限制,因此協程調度器從 Default 切換到 IO 并不會觸發執行緒切換
關于調度器介紹到這里,還沒有詳細解釋前述協程攔截器中的withContext為什么攔截失敗,這里針對這個詳細看一下原始碼實作,
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
其回傳型別為MainCoroutineDispatcher,繼承自CoroutineDispatcher,
public abstract class MainCoroutineDispatcher : CoroutineDispatcher()
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
......
}
CoroutineDispatch實作了ContinuationInterceptor,根據前述解釋的CoroutineContext結構,可知我們自定義的攔截器沒有生效是因為被替換了,
CoroutineDispatch中的isDispatchNeeded就是判斷是否需要分發,然后dispatch就是執行分發,
ContinuationInterceptor重要的方法就是interceptContinuation,在CoroutineDispatcher中直接回傳了DispatchedContinuation物件,它是一個Continuation型別,看一下其resumeWith實作,
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
//判斷是否需要分發
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
//不需要分發,直接使用原先的continuation物件的resumewith
continuation.resumeWith(result)
}
}
}
}
那么分發的判斷邏輯是怎么實作的?這要根據具體的dispatcher來看,
如果我們拿的是Dispatchers.Main,其dispatcher為HandlerContext,
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
......
其中HandlerContext繼承于HandlerDispatcher,而HandlerDispatcher繼承于MainCoroutineDispatcher
Dispatcher的基本實作原理大致為:
-
首先在協程進行啟動的時候通過攔截器的方式進行攔截,對應的方法是interceptContinuation
-
然后回傳一個具有切換執行緒功能的Continuation
-
在每次進行resumeWith的時候,內部再通過isDispatchNeeded進行判斷當前協程的運行是否需要切換執行緒,
-
如果需要則呼叫dispatch進行執行緒的切換,保證協程的正確運行,如果要自定義協程執行緒的切換,可以通過繼承CoroutineDispatcher來實作,
這里再簡單看一下WithContext,我們都知道其不僅可以接受CoroutineDispatcher來幫助我們切換執行緒,同時在執行完畢之后還會幫助我們將之前切換掉的執行緒進恢復,保證協程運行的連貫性,那這是怎么實作的呢?
withContext的執行緒恢復原理是它內部生成了一個DispatchedCoroutine,保存切換執行緒時的CoroutineContext與切換之前的Continuation,最后在onCompletionInternal進行恢復,我們簡單翻一翻其原始碼實作,
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 創建新的CoroutineContext
val oldContext = uCont.context
val newContext = oldContext + context
......
//使用新的Dispatcher,覆寫外層
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
//在complete時會會回呼
override fun afterCompletion(state: Any?) {
// Call afterResume from afterCompletion and not vice-versa, because stack-size is more
// important for afterResume implementation
afterResume(state)
}
override fun afterResume(state: Any?) {
////uCont就是父協程,context仍是老版context,因此可以切換回原來的執行緒上
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
......
}
-
對于withContext,傳入的context會覆寫外層的攔截器并生成一個newContext,因此可以實作執行緒切換,
-
DispatchedCoroutine作為complete傳入協程體的創建函式中,因此協程體執行完成后會回呼到afterCompletion中,
-
DispatchedCoroutine中傳入的uCont是父協程,它的攔截器仍是外層的攔截器,因此會切換回原來的執行緒中,
4.3.1 典型用例
例如:點擊一個按鈕,進行異步操作后再回呼重繪UI
getUserBtn.setOnClickListener {
getUser { user ->
handler.post {
userNameView.text = user.name
}
}
}
typealias Callback = (User) -> Unit
fun getUser(callback: Callback){
...
}
由于 getUser 函式需要切到其他執行緒執行,因此回呼通常也會在這個非 UI 的執行緒中呼叫,所以為了確保 UI 正確被重繪,我們需要用 handler.post 切換到 UI 執行緒,
如果要用協程實作呢?
suspend fun getUserCoroutine() = suspendCoroutine<User> {
continuation ->
getUser {
continuation.resume(it)
}
}
getUserBtn.setOnClickListener {
GlobalScope.launch(Dispatchers.Main) {
userNameView.text = getUserCoroutine().name
}
}
suspendCoroutine 這個方法并不是幫我們啟動協程的,它運行在協程當中并且幫我們獲取到當前協程的 Continuation 實體,也就是拿到回呼,方便后面我們呼叫它的 resume 或者 resumeWithException 來回傳結果或者拋出例外,
4.3.2 執行緒系結
調度器的目的就是切執行緒,我們只要提供執行緒,調度器就應該很方便的創建出來,
suspend fun main() {
val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
GlobalScope.launch(myDispatcher) {
log(1)
}.join()
log(2)
}
由于這個執行緒池是我們自己創建的,因此我們需要在合適的時候關閉它,
除了上述的方法,kotlin協程還給出了更簡單的api,如下:
GlobalScope.launch(newSingleThreadContext("Dispather")) {
//......
}.join()
前述我們是通過執行緒的方式,同理可以通過執行緒池轉為調度器實作,
Executors.newScheduledThreadPool(10)
.asCoroutineDispatcher().use { dispatcher ->
GlobalScope.launch(dispatcher) {
//......
}.join
五、協程掛起
在前述協程時,經常會出現suspend關鍵字和掛起的說法,其含義和用法是什么?一起深入看一下,
5.1 概述
suspend翻譯過來就是中斷、掛起,用在函式宣告前,起到掛起協程的標識,本質作用是代碼呼叫時為方法添加一個Continuation型別的引數,保證協程中Continuation的上下傳遞,
掛起函式只能在協程或另一個掛起函式中被呼叫,如果你在非協程中使用到了掛起函式,會報錯,

阻塞:
函式A必須在函式B之前完成執行,執行緒被鎖定以便函式A能夠完成其執行
掛起:
函式A雖然已經啟動,但可以暫停,讓函式B執行,然后只在稍后恢復,執行緒沒有被函式A鎖定,
“掛起”是指協程從它當前執行緒脫離,切換到另一個執行緒運行,當執行緒運行到suspend函式時,會暫時掛起這個函式及后續代碼的執行,簡而言之,掛起函式是一個可以啟動、暫停和恢復的函式,
協程運行的時候每遇到被suspend修飾的方法時,都可能會掛起當前協程,不是必會掛起,例如如下方法就不會被掛起,
private suspend fun a() {
println("aa")
}
這是因為這種方法不會回傳COROUTINE_SUSPENDED型別,這在后面詳細解釋,
5.2 suspend本質
Kotlin 使用堆疊幀來管理要運行哪個函式以及所有區域變數,
協程在常規函式基礎上添加了suspend和resume兩項操作用于處理長時間運行的任務,
【suspend】:掛起或暫停,用于掛起執行當前協程,并保存所有區域變數
【resume】:恢復,用于讓已掛起的協程從掛起處恢復繼續執行
掛起(暫停)協程時,會復制并保存當前的堆疊幀以供稍后使用,將資訊保存到Continuation物件中,
恢復協程時,會將堆疊幀從其保存位置復制回來,對應的Continuation通過呼叫resumeWith函式才會恢復協程的執行,然后函式再次開始運行,同時回傳Result型別的成功或者例外的結果,
public interface Continuation<in T> {
//對應這個Continuation的協程背景關系
public val context: CoroutineContext
//恢復相應協程的執行,傳遞一個成功或失敗的結果作為最后一個掛起點的回傳值,
public fun resumeWith(result: Result<T>)
}
//將[value]作為最后一個掛起點的回傳值,恢復相應協程的執行,
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//恢復相應協程的執行,以便在最后一個掛起點之后重新拋出[例外],
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
-
Continuation 類有一個 resumeWith 函式可以接收 Result 型別的引數,
-
在結果成功獲取時,呼叫resumeWith(Result.success(value))或者呼叫拓展函式resume(value);出現例外時,呼叫resumeWith(Result.failure(exception))或者呼叫拓展函式resumeWithException(exception),這就是 Continuation 的恢復呼叫,
@FormUrlEncoded
@POST("/api/common/countryList")
suspend fun fetchCountryList(@FieldMap params: Map<String, String?>): CountryResponse
前述掛起函式決議后反編譯如下:
@FormUrlEncoded
@POST("/api/common/countryList")
@Nullable
Object fetchCountryList(@FieldMap @NotNull Map var1, @NotNull Continuation var2);
-
掛起函式反編譯后,發現多了一個Continuation引數,有編譯器傳遞,說明呼叫掛起函式需要Continuation,
-
只有掛起函式或者協程中才有Continuation,所以掛起函式只能在協程或者其他掛起函式中執行,
5.2.1 Continuation
這里看一下該Continuation的傳遞來源,
這個函式只能在協程或者掛起函式中執行,說明Continuation很有可能是從協程中傳入來的,查看協程構建原始碼,
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
通過launch啟動一個協程時,其通過coroutine的start方法啟動協程:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
然后start方法里面呼叫了CoroutineStart的invoke,這個時候我們發現了Continuation:
//CoroutineStart的invoke方法出現了Continuation
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
最侄訓呼到Continuation的resumeWith()恢復函式中,
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
我們再深入kotlin原始碼看一下其內部實作,
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)//1
}
}
}
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
val context = completion.context
// label == 0 when coroutine is not started yet (initially) or label == 1 when it was
return if (context === EmptyCoroutineContext)
object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
else
object : ContinuationImpl(completion as Continuation<Any?>, context) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
}
-
createCoroutineUnintercepted(receiver, completion)方法在Kotlin原始碼中是通過suspend關鍵字修飾的擴展方法,
-
suspend關鍵字修飾(suspend R.() -> T)物件實際被編譯成為一個Function2<r, continuation, Any?>介面物件,而關鍵字suspend實際編譯成了Continuation介面,
所以:
-
協程體本身就是Continuation,即必須在協程內呼叫suspend掛起函式,
-
suspend關鍵字并不具備暫停、掛起代碼塊或者函式方法功能,
5.2.2 狀態機CPS
協程實際掛起是如何實作的?
這里首先通過一個示例來演示一下狀態機,
suspend fun main() {
log(1)
log(returnSuspended())
log(2)
delay(1000)
log(3)
log(returnImmediately())
log(4)
}
suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{
continuation ->
thread {
Thread.sleep(1000)
continuation.resume("Return suspended.")
}
COROUTINE_SUSPENDED
}
suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{
log(5)
"Return immediately."
}
這里我們定義了兩個掛起函式,一個會真正掛起,一個會直接回傳結果,其運行結果為:
[main]:1
[Thread-2]:Return suspended.
[Thread-2]:2
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:5
[kotlinx.coroutines.DefaultExecutor]:Return immediately.
[kotlinx.coroutines.DefaultExecutor]:4
前述代碼的實際實作情況大致如下:
public class ContinuationImpl implements Continuation<Object> {
private int label = 0;
private final Continuation<Unit> completion;
public ContinuationImpl(Continuation<Unit> completion) {
this.completion = completion;
}
@Override
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object o) {
try {
Object result = o;
switch (label) {
case 0: {
LogKt.log(1);
result = SuspendFunctionsKt.returnSuspended( this);
label++;
if (isSuspended(result)) return;
}
case 1: {
LogKt.log(result);
LogKt.log(2);
result = DelayKt.delay(1000, this);
label++;
if (isSuspended(result)) return;
}
case 2: {
LogKt.log(3);
result = SuspendFunctionsKt.returnImmediately( this);
label++;
if (isSuspended(result)) return;
}
case 3:{
LogKt.log(result);
LogKt.log(4);
}
}
completion.resumeWith(Unit.INSTANCE);
} catch (Exception e) {
completion.resumeWith(e);
}
}
private boolean isSuspended(Object result) {
return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
}
-
首先定義了一個ContinuationImpl,即一個Continuation的實作,
-
可以在 Kotlin 的標準庫當中找到一個名叫 ContinuationImpl 的類,其 resumeWith 最終呼叫到了 invokeSuspend,而這個 invokeSuspend 實際上就是我們的協程體,通常也就是一個 Lambda 運算式,
-
通過 launch啟動協程,傳入的那個 Lambda 運算式,實際上會被編譯成一個 SuspendLambda 的子類,而它又是 ContinuationImpl 的子類,
public class RunSuspend implements Continuation<Unit> {
private Object result;
@Override
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object result) {
synchronized (this){
this.result = result;
notifyAll(); // 協程已經結束,通知下面的 wait() 方法停止阻塞
}
}
public void await() throws Throwable {
synchronized (this){
while (true){
Object result = this.result;
if(result == null) wait(); // 呼叫了 Object.wait(),阻塞當前執行緒,在 notify 或者 notifyAll 呼叫時回傳
else if(result instanceof Throwable){
throw (Throwable) result;
} else return;
}
}
}
}
接著,定義了一個RunSuspend,用來接收結果,
public static void main(String... args) throws Throwable {
RunSuspend runSuspend = new RunSuspend();
ContinuationImpl table = new ContinuationImpl(runSuspend);
table.resumeWith(Unit.INSTANCE);
runSuspend.await();
}
作為 completion 傳入的 RunSuspend 實體的 resumeWith 實際上是在 ContinuationImpl 的 resumeWtih 的最后才會被呼叫,因此它的 await() 一旦進入阻塞態,直到 ContinuationImpl 的整體狀態流轉完畢才會停止阻塞,此時行程也就運行完畢正常退出了,
這段代碼的運行結果為:
/******列印結果******/
[main]:1
[Thread-2]:Return suspended.
[Thread-2]:2
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:5
[kotlinx.coroutines.DefaultExecutor]:Return immediately.
[kotlinx.coroutines.DefaultExecutor]:4
-
協程體的執行就是一個狀態機,每一次遇到掛起函式,都是一次狀態轉移,就像我們前面例子中的 label 不斷的自增來實作狀態流轉一樣
-
狀態機即代碼中每一個掛起點和初始掛起點對應的Continuation都會轉化為一種狀態,協程恢復只是跳轉到下一種狀態,
-
掛起函式將執行程序分為多個 Continuation 片段,并且利用狀態機的方式保證各個片段是順序執行的,所以異步邏輯也可以用順序的代碼來實作,
5.3 協程運行原理
前述相關示例更多是為了驗證分析協程的一些特性,這里從協程的創建、啟動、恢復、執行緒調度,協程切換等詳細決議協程的實作,
5.3.1 協程創建與啟動
首先創建一個協程并啟動,最常見的莫過于CoroutineScope.launch{},其原始碼實作為:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
我們如果不指定start 引數,所以它會使用默認的 CoroutineStart.DEFAULT,最終 coroutine 會得到一個 StandaloneCoroutine,其實作自AbstractCoroutine,實作了Continuation,
前述分析suspend本質時已知,其最侄訓呼叫到createCoroutineUnintercepted,主要是創建了一個新的可掛起計算,通過呼叫resume(Unit)啟動協程,回傳值為Continuation,Continuation提供了resumeWith恢復協程的介面,用以實作協程恢復,Continuation封裝了協程的代碼運行邏輯和恢復介面,
將協程代碼進行反編譯,再看一下其位元組碼和java實作,例如
suspend fun test() {
CoroutineScope(Dispatchers.IO).launch {
delay(11)
}
}
查看其位元組碼實作時,可知其編譯生成內部類,
協程的計算邏輯封裝在invokeSuspend方法中,而SuspendLambda的繼承關系為 ,
SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
其中BaseContinuationImpl 部分關鍵原始碼如下:
internal abstract class BaseContinuationImpl(...) {
// 實作 Continuation 的 resumeWith,并且是 final 的,不可被重寫
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由編譯生成的協程相關類來實作
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
前述的協程示例代碼反編譯為:
public static final Object test(@NotNull Continuation $completion) {
Job var10000 = BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getIO()), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
//掛起標識
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
//設定掛起后恢復,進入的狀態
this.label = 1;
if (DelayKt.delay(11L, this) == var2) {
return var2;
}
break;
case 1:
// 是否需要拋出例外
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}
所以,協程的啟動流程為:resume(Unit)->resumeWith()->invokeSuspend(),
協程的掛起通過suspend掛起函式實作,協程的恢復通過Continuation.resumeWith實作,
5.3.2 協程執行緒調度
協程的執行緒調度是通過攔截器實作的,前面提到了協程啟動呼叫到了startCoroutineCancellable,關于協程調度在前述的協程調度器部分已詳細介紹了,這里再簡單過一下,
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
看一下其intercepted()的具體實作:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
internal abstract class ContinuationImpl(
......
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// context[ContinuationInterceptor] 就是協程的 CoroutineDispatcher
......
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
......
}
intercepted()最侄訓使用協程的CoroutineDispatcher的interceptContinuation方法包裝原來的 Continuation,攔截所有的協程運行操作,
DispatchedContinuation攔截了協程的啟動和恢復,分別是resumeCancellableWith和重寫的resumeWith(Result),
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//判斷是否需要執行緒調度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//將協程運算分發到另一個執行緒
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
//直接在當前執行緒執行協程運算
resumeUndispatchedWith(result)
}
}
}
}
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
//判斷是否需要執行緒調度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
//將協程的運算分發到另一個執行緒
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
//直接在當前執行緒執行協程運算
continuation.resumeWith(result)
}
}
}
}
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask(){
public final override fun run() {
//封裝了 continuation.resume 邏輯
}
......
}
5.3.3 協程掛起與恢復
編譯器會生成繼承自SuspendLambda的子類,協程的真正運算邏輯都在invokeSuspend中,這里我們先再次回到startCoroutineCancellable函式中,
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
看一下其中的resumeCancellableWith方法,
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
這是Continuation的擴展方法,最后都會呼叫到Continuation的resumeWith,這里的Continuation就是前述所說的SuspendLambda,它繼承了 BaseContinuationImpl
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
//執行invokeSuspend內的代碼塊
val outcome = invokeSuspend(param)
//如果代碼塊內執行了掛起方法,協程掛起,resumeWith執行結束,再次呼叫resumeWith時協程掛起點之后的代碼才能繼續執行
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 如果完成的completion也是BaseContinuationImpl,就會進入回圈
current = completion
param = outcome
} else {
// 執行completion resumeWith方法
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
.....
}
下面看一下invokeSuspend的實作邏輯,
fun main(args: Array<String>) {
val coroutineDispatcher = newSingleThreadContext("ctx")
// 啟動協程 1
GlobalScope.launch(coroutineDispatcher) {
println("the first coroutine")
async (Dispatchers.IO) {
println("the second coroutine 11111")
delay(100)
println("the second coroutine 222222")
}.await()
println("the first coroutine end end end")
}
// 保證 main 執行緒存活,確保上面兩個協程運行完成
Thread.sleep(500)
}
前述示例編譯成SuspendLambda子類的invokeSuspend方法為:
public final Object invokeSuspend(@NotNull Object $result) {
//掛起函式回傳標識SUSPEND_FLAG
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var3;
boolean var4;
//label默認初始值為0
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineScope $this$launch = (CoroutineScope)this.L$0;
var3 = "the first coroutine";
var4 = false;
System.out.println(var3);
//新建并啟動 async 協程
Deferred var10000 = BuildersKt.async$default($this$launch, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
//掛起標識
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var2;
boolean var3;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var2 = "the second coroutine 11111";
var3 = false;
System.out.println(var2);
this.label = 1;
//判斷是否執行delay掛起函式
if (DelayKt.delay(100L, this) == var4) {
//掛起,跳出該方法
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
// 恢復協程后再執行一次 resumeWith(),然后無例外的話執行最后的 println()
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var2 = "the second coroutine 222222";
var3 = false;
System.out.println(var2);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null);
//設定掛起后恢復時,進入的狀態
this.label = 1;
//呼叫await()掛起函式
if (var10000.await(this) == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var3 = "the first coroutine end end end";
var4 = false;
System.out.println(var3);
return Unit.INSTANCE;
}
如果async執行緒未執行完成,await()回傳為IntrinsicsKt.getCOROUTINE_SUSPENDED(),就會 return,launch 協程的invokeSuspend方法執行完成,協程所在執行緒繼續往下運行,此時 launch 協程處于掛起狀態,
所以協程的掛起在代碼層面來說就是跳出協程執行的方法體,或者說跳出協程當前狀態機下的對應狀態,然后等待下一個狀態來臨時在進行執行,
關于協程掛起有三點注意事項:
-
啟動其他協程并不會掛起當前協程,所以launch和async啟動執行緒時,除非新協程運行在當前執行緒,則當前協程只能在新協程運行完成后繼續執行,否則當前協程都會馬上繼續運行,
-
協程掛起并不會阻塞執行緒,因為協程掛起時相當于執行完協程的方法,執行緒繼續執行其他之后的邏輯,
-
掛起函式并一定都會掛起協程,例如await()掛起函式如果回傳值不等于IntrinsicsKt.getCOROUTINE_SUSPENDED(),則協程繼續執行掛起點之后邏輯,
看完invokeSuspend,我們再次回到startCoroutineCancellable函式中,其呼叫的createCoroutineUnintercepted 方法中創建的 SuspendLambda 實體是 BaseContinuationImpl 的子類物件,其 completion 引數為下:
launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine
async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine
上面這幾個類都是 AbstractCoroutine 的子類,而根據 completion 的型別會執行不同的邏輯:
BaseContinuationImpl: 執行協程邏輯
其它: 呼叫 resumeWith 方法,處理協程的狀態,協程掛起后的恢復即與它有關
前述的示例中async啟動的協程,也會呼叫其invokeSuspend方法執行async協程,假設async 回傳的結果已經可用時,即非 COROUTINE_SUSPENDED 值,此時 completion 是 DeferredCoroutine 物件,因此就會呼叫 DeferredCoroutine.resumeWith 方法,然后回傳,父協程的恢復邏輯便是在這里,
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
在 makeCompletingOnce 方法中,會根據 state 去處理協程狀態,這里最侄訓走到ResumeAwaitOnCompletion.invoke 來恢復父協程,必要的話還會把 async 的結果給它,
private class ResumeAwaitOnCompletion<T>(
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
// Resume with with the corresponding exception to preserve it
continuation.resumeWithException(state.cause)
} else {
// resume 被掛起的協程
@Suppress("UNCHECKED_CAST")
continuation.resume(state.unboxState() as T)
}
}
}
這里的 continuation 就是 launch 協程體,也就是 SuspendLambda 物件,于是 invoke 方法會再一次呼叫到 BaseContinuationImpl.resumeWith 方法,接著呼叫 SuspendLambda.invokeSuspend, 然后根據 label 取值繼續執行接下來的邏輯!
launch 協程恢復的程序,從 async 協程的SuspendLambda的子類的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 節點里面通過呼叫resume(result)恢復協程,
await()掛起函式恢復協程的原理:
-
將 launch 協程封裝為 ResumeAwaitOnCompletion 作為 handler 節點添加到 aynsc 協程的 state.list
-
然后在 async 協程完成時會通知 handler 節點呼叫 launch 協程的 resume(result) 方法將結果傳給 launch 協程,并恢復 launch 協程繼續執行 await 掛起點之后的邏輯,

5.3.4 協程三層封裝
通過前述的一系列分析可知,協程有三層封裝:
常用的launch和async回傳的Job、Deferred,里面封裝了協程狀態,提供了取消協程介面,而它們的實體都是繼承自AbstractCoroutine,它是協程的第一層包裝,
第二層包裝是編譯器生成的SuspendLambda的子類,封裝了協程的真正運算邏輯,繼承自BaseContinuationImpl,其中completion屬性就是協程的第一層包裝,
第三層包裝是前面分析協程的執行緒調度時提到的DispatchedContinuation,封裝了執行緒調度邏輯,包含了協程的第二層包裝,
協程其實就是一段可以掛起和恢復執行的運算邏輯,而協程的掛起通過掛起函式實作,掛起函式用狀態機的方式用掛起點將協程的運算邏輯拆分成不同的片段,每次運行協程執行不同的邏輯片段,
所以協程有兩個很大的好處:
簡化異步編程,支持異步回傳;
掛起不阻塞執行緒,提供執行緒利用率
六、總結
本文通過為什么使用協程,協程如何創建啟動,協程的調度原理和協程的掛起原理幾個方面對協程進行了初步剖析,下面一起回顧一下全文重點內容,對全文內容進行一個總結
協程引入:
協程可以讓異步代碼同步化,降低程式涉及的復雜度
協程本質是輕量級執行緒,單個執行緒可以運行多個協程,協程的運行不會導致執行緒阻塞
協程啟動:
協程啟動需要三部分:背景關系、啟動模式、協程體,創建協程的方式有runBlocking、launch和async,推薦使用CoroutineScope.launch的方式創建協程,使用async的方式創建并發執行,同步等待獲取回傳值的情況,
Job是launch構建協程回傳的一個協程任務,完成時沒有回傳值,可看成協程物件本身,其提供相關方法可用于觀察協程執行情況,Deferred繼承自Job,是async構建協程回傳的一個協程任務,可通過呼叫await()方法等待執行完成獲取結果,
啟動協程需要作用域,作用域在協程創建程序中產生,常見的協程作用域有GlobalScope、coroutineScope等,協程配合Jetpack Lifecycle相關組件提供的lifecycleScope等作用域進行使用,例外絲滑好用,
協程的啟動模式有DEFAULT、ATOMIC、UNDISPATCHED、LAZY四種,注意不同啟動模式的區別,
如果要在父協程中進行子協程切換操作,可以使用withContext,
協程調度:
協程背景關系是一個元素的集合,其定義是遞回的,自己包含若干個自己,其結構介于set 和 map 之間,
協程實作的本質是回呼,這個回呼即Continuation,協程攔截器的實作就是攔截Continuation,可在此處進行快取、日志列印等攔截處理
調度器即確認相關協程在哪個執行緒上執行,調度的本質是解決掛起恢復后協程邏輯在哪里運行的問題,其繼承自攔截器,
調度器的是實作原理即在協程啟動時通過攔截器進行攔截,回傳一個Continuation,再在協程恢復進行resumeWith操作時,進行執行緒切換判斷和執行緒切換,
協程掛起:
掛起函式是一個可啟動、暫停和恢復的函式,被suspend修飾的函式在協程運行時不是一定會被掛起的,
掛起函式的掛起實作原理就是狀態機的狀態轉移,協程體的執行就是一個狀態機,每遇到一次掛起函式就是一次狀態轉移,而協程的恢復不過是從一種狀態跳轉到下一種狀態,掛起函式將整個執行程序劃分為多個Continuation片段,利用狀態機的方式保證各個片段時順序執行的,從而實作了用順序的代碼實作異步邏輯,
參考資料:
- 【1】破解Kotlin協程
- 【2】Kotlin Jetpack 實戰 | 09.圖解協程原理
- 【3】一文看透 Kotlin 協程本質
- 【4】抽絲剝繭Kotlin - 協程
- 【5】Kotlin協程實作原理
- 【6】kotlin 協程-Android實戰
- 【7】kotlin 協程 官方指導檔案
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/502080.html
標籤:Android
上一篇:如何給玩偶建模并讓它跳個舞?
