主頁 > 移動端開發 > Kotlin:Flow 全面詳細指南,附帶原始碼決議。

Kotlin:Flow 全面詳細指南,附帶原始碼決議。

2021-11-15 11:29:16 移動端開發

文章目錄

    • Flow
    • Flow簡介
    • 如何回傳多個值?
      • For example 1 : list
      • For example 2 : Sequences
      • For example 3 : 異步計算并回傳?引入flow
    • Flow使用
      • Flow 簡單使用
      • Flow 構建
      • Flow 冷流
      • Flow 取消
      • Flow 相關運算子
        • 中間流運算子
          • map
          • filter
        • 變換運算子
        • size限制運算子
        • 終端運算子
          • 轉換為各種集合, toList 和 toSet,
          • first , 確保獲取且進獲取第一個值
          • 使用 reduce 和 fold 將流合并到一個值,
          • onEach
        • 運算子的順序
      • Flow 調度器切換
      • Flow 背壓處理
        • buffer
        • conflate
        • collectLatest
      • Flow 例外處理
        • try...catch
        • catch 相關運算子
      • Flow 啟動
    • Flow原理決議

Flow

來了來了它終于來了,這篇本應在好幾個個月前就需要發布的文章😂,一拖再拖🙄,畢竟對于flow還是的有敬畏之心的,不好好研究一下真心不敢亂寫,有什么問題,歡迎指出,歡迎私信技術交流😋,那么現在就正式進入Flow的世界吧!

Flow簡介

Flow是什么?

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

簡單來說:掛起函式可以異步處理并且回傳單個值,但是如果要回傳多個異步計算的值呢?這就是flow的用處了,下面從如何回傳多個值開始一步一步深入了解flow

如何回傳多個值?

For example 1 : list

我們可以有一個簡單的函式,它回傳一個包含三個數字的串列,然后使用 forEach 將它們全部列印出來:

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}

結果

1
2
3

For example 2 : Sequences

如果我們需要阻塞代碼來計算數字(每次計算需要 100 毫秒),那么我們可以使用序列來表示數字:

fun simple(): Sequence<Int> = sequence { // sequence 構建
    for (i in 1..3) {
        //模擬計算耗時
        Thread.sleep(100)
        println("send$i")
        // yield 下一個值
        yield(i) 
    }
}

fun main() {
    simple().forEach { value -> println("receiver$value") }
    println("end")
}

結果

send1
receiver1
send2
receiver2
send3
receiver3
end

For example 3 : 異步計算并回傳?引入flow

有什么辦法可以異步計算多個值并且回傳的嗎?當然可以,我們可以使用掛起函式,使計算程序在異步執行緒執行,最終以list的形式回傳,

舉個🌰

suspend fun simple(): List<Int> {
	//模擬耗時操作
    delay(1000)
    return listOf(1, 2, 3)
}

fun main()= runBlocking {
    val job = launch {
        simple().forEach { value -> println(value) }
    }
    launch {
        println("other operate")
    }
    job.join()
}

結果:從結果來看,耗時操作并沒有影響主執行緒的運行😎

other operate
1
2
3

但是,這樣就夠了嗎?no no no !🙅?♀?

使用list意味著我們只能一次性的回傳所有的值,所以為了表示流的計算,引入了flow,就像可以使用Sequence型別用于同步計算值一樣,

Flow使用

Flow 簡單使用

上面介紹了flow要解決什么問題,那么我們就開始使用起來吧,

先看一個簡單的🌰

fun simple(): Flow<Int> = flow { // flow 構建
    for (i in 1..3) {
        //模擬異步耗時計算
        delay(100)
        //發射值
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    // launch 一個協程 同時延時100毫秒列印 校驗主執行緒是否阻塞
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            //主執行緒在這個時間段可以干別的事情
            delay(100)
        }
    }
    // collect flow value
    simple().collect { println(it) }
}

結果:通過執行緒列印 I'm not blocked證明異步計算不會阻塞主執行緒,計算成功之后會resume到collect里面繼續執行,

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

