我正在使用管理員客戶端API查詢kafka代理,以獲得CONSUMER_GROUP的承諾的偏移量,使用的代碼如下:
Map<TopicPartition, OffsetAndMetadata> offsets =
admin.listConsumerGroupOffsets(CONSUMER_GROUP)
.partitionsToOffsetAndMetadata().get()。
上述代碼將觸發對特殊創建的__consumer_offsets主題的查詢,以獲得CONSUMER_GROUP負責的主題(s)-磁區中的每個磁區的承諾偏移量。
另一方面,我使用下面的代碼來檢索CONSUMER_GROUP的每個主題磁區的最新(結束)偏移量
。for(TopicPartition tp: offsets.keySet() ) {
requestLatestOffsets.put(tp, OffsetSpec.latest())。
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get()。
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet() {
long latestOffset = latestOffsets.get(e.getKey() ).offset()。
我的問題是,已提交的和最新的偏移量因此是由兩個不同的主題來查詢/請求的。提交的偏移量是從__consumer_offsets主題中請求的,而最新(結束)的偏移量是從CONSUMER_GROUP的實際主題中請求的。
(1) 上述關于請求已承諾和最新偏移量的描述是否準確?
(2)是否可以直接查詢__consumer_offsets主題?
謝謝。
謝謝你。
uj5u.com熱心網友回復:
是的,你的理解是正確的。已承諾的偏移量存盤在
__consumer_offsets主題中,而你需要查詢特定的磁區以獲得它們的終端偏移量。是的,
__consumer_offsets是一個常規主題,如果你想的話,你可以直接消費它。通常情況下,通過提供的API檢索資料要容易得多,但如果你對其內容感興趣,你可以消費它。如果您想了解如何反序列化資料,請查看控制臺格式化程式。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/331560.html
標籤:
