文章目錄
- Flow
- Flow簡介
- 如何回傳多個值?
- For example 1 : list
- For example 2 : Sequences
- For example 3 : 異步計算并回傳?引入flow
- Flow使用
- Flow 簡單使用
- Flow 構建
- Flow 冷流
- Flow 取消
- Flow 相關運算子
- 中間流運算子
- map
- filter
- 變換運算子
- size限制運算子
- 終端運算子
- 轉換為各種集合, toList 和 toSet,
- first , 確保獲取且進獲取第一個值
- 使用 reduce 和 fold 將流合并到一個值,
- onEach
- 運算子的順序
- Flow 調度器切換
- Flow 背壓處理
- buffer
- conflate
- collectLatest
- Flow 例外處理
- try...catch
- catch 相關運算子
- Flow 啟動
- Flow原理決議
Flow
來了來了它終于來了,這篇本應在好幾個個月前就需要發布的文章😂,一拖再拖🙄,畢竟對于flow還是的有敬畏之心的,不好好研究一下真心不敢亂寫,有什么問題,歡迎指出,歡迎私信技術交流😋,那么現在就正式進入Flow的世界吧!
Flow簡介
Flow是什么?
A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.
簡單來說:掛起函式可以異步處理并且回傳單個值,但是如果要回傳多個異步計算的值呢?這就是flow的用處了,下面從如何回傳多個值開始一步一步深入了解flow,
如何回傳多個值?
For example 1 : list
我們可以有一個簡單的函式,它回傳一個包含三個數字的串列,然后使用 forEach 將它們全部列印出來:
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
結果
1
2
3
For example 2 : Sequences
如果我們需要阻塞代碼來計算數字(每次計算需要 100 毫秒),那么我們可以使用序列來表示數字:
fun simple(): Sequence<Int> = sequence { // sequence 構建
for (i in 1..3) {
//模擬計算耗時
Thread.sleep(100)
println("send$i")
// yield 下一個值
yield(i)
}
}
fun main() {
simple().forEach { value -> println("receiver$value") }
println("end")
}
結果
send1
receiver1
send2
receiver2
send3
receiver3
end
For example 3 : 異步計算并回傳?引入flow
有什么辦法可以異步計算多個值并且回傳的嗎?當然可以,我們可以使用掛起函式,使計算程序在異步執行緒執行,最終以list的形式回傳,
舉個🌰
suspend fun simple(): List<Int> {
//模擬耗時操作
delay(1000)
return listOf(1, 2, 3)
}
fun main()= runBlocking {
val job = launch {
simple().forEach { value -> println(value) }
}
launch {
println("other operate")
}
job.join()
}
結果:從結果來看,耗時操作并沒有影響主執行緒的運行😎
other operate
1
2
3
但是,這樣就夠了嗎?no no no !🙅?♀?
使用list意味著我們只能一次性的回傳所有的值,所以為了表示流的計算,引入了flow,就像可以使用Sequence型別用于同步計算值一樣,
Flow使用
Flow 簡單使用
上面介紹了flow要解決什么問題,那么我們就開始使用起來吧,
先看一個簡單的🌰
fun simple(): Flow<Int> = flow { // flow 構建
for (i in 1..3) {
//模擬異步耗時計算
delay(100)
//發射值
emit(i)
}
}
fun main() = runBlocking<Unit> {
// launch 一個協程 同時延時100毫秒列印 校驗主執行緒是否阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
//主執行緒在這個時間段可以干別的事情
delay(100)
}
}
// collect flow value
simple().collect { println(it) }
}
結果:通過執行緒列印 I'm not blocked證明異步計算不會阻塞主執行緒,計算成功之后會resume到collect里面繼續執行,
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
通過上述代碼,需要注意一下幾點:🙆?♀?
- 使用
flow代碼塊構建出來的型別為Flow flow代碼塊里面允許寫掛起函式,比如上面的,delayemit,- 使用
emit進行值的發射,使用collect進行值的收集
Flow 構建
除了上面使用的flow{}進行構建之外,還可以使用其他的方式進行構建,
-
使用
flowOf可以定義一組固定的值fun simple(): Flow<Int> = flowOf(1, 2, 3) -
可以使用
asFlow()擴展函式將各種集合和序列轉換為流,// 將list轉換為flow listOf(1,2,3).asFlow().collect { value -> println(value) }
Flow 冷流
Flow是冷流,構建器代碼在呼叫collect之前是不會進行呼叫的,對于多個呼叫者,都會重新走一遍構建器的代碼,
廢話不多說,上🌰
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
結果
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
每次收集流時都會開始,這就是為什么我們再次呼叫 collect 時會看到“Flow started”的原因,
Flow 取消
如何取消一個Flow呢?
Kotlin官方并沒有提供flow取消的函式,啊 這???😕聽到這個是不是還滿疑惑,且聽我細細道來,
Flow需要在協程里面使用,因為collect是掛起函式,另外基于冷流的特性,不呼叫collect構建器的代碼壓根不會走,所以只能是協程,那 我取消協程不就行了嗎?😮,好像之前有看到過有開發者提出過,是否要給flow單獨加一個取消的函式,被Jetbrains無情的拒絕了,哈哈哈哈很搞笑,下面參考Kotlin官方的一段話,
Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay).
adheres 堅持
Flow 堅持協程的一般協作取消, 像往常一樣,當流在可取消的掛起函式(如延遲)中被掛起時,可以取消流收集,
這個adheres好像就像是在回復廣大的開發者,你取消協程就行了😂😂😂,
好了,下面看取消的🌰
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("emit $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
val job = launch {
simple().collect { println(it) }
}
delay(250)
job.cancel(CancellationException("timeout 250"))
println("done")
}
結果:看我們只需要取消對應的協程即可,對應的flow也會被取消收集,
emit 1
1
emit 2
2
done
這里引申一點,對于timeout,官方有提供專用的操作函式,withTimeout系列,不需要我們手動delay然后繼續呼叫取消,畢竟不是很優雅,
上述代碼也可以寫成如下的形式
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("emit $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) {
simple().collect { value -> println(value) }
}
println("done")
}
看到了嗎?直接將launch替換為withTimeoutOrNull就可以做到延時取消效果了,這里簡單做一下原始碼分析,
原始碼
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? {
...
try {
return suspendCoroutineUninterceptedOrReturn { uCont ->
val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont)
coroutine = timeoutCoroutine
setupTimeout<T?, T?>(timeoutCoroutine, block)
}
} catch (e: TimeoutCancellationException) {
...
}
}
private class TimeoutCoroutine<U, in T: U>(
@JvmField val time: Long,
uCont: Continuation<U> // unintercepted continuation
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {
override fun run() {
cancelCoroutine(TimeoutCancellationException(time, this))
}
...
}
private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
): Any? {
// schedule cancellation of this coroutine on time
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
...
}
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
省略了一大堆非核心的代碼,我們直接看延時取消的操作,這里簡單分析一下:
- 創建TimeoutCoroutine物件,它同時實作了Runnable,在run里面呼叫了取消函式,拋出TimeoutCancellationException,
- 呼叫當前context的invokeOnTimeout函式,該函式需要一個Runnable,傳入了timeoutCoroutine,此實作使用內置的單執行緒調度執行器服務,會在延時對應的事件后呼叫Runnable的run函式,然后就會取消當前的協程,
- 在取消協程之后,會取消掉當前背景關系的所有將在完成時呼叫的回呼,disposeOnCompletion函式被呼叫,
Flow 相關運算子
這一塊的運算子,其實是比較多的,但是如果您熟悉RxJava的話,其實都是差不多的,這一塊這里就不做原始碼分析了,只是看一下怎么使用即可,內部其實是創建了新的流回傳出來了,有興趣的話可以自行查看一下原始碼,
中間流運算子
map
映射,看🌰(傳入請求流可以使用 map 運算子映射到結果,即使執行請求是由掛起函式實作的長時間運行的操作)
suspend fun performRequest(request: Int): String {
delay(1000) // 模擬長時間的異步作業
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 轉換為flow
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
結果
response 1
response 2
response 3
filter
過濾操作,看🌰
suspend fun performRequest(request: Int): String {
delay(1000) // 模擬長時間的異步作業
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 轉換為flow
.map { request -> performRequest(request) }
.filter { it == "response 1" }
.collect { response -> println(response) }
}
結果:僅回傳匹配到的值
response 1
變換運算子
在流變換算子中,最通用的一種叫做變換, 它可以用來模仿簡單的轉換,比如 map 和 filter,也可以實作更復雜的轉換, 使用transform,我們可以發出任意次數的任意值,
例如🌰,使用transform,我們可以在執行長時間運行的異步請求之前發出一個字串,并在其后回應:
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
結果
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
size限制運算子
顧名思義,限制收集的數量,使用運算子take
它會在判斷當發射的值在達到相應限制時取消流程的執行, 因為協程中的取消總是通過拋出例外來執行,所以需要考慮進行相應的例外捕獲來保證后續的流暢正常進行不被取消掉,
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("not execute")
emit(3)
} finally {
println("finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take 兩個值
.collect { value -> println(value) }
}
結果
1
2
finally in numbers
終端運算子
終端運算子可以啟動一個流,最基礎的就是上述常提到的collect,但是還有一些其他的終端運算子,它可能會讓一些操作變得更簡單:
轉換為各種集合, toList 和 toSet,
flowOf(1,2).toList().forEach {
println(it)
}
flowOf(1,2).toSet().forEach(::println)
first , 確保獲取且進獲取第一個值
回傳流發出的第一個元素然后取消流的集合的終端運算子, 如果流為空,則拋出 NoSuchElementException,
val value : Int = flowOf(1, 2).first()
println(value)
使用 reduce 和 fold 將流合并到一個值,
val sum = (1..5).asFlow()
.map { it * it } //平方
.reduce { a, b -> a + b } // 進行累加
println(sum)
//結果
55
fold和reduce使用起來差不多,區別就是fold可以定義初始化,其實很簡單,reduce傳入的lambda前一個引數是每次計算的結果累計,后一個引數是當前需要傳入的值,不明白可以去瞅一眼原始碼,這里不在引申,
onEach
這個運算子也較為常用,這里也介紹一下,回傳在上游流的每個值向下游發出之前呼叫給定操作的流,
🌰
(1..5)
.asFlow()
.onEach {
println("onEach$it")
}.collect()
//結果
onEach1
onEach2
onEach3
onEach4
onEach5
運算子的順序
除非使用對多個流進行操作的特殊運算子,否則流的每個單獨集合都按順序執行, 該集合直接在呼叫終端運算子的協程中作業, 默認情況下不會啟動新的協程, 每個發出的值都由從上游到下游的所有中間運算子處理,然后交付給終端運算子,
🌰
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
//結果 按照順序沒有值依次向下發射
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow 調度器切換
對于UI驅動型的程式來說,需要將長時間計算的任務放在異步執行緒處理,UI展示作業需要放在主執行緒處理,也就是說需要將構建器的代碼放到異步執行緒執行,但是終端運算子,比如collect需要在主執行緒獲取,那么怎么做呢?
使用flowOn運算子
在這之前您可能的了解一下,協程的調度器,可以簡單參考之前寫的一篇文章,有對調度器做簡單介紹:https://blog.csdn.net/weixin_44235109/article/details/119981210
fun main() = runBlocking {
flow {
for (i in 1..3) {
//模擬異步處理
delay(100)
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default)//使用flowOn傳入Default的調度器
.collect { value ->
log("Collected $value")
}
}
結果
16:39:21:954 [DefaultDispatcher-worker-1] Emitting 1
16:39:21:969 [main] Collected 1
16:39:22:071 [DefaultDispatcher-worker-1] Emitting 2
16:39:22:071 [main] Collected 2
16:39:22:178 [DefaultDispatcher-worker-1] Emitting 3
16:39:22:178 [main] Collected 3
可以很明顯的看出,構建模塊被調度到異步執行緒處理了,而收集的作業還在主執行緒進行,
flowOn負責構建的模塊調度,那么收集的誰負責呢?
其實和例外處理類似,collect受呼叫它的協程背景關系限制,所以最后的執行執行緒和當前協程背景關系的調度器有關,目前我使用的是idea測驗的,默認runBlocking的調度器就是主執行緒,如果是android上面的話,runBlocking可能就需要傳入Dispatchers.Main了,
其實和RxJava還是非常相似的😂,
注意一點,此時其實已經改變流執行的順序了,
官方的解釋如下:
Another thing to observe here is that the flowOn operator has changed the default sequential nature of the flow. Now collection happens in one coroutine (“coroutine#1”) and emission happens in another coroutine (“coroutine#2”) that is running in another thread concurrently with the collecting coroutine. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.
這里要注意的另一件事是 flowOn 運算子更改了流的默認順序性質, 現在收集發生在一個協程(“coroutine#1”)中,發射發生在另一個協程(“coroutine#2”)中,該協程與收集協程同時運行在另一個執行緒中, 當必須在其背景關系中更改 CoroutineDispatcher 時, flowOn 運算子為上游流創建另一個協程,
這一塊我簡單看了一下原始碼,這里面不同的調度器會遇到多執行緒的問題,最里面使用了channel進行了調度處理,具體的核心類是ChannelFlow,后面會對flow進行簡單原始碼分析,但篇幅有限不對這一塊過深入分析,感興趣可以自行查看,或找博主私下探討,
其實上面的看不出來會改變流執行的順序,下面改變一下代碼,驗證一下看看🌰
fun main() = runBlocking {
flow {
for (i in 1..3) {
//模擬異步處理
delay(100)
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default)
.collect { value ->
delay(200)
log("Collected $value")
}
}
結果
16:52:33:258 [DefaultDispatcher-worker-1] Emitting 1
16:52:33:386 [DefaultDispatcher-worker-1] Emitting 2
16:52:33:482 [main] Collected 1
16:52:33:493 [DefaultDispatcher-worker-1] Emitting 3
16:52:33:684 [main] Collected 2
16:52:33:887 [main] Collected 3
我們只需要將,collect里面增加一個delay即可,發現其實這時候就是發射歸發射,收集歸收集了,不似上面我們寫的程式,發射一個值只有到終端運算子之后才會發射第二個,這里面肯定就會對值進行快取,那么就會牽扯到一個問題,老生常談🙆?♀?,背壓處理!
Flow 背壓處理
對于背壓處理,Kotlin 提供三種解決方案:
| 運算子 | 含義 |
|---|---|
| buffer | 指定固定容量快取 |
| conflate | 保留最新的值 |
| collectLatest | 新值發送時取消之前的 |
buffer
這里有必要看一下buffer函式的原始碼定義
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND)
可以看出需要兩個引數,都有默認值,
- 第一個好理解容量,默認等于BUFFERED,這個值其實時64,可以按照使用時需要自己指定具體的數字,
- 第二個是指定,當buffer溢位時的操作,默認的操作是掛起,還要兩個操作分別是洗掉最舊的值不要掛起或者洗掉當前最新的值不要掛起,可以自行查看原始碼,這里不再引申,
使用🌰
flow {
for (i in 1..3) {
//模擬異步處理
delay(100)
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default)
.buffer()
.collect { value ->
delay(200)
log("Collected $value")
}
//結果
17:17:28:420 [DefaultDispatcher-worker-1] Emitting 1
17:17:28:536 [DefaultDispatcher-worker-1] Emitting 2
17:17:28:646 [main] Collected 1
17:17:28:646 [DefaultDispatcher-worker-1] Emitting 3
17:17:28:846 [main] Collected 2
17:17:29:049 [main] Collected 3
conflate
這個只獲取最新值也比較好理解,應用場景,比如說獲取下載進度,對于用戶來說其實每次只需要獲取當前最新的進度就好了,不需要把之前的值再去獲取一遍,下面也舉一個例子🌰
fun main() = runBlocking {
flow {
for (i in 1..3) {
//模擬異步處理
delay(100)
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default)
.conflate()
.collect { value ->
delay(300)//模擬下游處理比較慢
log("Collected $value")
}
}
//結果 第一個值肯定可以拿到 當地一個值處理完成之后 最新的值就是3了 所以丟棄了2
17:21:42:916 [DefaultDispatcher-worker-1] Emitting 1
17:21:43:034 [DefaultDispatcher-worker-1] Emitting 2
17:21:43:140 [DefaultDispatcher-worker-1] Emitting 3
17:21:43:236 [main] Collected 1
17:21:43:546 [main] Collected 3
collectLatest
說明一點:這玩意其實也是一個終端運算子
前兩個可能都比較好理解,那新值發送時取消之前的是什么意思呢?為了便于理解,直接上例子,按照結果說明:
🌰
fun main() = runBlocking {
flow {
for (i in 1..3) {
//模擬異步處理
delay(100)
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default)
.collectLatest {
delay(300)
log("Collected $it")
}
}
//結果
17:25:33:916 [DefaultDispatcher-worker-1] Emitting 1
17:25:34:038 [DefaultDispatcher-worker-1] Emitting 2
17:25:34:150 [DefaultDispatcher-worker-1] Emitting 3
17:25:34:453 [main] Collected 3
對比上面的例子,這里只是將collect替換成了collectLatest而已,為什么1沒有了呢?
顯而易見了,這玩意會在最新的到來會直接取消下游上一個消費的處理,因為有delay所以1還沒有來得及列印,就因為下一個值發射了,然后就被取消了!!!您可真霸道呢?🙆?♀?
Flow 例外處理
當運算子內的發射器或代碼拋出例外時,流收集可以以例外結束, 有幾種方法可以處理這些例外,
try…catch
較為簡單,上🌰
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
//結果
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
if (value>1){
throw IllegalStateException("exception value is $value")
}
}
} catch (e: Throwable) {
println("Caught $e")
}
}
顯而易見,拋出例外之后,收集結束,如果是UI驅動程式比如:Android還是推薦主從作用域,例外不會向上傳播,
思考一個問題,剛剛例外是發生在收集端,如果在構建的時候發生例外呢?
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
if (value>1){
throw IllegalStateException("exception value is $value")
}
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
//結果
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: exception value is 2
完美:例外仍被捕獲并停止收集
上述代碼的問題就是不夠優雅,還有例外對于流來說必須是透明的,使用try … catch 顯然違反了透明性,所以Kotlin 封裝了try catch.
catch 相關運算子
catch 運算子的主體可以分析例外并根據捕獲的例外以不同的方式對其做出反應:
- 可以使用 throw 重新拋出例外,
- 可以使用 catch 主體中的發射將例外轉換為值的發射,
- 例外可以被其他代碼忽略、記錄或處理,
看🌰
simple()
.catch { e -> emit("Caught $e") } // 我不僅捕獲到了例外,我還能繼續發射!!!!
.collect { value -> println(value) }
結果
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: exception value is 2
當然因為拋出了例外,協程還是會終止,但是此時例外是以發生的形式傳遞下去的,
注意:catch 中間運算子,尊重例外透明性,只捕獲上游例外(即來自 catch 之上的所有運算子的例外,但不在其之下), 如果 collect { … } 中的塊(位于 catch 下方)拋出例外,則它不會捕獲:
🌰
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // 不會捕獲到下游的例外
.collect { value ->
if(value <= 1) throw IllegalStateException("Collected $value")
println(value)
}
}
//
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at org.example.zxf.kotlin11.flow.TestFlowKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)
這個時候可以利用onEach將需要可能捕獲例外的地方前置,如下所示,
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
onCompletion 相關操作
我們知道和try … catch 搭配的 還有一個finally,所以 flow當然也有一個差不多的了,叫onCompletion
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
//結果
Emitting 1
1
Emitting 2
2
Emitting 3
3
Done
拋個例外試試,onCompletion可以通過cause進行判斷是否是正常結束.
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
emit(2)
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause ->
if (cause != null) {
println("Flow completed exceptionally")
}
}
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
//結果
1
Flow completed exceptionally
Caught exception
與 catch 運算子的另一個區別是 onCompletion 會看到所有例外,并且僅在成功完成上游流(沒有取消或失敗)時才會收到空例外,
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
if (value>1){
throw IllegalStateException("exception value is $value")
}
println(value)
}
}
//結果
1
Flow completed with java.lang.IllegalStateException: exception value is 2
Exception in thread "main" java.lang.IllegalStateException: exception value is 2
注意一點:雖然看到了,但是并沒有進行捕獲,例外還是拋出了,
Flow 啟動
最后提一點和啟動相關的,先看下面兩個例子:
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- collect 掛起函式阻塞
println("Done")
}
//結果
Event: 1
Event: 2
Event: 3
Done
對于上述,沒有異議吧,Done的列印需要等待collect恢復,因為在一個協程內,如果需要不等待呢?那就需要另起一個協程了
🌰
fun main() = runBlocking<Unit> {
launch {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- collect 掛起函式阻塞
}
println("Done")
}
//結果
Done
Event: 1
Event: 2
Event: 3
但是上述寫法可以等價于,下面的寫法,flow這玩意封裝了另一種啟動方式
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this)
println("Done")
}
//結果
Done
Event: 1
Event: 2
Event: 3
看一下launchIn原始碼!
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
好吧,其實不就是封裝了一下嘛,主要是原始碼好多地方使用launchIn來著,所以這里提一下🙆?♀?,
Flow原理決議
好吧,激動人心的時刻終于來了,做一些原始碼分析,
先寫一個小🌰
fun main() = runBlocking {
flow {
emit(1)
emit(2)
emit(3)
}.collect {
println(it)
}
}
好了,我們就分析這玩意怎么實作的!往深篇幅有限寫不下了🙄,如果您想深入交流,歡迎私信交流,
所以,我先列出來flow函式構建的原始碼,只貼出核心代碼:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
可以看出flow構建了一個SafeFlow將block傳入,并且重寫了collectSafely方法,呼叫了block(),這里需要注意一點SafeFlow繼承自AbstractFlow.
好,我們在看一呼叫collect之后發生了什么,
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
看的出,這時將collect的block封裝成了FlowCollector型別(重寫emit方法用于執行block),此時會呼叫到SafeFlow的collect方法,具體在AbstractFlow里面實作,這里面干了什么呢?看的出,創建了SafeCollector型別的東西,然后將SafeCollector傳遞給了collectSafely,而在上面的分析中,我們知道collectSafely在構建flow時進行了實作,SafeCollector作為receiver以擴展函式的形式呼叫了,flow構建器的block(這里面執行了,我們手動的emit操作),這么一看是不是有點聯系起來了😎
好的,接下來我們繼續分析一下,SafeCollector,那這玩意的原始碼其實也是有點多的,我們應該看哪一個函式呢?通過上面的分析可知,呼叫collect之后,會呼叫到SafeFlow的collect方法,進而會以擴展函式的形式呼叫到flow的block,而在block里面就是我們自己寫的emit了,所以receiver既然是SafeCollector,那肯定就是呼叫SafeCollector的emit了,我們去瞅一瞅SafeCollector的emit函式相關的呼叫鏈:👀
override suspend fun emit(value: T) {
...
try {
emit(uCont, value)
} catch (e: Throwable) {
...
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
...
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
好了,經過簡化之后,就是上面這些玩意兒了,
首先,需要分清幾個東西,
collector和this的區別
- collector:還記得上面的原始碼嗎?我們呼叫
collect的block封裝到了一個FlowCollector,然后傳遞給了SafeCollector,collector就指FlowCollector,這里面重寫了emit,emit里面呼叫了block,這個block就是我們自己寫的了,這個時間簡單理解呼叫collector的emit就可以呼叫到collect的block了, - this:理解了collector,this就指代當前的
SafeCollector了,
現在我們繼續看上面的代碼:額 此時好像還需要理解一個東西emitFun
我們把它翻譯成java看一下
private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() {
// $FF: synthetic method
// $FF: bridge method
public Object invoke(Object var1, Object var2, Object var3) {
return this.invoke((FlowCollector)var1, var2, (Continuation)var3);
}
@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
return p1.emit(p2, continuation);
}
}, 3);
簡單明了,就是呼叫了collector的emit函式,但是你可能會說,emit不是支持一個引數嗎?上面傳進去倆啊,但是你別忘了,emit是掛起函式,默認有一個引數的傳遞就是continuation,這一塊Kotlin替我們做了,但是對于Java來說必須要傳遞continuation進去,通過這種方式傳入一樣的continuation,保證了continuation的統一,
好了,總結一下:
分析下來,簡單就是,利用擴展函式的性質,呼叫到flow的block進而呼叫了SafeCollector的emit,而這里的emit會呼叫到傳進來的FlowCollector的emit,而傳進來的emit函式被重寫呼叫block,所以就會呼叫到collect的block了,
另外因為,只有呼叫collect之后,進而才會呼叫到SafeFlow的collect函式,進而才會呼叫到collectSafely函式去執行flow的代碼,所以不呼叫collect的話,flow的代碼構建塊的代碼是不會執行的,最多就是回傳一個SafeFlow的物件而已,
對于各種運算子的原始碼,這里不帶著大家看了,不是很難,里面就是饒了一圈,又回傳了流而已,可以自己查看,
另外對于flowOn或者其他運算子,導致調度器不一致時,此時底層將不再上上面這一套的邏輯,內部是使用ChannelFlow實作,這一塊感興趣的也可以瞅瞅原始碼,也可以與博主私信交流,里面還是比較繞,,,
還有還有補充一點,對于ShareFlow和StateFlow,博主目前也在積極的籌劃中,想了解的可以關注一波哈,如果您看到這里的話,覺得不錯,希望您可以毫不吝嗇手中的贊👍,鼓勵一下博主,感謝!

轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/357160.html
標籤:其他
下一篇:Xcode快捷鍵