通過上述代碼,需要注意一下幾點:🙆?♀?

  • 使用flow代碼塊構建出來的型別為Flow
  • flow代碼塊里面允許寫掛起函式,比如上面的,delay emit
  • 使用emit進行值的發射,使用collect進行值的收集

Flow 構建

除了上面使用的flow{}進行構建之外,還可以使用其他的方式進行構建,

  1. 使用flowOf可以定義一組固定的值

    fun simple(): Flow<Int> = flowOf(1, 2, 3)
    
  2. 可以使用 asFlow() 擴展函式將各種集合和序列轉換為流,

    // 將list轉換為flow
    listOf(1,2,3).asFlow().collect { value -> println(value) }
    

Flow 冷流

Flow是冷流,構建器代碼在呼叫collect之前是不會進行呼叫的,對于多個呼叫者,都會重新走一遍構建器的代碼,

廢話不多說,上🌰

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

結果

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

每次收集流時都會開始,這就是為什么我們再次呼叫 collect 時會看到“Flow started”的原因,

Flow 取消

如何取消一個Flow呢?

Kotlin官方并沒有提供flow取消的函式,啊 這???😕聽到這個是不是還滿疑惑,且聽我細細道來,

Flow需要在協程里面使用,因為collect是掛起函式,另外基于冷流的特性,不呼叫collect構建器的代碼壓根不會走,所以只能是協程,那 我取消協程不就行了嗎?😮,好像之前有看到過有開發者提出過,是否要給flow單獨加一個取消的函式,被Jetbrains無情的拒絕了,哈哈哈哈很搞笑,下面參考Kotlin官方的一段話,

Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay).

adheres 堅持

Flow 堅持協程的一般協作取消, 像往常一樣,當流在可取消的掛起函式(如延遲)中被掛起時,可以取消流收集,

這個adheres好像就像是在回復廣大的開發者,你取消協程就行了😂😂😂,

好了,下面看取消的🌰

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("emit $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    val job = launch {
        simple().collect { println(it) }
    }
    delay(250)
    job.cancel(CancellationException("timeout 250"))
    println("done")
}

結果:看我們只需要取消對應的協程即可,對應的flow也會被取消收集,

emit 1
1
emit 2
2
done

這里引申一點,對于timeout,官方有提供專用的操作函式,withTimeout系列,不需要我們手動delay然后繼續呼叫取消,畢竟不是很優雅,

上述代碼也可以寫成如下的形式

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("emit $i")
        emit(i)
    }
}
fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) {
        simple().collect { value -> println(value) }
    }
    println("done")
}

看到了嗎?直接將launch替換為withTimeoutOrNull就可以做到延時取消效果了,這里簡單做一下原始碼分析,

