我嘗試通過部署ElasticsearchSinkConnector
POST /連接器
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS"
}
}
但是,當我通過以下方式檢查狀態時,它已成功部署
獲取 /connectors/elasticsearch-sink/status
tasks是空陣列[]:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [],
"type": "sink"
}
我發現了這個Kafka Connect:沒有為連接器創建任務
但是,我在里面嘗試了這兩個答案,都更改名稱,洗掉 ElasticsearchSinkConnector 然后重新部署多次對我不起作用。
此外,Kafka Connect pod 中沒有日志。
任何想法?謝謝!
uj5u.com熱心網友回復:
將這兩個添加到 ElasticsearchSinkConnector 配置后
"errors.log.include.messages": "true",
"errors.log.enable": "true"
在配置中如此喜歡
POST /連接器
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
而這一次,當我通過
獲取 /connectors/elasticsearch-sink/status
tasks現在顯示錯誤:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.xxx.xxx.xxx:8083",
"trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
}
],
"type": "sink"
}
現在只需要修復權限問題。
另外,我發現了一篇關于如何除錯的好文章:https ://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/446597.html
標籤:弹性搜索 阿帕奇卡夫卡 apache-kafka-连接
上一篇:彈性:輸入不匹配'-'
