我正在嘗試構建一個具有低延遲的 kafka spark 結構化流有狀態應用程式。通過說低延遲,我的意思是每個作業幾百毫秒。
spark 應用程式從一個磁區號為 executor core 2 倍的 kafka 主題中讀取資料,然后將其處理并輸出到另一個 kafka 主題。資料生成到該主題的速率為 100 條記錄/秒,記錄大小約為 2 kb。作業的DAG表明包括從 kafka 源讀取的階段需要 0.5s 。這個階段基本上將 kafka 中的資料轉換為自定義案例類的資料集,然后是第二階段的 groupByKey 和 flatMapGroupsWithState 函式。Web UI 中的 shuffle 寫入時間為 0 ms(應該很小,因為 shuffle 的資料大小在 10~20kb 左右)。所以 AFAIK 唯一耗時的操作應該是從 kafka 讀取。
我讀過有關 kafka 的性能比這要好得多的資訊。端到端延遲可以小于 100 毫秒。
kafka 代理的負載并不重。我不知道它是否與問題有關,但整個應用程式運行在 kubernetes 集群上。如果可能有幫助,請附上這個階段的圖片和整個查詢的圖片。
抱歉,我無法發布代碼。有什么我可以嘗試做的嗎?
此致
uj5u.com熱心網友回復:
今天我發現第一階段的一些任務需要0.5s,而有些則不需要,這對我來說很可疑。我更深入地研究了 kafka 設定。有一個名為 fetch.max.wait.ms 的配置,默認情況下會阻止消費者任務停止等待新訊息 500 毫秒。減少此配置后,一切正常。更多資訊:fetch.max.wait.ms
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/509909.html
下一篇:如何為repartitionByCassandraReplica.JoinWIthCassandraTable()與DirectJoin=AlwaysOn計算輸入大小?