原始碼
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? {
    ...
    try {
        return suspendCoroutineUninterceptedOrReturn { uCont ->
            val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont)
            coroutine = timeoutCoroutine
            setupTimeout<T?, T?>(timeoutCoroutine, block)
        }
    } catch (e: TimeoutCancellationException) {
       ...
    }
}
private class TimeoutCoroutine<U, in T: U>(
    @JvmField val time: Long,
    uCont: Continuation<U> // unintercepted continuation
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {
    override fun run() {
        cancelCoroutine(TimeoutCancellationException(time, this))
    }
	...
}

private fun <U, T: U> setupTimeout(
    coroutine: TimeoutCoroutine<U, T>,
    block: suspend CoroutineScope.() -> T
): Any? {
    // schedule cancellation of this coroutine on time
    coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
    ...
}
    
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
        DefaultDelay.invokeOnTimeout(timeMillis, block, context)

省略了一大堆非核心的代碼,我們直接看延時取消的操作,這里簡單分析一下:

  • 創建TimeoutCoroutine物件,它同時實作了Runnable,在run里面呼叫了取消函式,拋出TimeoutCancellationException,
  • 呼叫當前context的invokeOnTimeout函式,該函式需要一個Runnable,傳入了timeoutCoroutine,此實作使用內置的單執行緒調度執行器服務,會在延時對應的事件后呼叫Runnable的run函式,然后就會取消當前的協程,
  • 在取消協程之后,會取消掉當前背景關系的所有將在完成時呼叫的回呼,disposeOnCompletion函式被呼叫,

Flow 相關運算子

這一塊的運算子,其實是比較多的,但是如果您熟悉RxJava的話,其實都是差不多的,這一塊這里就不做原始碼分析了,只是看一下怎么使用即可,內部其實是創建了新的流回傳出來了,有興趣的話可以自行查看一下原始碼,

中間流運算子

map

映射,看🌰(傳入請求流可以使用 map 運算子映射到結果,即使執行請求是由掛起函式實作的長時間運行的操作)

          
suspend fun performRequest(request: Int): String {
    delay(1000) // 模擬長時間的異步作業
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // 轉換為flow
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

結果

response 1
response 2
response 3
filter

過濾操作,看🌰

suspend fun performRequest(request: Int): String {
    delay(1000) // 模擬長時間的異步作業
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // 轉換為flow
        .map { request -> performRequest(request) }
        .filter { it == "response 1" }
        .collect { response -> println(response) }
}

結果:僅回傳匹配到的值

response 1

變換運算子

在流變換算子中,最通用的一種叫做變換, 它可以用來模仿簡單的轉換,比如 map 和 filter,也可以實作更復雜的轉換, 使用transform,我們可以發出任意次數的任意值,

例如🌰,使用transform,我們可以在執行長時間運行的異步請求之前發出一個字串,并在其后回應:

(1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }

結果

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

size限制運算子

顧名思義,限制收集的數量,使用運算子take

它會在判斷當發射的值在達到相應限制時取消流程的執行, 因為協程中的取消總是通過拋出例外來執行,所以需要考慮進行相應的例外捕獲來保證后續的流暢正常進行不被取消掉

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("not execute")
        emit(3)    
    } finally {
        println("finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take 兩個值
        .collect { value -> println(value) }
}            

結果

1
2
finally in numbers

終端運算子

終端運算子可以啟動一個流,最基礎的就是上述常提到的collect,但是還有一些其他的終端運算子,它可能會讓一些操作變得更簡單:

轉換為各種集合, toList 和 toSet,
    flowOf(1,2).toList().forEach { 
        println(it)
    }
    flowOf(1,2).toSet().forEach(::println)
first , 確保獲取且進獲取第一個值

回傳流發出的第一個元素然后取消流的集合的終端運算子, 如果流為空,則拋出 NoSuchElementException,

    val value : Int = flowOf(1, 2).first()
    println(value)
使用 reduce 和 fold 將流合并到一個值,
    val sum = (1..5).asFlow()
        .map { it * it } //平方
        .reduce { a, b -> a + b } // 進行累加
    println(sum)
    //結果
    55

fold和reduce使用起來差不多,區別就是fold可以定義初始化,其實很簡單,reduce傳入的lambda前一個引數是每次計算的結果累計,后一個引數是當前需要傳入的值,不明白可以去瞅一眼原始碼,這里不在引申,

onEach

這個運算子也較為常用,這里也介紹一下,回傳在上游流的每個值向下游發出之前呼叫給定操作的流,

🌰

(1..5)
.asFlow()
.onEach {
	println("onEach$it")
}.collect()
//結果
onEach1
onEach2
onEach3
onEach4
onEach5

運算子的順序

除非使用對多個流進行操作的特殊運算子,否則流的每個單獨集合都按順序執行, 該集合直接在呼叫終端運算子的協程中作業, 默認情況下不會啟動新的協程, 每個發出的值都由從上游到下游的所有中間運算子處理,然后交付給終端運算子,

🌰

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }    
//結果  按照順序沒有值依次向下發射
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow 調度器切換

對于UI驅動型的程式來說,需要將長時間計算的任務放在異步執行緒處理,UI展示作業需要放在主執行緒處理,也就是說需要將構建器的代碼放到異步執行緒執行,但是終端運算子,比如collect需要在主執行緒獲取,那么怎么做呢?

使用flowOn運算子

在這之前您可能的了解一下,協程的調度器,可以簡單參考之前寫的一篇文章,有對調度器做簡單介紹:https://blog.csdn.net/weixin_44235109/article/details/119981210

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模擬異步處理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)//使用flowOn傳入Default的調度器
        .collect { value ->
            log("Collected $value")
        }
}

