我回圈三次offsets
:
- 獲取
TopicPartition
&offset
- 獲取
TopicPartition
&OffsetAndMetadata
- 獲取生產者和消費者之間的增量
我想知道我是否可以從中獲得offset
價值,OffsetAndMetadata
但我不確定如何。我在網上找不到示例來獲得這個值。任何幫助表示贊賞,謝謝!
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// "2" : 39376,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
val consumedPartitions = new ListBuffer[TopicPartition]()
val topicOffsetList = new ListBuffer[Int]()
val mapTopicPartitionOffset = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val offset = offsets(partition).toLong
(tp -> offset)
})
.toMap
val mapTopicPartition = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(offsets(partition).toLong)
(tp -> oam)
})
.toMap
for(topicPartition <- mapTopicPartitionOffset){
consumedPartitions = topicPartition
}
try {
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,offset) <- mapTopicPartitionOffset){
val bbCurrentOffset = offset
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)
// Partition offset delta
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList = delta.abs
}
} catch {
case e: Exception => {
log.error(s"${consumerGroupId} Could not get Kafka offset", e)
}
}
try {
kafkaConsumer.commitSync(mapTopicPartition.asJava)
} catch {
case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
log.info("consumerGroupId: " consumerGroupId " topic: " topic " lagDeltaSum: " topicOffsetList.sum)
})
})
}
uj5u.com熱心網友回復:
如果沒有主題和磁區,您將無法獲得偏移值。您已經在為每個主題進行迭代,并且您的 JSON 顯示您在磁區映射中具有鍵,所以這個問題只是“如何從映射中獲取值?”
更重要的是,您是否真的需要OffsetAndMetadata
從其他兩個地方具有相同值時獲得偏移量?
更具體地說,您不需要同時使用mapTopicPartitionOffset
和mapTopicPartition
。每個都包含完全相同的資訊,只有一個物件作為具有 null 元資料屬性的值。
您還有offsets
map,其值是您正在迭代的當前主題的所有偏移量,鍵是它的(消耗的)磁區。
每個都包含您想要的偏移值
你只需要mapTopicPartition
使用 with commitSync
,所以試試這個
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
// for getting end offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
// convert to values accepted by Kafka Consumer API
val toCommit = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions = tp
val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
(tp -> oam)
})
.toMap
// could also use toCommit.keys instead of separate list
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,oam) <- toCommit) {
val bbCurrentOffset = oam.offset()
// or offsets.get(String.valueOf(topicPartition.partition))
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/480334.html
標籤:斯卡拉
上一篇:如何對從Play中的特征繼承的案例類進行Json編碼?
下一篇:返回列表