日志分析系統ELK(中)之logstash
- 1、什么是logstash?
- 2、Logstash安裝
- 3、logstash簡單命令列測驗
- 4、logstash檔案測驗
- (1)命令列輸入,輸出到檔案
- (2)命令列輸入,輸出到elasticsearch
- (3)檔案輸入,輸出到elasticsearch
- 5、logstash可以偽裝成日志服務器,直接接受遠程日志
- 6、多行過濾插件
- (1)命令列多行輸入,檔案輸出
- (2)檔案多行輸入,輸出到elasticsearch
- 7、grok切片過濾插件
- (1)命令列輸入,過濾,命令列輸出
- (2)apache日志輸入,切片,es輸出
接上篇,server3、server4、server5,是Elasticsearch集群,本文介紹logstash
1、什么是logstash?
Logstash是一個開源的服務器端資料處理管道,聚合器,logstash擁有200多個插件,能夠同時從多個來源采集資料,轉換資料,過濾資料,然后將資料發送到您最喜歡的 “存盤庫” 中,(大多都是 Elasticsearch,)
Logstash管道有兩個必需的元素,輸入和輸出,以及一個可選元素過濾器,如下圖

(1)輸入:采集各種樣式、大小和來源的資料
Logstash 支持各種輸入選擇 ,同時從眾多常用來源捕捉事件,
能夠以連續的流式傳輸方式,輕松地從您的日志、指標、Web 應用、資料存盤以及各種 AWS 服務采集資料,

(2)過濾器:實時決議和轉換資料
資料從源傳輸到存盤庫的程序中,Logstash 過濾器能夠決議各個事件,識別已命名的欄位以構建結構,并將它們轉換成通用格式,以便更輕松、更快速地分析和實作商業價值,
- 利用 Grok 從非結構化資料中派生出結構
- 從 IP 地址破譯出地理坐標
- 將 PII 資料匿名化,完全排除敏感欄位
- 簡化整體處理,不受資料源、格式或架構的影響
(3)輸出:選擇您的存盤庫,匯出您的資料
盡管 Elasticsearch 是我們的首選輸出方向,能夠為我們的搜索和分析帶來無限可能,但它并非唯一選擇,Logstash 提供眾多輸出選擇,您可以將資料發送到您要指定的地方,并且能夠靈活地解鎖眾多下游用例,

2、Logstash安裝
官網https://elasticsearch.cn/download/下載Logstash軟體包,并準備java的jdk包
準備新的虛擬機serever6(172.25.77.6),分配記憶體1G
server6安裝jdk和logstash

3、logstash簡單命令列測驗
找到logstash命令的路徑執行,標準輸入到標準輸出,即命令列輸入,命令列輸出

輸入hello word,標準輸出hello word;輸入test,標準輸出test

4、logstash檔案測驗
(1)命令列輸入,輸出到檔案
在/etc/logstash/conf.d/目錄下.conf結尾的檔案都可以讀到,

編輯test.conf檔案如下
[root@server6 conf.d]# cat test.conf
input {
stdin {} %輸入來自命令列標準輸入
}
output {
file { %輸出到/tmp/testfile檔案中,格式為custom format: {輸入內容}
path => "/tmp/testfile"
codec => line { format => "custom format: %{message}"}
}
}

執行test.conf檔案

命令列輸入heiheihei,輸出到了/tmp/testfile檔案中

上面的方法無法直接看到結果,不太舒服,修改test.conf檔案
[root@server6 conf.d]# cat test.conf
input {
stdin {}
}
output {
stdout{} %一份標準輸出到命令列
file { %一份輸出到/tmp/testfile檔案中
path => "/tmp/testfile"
codec => line { format => "custom format: %{message}"}
}
}

執行test.conf檔案

輸入linux,可以看到標準輸出,也輸出一份到/tmp/testfile檔案中