結果

16:39:21:954 [DefaultDispatcher-worker-1] Emitting 1
16:39:21:969 [main] Collected 1
16:39:22:071 [DefaultDispatcher-worker-1] Emitting 2
16:39:22:071 [main] Collected 2
16:39:22:178 [DefaultDispatcher-worker-1] Emitting 3
16:39:22:178 [main] Collected 3

可以很明顯的看出,構建模塊被調度到異步執行緒處理了,而收集的作業還在主執行緒進行,

flowOn負責構建的模塊調度,那么收集的誰負責呢?

其實和例外處理類似,collect受呼叫它的協程背景關系限制,所以最后的執行執行緒和當前協程背景關系的調度器有關,目前我使用的是idea測驗的,默認runBlocking的調度器就是主執行緒,如果是android上面的話,runBlocking可能就需要傳入Dispatchers.Main了,

其實和RxJava還是非常相似的😂,

注意一點,此時其實已經改變流執行的順序了

官方的解釋如下:

Another thing to observe here is that the flowOn operator has changed the default sequential nature of the flow. Now collection happens in one coroutine (“coroutine#1”) and emission happens in another coroutine (“coroutine#2”) that is running in another thread concurrently with the collecting coroutine. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.

這里要注意的另一件事是 flowOn 運算子更改了流的默認順序性質, 現在收集發生在一個協程(“coroutine#1”)中,發射發生在另一個協程(“coroutine#2”)中,該協程與收集協程同時運行在另一個執行緒中, 當必須在其背景關系中更改 CoroutineDispatcher 時, flowOn 運算子為上游流創建另一個協程,

這一塊我簡單看了一下原始碼,這里面不同的調度器會遇到多執行緒的問題,最里面使用了channel進行了調度處理,具體的核心類是ChannelFlow,后面會對flow進行簡單原始碼分析,但篇幅有限不對這一塊過深入分析,感興趣可以自行查看,或找博主私下探討,

其實上面的看不出來會改變流執行的順序,下面改變一下代碼,驗證一下看看🌰

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模擬異步處理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .collect { value ->
            delay(200)
            log("Collected $value")
        }
}

結果

16:52:33:258 [DefaultDispatcher-worker-1] Emitting 1
16:52:33:386 [DefaultDispatcher-worker-1] Emitting 2
16:52:33:482 [main] Collected 1
16:52:33:493 [DefaultDispatcher-worker-1] Emitting 3
16:52:33:684 [main] Collected 2
16:52:33:887 [main] Collected 3

我們只需要將,collect里面增加一個delay即可,發現其實這時候就是發射歸發射,收集歸收集了,不似上面我們寫的程式,發射一個值只有到終端運算子之后才會發射第二個,這里面肯定就會對值進行快取,那么就會牽扯到一個問題,老生常談🙆?♀?,背壓處理

Flow 背壓處理

對于背壓處理,Kotlin 提供三種解決方案:

運算子含義
buffer指定固定容量快取
conflate保留最新的值
collectLatest新值發送時取消之前的

buffer

這里有必要看一下buffer函式的原始碼定義

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND)

可以看出需要兩個引數,都有默認值,

  • 第一個好理解容量,默認等于BUFFERED,這個值其實時64,可以按照使用時需要自己指定具體的數字,
  • 第二個是指定,當buffer溢位時的操作,默認的操作是掛起,還要兩個操作分別是洗掉最舊的值不要掛起或者洗掉當前最新的值不要掛起,可以自行查看原始碼,這里不再引申,

