主頁 > 後端開發 > KafkaScala:如何從OffsetAndMetadata類中獲取偏移值

KafkaScala:如何從OffsetAndMetadata類中獲取偏移值

2022-05-24 02:20:58 後端開發

我回圈三次offsets

  1. 獲取TopicPartition&offset
  2. 獲取TopicPartition&OffsetAndMetadata
  3. 獲取生產者和消費者之間的增量

我想知道我是否可以從中獲得offset價值,OffsetAndMetadata但我不確定如何。我在網上找不到示例來獲得這個值。任何幫助表示贊賞,謝謝!

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    event.progress.sources
      // Ignoring sqs / jms sources their offsets
      .filter(source => source.description.toLowerCase().contains("kafka"))
      // ex offset :
      //    "endOffset" : {
      //      "rcs-enriched-event" : {
      //        "8" : 31376,
      //        "11" : 39114,
      //        "2" : 39376,
      //      } ...
      .foreach(source => {

        /// Map[Topic,Map[Partition, CurrentOffset]]
        val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
        jsonOffsets.keys.filter(key => topics.contains(key))
          .foreach(topic => {
            val offsets: Map[String, Int] = jsonOffsets(topic)
            val consumedPartitions = new ListBuffer[TopicPartition]()
            val topicOffsetList = new ListBuffer[Int]()

            val mapTopicPartitionOffset = offsets
              .keys
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val offset = offsets(partition).toLong
                (tp -> offset)
              })
              .toMap

            val mapTopicPartition = offsets
              .keys
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val oam = new OffsetAndMetadata(offsets(partition).toLong)
                (tp -> oam)
              })
              .toMap


            for(topicPartition <- mapTopicPartitionOffset){
              consumedPartitions  = topicPartition
            }

            try {
              val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)

              for((topicPartition,offset) <- mapTopicPartitionOffset){

                val bbCurrentOffset =  offset

                // latest offset
                val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)

                // Partition offset delta
                val delta = partitionLatestOffset - bbCurrentOffset

                topicOffsetList  = delta.abs
              }
            } catch {
              case e: Exception => {
                log.error(s"${consumerGroupId} Could not get Kafka offset", e)
              }
            }

            try {
              kafkaConsumer.commitSync(mapTopicPartition.asJava)
            } catch {
              case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)
            }

            //log.info have the group id (unique id), the topic, cumulative consumer lag delta
            log.info("consumerGroupId: "   consumerGroupId   " topic: "   topic   " lagDeltaSum: "   topicOffsetList.sum)
          })

      })
  }

uj5u.com熱心網友回復:

如果沒有主題和磁區,您將無法獲得偏移值。您已經在為每個主題進行迭代,并且您的 JSON 顯示您在磁區映射中具有鍵,所以這個問題只是“如何從映射中獲取值?”

更重要的是,您是否真的需要OffsetAndMetadata從其他兩個地方具有相同值時獲得偏移量?

更具體地說,您不需要同時使用mapTopicPartitionOffsetmapTopicPartition每個都包含完全相同的資訊,只有一個物件作為具有 null 元資料屬性的值。

您還有offsetsmap,其值是您正在迭代的當前主題的所有偏移量,鍵是它的(消耗的)磁區。

每個都包含您想要的偏移值

你只需要mapTopicPartition使用 with commitSync,所以試試這個

.foreach(topic => {
    val offsets: Map[String, Int] = jsonOffsets(topic)
    // for getting end offsets
    val consumedPartitions = new ListBuffer[TopicPartition]()
    // convert to values accepted by Kafka Consumer API
    val toCommit = offsets
          .keys
          .map(partition => {
            val tp = new TopicPartition(topic, partition.toInt)
            consumedPartitions  = tp
            val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
            (tp -> oam)
         })
        .toMap

    // could also use toCommit.keys instead of separate list 
    val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
 
    for((topicPartition,oam) <- toCommit) {

        val bbCurrentOffset = oam.offset() 
        // or offsets.get(String.valueOf(topicPartition.partition))

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/480334.html

標籤:斯卡拉

上一篇:如何對從Play中的特征繼承的案例類進行Json編碼?

下一篇:返回列表

標籤雲
其他(140642) Python(33512) JavaScript(21255) Java(15659) C(13883) 區塊鏈(8222) AI(7469) C#(6582) 基礎類(6313) 爪哇(6248) MySQL(6172) html(5698) 熊猫(5272) sql(5226) PHP(5141) 腳本語言(PerlPython)(5129) 非技術區(4971) Linux(4823) 数组(4745) R(4566) Android(4340) 反应(4258) 数据框(3668) css(3633) 节点.js(3345) C語言(3288) C++語言(3117) json(2799) Java相關(2746) 疑難問題(2699) 列表(2697) 扑(2582) 安卓(2573) VBA(2502) 單片機工控(2479) 打字稿(2469) 细绳(2173) ASP.NET(2122) iOS(2078) MongoDB(1976) Web開發(1951) 字典(1927) 麻木的(1901) 正则表达式(1887) 循环(1878) 擅长(1816) 镖(1802) 網絡通信(1793) 迅速(1779) 蟒蛇-3.x(1774) 數據庫相關(1767) VB基礎類(1755) .NETCore(1739) Unity3D(1687) C++(1660) 功能(1649) 開發(1646) .NET技术(1635) 系統維護與使用區(1617) HtmlCss(1594)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布