(2)命令列輸入,輸出到elasticsearch
編輯es.conf 檔案
[root@server6 conf.d]# cat es.conf
input {
stdin {}
}
output {
stdout {} %標準輸出一份
elasticsearch { %給elasticsearch輸出一份
hosts => ["172.25.77.3:9200"] %目標elasticsearch主機ip
index => "logstash-%{+yyyy.MM.dd}" %索引格式為logstash-年月日
}
}

執行es.conf 檔案

輸入linux haha,標準輸出一份

資料瀏覽->指定索引logstash-2021.08.14,可以看到elasticsearch輸出一份

(3)檔案輸入,輸出到elasticsearch
現在我們想把日志檔案作為輸入,首先要把權限改為644,因為logstash讀取時是logstash身份,所以必須開放讀的權力,

修改es.conf 檔案
[root@server6 conf.d]# cat es.conf
input {
file { %從檔案/var/log/messages輸入,從頭開始輸入
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
stdout {} %標準輸出
elasticsearch { %輸出elasticsearch
hosts => ["172.25.77.3:9200"]
index => "logstash-%{+yyyy.MM.dd}"
}
}

執行es.conf 檔案

可以看到輸入了很多/var/log/messages

elasticsearch也可以看到很多資料

假如我們把剛才創建的索引洗掉了,再次創建可以恢復嗎?

洗掉索引

再次執行es.conf 檔案,會發現沒有資料輸入

由于終端被占用了,再開啟一個視窗,輸入一條日志

現在再回傳去看,有輸入了,一個是遠程登錄,一個是剛創建的日志,都是新的日志,那舊的日志呢?我寫的從頭開始輸入阿

資料輸入到elasticsearch了

再次洗掉索引

在/usr/share/logstash/data/plugins/inputs/file/目錄下,有一個.sincedb的檔案,它負責記錄資料偏移量,之前看到那個位置了,不會重復輸入,所以洗掉他,就可以重新全部輸入了

這里補充一下,sincedb檔案一共6個欄位,分別表示inode編號、檔案系統的主要設備號、檔案系統的次要設備號、檔案中的當前位元組偏移量、最后一個活動時間戳(浮點數)、與此記錄匹配的最后一個已知路徑

再次執行es.conf 檔案

/var/log/messages的資料又全部輸入了一遍

資料輸入到elasticsearch了

5、logstash可以偽裝成日志服務器,直接接受遠程日志
如果按照前面的方法收集日志資訊,需要每臺服務器上都部署logstash,這樣太累了,那能不能讓logstash偽裝成日志服務器,每個節點服務器遠程發送日志給logstash呢?
編輯es.conf 檔案
[root@server6 conf.d]# cat es.conf
input {
#file {
# path => "/var/log/messages"
# start_position => "beginning"
#}
syslog { %偽裝syslog,開放埠514
port => 514
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.25.77.3:9200"]
index => "syslog-%{+yyyy.MM.dd}" %索引為syslog-年月日
}
}

執行es.conf 檔案

新開一個視窗查看埠514已開放

遠程主機server3編輯/etc/rsyslog.conf檔案
打開514埠

所有的日志發送給172.25.77.6:514一份

server3重啟rsyslog服務

server6的視窗有輸入了

現在elasticsearch就可以看到server3的日志了

同理,修改遠程主機server4的/etc/rsyslog.conf檔案


server4重啟rsyslog服務

server6的視窗又有輸入了

elasticsearch可以看到server4的日志了

這時server6查看514埠有三個,第一個是自己的接受埠,第二個是server3的發送埠,第三個是server4的發送埠

6、多行過濾插件
錯誤日志一般都有很多行,如果按照前面的做法會分成很多條,分開讀,單獨看根本不知道什么意思,多行過濾可以把多行日志記錄合并為一行輸出,
(1)命令列多行輸入,檔案輸出
編輯test.conf檔案
[root@server6 conf.d]# cat test.conf
input {
stdin {
codec => multiline { %多行輸入
pattern => "EOF" %結束標志詞為EOF
negate => "true"
what => "previous"
}
}
}
output {
stdout {}
file {
path => "/tmp/testfile" %輸出到檔案/tmp/testfile
codec => line { format => "custom format: %{message}"} %格式為custom format:{資料}
}
}

執行test.conf檔案

測驗:多行輸入,以EOF結束輸入,可以看到標準輸出是一條

/tmp/testfile檔案中也是一條

(2)檔案多行輸入,輸出到elasticsearch
接下來用檔案輸入來測驗,使用es集群的server3的日志(之前有一些報錯日志),把my-es.log發給server6的/var/log

先看一下正確的日志都是以時間開頭的,并且被中括號[ ]括起來的一行,而錯誤日志有很多行,比如下圖的at org開頭的這些,他們合起來應該是一條錯誤日志,

修改test.conf檔案,先不加多行輸入模塊,看效果
[root@server6 conf.d]# cat test.conf
input {
file {
path => "/var/log/my-es.log" %檔案/var/log/my-es.log作為輸入
start_position => "beginning" %從頭開始輸入
# codec => multiline {
# pattern => "EOF"
# negate => "true"
# what => "previous"
# }
}
}
output {
stdout {}
elasticsearch { %輸出到es
hosts => ["172.25.77.3:9200"]
index => "eslog-%{+yyyy.MM.dd}"
}
}

執行test.conf檔案

顯示輸入了很多資料

在es中查看eslog索引,搜索at org,可以看到他們分成了一條一條的單獨的資料

現在洗掉該索引

并洗掉相關的sincedb

修改test.conf檔案
[root@server6 conf.d]# cat test.conf
input {
file {
path => "/var/log/my-es.log"
start_position => "beginning"
codec => multiline {
pattern => "^\[" %以[開頭的為結束詞
negate => "true"
what => "previous"
}
}
}
output {
stdout {}
elasticsearch { %輸出到es
hosts => ["172.25.77.3:9200"]
index => "eslog-%{+yyyy.MM.dd}"
}
}

再次執行test.conf檔案

看到這個是一條資料

當然也可以在es中查看,是一條資料

7、grok切片過濾插件
我們平時查看日志,比如查看apache的日志,可以發現很有規律,如下圖,先是訪問的ip,時間等等,那么能不能只看其中一組資料,比如只想要ip這一列,現在就需要logstash的切片這個功能了

我們可以根據日志的特征自定義grok的書寫,得到想要的切片
(1)命令列輸入,過濾,命令列輸出
編輯grok.conf檔案
[root@server6 conf.d]# cat grok.conf
input {
stdin {}
}
filter {
grok { %把輸入切片成五塊,分別對應
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output {
stdout {}
}

執行grok.conf檔案

輸入一串資料,根據設定的切片方法,一一對應

(2)apache日志輸入,切片,es輸出
server6安裝apache

開啟apache,寫入默認發布目錄

真機訪問172.25.77.6

server6查看apache的日志

在/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns目錄下,有很多軟體的日志的輸出形式

看httpd的規定,如何寫日志已經提前用變數的方法定義了,所以我們只需要按照這個規定切片就好了

把apache的日志作為grok的輸入,日志檔案需要給讀的權限,日志檔案的目錄需要給讀和執行的權限,讀的時候是logstash的身份

修改grok.conf檔案
[root@server6 conf.d]# cat grok.conf
input {
file {
path => "/var/log/httpd/access_log" %/var/log/httpd/access_log檔案作為輸入
start_position => "beginning" %從頭開始
}
}
filter {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" } %按照默認的HTTPD_COMBINEDLOG方式切片
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.25.77.3:9200"]
index => "apachelog-%{+yyyy.MM.dd}" %索引名字叫apachelog
}
}

執行grok.conf檔案

按照默認定義好的模式切片

es查看,成功切片

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/294343.html
標籤:其他
上一篇:JVM垃圾回收
下一篇:docker相關命令以及報錯處理