使用🌰

    flow {
        for (i in 1..3) {
            //模擬異步處理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .buffer()
        .collect { value ->
            delay(200)
            log("Collected $value")
        }
//結果       
17:17:28:420 [DefaultDispatcher-worker-1] Emitting 1
17:17:28:536 [DefaultDispatcher-worker-1] Emitting 2
17:17:28:646 [main] Collected 1
17:17:28:646 [DefaultDispatcher-worker-1] Emitting 3
17:17:28:846 [main] Collected 2
17:17:29:049 [main] Collected 3

conflate

這個只獲取最新值也比較好理解,應用場景,比如說獲取下載進度,對于用戶來說其實每次只需要獲取當前最新的進度就好了,不需要把之前的值再去獲取一遍,下面也舉一個例子🌰

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模擬異步處理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .conflate()
        .collect { value ->
            delay(300)//模擬下游處理比較慢
            log("Collected $value")
        }
}  
//結果   第一個值肯定可以拿到 當地一個值處理完成之后  最新的值就是3了 所以丟棄了2
17:21:42:916 [DefaultDispatcher-worker-1] Emitting 1
17:21:43:034 [DefaultDispatcher-worker-1] Emitting 2
17:21:43:140 [DefaultDispatcher-worker-1] Emitting 3
17:21:43:236 [main] Collected 1
17:21:43:546 [main] Collected 3

collectLatest

說明一點:這玩意其實也是一個終端運算子

前兩個可能都比較好理解,那新值發送時取消之前的是什么意思呢?為了便于理解,直接上例子,按照結果說明:

🌰

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模擬異步處理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .collectLatest {
            delay(300)
            log("Collected $it")
        }
}
//結果
17:25:33:916 [DefaultDispatcher-worker-1] Emitting 1
17:25:34:038 [DefaultDispatcher-worker-1] Emitting 2
17:25:34:150 [DefaultDispatcher-worker-1] Emitting 3
17:25:34:453 [main] Collected 3

對比上面的例子,這里只是將collect替換成了collectLatest而已,為什么1沒有了呢?

顯而易見了,這玩意會在最新的到來會直接取消下游上一個消費的處理,因為有delay所以1還沒有來得及列印,就因為下一個值發射了,然后就被取消了!!!您可真霸道呢?🙆?♀?

Flow 例外處理

當運算子內的發射器或代碼拋出例外時,流收集可以以例外結束, 有幾種方法可以處理這些例外,

try…catch

較為簡單,上🌰

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}
//結果  
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->
            println(value)
            if (value>1){
                throw IllegalStateException("exception value is $value")
            }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

顯而易見,拋出例外之后,收集結束,如果是UI驅動程式比如:Android還是推薦主從作用域,例外不會向上傳播,

思考一個問題,剛剛例外是發生在收集端,如果在構建的時候發生例外呢?

fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
        .map { value ->
            if (value>1){
                throw IllegalStateException("exception value is $value")
            }
            "string $value"
        }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}
//結果
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: exception value is 2

完美:例外仍被捕獲并停止收集

上述代碼的問題就是不夠優雅,還有例外對于流來說必須是透明的,使用try … catch 顯然違反了透明性,所以Kotlin 封裝了try catch.

catch 相關運算子

catch 運算子的主體可以分析例外并根據捕獲的例外以不同的方式對其做出反應:

  • 可以使用 throw 重新拋出例外,
  • 可以使用 catch 主體中的發射將例外轉換為值的發射,
  • 例外可以被其他代碼忽略、記錄或處理,

看🌰

simple()
    .catch { e -> emit("Caught $e") } // 我不僅捕獲到了例外,我還能繼續發射!!!!
    .collect { value -> println(value) }

結果

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: exception value is 2

當然因為拋出了例外,協程還是會終止,但是此時例外是以發生的形式傳遞下去的,

