我有一個這樣的基于回呼的 API:
class CallbackApi {
fun addListener(callback: Callback) {
// todo
}
fun removeListener(callback: Callback) {
// todo
}
interface Callback {
fun onResult(result: Int)
}
}
和一個擴展函式,其中API轉換為熱冷流:
fun CallbackApi.toFlow() = callbackFlow<Int> {
val callback = object : CallbackApi.Callback {
override fun onResult(result: Int) {
trySendBlocking(result)
}
}
addListener(callback)
awaitClose { removeListener(callback) }
}
您是否介意建議如何撰寫一個單元測驗來確保 API 正確轉換為熱流?
這是我的嘗試。通過反復試驗,我想出了這個解決方案。
@Test
fun callbackFlowTest() = runBlocking {
val callbackApi = mockk<CallbackApi>()
val callbackSlot = slot<CallbackApi.Callback>()
every { callbackApi.addListener(capture(callbackSlot)) } just Runs
every { callbackApi.removeListener(any()) } just Runs
val list = mutableListOf<Int>()
val flow: Flow<Int> = callbackApi.toFlow().onEach { list.add(it) }
val coroutineScope = CoroutineScope(this.coroutineContext SupervisorJob())
flow.launchIn(coroutineScope)
yield()
launch {
callbackSlot.captured.onResult(10)
callbackApi.removeListener(mockk()) // this was a misunderstanding
}.join()
assert(list.single() == 10)
}
但我不明白這個解決方案的兩個部分。
1- 如果沒有SupervisorJob(),測驗似乎永遠不會結束。也許出于某種原因收集流量永遠不會結束,我不明白。我在一個單獨的協程中提供捕獲的回呼。
2- 如果我移除其中的launch主體callbackSlot.captured.onResult(10),測驗將因此錯誤而失敗UninitializedPropertyAccessException: lateinit property captured has not been initialized。我認為這yield應該開始流動。
uj5u.com熱心網友回復:
以及將 API 轉換為熱流的擴展函式
這個擴展看起來是正確的,但是流程并不熱(也不應該如此)。它僅在實際收集開始時注冊回呼,并在收集器取消時取消注冊(這包括收集器使用限制專案數量的終端運算子時,例如.first()or .take(n))。
對于您的其他問題,這是一個非常重要的注意事項。
在沒有該 SupervisorJob() 的情況下,測驗似乎永遠不會結束。也許出于某種原因收集流量永遠不會結束,我不明白
如上所述,由于流的構造方式(以及CallbackApi作業方式),流收集不能由生產者(回呼 API)決定結束。它只能通過取消收集器來停止,這也會取消注冊相應的回呼(這很好)。
您的自定義作業允許測驗結束的原因可能是因為您通過在背景關系中由沒有當前作業作為父作業的自定義作業覆寫背景關系中的作業來逃避結構化并發。但是,您可能仍然會從永遠不會取消的范圍中泄漏那個永無止境的協程。
我在一個單獨的協程中提供捕獲的回呼。
這是正確的,雖然我不明白你為什么removeListener從這個單獨的協程中呼叫。你在這里注銷什么回呼?請注意,這也不會對流程產生任何影響,因為即使您可以取消注冊在callbackFlow構建器中創建的回呼,它也不會神奇地關閉 的通道callbackFlow,因此流程無論如何都不會結束(我' m 假設是你試圖在這里做的)。
此外,從外部取消注冊回呼會阻止您檢查它實際上是否被您的生產代碼取消注冊。
2- 如果我洗掉其中 callbackSlot.captured.onResult(10) 的啟動體,測驗將失敗并出現此錯誤 UninitializedPropertyAccessException: lateinit property capture has not been initialized。我認為產量應該開始流動。
yield()很脆。如果你使用它,你必須非常清楚每個并發協程的代碼當前是如何撰寫的。它脆弱的原因是它只會將執行緒讓給其他協程,直到下一個暫停點。你無法預測 yield 時會執行哪個協程,也無法預測執行緒到達掛起點后會恢復哪個協程。如果有幾個停賽,所有賭注都取消。如果還有其他正在運行的協程,那么所有的賭注也都取消了。
更好的方法是使用kotlinx-coroutines-testwhich 提供的實用程式,例如advanceUntilIdle確保其他協程都已完成或等待暫停點。
現在如何修復這個測驗?我現在無法測驗任何東西,但我可能會這樣處理:
- 使用
runTestfromkotlinx-coroutines-test而不是runBlocking更好地控制其他協程運行時(并等待流集合執行某些操作) - 在協程中啟動流收集(僅
launch/launchIn(this)沒有自定義范圍)并保留已啟動的句柄Job(launch/ 的回傳值launchIn) - 用一個值呼叫捕獲的回呼,
advanceUntilIdle()以確保流收集器的協程可以處理它,然后斷言串列獲得了元素(注意:由于一切都是單執行緒的并且回呼沒有掛起,如果沒有緩沖區,這將死鎖,但callbackFlow使用默認緩沖區,所以應該沒問題) - 可選:用不同的值重復以上幾次,并確認它們被流收集
- 取消收集作業,
advanceUntilIdle()然后測驗回呼是否未注冊(我不是 Mockk 專家,但應該有一些東西可以檢查是否removeListener被呼叫)
注意:也許我是老派,但如果你CallbackApi是一個介面(在你的例子中它是一個類,但我不確定它在多大程度上反映了現實),我寧愿使用通道手動實作一個模擬來模擬事件和斷言期望。我發現推理和除錯更容易。這是我的意思的一個例子
uj5u.com熱心網友回復:
這是我根據@Joffrey 有用的指南找到的解決方案:
@Test
fun callbackFlowTestSolution() = runTest {
val callbackApi = mockk<CallbackApi>()
val callbackSlot = slot<CallbackApi.Callback>()
every { callbackApi.addListener(capture(callbackSlot)) } just Runs
every { callbackApi.removeListener(any()) } just Runs
val itemsToSend = Array(9) { index -> index } // [0, 1, 2, 3, 4, 5, 6, 7, 8]
val collectedItems = mutableListOf<Int>()
val flow: Flow<Int> = callbackApi.toFlow().onEach { collectedItems.add(it) }
val collectJob = launch { flow.collect() }
advanceUntilIdle() // wait for callbackFlow builder to call addListener
itemsToSend.forEach { callbackSlot.captured.onResult(it) }
advanceUntilIdle() // wait for flow collection
collectJob.cancel()
advanceUntilIdle() // wait for awaitClose
verify { callbackApi.removeListener(callbackSlot.captured) }
assertArrayEquals(itemsToSend.toIntArray(), collectedItems.toIntArray())
}
請考慮升級kotlinx-coroutines-test:1.6.0以使用runTest. 在舊版本中,我們可以使用runBlockingTest,但已棄用。
更新:如果您想檢查流程是否可以一項一項地收集專案
@Test
fun callbackFlowTestSolution() = runTest {
val callbackApi = mockk<CallbackApi>()
val callbackSlot = slot<CallbackApi.Callback>()
every { callbackApi.addListener(capture(callbackSlot)) } just Runs
every { callbackApi.removeListener(any()) } just Runs
val itemsToSend = Array(9) { index -> index } // [0, 1, 2, 3, 4, 5, 6, 7, 8]
var lastCollectedItem: Int? = null
val flow: Flow<Int> = callbackApi.toFlow().onEach { lastCollectedItem = it }
val collectJob = launch { flow.collect() }
advanceUntilIdle() // wait for callbackFlow builder to call addListener
itemsToSend.forEach {
callbackSlot.captured.onResult(it)
advanceUntilIdle() // wait for flow collection
assertEquals(it, lastCollectedItem)
}
collectJob.cancel()
advanceUntilIdle() // wait for awaitClose
verify { callbackApi.removeListener(callbackSlot.captured) }
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/406797.html
標籤:
下一篇:當使用Mockito.spy()模擬一個方法被另一個方法呼叫時,為什么我會得到一個UnnecessaryStubbingException?
