視頻地址
B站視頻地址:Logstash如何成為鎮得住場面的資料管道
公眾號視頻地址:Logstash如何成為鎮得住場面的資料管道
知乎視頻地址:Logstash如何成為鎮得住場面的資料管道
內容
首先我們延續上一期視頻中日志采集架構的案例,Filebeat采集日志并推送Kafka訊息佇列進行分發,再由Logstash消費日志訊息,并將日志資料最終落地在Elasticsearch集群索引當中,Kafka作為訊息佇列分發服務需要將收集到的日志訊息繼續分發下去,最終資料落地在Elasticsearch集群索引當中,
那么連接整個程序的主角Logstash是如何作業的,就是我們今天講解的重點,
Logstash作業程序分為三個部分:Input輸入、Filter過濾、Output輸出,它們共同協作形成了完整的Logstash資料管道傳輸機制
我們先從一個最簡單的例子演示開始,看看Logstash是怎么輸入和輸出的,這一次先跳過filter過濾環節,
下面查看已經預置好的一個組態檔01-kafka-elastic-nginx.conf
首先是input輸入配置點,從Kafka訂閱訊息,Kafka集群地址與filebeat中都指向了一個地址,其他配置我們先略過,后續Kafka專題再說
下來看到要訂閱的Topic主題TestT3,我們先不用json格式解碼訊息,默認就是純文本的方式
一樣的,這一步先略過過濾環節,直接看看output輸出配置點,目標是給Elasticsearch輸出資料,并指定了elasticsearch集群的三個節點
輸出環節創建需要寫入的elasticsearch日志索引,我們先按照默認的filebeat采集時間,進行日期格式化,按照每個小時建立一個索引,這塊會有時間問題,一會兒再說,
讓資料輸出到終端,方便我們除錯結果,
通過演示中最簡單的配置方式,這時候的Logstash已經成為連接Kafka和Elastisearch之間的資料管道了!
好,接下來我們將所有系統運行起來,并生成一條nginx請求日志,看看管道各個階段的資料變化,
首先nginx日志資料被filebeat采集,是一條典型的無結構的文本日志資料,大家注意紅色標注的時間是2021年2月21日13時
接著這條日志資料通過Kafka進入到了Logstash管道的輸入階段,
Logstash為這條日志生成了更為非常龐大的Json資料,里面包括了所有被采集主機的資訊,以及nginx日志,實際上這些原始資訊并沒有被良好的進行資料清洗與結構化
最后資料被寫入到Elastisearch一個按小時劃分的索引當中,對應時間為2021年2月21日5時
我們發現Logstash對原始資料在沒有任何處理的情況下,會很不方便將來資料的使用;
這次我們利用Logstash json解碼器讓管道重新再來一次,
接下來我們進入Logstash中對應的組態檔,并找到input輸入點的codec配置,刪掉注釋,打開Logstash對輸入資料的json解碼方式·,
我們看看再次進入管道中的日志資料,Logstash首先對原始日志資料進行Json決議
這時候我們再看Json決議后的資料,是不是就清晰多了,filebeat采集到的本地機器資料、以及紅色框中Nginx HTTP日志資料、以及其他標簽資料都進行了欄位分離
做到這一步其實還是不夠好,為什么呢?一方面因為我們依然希望將Nginx HTTP的日志資料也進行結構化處理,
另一個方面,Filebeat傳遞給Logstash的系統時間是慢了8個小時的UTC時間標準,反而Nginx日志中的時間是我們本地的北京時間標準,因此我們希望用Nginx日志時間作為創建Elasticsearch日志索引的唯一依據
這時候我們就要使用Logstash的過濾機制了,我們繼續進入Logstash對應的配置中,刪掉過濾配置中的注釋,讓Logstash過濾最常用插件grok、date、ruby、mutate起作用
grok插件是專業處理非結構化資料的能手,通過自定義的Nginx日志正則運算式,就能實作Nginx日志的結構化決議
date插件用于處理時間問題,我們通過date插件將nginx日志中的時間轉換成Logstash時間物件,并賦給一個新的臨時時間欄位indextime
ruby就是在過濾程序中可以插入ruby腳本語言來進行程式級處理,我們通過ruby語言對indextime時間格式化,生成一個精確到小時的字串欄位index.date,用于elasticsearch索引名稱
mutate是最常用的可以對管道中資料欄位進行操作的插件了,我們的目的是洗掉臨時時間欄位indextime
最后我們還需要將output輸出中的索引生成方式修改一下,注釋掉原來用filebeat生默認時間生成的索引,改成nginx日志時間生成的索引,
我們重新運行Logstash,資料經過了Input解碼、日志grok結構化處理、本地時間物件創建,并進行日期格式化,為了生成新的Elasticsearch索引欄位,并對臨時欄位進行洗掉,最終經過Output輸出階段,創建Elasticsearch索引或寫入日志資料
讓我們看看Elasticsearch最終保存的資料效果,index索引對應的時間來自過濾器創建的index.date欄位,index.date欄位又來自nginx日志中分離出的本地時間,這樣我們就不用再去修改Logstash的系統時間了
我們看到菱形標注的欄位資料就是由過濾器對nginx http日志進行結構化抽取的結果,
同樣elasticsearch依然保存著nginx日志的原始資料以備不時之需
前往讀位元組的知乎——了解更多關于大資料的知識
公眾號 "讀位元組" 大資料(技術、架構、應用)的深度,專業解讀
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/270976.html
標籤:大數據
上一篇:大資料之BI開發 - 維度創建