注意:catch 中間運算子,尊重例外透明性,只捕獲上游例外(即來自 catch 之上的所有運算子的例外,但不在其之下), 如果 collect { … } 中的塊(位于 catch 下方)拋出例外,則它不會捕獲:

🌰

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // 不會捕獲到下游的例外
        .collect { value ->
            if(value <= 1) throw IllegalStateException("Collected $value")
            println(value)
        }
}
//
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at org.example.zxf.kotlin11.flow.TestFlowKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)

這個時候可以利用onEach將需要可能捕獲例外的地方前置,如下所示,

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()

onCompletion 相關操作

我們知道和try … catch 搭配的 還有一個finally,所以 flow當然也有一個差不多的了,叫onCompletion

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}
//結果
Emitting 1
1
Emitting 2
2
Emitting 3
3
Done

拋個例外試試,onCompletion可以通過cause進行判斷是否是正常結束.

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
    emit(2)
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause ->
            if (cause != null) {
                println("Flow completed exceptionally")
            }
        }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}
//結果
1
Flow completed exceptionally
Caught exception

與 catch 運算子的另一個區別是 onCompletion 會看到所有例外,并且僅在成功完成上游流(沒有取消或失敗)時才會收到空例外,

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            if (value>1){
                throw IllegalStateException("exception value is $value")
            }
            println(value)
        }
}
//結果
1
Flow completed with java.lang.IllegalStateException: exception value is 2
Exception in thread "main" java.lang.IllegalStateException: exception value is 2

注意一點:雖然看到了,但是并沒有進行捕獲,例外還是拋出了

Flow 啟動

最后提一點和啟動相關的,先看下面兩個例子:

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- collect 掛起函式阻塞
    println("Done")
}   
//結果
Event: 1
Event: 2
Event: 3
Done

對于上述,沒有異議吧,Done的列印需要等待collect恢復,因為在一個協程內,如果需要不等待呢?那就需要另起一個協程了

🌰

fun main() = runBlocking<Unit> {
    launch {
        events()
            .onEach { event -> println("Event: $event") }
            .collect() // <--- collect 掛起函式阻塞
    }
    println("Done")
}
//結果
Done
Event: 1
Event: 2
Event: 3

但是上述寫法可以等價于,下面的寫法,flow這玩意封裝了另一種啟動方式

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this)
    println("Done")
}
//結果
Done
Event: 1
Event: 2
Event: 3

看一下launchIn原始碼!

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

好吧,其實不就是封裝了一下嘛,主要是原始碼好多地方使用launchIn來著,所以這里提一下🙆?♀?,

Flow原理決議

好吧,激動人心的時刻終于來了,做一些原始碼分析,

先寫一個小🌰

fun main() = runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
    }.collect {
        println(it)
    }
}

好了,我們就分析這玩意怎么實作的!往深篇幅有限寫不下了🙄,如果您想深入交流,歡迎私信交流,

所以,我先列出來flow函式構建的原始碼,只貼出核心代碼:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

可以看出flow構建了一個SafeFlowblock傳入,并且重寫了collectSafely方法,呼叫了block(),這里需要注意一點SafeFlow繼承自AbstractFlow.

好,我們在看一呼叫collect之后發生了什么,

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }	
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

看的出,這時將collect的block封裝成了FlowCollector型別(重寫emit方法用于執行block),此時會呼叫到SafeFlowcollect方法,具體在AbstractFlow里面實作,這里面干了什么呢?看的出,創建了SafeCollector型別的東西,然后將SafeCollector傳遞給了collectSafely,而在上面的分析中,我們知道collectSafely在構建flow時進行了實作,SafeCollector作為receiver以擴展函式的形式呼叫了,flow構建器的block(這里面執行了,我們手動的emit操作),這么一看是不是有點聯系起來了😎

