我已經實作了一個同步迭代器,它從外部服務中獲取批量資料(可能需要一些時間或掛起)。
我想在呼叫者代碼遍歷當前批次序列的條目時異步獲取下一批。
理想情況下,我想使用kotlin coroutines。
期望的程序:
- 迭代器獲取第一批
- 迭代器開始在后臺獲取第二批
- 同時,呼叫者處理第一批
- 處理完第一批后,呼叫者可以立即處理由迭代器在后臺獲取的第二批
- 迭代器開始在后臺獲取第三批
- 處理完第二批后,呼叫者可以立即處理由迭代器在后臺獲取的第三批
- 迭代器開始在后臺獲取第 4 批
- 等等。
我的同步實作:
fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
return object : Iterator<LogPayloadType>, AutoCloseable {
val logging: Logging = options.service
var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...))
var i = 0
var batch: List<LogEntry> = currentPage.values.toList()
var isClosed = false
override fun close() {
logging.close()
isClosed = true
}
override fun hasNext(): Boolean {
if (isClosed)
return false
val hasNext = i < batch.size || currentPage.hasNextPage()
if (!hasNext) {
logging.close()
}
return hasNext
}
override fun next(): LogPayloadType {
if (!hasNext()) {
throw NoSuchElementException()
}
if (i == batch.size - 1 && currentPage.hasNextPage()) {
currentPage = currentPage.nextPage
batch = currentPage.values.toList()
i = 0
}
val logEntry = batch[i ]
return logEntry.getPayload()
}
}.asSequence()
}
Ofc 我對完全不同的解決方案持開放態度,只要它們可以包含在 kotlin 的序列中。
編輯
這是一個異步實作,它使用thread { }. 我無法通過協程實作這一點
fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
return object : Iterator<LogPayloadType> {
private var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
private var i = 0
private var batch: List<LogEntry> = currentPage.values.toList()
private var nextPage: Page<LogEntry>? = null
private var job: Thread? = null
override fun hasNext() = i < batch.size || currentPage.hasNextPage()
override fun next(): LogPayloadType {
if (!hasNext()) {
throw NoSuchElementException()
}
if (currentPage.hasNextPage()) {
if (nextPage == null && job == null) {
job = thread { nextPage = readNextPage(currentPage) }
}
if (i == batch.size) {
job!!.join()
currentPage = nextPage!!
job = thread { nextPage = readNextPage(currentPage) }
batch = currentPage.values.toList()
i = 0
}
}
val logEntry = batch[i ]
return logEntry.getPayload<Payload<*>>()
}
private fun readNextPage(curPage: Page<LogEntry>): Page<LogEntry>? = curPage.nextPage
}.asSequence()
}
uj5u.com熱心網友回復:
請記住,由于您使用的所有類我都不知道,因此我無法測驗以下任何代碼,因此可能存在錯誤。
首先,使用sequence構建器可以大大簡化使用執行緒的代碼:
// Not sure about how this should behave as you treat it like a blocking function.
// I return null when it's exhausted to simplify while loop.
private fun <T> readNextPageOrNull(page: Page<T>): Page<T>? =
if (page.hasNextPage()) page.nextPage!! else null
fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
return sequence {
var jobResult: Page<LogEntry>? = null
var job = thread {
jobResult = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
}
while (true) {
job.join()
val page = jobResult ?: break
job = thread { jobResult = readNextPageOrNull(page) }
yieldAll(page.values.asIterable())
}
}
}
您可以進一步簡化它并利用執行緒池而不是通過使用CompletableFuture.supplyAsync {}代替使用一堆新執行緒thread {}:
fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
return sequence {
var job = CompletableFuture.supplyAsync {
logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
}
while (true) {
val page = job.join() ?: break
job = CompletableFuture.supplyAsync { readNextPageOrNull(page) }
yieldAll(page.values.asIterable())
}
}
}
我們可以將其轉換為使用協程,但是將協程代碼轉換為阻塞代碼是很尷尬的。你必須使用runBlocking. 這不會給您帶來太多好處,但它會使用協程 Dispatcher 執行緒池,如果您正在使用協程,那么您可能已經在使用該執行緒池。這里,coroutineScope是當前類中適合的任何范圍。
fun readStructuredLogs(): Sequence<Payload.JsonPayload> {
return sequence {
var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
}
while (true) {
val page = runBlocking { job.await() } ?: break
job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
yieldAll(page.values.asIterable())
}
}
}
如果您已經在使用協程,您可以考慮使用 Flow 而不是 Sequence 以便在您等待下一個專案時它會掛起而不是阻塞。使用 Flow 運算子可能有更簡單的方法來執行此操作,但我只是對上面的序列代碼進行了快速修改:
fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
return flow {
var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
}
while (true) {
val page = job.await() ?: break
job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
emitAll(page.values.asFlow())
}
}
}
Edit: Possible way to do this with buffer:
fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
return flow {
var page = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
while (true) {
emit(page)
page = readNextPageOrNull(page) ?: break
}
}
.flowOn(Dispatchers.IO)
.buffer(1)
.flatMapConcat { it.values.asFlow() }
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/354401.html
上一篇:在兩個事件發生后采取行動-JS
