我有一個 Confluent 接收器連接器,它從 Kafka 主題中獲取資料。然后將其攝取到 S3 存盤桶中。
攝取作業正常,一切都很好,但是現在我需要在將 Avro 資料放入存盤桶之前對其進行壓縮。
我嘗試了以下配置
{
"name":"--private-v1-s3-sink",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"s3.region":"eu-west-1",
"partition.duration.ms":"3600000",
"rotate.schedule.interval.ms": "3600000",
"topics.dir":"svs",
"flush.size":"2500",
"schema.compatibility":"FULL",
"file.delim":"_",
"topics":"--connect.s3.format.avro.AvroFormat",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"--systems",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "${S3_BUCKET}",
"s3.acl.canned":"bucket-owner-full-control",
"avro.codec": "snappy",
"locale":"en-GB",
"timezone": "GMT",
"errors.tolerance": "all",
"path.format":"'ingest_date'=yyyy-MM-dd",
"timestamp.extractor":"Record"
'avro.code',我認為會壓縮資料,但事實并非如此。取而代之的是,我還嘗試了 '"s3.compression.type": "snappy" ',但仍然沒有運氣!但是這確實適用于 JSON 和 GZIP。
不太確定出了什么問題?
uj5u.com熱心網友回復:
這些設定僅適用于 S3 Avro 撰寫器,不適用于來自生產者的動態資料,這些資料必須在生產者或代理/主題級別而不是連接設定進行“壓縮”。
參考compression.type主題配置
uj5u.com熱心網友回復:
對于那些將來可能會遇到這種情況的人。
我在此設定之間進行了測驗,使用 BZIP2 而不是 snappy,并且沒有啟用壓縮。
這是結果:
No compression 58.2MB / 406 total objects
BZIP 19.9MB / 406 total objects
Snappy 31.1MB / 406 total objects
跑了 24 小時,都從同一個主題中提取出來并放入他們自己的桶中。
正如您所看到的,上面使用 snappy 的配置實際上是有效的。
BZIP 提供更高的壓縮率并且看起來更快。
最終我們不得不使用 snappy,盡管此時 Redshift 攝取只允許使用 snappy 壓縮 Avro,無論如何。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/386949.html
標籤:亚马逊-s3 阿帕奇卡夫卡 阿帕奇卡夫卡连接 汇流平台
上一篇:如何通過使用位置url從Nodemongo中洗掉s3物件?
下一篇:如何從s3存盤桶下載指定的檔案