好的,接下來我們繼續分析一下,SafeCollector,那這玩意的原始碼其實也是有點多的,我們應該看哪一個函式呢?通過上面的分析可知,呼叫collect之后,會呼叫到SafeFlowcollect方法,進而會以擴展函式的形式呼叫到flow的block,而在block里面就是我們自己寫的emit了,所以receiver既然是SafeCollector,那肯定就是呼叫SafeCollector的emit了,我們去瞅一瞅SafeCollector的emit函式相關的呼叫鏈:👀

	override suspend fun emit(value: T) {
        	...
            try {
                emit(uCont, value)
            } catch (e: Throwable) {
                ...
            }
    }
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        ...
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }
    private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
    

好了,經過簡化之后,就是上面這些玩意兒了,

首先,需要分清幾個東西,

collectorthis的區別

  • collector:還記得上面的原始碼嗎?我們呼叫collect的block封裝到了一個FlowCollector,然后傳遞給了SafeCollector,collector就指FlowCollector,這里面重寫了emit,emit里面呼叫了block,這個block就是我們自己寫的了,這個時間簡單理解呼叫collector的emit就可以呼叫到collect的block了,
  • this:理解了collector,this就指代當前的SafeCollector了,

現在我們繼續看上面的代碼:額 此時好像還需要理解一個東西emitFun

我們把它翻譯成java看一下

   private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() {
      // $FF: synthetic method
      // $FF: bridge method
      public Object invoke(Object var1, Object var2, Object var3) {
         return this.invoke((FlowCollector)var1, var2, (Continuation)var3);
      }

      @Nullable
      public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
         return p1.emit(p2, continuation);
      }
   }, 3);

簡單明了,就是呼叫了collector的emit函式,但是你可能會說,emit不是支持一個引數嗎?上面傳進去倆啊,但是你別忘了,emit是掛起函式,默認有一個引數的傳遞就是continuation,這一塊Kotlin替我們做了,但是對于Java來說必須要傳遞continuation進去,通過這種方式傳入一樣的continuation,保證了continuation的統一,

好了,總結一下:

分析下來,簡單就是,利用擴展函式的性質,呼叫到flow的block進而呼叫了SafeCollector的emit,而這里的emit會呼叫到傳進來的FlowCollector的emit,而傳進來的emit函式被重寫呼叫block,所以就會呼叫到collect的block了,

另外因為,只有呼叫collect之后,進而才會呼叫到SafeFlow的collect函式,進而才會呼叫到collectSafely函式去執行flow的代碼,所以不呼叫collect的話,flow的代碼構建塊的代碼是不會執行的,最多就是回傳一個SafeFlow的物件而已

對于各種運算子的原始碼,這里不帶著大家看了,不是很難,里面就是饒了一圈,又回傳了流而已,可以自己查看,

另外對于flowOn或者其他運算子,導致調度器不一致時,此時底層將不再上上面這一套的邏輯,內部是使用ChannelFlow實作,這一塊感興趣的也可以瞅瞅原始碼,也可以與博主私信交流,里面還是比較繞,,,

還有還有補充一點,對于ShareFlowStateFlow,博主目前也在積極的籌劃中,想了解的可以關注一波哈,如果您看到這里的話,覺得不錯,希望您可以毫不吝嗇手中的贊👍,鼓勵一下博主,感謝!
在這里插入圖片描述

轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/357160.html

標籤:其他

上一篇:碎片(Fragment)的基本使用

