文章目錄
- 一、前言
- 二、Flow的簡單演示
- 三、Flow的取消
- 四、構建Flow
- 五、過度流運算子
- 六、轉換運算子
- 七、限長運算子
- 八、末端流操作
- 九、流是連續的
- 十、Flow背景關系
- 十一、withContext 發出錯誤
- 十二、flowOn 運算子
- 十三、緩沖
- 十四、合并
- 十五、處理最新值
- 十六、Zip
- 十七、Combine
- 十八、flatMapConcat 與 flattenConcat
- 十九、flatMapMerge 與 flattenMerge
- 二十、flatMapLatest
- 二十一、Flow的例外
- 二十二、例外的透明性
- 二十三、完成情況
- 二十四、collect和launchIn
- 二十五、參考鏈接
一、前言
? 掛起函式可以回傳一個值,但是沒有辦法回傳多個值,List可以回傳多個值,但是它是一次性回傳的,Sequence可以根據需要回傳多個值,但它是阻塞的,所以可以使用Flow來對資料進行處理,而且Flow是資料流可以對資料進行各種復雜操作,并且簡化異步操作,
二、Flow的簡單演示
fun simple(): Flow<Int> = flow { // 流構建器
for (i in 1..3) {
delay(100) // 假裝我們在這里做了一些有用的事情
emit(i) // 發送下一個值
}
}
@Test
fun flowSample(){
runBlocking {
// 啟動并發的協程以驗證主執行緒并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集這個流
simple().collect { value -> println(value) }
}
}
運行結果如下:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
這段代碼在不阻塞主執行緒的情況下每等待 100 毫秒列印一個數字,如果換成List或者Sequence就會有不一樣的結果,
這里面有以下注意點:
- 名為 flow 的 Flow 型別構建器函式,
flow { ... }構建塊中的代碼可以掛起,- 流使用 emit 函式 發射值,
- 流使用 collect 函式收集值,
Flow是冷流,也就是說Flow是在資料被收集的時候才開始運行,以下是示例
fun simple(): Flow<Int> = flow { // 流構建器
for (i in 1..3) {
delay(100) // 假裝我們在這里做了一些有用的事情
emit(i) // 發送下一個值
}
}
@Test
fun coldFlow(){
runBlocking {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
}
運行結果如下:
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
從結果可以看到,Flow只有開始收集的時候才開始運行
三、Flow的取消
Flow是可以取消的,這里用withTimeoutOrNull來進行超時取消演示,如下
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
@Test
fun withTimeoutCancel() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 在 250 毫秒后超時
simple().collect { value -> println(value) }
}
println("Done")
}
運行結果如下
Emitting 1
1
Emitting 2
2
Done
可以看到超出時間后就取消了,
Flow對每次發射對值進行了ensureActive 監測,意味著可以隨時取消,如下
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
可以看到結果如下
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
實際上,由于性能問題,很多其它流操作不會對其進行取消監測,如下
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
結果如下
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
可以看到在執行完畢后才會出現例外,并沒有按照預期那樣取消,因此如果想達到預期效果必須明確表示是否監測取消,可以使用.onEach { currentCoroutineContext().ensureActive() },但是Flow也提供了現成的運算子 cancellable
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
運行結果如下
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
四、構建Flow
之前演示了使用flow{}進行構建,除了它還有其它的構建方式
-
flowOf 構建器定義了一個發射固定值集的流,
flowOf(1, 2, 3) -
使用
.asFlow()擴展函式,可以將各種集合與序列轉換為流,// 將一個整數區間轉化為流 (1..3).asFlow().collect { value -> println(value) }
五、過度流運算子
Flow與LiveData除了執行緒切換的區別外,還有一個最大的特點就是,Flow擁有眾多運算子可以對資料進行操作,過渡運算子應用于上游流,并回傳下游流,而且這些運算子不是掛起函式,回傳很快,回傳新的Flow內容,
基礎運算子和集合一樣擁有類似的名字,如 map和filter,而且這些運算子中可以執行掛起操作,這是序列Sequence所不能做到的
如下所示:
suspend fun performRequest(request: Int): String {
delay(1000) // 模仿長時間運行的異步作業
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 一個請求流
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
結果如下
response 1
response 2
response 3
六、轉換運算子
轉換運算子可以對資料進行比map更復雜的操作,最常用的為transform,比如
使用 transform 我們可以在執行長時間運行的異步請求之前發射一個字串并跟蹤這個回應:
suspend fun performRequest(request: Int): String {
delay(1000) // 模仿長時間運行的異步任務
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 一個請求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
結果如下
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
從結果可以看到每次都會發送兩條資料,當然也可以根據其它情況然后按需發送,比如只發送偶數的資料
七、限長運算子
限長運算子顧名思義就是限制發射長度,假如有五個資料,可以限制發送兩個,但是該運算子會出現例外,所以需要進行處理
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 只獲取前兩個
.collect { value -> println(value) }
}
結果如下
1
2
Finally in numbers
八、末端流操作
在流結束的時候我們也可以做各種操作,collect()只是其中的基本操作,還有一些其它操作,如
- 轉化為各種集合,例如 toList 與 toSet,
- 獲取第一個(first)值與確保流發射單個(single)值的運算子,
- 使用 reduce 與 fold 將流規約到單個值,
val sum = (1..5).asFlow()
.map { it * it } // 數字 1 至 5 的平方
.reduce { a, b -> a + b } // 求和(末端運算子)
println(sum)
結果如下
55
九、流是連續的
流的每次單獨收集都是按順序執行的,除非進行特殊操作的運算子使用多個流,該收集程序直接在協程中運行,該協程呼叫末端運算子, 默認情況下不啟動新協程, 從上游到下游每個過渡運算子都會處理每個發射出的值然后再交給末端運算子,所以假設有五個值,那么會進行五次收集,每次收集就會執行所有流程,這里我們通過一個例子進行演示,該示例過濾偶數并將其映射到字串:
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
其結果如下
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
可以看出,假如條件不符合就不會執行map操作
十、Flow背景關系
Flow的收集程序總是在協程的背景關系中執行,例如,如果有一個流 simple,然后以下代碼在它的撰寫者指定的背景關系中運行,而無論流 simple 的實作細節如何,通常情況下,流的背景關系不允許切換,流的該屬性稱為 背景關系保存,如下
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
withContext(context) {
simple().collect { value ->
println(value) // 運行在指定背景關系中
}
}
@Test
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
結果如下
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
由于 simple().collect 是在主執行緒呼叫的,那么 simple 的流主體也是在主執行緒呼叫的, 這是快速運行或異步代碼的理想默認形式,它不關心執行的背景關系并且不會阻塞呼叫者,
十一、withContext 發出錯誤
然而有時候有些需要在后臺執行緒運行,更新ui需要在主執行緒運行,在這里貿然使用withContext就會報錯. flow {...} 構建器中的代碼必須遵循背景關系保存屬性,并且不允許從其他背景關系中發射(emit),所以下述代碼就會出錯
fun simple(): Flow<Int> = flow {
// 在流構建器中更改消耗 CPU 代碼的上下文的錯誤方式
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // 假裝我們以消耗 CPU 的方式進行計算
emit(i) // 發射下一個值
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
大致錯誤如下
Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5cbe206b, BlockingEventLoop@5ae98b8c],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@78f05337, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
java.lang.IllegalStateException: Flow invariant is violated:
十二、flowOn 運算子
根據上述例外可以知道通過flowOn進行背景關系切換,
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假裝我們以消耗 CPU 的方式進行計算
log("Emitting $i")
emit(i) // 發射下一個值
}
}.flowOn(Dispatchers.Default) // 在流構建器中改變消耗 CPU 代碼背景關系的正確方式
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
這里需要注意的是flowOn創建了新的協程
十三、緩沖
如果Flow在生產和收集階段都很耗時間,可以使用緩沖技術,如
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假裝我們異步等待了 100 毫秒
emit(i) // 發射下一個值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 假裝我們花費 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
}
這個例子會很慢,因為生產時候耗時100ms,收集時候耗時300ms,所以我們可以通過緩沖技術,修改如下
val time = measureTimeMillis {
simple()
.buffer() // 緩沖發射項,無需等待
.collect { value ->
delay(300) // 假裝我們花費 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
它產生了相同的數字,只是更快了,由于我們高效地創建了處理流水線, 僅僅需要等待第一個數字產生的 100 毫秒以及處理每個數字各需花費的 300 毫秒,這種方式大約花費了 1000 毫秒來運行,主要就是將生產和收集并行運行
十四、合并
當流代表部分操作結果或操作狀態更新時,可能沒有必要處理每個值,而是只處理最新的那個,在本示例中,當收集器處理它們太慢的時候, conflate 運算子可以用于跳過中間值,構建前面的示例:
val time = measureTimeMillis {
simple()
.conflate() // 合并發射項,不對每個值進行處理
.collect { value ->
delay(300) // 假裝我們花費 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
結果如下
1
3
Collected in 758 ms
雖然第一個數字仍在處理中,但第二個和第三個數字已經產生,因此第二個是 conflated ,只有最新的(第三個)被交付給收集器,所以知道假如之前生產的內容沒有被消費時候,就會被舍棄,
十五、處理最新值
conflate()是通過舍棄生產的值達到快速處理的目的的,另一種是通過取消收集器,每次發射新值時候再次啟動,這里可以使用collectLatest來進行操作
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 取消并重新發射最后一個值
println("Collecting $value")
delay(300) // 假裝我們花費 300 毫秒來處理它
println("Done $value")
}
}
println("Collected in $time ms")
十六、Zip
Flow的處理中可以將多個Flow合并在一起,稱之為組合,比如Zip就是一種簡單的組合方式,效果如下
val nums = (1..3).asFlow() // 數字 1..3
val strs = flowOf("one", "two", "three") // 字串
nums.zip(strs) { a, b -> "$a -> $b" } // 組合單個字串
.collect { println(it) } // 收集并列印
結果如下
1 -> one
2 -> two
3 -> three
十七、Combine
我們知道conflation運算子,假如一個Flow根據另外一個Flow的最新值進行變動時候,這時候使用zip可能會出現問題,如下
@Test
fun moreFlowConflate(){
runBlocking {
val nums = (1..3).asFlow().onEach { delay(300) } // 發射數字 1..3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發射一次字串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”組合單個字串
.collect { value -> // 收集并列印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
}
該代碼一個300ms發射一個數字,一個是400ms更新一次字串,合并后的預期效果應該是,每一個更新都會觸發另外一個更新,比如300ms后發射數字后會列印一次值,100ms后輪到字串發射了,又會更新一次字串,然后200ms后又輪到數字發射了,這時候字串還沒有發射,所以以應該會產生兩個相同的字串,但是實際上并不是,實際結果如下
1 -> one at 430 ms from start
2 -> two at 830 ms from start
3 -> three at 1232 ms from start
解決方式如下
@Test
fun moreFlowConflate(){
runBlocking {
val nums = (1..3).asFlow().onEach { delay(300) } // 發射數字 1..3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發射一次字串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.combine(strs) { a, b -> "$a -> $b" } // 使用“combine”組合單個字串
.collect { value -> // 收集并列印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
}
結果如下
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
十八、flatMapConcat 與 flattenConcat
像集合一樣可能存在集合嵌套,需要我們進行平鋪,Flow因為運算子可以進行異步請求,所以也會出現類似的情況,如下
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
(1..3).asFlow().map { requestFlow(it) }
但是Flow并不能像以前的集合一樣使用 flatten 與 flatMap ,需要使用 flatMapConcat 與 flattenConcat 來進行處理,如下
val startTime = System.currentTimeMillis() // 記錄開始時間
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發射一個數字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并列印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
結果如下
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
十九、flatMapMerge 與 flattenMerge
上述的效率是按順序執行的,速度可能會比較慢,FLow也支持并行收集所有傳入的流,這里可以使用flatMapMerge 與 flattenMerge 來實作,并發的話需要對并發的流的個數concurrency進行限制,默認為 DEFAULT_CONCURRENCY,
val startTime = System.currentTimeMillis() // 記錄開始時間
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發射一個數字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并列印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
結果如下
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
注意:
flatMapMerge 會順序呼叫代碼塊(本示例中的 { requestFlow(it) }),但是并發收集結果流,相當于執行順序是首先執行 map { requestFlow(it) } 然后在其回傳結果上呼叫 flattenMerge,
二十、flatMapLatest
跟collectLatest一樣,平鋪操作中也有類似的操作flatMapLatest
val startTime = System.currentTimeMillis() // 記錄開始時間
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發射一個數字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并列印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
結果如下
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
二十一、Flow的例外
如果Flow的運行出現錯誤時,可以對其進行處理,這里使用try{}catch{}進行處理
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 發射下一個值
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
二十二、例外的透明性
如果在發射emit時候出現例外該怎么辦呢?使用try{}catch{},但是被處理掉的代碼外部無法得知,除非使用emit把例外Exeption發射出去,所以大致有以下幾種方式
- 可以使用
throw重新拋出例外, - 可以使用 catch 代碼塊中的 emit 將例外轉換為值發射出去,
- 可以將例外忽略,或用日志列印,或使用一些其他代碼處理它,
simple()
.catch { e -> emit("Caught $e") } // 發射一個例外
.collect { value -> println(value) }
我么可以捕獲發射時候的例外,但是假如收集collect() 出現例外是否依然可以這樣,看以下代碼
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // 不會捕獲下游例外
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
期待列印出catch{}中的“Caught …”例外訊息,實際上并非如此
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at ...
這里需要使用onEach進行幫助,并保證collect沒有引數
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
注意該代碼順序,這樣就看到以下結果
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
二十三、完成情況
當一個Flow完成時候,可以使用這兩個方式處理,
-
命令式 finally 塊
fun simple(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { try { simple().collect { value -> println(value) } } finally { println("Done") } } -
onCompletion 運算子
simple() .onCompletion { println("Done") } .collect { value -> println(value) }該運算子與
catch運算子配合使用可以知道Flow是正常結束還是例外結束fun simple(): Flow<Int> = flow { emit(1) throw RuntimeException() } fun main() = runBlocking<Unit> { simple() .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } .catch { cause -> println("Caught exception") } .collect { value -> println(value) } }onCompletion接收一個引數cause,當上游流沒有錯誤時候,會傳一個null,
二十四、collect和launchIn
在實際使用中,我么會發現collect會阻塞下一步的運行,比如以下方式
fun simple(): Flow<Int> = flow { // 流構建器
for (i in 1..4) {
delay(100) // 假裝我們在這里做了一些有用的事情
emit(i) // 發送下一個值
}
}
@Test
fun coldFlow(){
runBlocking {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect...end")
}
}
結果如下
Calling simple function...
Calling collect...
1
2
3
4
Calling collect...end
當然也可以使用將Flow包括在一個新的協程里面,不過Flow已經對此作了處理,使用launchIn即可完成該功能,不過由于launchIn只能傳入一個協程作用域,因此里面不能像collect一樣執行代碼,所以這里需要使用onEach { ... }進行配合,實際上onEach { ... }.launchIn(scope)也是成對出現的
fun simple(): Flow<Int> = flow { // 流構建器
for (i in 1..4) {
delay(100) // 假裝我們在這里做了一些有用的事情
emit(i) // 發送下一個值
}
}
@Test
fun coldFlow(){
runBlocking {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.onEach { value -> println(value)}.launchIn(this)
println("Calling collect...end")
}
}
二十五、參考鏈接
-
kotlin協程的資料流
https://www.kotlincn.net/docs/reference/coroutines/flow.html
-
transform
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/292209.html
標籤:其他
