目錄
- 1 MySql資料到Elasticsearch
- 1.1 下載logstash
- 1.2 解壓logstash
- 1.3 在logstash 目錄創建 mysql 檔案夾
- 1.4 將 mysql 驅動檔案和資料庫查詢檔案 放進mysql中
- 1.5 在config 目錄下創建 mysqltoes.conf 檔案
- 1.6 mysqltoes.conf 配置
- 1.7 啟動 logstash
- 2 配置語法講解
- 3 啟動方式
- 4 filebeat基本講解
- 5 實戰運用
- 5.1 業務到redis到es之間遷移
- 5.2 業務到redis到mongo
1 MySql資料到Elasticsearch
1.1 下載logstash

官網
https://www.elastic.co/cn/logstash/


wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz
1.2 解壓logstash
tar -zxvf logstash-6.6.0.tar.gz
1.3 在logstash 目錄創建 mysql 檔案夾
[root@bigdata01 logstash-6.6.0]# mkdir mysql
1.4 將 mysql 驅動檔案和資料庫查詢檔案 放進mysql中

1.5 在config 目錄下創建 mysqltoes.conf 檔案

1.6 mysqltoes.conf 配置
input {
# 多張表的同步只需要設定多個jdbc的模塊就行了
jdbc {
# mysql 資料庫鏈接,shop為資料庫名
jdbc_connection_string => "jdbc:mysql://ip:3306/mall?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
# 用戶名和密碼
jdbc_user => "root"
jdbc_password => "root"
# 驅動
jdbc_driver_library => "/usr/local/logstash-6.6.0/mysql/mysql-connector-java-8.0.16.jar"
# 驅動類名
jdbc_driver_class => "com.mysql.jdbc.Driver"
#是否分頁
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#直接執行sql陳述句
# statement =>"select * from t_item"
# 執行的sql 檔案路徑+名稱
statement_filepath => "/usr/local/logstash-6.6.0/mysql/item.sql"
# 默認列名轉換為小寫
lowercase_column_names => "false"
#設定監聽間隔 各欄位含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鐘都更新
schedule => "* * * * *"
# 索引型別
#type => "jdbc"
}
}
output {
elasticsearch {
#es的ip和埠
hosts => ["http://ip:9200"]
#ES索引名稱(自己定義的)
index => "mall"
#檔案型別
document_type => "mall_item"
#設定資料的id為資料庫中的欄位
document_id => "%{iteId}"
}
stdout {
codec => json_lines
}
}
1.7 啟動 logstash
前臺啟動:
[root@bigdata01 bin]# ./logstash -f ../config/mysqltoes.conf
后臺啟動:
[root@bigdata01 bin]# nohup ./logstash -f ../config/mysqltoes.conf >logstash.log &
2 配置語法講解
logstash使用{ }來定義配置區域,區域內又可以包含其插件的區域配置,
# 最基本的組態檔定義,必須包含input 和 output,
input{
stdin{ }
}
output{
stdout{
codec=>rubydebug
}
}
# 如果需要對資料進操作,則需要加上filter段
input{
stdin{ }
}
filter{
# 里面可以包含各種資料處理的插件,如文本格式處理 grok、鍵值定義 kv、欄位添加、
# geoip 獲取地理位置資訊等等...
}
output{
stdout{
codec=>rubydebug
}
}
# 可以定義多個輸入源與多個輸出位置
input{
stdin{ }
file{
path => ["/var/log/message"]
type => "system"
start_position => "beginning"
}
}
output{
stdout{
codec=>rubydebug
}
file {
path => "/var/datalog/mysystem.log.gz"
gzip => true
}
}
3 啟動方式
# 通過手動指定組態檔啟動
/bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf
# 以daemon方式運行,則在指令后面加一個 & 符號
/bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf &
# 如果是通過rpm包安裝的logstash則可以使用自帶的腳本啟動
/etc/init.d/logstash start
# 通過這種方式啟動,logstash會自動加載 /etc/logstash/conf.d/ 下的組態檔
4 filebeat基本講解
filebeat是基于原先 logstash-forwarder 的原始碼開發而來,無需JAVA環境,運行起來更輕便,無疑是業務服務器端的日志收集工具,
配 置
# 組態檔路徑 "/etc/filebeat/filebeat.yml"
# 一個組態檔可以包含多個prospectors,一個prospectors可以包含多個path,
filebeat:
# List of prospectors to fetch data.
prospectors:
# Each - is a prospector. Below are the prospector specific configurations
-
paths:
- /var/log/messages
input_type: log
document_type: messages
-
paths:
- /alidata/log/nginx/access/access.log.json
input_type: log
document_type: nginxacclog
-
paths:
- /alidata/www/storage/logs/laravel.log
input_type: log
document_type: larlog
-
paths:
- /alidata/www/500_error/500_error.log
input_type: log
document_type: error500
-
paths:
- /alidata/www/deposit/deposit.log
input_type: log
document_type: deposit
-
paths:
- /alidata/www/call_error.log
input_type: log
document_type: call_error
-
paths:
- /alidata/www/weixin_deposit.log
input_type: log
document_type: weixin_deposit
-
paths:
- /alidata/log/php/php-fpm.log.slow
input_type: log
document_type: phpslowlog
# 多行處理
multiline:
pattern: '^[[:space:]]'
negate: true
match: after
# Additional prospector
registry_file: /var/lib/filebeat/registry
############################# Libbeat Config ##################################
# Base config file used by all other beats for using libbeat features
############################# Output ##########################################
# 輸出資料到 redis
output:
redis:
host: "10.122.52.129"
port: 6379
password: "123456"
# 輸出資料到 logstash ,一般兩者選用其一
logstash:
hosts: ["10.160.8.221:5044"]
############################# Shipper #########################################
shipper:
# 打上服務器tag
name: "host_2"
############################# Logging #########################################
logging:
files:
rotateeverybytes: 10485760 # = 10MB
filebeat主要配置就是這個組態檔了,設定好之后啟動服務就會自動從源拉取資料發送到指定位置,當資料為普通行資料時,filebeat會自動為其添加欄位資訊,其中一項欄位 @timestamp 為filebeat讀取到這條資料的時間,默認格式為UTC時間,比中國大陸時間早8小時,
如果資料為json格式,而資料中已包含@timestamp欄位,filebeat處理時會把@timestamp欄位值替換為filebeat讀取到該行資料的當前UTC時間,
5 實戰運用
5.1 業務到redis到es之間遷移

