1. 概述
logspout收集資料以后,就會把資料發送給logstash進行處理,本文主要講解logstash的input, filter, output處理
2. input
資料的輸入處理
支持tcp,udp等協議
晚上找資料建議在使用 LogStash::Inputs::Syslog 的時候走 TCP 協議來傳輸資料,
因為具體實作中,UDP 監聽器只用了一個執行緒,而 TCP 監聽器會在接收每個連接的時候都啟動新的執行緒來處理后續步驟,
如果你已經在使用 UDP 監聽器收集日志,用下行命令檢查你的 UDP 接收佇列大小:# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'
Recv-Q
228096
228096 是 UDP 接收佇列的默認最大大小,這時候 linux 內核開始丟棄資料包了!
2.1. 語法
基本語法如下:
input{ tcp { mode => "server" port => 5000 codec => json_lines tags => ["data-http"] }}
2.2. multiline
有時候日志是這樣多行顯示的:
[2019-10-12 15:24:50 ACCOUNT 97364 4658800064 INFO] http_ip=127.0.0.1 http_uri=/account/v1/binding http_method=POST http_time=182ms http_status=401 http_headers=Content-Type:application/x-www-form-urlencodedContent-Length:27Accept-Encoding:identityHost:localhost:8800User-Agent:Python-urllib/3.6Key:424518e4d27b11e8ada274e5f95979aeVersion:1.1.0Time:1570865090.412524Token:y66AHLNmRoscIIsoWnKzxosojSg=User-Id:0Connection:close http_kwargs={'sns_type': 'wechat', 'code': 'CG9DEj', 'user_id': 0, 'language': 1} http_response={"code":"usr_sns_code_error","message":"\u7b2c\u4e09\u65b9sns\u5e10\u53f7code\u65e0\u6548"}
默認情況下logstash會把一行日志轉換成elasticsearch的一個doc,上面這個日志就會存盤成15條日志,這樣就不能滿足我們的需求,我們只是想要一條日志
我們可以這么配置input:
input{ tcp { port => 5001 type => syslog tags => ["syslog"] codec=>multiline{ pattern => "\[%{TIMESTAMP_ISO8601:timestamp}" negate => true what => "previous" } }}紅色代碼的作用是:匹配到以[2019-10-08 16:57:42開頭的一行日志作為previous,不是以這個格式開頭的將作為子行出現,然后把多行記錄合并成一行記錄
3. filter
資料的過濾轉化處理
3.1. 語法
基本語法如下:
filter {
grok {
match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
overwrite => [ "message" ]
}
}
3.2. grok范式匹配
grok適合用來決議syslog,apache,mysql等日志
假如你的日志格式是這樣的
[2019-10-12 15:44:52 ACCOUNT 1 140058162291304 WARNING] HashCache::_rds_get, cache not existed!!! id_ls:[]
日志的格式是這樣的:
"[%(asctime)s %(service)s %(process)d %(thread)d %(levelname)s] %(message)s"
那么針對這樣有特定格式的日志,我們要怎樣提取這里面的欄位呢?
可以這么配置你的filter:
filter{ if [type] == "syslog" { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp} %{DATA:service} %{DATA:pid} %{DATA:tid} %{LOGLEVEL:log-level}\] %{GREEDYDATA:msg_body}" } } }}使用grok的match正則運算式匹配可以方便的從message中提取欄位
從elasticsearch可以發現增加了timestamp、server、pid、tid和log-level等欄位,

附上官網檔案:
# grok除錯器
https://grokdebug.herokuapp.com =>debugger
# 官方檔案
https://www.elastic.co/guide/en/kibana/7.3/xpack-grokdebugger.html
3.3. gsub字串替換
經過logspout處理以后,會增加一些metadata(container name, container id, etc)
紅色部分是logspout添加的:
<14>1 2019-10-08T18:00:15Z zfswalk0 mage-device-11283 16901 - - [2019-10-09 09:49:08 WARNING SACCOUNT C P1 T140004171454120 P1 P2 P3] start listen on HTTP:0.0.0.0:17698, start listen on HTTP:0.0.0.0:17698
如何去除這部分多余的資料呢?
logstash需要使用gsub進行字串替換:
filter{ if [type] == "syslog" { mutate { gsub => [ "message", "<\d+>.*?- -", "" ] } }}
這個正則運算式的意義是選擇從“<14>”開始到“- -”結束的子字串,然后替換成空字串,實作metadata的洗掉
3.4. remove_filed洗掉欄位
ELK是采用json字典的方式來存盤資料的
如果你有哪些欄位是不需要的,可以通過remove_field來洗掉
假如你不想要grop決議出來的msg_body欄位和test欄位,可以這么操作,那么最后存盤到elasticsearch那邊將不會出現這2欄位
filter{ if [type] == "syslog" { mutate { remove_field => [ "msg_body", "test" ] } }}
3.5. kv過濾器決議kv資料
官方檔案kv filter:https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html
動態的決議kv可以很方便的支持日志擴展,不需要后期去修改
它會把這個字串:ip=1.2.3.4 error=REFUSED決議成kv字典形式:{"ip":"1.2.3.4", "error": "REFUSED"}
filter{ if [type] == "syslog" { kv { source => "msg_body" field_split => "\t\t" } }}這邊的配置意思是:從msg_body這個欄位去決議kv欄位,欄位的分隔符是"\t\t"
當然這也要求日志寫入的時候需要采用"\t\t"來區分多個欄位,類似這樣:
[2019-10-12 15:24:50 ACCOUNT 97364 4658800064 INFO] http_ip=192.168.1.136 http_uri=/account/v1/binding http_method=POSThttp_ip=127.0.0.1、http_uri=/account/v1/binding與http_method=POST這三個欄位是采用'\t\t'分割的
這樣kv filter就會決議成功,并往doc里面設定http_ip, http_uri,http_method這三個值:

4. output
過濾轉化后的資料的輸出處理
這里是把資料存盤到elasticsearch的9200埠,index是"syslog-%{+YYYY.MM.dd}"
output{ if "syslog" in [tags]{ elasticsearch{ hosts=>["elasticsearch:9200"] index => "syslog-%{+YYYY.MM.dd}" } stdout{codec => rubydebug} }}然后elasticsearch就能得到資料了
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/46441.html
標籤:架構設計
上一篇:一個極簡的分布式檔案系統