下一篇:Xcode快捷鍵

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【從零開始擼一個App】Dagger2

    Dagger2是一個IOC框架,一般用于Android平臺,第一次接觸的朋友,一定會被搞得暈頭轉向。它延續了Java平臺Spring框架代碼碎片化,注解滿天飛的傳統。嘗試將各處代碼片段串聯起來,理清思緒,真不是件容易的事。更不用說還有各版本細微的差別。 與Spring不同的是,Spring是通過反射 ......

    uj5u.com 2020-09-10 06:57:59 more
  • Flutter Weekly Issue 66

    新聞 Flutter 季度調研結果分享 教程 Flutter+FaaS一體化任務編排的思考與設計 詳解Dart中如何通過注解生成代碼 GitHub 用對了嗎?Flutter 團隊分享如何管理大型開源專案 插件 flutter-bubble-tab-indicator A Flutter librar ......

    uj5u.com 2020-09-10 06:58:52 more
  • Proguard 常用規則

    介紹 Proguard 入口,如何查看輸出,如何使用 keep 設定入口以及使用實體,如何配置壓縮,混淆,校驗等規則。

    ......

    uj5u.com 2020-09-10 06:59:00 more
  • Android 開發技術周報 Issue#292

    新聞 Android即將獲得類AirDrop功能:可向附近設備快速分享檔案 谷歌為安卓檔案管理應用引入可安全隱藏資料的Safe Folder功能 Android TV新主界面將顯示電影、電視節目和應用推薦內容 泄露的Android檔案暗示了傳說中的谷歌Pixel 5a與折疊屏新機 谷歌發布Andro ......

    uj5u.com 2020-09-10 07:00:37 more
  • AutoFitTextureView Error inflating class

    報錯: Binary XML file line #0: Binary XML file line #0: Error inflating class xxx.AutoFitTextureView 解決: <com.example.testy2.AutoFitTextureView android: ......

    uj5u.com 2020-09-10 07:00:41 more
  • 根據Uri,Cursor沒有獲取到對應的屬性

    Android: 背景:呼叫攝像頭,拍攝視頻,指定保存的地址,但是回傳的Cursor檔案,只有名稱和大小的屬性,沒有其他諸如時長,連ID屬性都沒有 使用 cursor.getInt(cursor.getColumnIndexOrThrow(MediaStore.Video.Media.DURATIO ......

    uj5u.com 2020-09-10 07:00:44 more
  • Android連載29-持久化技術

    一、持久化技術 我們平時所使用的APP產生的資料,在記憶體中都是瞬時的,會隨著斷電、關機等丟失資料,因此android系統采用了持久化技術,用于存盤這些“瞬時”資料 持久化技術包括:檔案存盤、SharedPreference存盤以及資料庫存盤,還有更復雜的SD卡記憶體儲。 二、檔案存盤 最基本存盤方式, ......

    uj5u.com 2020-09-10 07:00:47 more
  • Android Camera2Video整合到自己專案里

    背景: Android專案里呼叫攝像頭拍攝視頻,原本使用的 MediaStore.ACTION_VIDEO_CAPTURE, 后來因專案需要,改成了camera2 1.Camera2Video 官方demo有點問題,下載后,不能直接整合到專案 問題1.多次拍攝視頻崩潰 問題2.雙擊record按鈕, ......

    uj5u.com 2020-09-10 07:00:50 more
  • Android 開發技術周報 Issue#293

    新聞 谷歌為Android TV開發者提供多種新功能 Android 11將自動填表功能整合到鍵盤輸入建議中 谷歌宣布Android Auto即將支持更多的導航和數字停車應用 谷歌Pixel 5只有XL版本 搭載驍龍765G且將比Pixel 4更便宜 [圖]Wear OS將迎來重磅更新:應用啟動時間 ......

    uj5u.com 2020-09-10 07:01:38 more
  • 海豚星空掃碼投屏 Android 接收端 SDK 集成 六步驟

    掃碼投屏,開放網路,獨占設備,不需要額外下載軟體,微信掃碼,發現設備。支持標準DLNA協議,支持倍速播放。視頻,音頻,圖片投屏。好點意思。還支持自定義基于 DLNA 擴展的操作動作。好像要收費,沒體驗。 這里簡單記錄一下集成程序。 一 跟目錄的build.gradle添加私有mevan倉庫 mave ......

    uj5u.com 2020-09-10 07:01:43 more
最新发布
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:40:31 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:40:11 more
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:39:36 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:39:13 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:16:23 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:16:15 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:15:46 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:14:53 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:14:08 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:08:34 more