有人可以幫助我使用logstash。我想用elastic同步PostgreSQL,問題是es中的當前設定重復資料。我知道創建一個欄位“id”可以幫助但仍然logstash重新創建整個索引。
查詢陳述句
SELECT id, title, description, type,"updatedAt" FROM public.videos WHERE ("updatedAt" > :sql_last_value AND "updatedAt" < NOW())
jdbc {
jdbc_driver_library => "/usr/share/logstash/postgresql-jdbc.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://{POSTGRES_URL}"
jdbc_user => "${POSTGRES_USERNAME}"
jdbc_password => "${POSTGRES_PASSWORD}"
jdbc_paging_enabled => true
tracking_column => "updatedAt"
tracking_column_type => "timestamp"
use_column_value => true
schedule => "*/5 * * * * *"
statement_filepath => "/usr/share/logstash/sql/video.sql"
tags => "video"
}
output {
if "video" in [tags] {
elasticsearch {
hosts => ["${ES_HOSTS}"]
index => "video"
# document_id => "%{[@metadata][_id]}"
user => "${ES_USER}"
password => "${ES_PASSWORD}"
cacert => '/etc/logstash/certificates/ca.crt'
doc_as_upsert => true
}
}
}
mysql 官方檔案中的設定
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => <my username>
jdbc_password => <my password>
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
uj5u.com熱心網友回復:
關于您的輸入部分,您需要確保您tracking_column實際上是一個可以跟蹤的真實欄位:
tracking_column => "updatedAt"
您的輸出部分應如下所示。正確設定document_id和[@metadata][_id]沒有隨機值很重要,因此重復。
elasticsearch {
hosts => ["${ES_HOSTS}"]
index => "video"
document_id => "%{id}" <---- change this line
user => "${ES_USER}"
password => "${ES_PASSWORD}"
cacert => '/etc/logstash/certificates/ca.crt'
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/375325.html