nginx 日志格式配置
log_format json '{"@timestamp":"$time_iso8601",'
'"slbip":"$remote_addr",'
'"clientip":"$http_x_forwarded_for",'
'"serverip":"$server_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"domain":"$host",'
'"method":"$request_method",'
'"requesturi":"$request_uri",'
'"url":"$uri",'
'"appversion":"$HTTP_APP_VERSION",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}';
filebeat 配置
filebeat:
# List of prospectors to fetch data.
prospectors:
# Each - is a prospector. Below are the prospector specific configurations
-
paths:
- /alidata/log/nginx/access/access.log.json
input_type: log
document_type: nginxacclog
############################# Output ##########################################
output:
logstash:
hosts: ["10.160.8.221:5044"]
# 其他部分配置省略,
logstash 配置 (此處logstash用于接收filebeat的資料,然后轉存redis)
input {
beats {
port => 5044
codec => "json"
}
}
filter {
if [type] == "nginxacclog" {
geoip {
source => "clientip"
target => "geoip"
database => "/u01/elk/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]","%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]","%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]","float" ]
}
}
}
output{
if [type] == "nginxacclog" {
redis {
data_type => "list"
key => "nginxacclog"
host => "127.0.0.1"
port => "26379"
password => "123456"
db => "0"
}
}
if [type] == "messages" {
redis {
data_type => "list"
key => "messages"
host => "127.0.0.1"
port => "26379"
password => "123456"
db => "0"
}
}
}
logstash 配置 (此處logstash用于讀取redis list中的資料,然后轉存elasticsearch)
input{
redis {
host => "10.10.1.2"
port => "26379"
db => "0"
key => "nginxacclog"
threads => 300
password => "123456"
data_type => "list"
codec => "json"
}
redis {
host => "10.10.1.2"
port => "26379"
db => "0"
key => "messages"
password => "123456"
threads => 50
data_type => "list"
codec => "json"
}
}
output {
if [type] == "nginxacclog" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-nginxacclog-%{+YYYY.MM.dd}"
manage_template => true
flush_size => 50000
idle_flush_time => 10
workers => 2
}
}
if [type] == "messages" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-messages-%{+YYYY.MM.dd}"
manage_template => true
flush_size => 50000
idle_flush_time => 30
workers => 1
}
}
}
關鍵指令解釋:
threads 開啟多少個執行緒讀取redis資料,也就是從redis輸入到logstash的速度,執行緒越多讀取速度越快,但是根據接收節點的接收速度來設定,如果輸入過快,接收速度不夠,則會出現丟資料的情況,設定一個最佳的threads值需要和接收節點做反復測驗才能得出,
flush_size 控制logstash向Elasticsearch批量發送資料,上面的配置表示,logstash會努力贊到50000條資料一次發送給Elasticsearch,
idle_flush_time 控制logstash多長時間向Elasticsearch發送一次資料,默認為1秒,根據以上配置,logstash積攢資料未到flush_size 10秒后也會向Elasticsearch發送一次資料,
workers 建議設定為1或2,如果機器性能不錯可以設定為2. 不建議設定的更高,
5.2 業務到redis到mongo

filebeat 配置(從日志檔案讀取到的資料直接快取至redis佇列)
filebeat:
# List of prospectors to fetch data.
prospectors:
# Each - is a prospector. Below are the prospector specific configurations
-
paths:
- /alidata/log/nginx/access/access.log.json
input_type: log
document_type: nginxacclog
############################# Output ##########################################
output:
redis:
host: "10.160.8.221"
port: 26379
password: "123456"
document_type 自定義日志型別,在logstash中可通過type判斷做不同的處理,
logstash 配置 (此處logstash用于讀取redis list中的資料,然后轉存mongodb)
input {
redis {
host => "10.160.8.221"
port => "26379"
key => "filebeat"
data_type => "list"
password => "123456"
threads => 50
}
redis {
host => "10.160.8.221"
port => "26379"
key => "mycat"
data_type => "list"
password => "123456"
threads => 50
type => "mycat"
}
}
output {
if [type] == "mycat" {
mongodb{
collection => "mycat%{+yyyyMMdd}"
isodate => true
database => "logdb"
uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb"
}
}
if [type_xi09wnk] == "nginxacclog" {
mongodb{
collection => "nginx_accress%{years_dik3k}%{months_dik3k}%{days_dik3k}"
isodate => true
database => "logdb"
uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb"
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/354568.html
標籤:其他
上一篇:Kafka作業原理
