我有一個部署在 Kinesis Data Analytics 上的 Apache Flink 應用程式。
此應用程式從 Kafka 讀取并寫入 S3。它寫入的 S3 存盤桶結構是使用 BucketAssigner 計算的。這里是 BucketAssigner 的精簡版本
我遇到的問題是,假設我們必須寫入這個目錄結構:s3://myBucket/folder1/folder2/folder3/myFile.json
在發出PUT請求之前,它會發出以下HEAD請求:
HEAD /folder1HEAD /folder1/folder2HEAD /folder1/folder2/folder3/
然后它發出PUT請求。
它對每個請求都執行此操作,這導致 S3 速率限制以及我的 FLink 應用程式中的背壓。
我發現有人對 BucketingSink 有類似的問題:https ://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz
那里提到的解決方案是切換到我正在做的 StreamingFileSink。
有關如何在 StreamingFileSink 中解決此問題的任何想法?
我的 SinkConfig 如下:
StreamingFileSink
.forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(60000)
.build())
.build()
JsonEncoder 獲取物件并將其轉換為 json 并寫出這樣的位元組
我已經描述了有關整個管道如何在這個問題中作業的更多細節,如果這有幫助的話:Heavy back pressure and huge checkpoint size
uj5u.com熱心網友回復:
Hadoop S3 檔案系統嘗試在 S3 之上模仿檔案系統。這意味著:
- 在寫入鍵之前,它會通過檢查前綴到最后一個“/”的鍵來檢查“父目錄”是否存在
- 它創建空標記檔案來標記這樣一個父目錄的存在
- 所有這些“存在”請求都是 S3 HEAD 請求,它們既昂貴又開始違反一致的 read-after-create 可見性
因此,Hadoop S3 檔案系統具有非常高的“創建檔案”延遲,并且很快達到請求速率限制(HEAD 請求在 S3 上的請求速率限制非常低)。因此,最好找到寫入更少不同檔案的方法。
您還可以探索使用熵注入。熵注入發生在檔案系統級別,因此它應該與 FileSink 一起使用。除了我不確定它將如何與接收器進行的磁區/存盤進行互動,因此您可能會或可能不會發現它在實踐中有用。如果您嘗試過,請反饋!
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/449257.html
標籤:亚马逊-s3 Hadoop apache-flink 亚马逊运动分析
