我將 Elasticsearch Sink 連接器用于從 kafka 到 elasticsearch 的流資料,我有下一個問題。
我在 kafka 主題中有下一個結構document
Partition : 0
Offset: 0
Key:
Value:
{
"attributes": {
"3": "Mike"
}
}
Timestamp: 2022-11-03 19:03:34.866
對于這個資料,我的彈性搜索中有下一個索引模板
{
"version": 1,
"index_patterns": [
"documents-*"
],
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"cashier": {
"type": "text"
}
}
}
}
我有下一個配置 Elastcisearch Sink Connector
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "document, document-processing-error",
"key.ignore": "true",
"schema.ignore": "true",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"name": "elasticsearch-sink",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"flush.synchronously": "true",
"transforms": "appendTimestampToIX",
"transforms.appendTimestampToIX.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.appendTimestampToIX.topic.format": "${topic}-${timestamp}",
"transforms.appendTimestampToIX.timestamp.format": "yyyy-MM-dd"
}
}
在輸出中,我的索引中有下一個資料document-2022-11-03
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "document-2022-11-03",
"_type": "_doc",
"_id": "document-2022-11-03 0 0",
"_score": 1.0,
"_source": {
"attributes": {
"3": "Mike"
}
}
}
]
}
}
這很好用,但是我需要對我的資料進行額外的轉換,例如,如果在屬性中我有 key 3,我需要替換這個欄位并添加 keycashier并將這個結構改變為帶有隨機 id 檔案的平面 JSON,所以,最終輸出我需要下一個結構(例如)
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "document-2022-11-03",
"_type": "_doc",
"_id": "134DaBfWAE6AZUyKUAbjRksjXHTmP6hDxedGm4YhBnZW",
"_score": 1.0,
"_source": {
"cashier": "Mike"
}
}
]
}
}
我厭倦了將下一個配置用于替換欄位,但這對我不起作用
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "arrtubites.3:cashier"
我怎樣才能做到這一點?
uj5u.com熱心網友回復:
ReplaceFieldtransform不適用于 Maps 或 Objects 等嵌套屬性,僅適用于兩者的頂級欄位。
如果你想轉換
{
"attributes": {
"3": "Mike"
}
}
進入
{
"cashier": "Mike"
}
然后,Kafka Streams 或 ksqlDB 是常見的建議(也就是在其他地方消費,并使用您想要執行的邏輯生成一個新主題)。
Logstash 也可能是一個選項,而不是 Kafka Connect。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/530382.html
標籤:弹性搜索阿帕奇卡夫卡apache-kafka-连接汇合平台
上一篇:需要幫助了解GET/_index_template/<templatename>和GET_cat/templates?v輸出之間的區別
