簡介
這篇文章將從原始碼的角度,分析攜程的執行流程,我們創建一個攜程,系統是怎么進行調度的,什么時候執行的,是否需要創建新執行緒等等,帶著這些疑問,一起往下看吧,
例子先行
fun main(): Unit = runBlocking {
launch {
println("${treadName()}======1")
}
GlobalScope.launch {
println("${treadName()}======3")
}
launch {
println("${treadName()}======2")
}
println("${treadName()}======4")
Thread.sleep(2000)
}
輸出如下:
DefaultDispatcher-worker-1======3
main======4
main======1
main======2
Process finished with exit code 0
根據列印,如果根據單執行緒執行流程來看,是不是感覺上面的日志列印順序有點不好理解,下面我們就逐步來進行分解,
-
runBlocking攜程體
這里將其它代碼省略到了,我這里都是按照一條簡單的執行流程進行講解,public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { val eventLoop: EventLoop? val newContext: CoroutineContext ... if (contextInterceptor == null) { eventLoop = ThreadLocalEventLoop.eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } ... val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() }看一下
eventLoop的初始化,會 在當前執行緒(主執行緒)創建BlockingEventLoop物件,internal val eventLoop: EventLoop get() = ref.get() ?: createEventLoop().also { ref.set(it) } internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())看一下
newContext初始化,這里會對攜程背景關系進行組合,回傳新的背景關系,最后回傳的是一個BlockingEventLoop物件,public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }開始對攜程進行調度
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)看一下執行這句代碼之前,各變數的值
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-w4u0tSiL-1631697299327)(//upload-images.jianshu.io/upload_images/3341206-0d228d7431b736a4.png?imageMogr2/auto-orient/strip|imageView2/2/w/566/format/webp)]
而上面的代碼最終呼叫的是
CoroutineStart.DEFAULT的invoke方法,public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily }我們使用的是
DEFAULT啟動模式,然后會執行resumeCancellableWith方法,inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } }dispatcher是BlockingEventLoop物件,沒有重寫isDispatchNeeded,默認回傳true,然后呼叫dispatch繼續進行分發,BlockingEventLoop繼承了EventLoopImplBase并呼叫其dispatch方法,把任務加入到佇列中,public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)回到最開始,在
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)執行完,還執行了coroutine.joinBlocking()看一下實作,fun joinBlocking(): T { registerTimeLoopThread() try { eventLoop?.incrementUseCount() try { while (true) { @Suppress("DEPRECATION") if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break parkNanos(this, parkNanos) } } finally { // paranoia eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() } // now return result val state = this.state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T }執行
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE,取出任務進行執行,也就是runBlocking攜程體, -
launch {}執行流程public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }因為
launch是直接在runBlocking(父攜程體)里新的創建的子攜程體,所以執行流程上和之前將的差不多,只不過不會像runBlocking再去創建BlockingEventLoop物件,而是直接用runBlocking(父攜程體)的,然后把任務加到里面,所以通過這種方式其實就是單執行緒對任務的調度而已,所以在runBlocking(父攜程體)內通過launch啟動再多的攜程體,其實都是在同一執行緒,按照任務佇列的順序執行的,
根據上面日志輸出,并沒有先執行兩個
launch攜程體,這是為什么呢,根據上面的講解,應用知道,runBlocking(父攜程體)是第一被添加的佇列的任務,其次是launch,所以是這樣的順序,那可以讓launch立即執行嗎?答案是可以的,這就要說攜程的啟動模式了,
-
CoroutineStart 是協程的啟動模式,存在以下4種模式:- DEFAULT 立即調度,可以在執行前被取消
- LAZY 需要時才啟動,需要start、join等函式觸發才可進行調度
- ATOMIC 立即調度,協程肯定會執行,執行前不可以被取消
- UNDISPATCHED 立即在當前執行緒執行,直到遇到第一個掛起點(可能切執行緒)
我們使用
UNDISPATCHED就可以使攜程體馬上在當前執行緒執行,看一下是怎么實現的,看一下實作:
使用這種啟動模式執行UNDISPATCHED -> block.startCoroutineUndispatched(completion)方法,
internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
startDirect(completion) { actualCompletion ->
withCoroutineContext(completion.context, null) {
startCoroutineUninterceptedOrReturn(actualCompletion)
}
}
}
大家可以自己點擊去看一下,大概就是會立即執行攜程體,而不是將任務放入佇列,
但是
GlobalScope.launch卻不是按照這樣的邏輯,這是因為GlobalScope.launch啟動的全域攜程,是一個獨立的攜程體了,并不是runBlocking(父攜程體)子攜程,看一下通過GlobalScope.launch有什么不同,
-
GlobalScope.launch執行流程- 啟動全域攜程
GlobalScope.launch ```kotlin `newCoroutineContext(context)`回傳`Dispatchers.Default`物件,而DefaultScheduler繼承了ExperimentalCoroutineDispatcher類,看一下`ExperimentalCoroutineDispatcher`中的`dispatch`代碼:override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
…
coroutineScheduler.dispatch(block)
…看一下`coroutineScheduler`初始化 ```kotlin private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)CoroutineScheduler實作了Executor介面,里面還有兩個全域佇列和執行緒池相關的引數,@JvmField val globalCpuQueue = GlobalQueue() @JvmField val globalBlockingQueue = GlobalQueue()繼續呼叫
CoroutineScheduler中的dispatch方法fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) } }-
val task = createTask(block, taskContext)包裝成TaskImpl物件, -
val currentWorker = currentWorker()當前是主執行緒,運行程式時由行程創建,肯定不是Worker物件,Worker是一個繼承了Thread的類 ,并且在初始化時都指定為守護執行緒,Worker存在5種狀態: CPU_ACQUIRED 獲取到cpu權限 BLOCKING 正在執行IO阻塞任務 PARKING 已處理完所有任務,執行緒掛起 DORMANT 初始態 TERMINATED 終止態
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)由于currentWorker是null,直接回傳task物件,addToGlobalQueue(notAdded)根據任務是否是阻塞任務,將task添加到全域任務佇列中,這里被添加到globalCpuQueue中,- 執行
signalCpuWork()來喚醒一個執行緒或者啟動一個新的執行緒,
fun signalCpuWork() {
if (tryUnpark()) return
if (tryCreateWorker()) return
tryUnpark()
}
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
val created = createdWorkers(state)// 創建的的執行緒總數
val blocking = blockingTasks(state)// 處理阻塞任務的執行緒數量
val cpuWorkers = (created - blocking).coerceAtLeast(0)//得到非阻塞任務的執行緒數量
if (cpuWorkers < corePoolSize) {// 小于核心執行緒數量,進行執行緒的創建
val newCpuWorkers = createNewWorker()
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()// 當前非阻塞型執行緒數量為1,同時核心執行緒數量大于1時,再進行一個執行緒的創建,
if (newCpuWorkers > 0) return true
}
return false
}
// 創建執行緒
private fun createNewWorker(): Int {
synchronized(workers) {
...
val created = createdWorkers(state)// 創建的的執行緒總數
val blocking = blockingTasks(state)// 阻塞的執行緒數量
val cpuWorkers = (created - blocking).coerceAtLeast(0) // 得到非阻塞執行緒數量
if (cpuWorkers >= corePoolSize) return 0//超過最大核心執行緒數,不能進行新執行緒創建
if (created >= maxPoolSize) return 0// 超過最大執行緒數限制,不能進行新執行緒創建
...
val worker = Worker(newIndex)
workers[newIndex] = worker
require(newIndex == incrementCreatedWorkers())
worker.start()// 執行緒啟動
return cpuWorkers + 1
}
}
那么這里面的任務又是怎么調度的呢,當全域任務被執行的時候,看一下Worker中的run方法:
override fun run() = runWorker()
執行runWorker方法,該方法會從佇列中找到執行任務,然后開始執行,詳細代碼,可以自行翻閱,
所以
GlobalScope.launch使用的就是執行緒池,沒有所謂的性能好,
Dispatchers調度器
Dispatchers是協程中提供的執行緒調度器,用來切換執行緒,指定協程所運行的執行緒,,上面用的是默認調度器Dispatchers.Default,
Dispatchers中提供了4種型別調度器:
Default 默認調度器:適合CPU密集型任務調度器 比如邏輯計算;
Main UI調度器
Unconfined 無限制調度器:對協程執行的執行緒不做限制,協程恢復時可以在任意執行緒;
IO調度器:適合IO密集型任務調度器 比如讀寫檔案,網路請求等,
最后
由于水平有限,**有錯誤的地方在所難免,未免誤導他人,歡迎大佬指正!**碼字不易,感謝大家的點贊關注!🙏有一起學習的小伙伴可以關注下我的公眾號——【??程式猿養成中心??】每周會定期做關于Android的技術分享,
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/300502.html
標籤:其他
