目錄
#--------1 使用思路
#--------2 組件的選型
1 source
2 channel
3 sink
#--------3 .conf檔案的撰寫:
1 先定義
2 撰寫各個組件的屬性
3 將各個組件進行連接
#--------4 flune的拓撲結構 (avro源和avro槽是關鍵)
#--------5 監控程式ganglia
#--------6 結語
#--------1 使用思路
0 還是得查官方網站最有用!! https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
1 想好用什么源source
2 使用什么通道channel
3 采集的目的地sink
4 撰寫好conf檔案放在指定目錄下
5 使用命令進行實作,命令格式:
/root/flume/bin/flume-ng agent -n agent的名字 -c /root/flume/conf/ -f job/對應conf檔案名
#--------2 組件的選型
#============source
《注意:由于后續版本的更新推出的Taildir source功能過于強大所以基本不會使用Exec和Spooldir 》
適用面 source 優點/缺點
>監聽埠發送的資訊 netcat 監聽埠資料
例如
a2.sources.r1.type = netcat
a2.sources.r1.bind = node01
a2.sources.r1.port = 44444
>監控本地單個檔案上傳(以命令的形式) Exec 缺點:不能斷電續傳
例如
a1.sources.r1.type = exec
#linux命令tail -F監聽一個檔案(不能監聽一個檔案夾)
a1.sources.r1.command = tail -F /root/hive-3.1.2/logs/root/hive.log
a1.sources.r1.shell = /bin/bash -c
>監控單目錄中新檔案的上傳 Spooldir 缺點:不能動態追加
例如
a3.sources.r3.type = spooldir
#監聽的檔案夾
a3.sources.r3.spoolDir = /root/flume/upload
#給上傳之后的檔案加上的后綴
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 結尾的檔案,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
>監控多目錄多檔案(可以追加) Taildir 優點:既能斷電續傳也能動態追加
缺點:檔案改名之后會重復上傳,例如hive.log第二天
會更名為hive.log.2022-01-09這樣第二天更名之后的文
件又會被上傳一次(可以通過修改原始碼的方式改進)
例如
a3.sources.r3.type = TAILDIR
#存放標記檔案的位置
a3.sources.r3.positionFile = /root/flume/tail_dir.json
#監聽的檔案組
a3.sources.r3.filegroups = f1 f2
#只監聽file1檔案夾下的包含file的檔案
a3.sources.r3.filegroups.f1 = /root/flume/files/.*file.*
#只監聽file2檔案夾下的包含log的檔案
a3.sources.r3.filegroups.f2 = /root/flume/files2/.*log.*
>復雜拓撲連接 avro 用于復雜拓撲的跨機器跨埠連接源
例如
a3.sources.r1.type = avro
#連接的avro sink的ip地址
a3.sources.r1.bind = node04
#連接的avro sink的埠號
a3.sources.r1.port = 4141
>fulme-kafka-flume標準日志采集 kafka 作為消費者接收kafka中topic的資料
例如
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#設定topic通道中一次傳輸的個數為5000個不滿足5000個,
a1.sources.r1.batchSize = 5000
#但是時間到達2000ms時也進行傳輸
a1.sources.r1.batchDurationMillis = 2000
#系結kafka集群的地址可以系結多的防止所連接的那一臺kafka所在機器宕機無法訪問
a1.sources.r1.kafka.bootstrap.servers = warehouse:9092
#連接kafka集群中的哪幾個topic,可以使用kafka.topics.regex,使用正則運算式連接多個topic
a1.sources.r1.kafka.topics=topic_log
#定義攔截器(定義名字,可以定義多個,攔截器需要自定義(自定義攔截器這一塊我還不是很熟練后續要多練習))
a1.sources.r1.interceptors = i1
#時間攔截器用于解決 零點漂移問題 這個問題大概就是日志生產時間明明是8.9日 23:59但是由于
#在flume傳輸的程序中消耗了一部分時間所以到達kafka中的時間已經是8.10日之后,那么這個日志
#將會后移至8.10日在進行處理,通過在event中的頭檔案中用攔截器加上日志生產時間的方式可以解決
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
#=============channel
memory 記憶體傳輸 可靠性低 傳輸效率高
例如
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
file 磁盤傳輸 可靠性高 傳輸效率低
例如
## channel1
a1.channels.c1.type = file
#備份記憶體中索引資料的磁盤路徑
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
#存放channel中資料的磁盤路徑
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
kafka channel 資料存盤在kafka
存盤在磁盤中可靠性高
傳輸效率高
例如
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#=============sink
avro 拓撲連接
例如
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node01
a1.sinks.k1.port = 4141
hdfs 采集到hdfs
例如
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://node01:8020/flume2/%Y%m%d/%H
#上傳檔案的前綴
a3.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動檔案夾
a3.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的檔案夾
a3.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a3.sinks.k1.hdfs.batchSize = 100
#設定檔案型別,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的檔案
a3.sinks.k1.hdfs.rollInterval = 60
#設定每個檔案的滾動大小大概是 128M
a3.sinks.k1.hdfs.rollSize = 134217700
#檔案的滾動與 Event 數量無關
a3.sinks.k1.hdfs.rollCount = 0
logger 采集為日志,可以直接列印到控制臺
例如
a3.sinks.k1.type = logger
file_roll 采集到本地
例如
a2.sinks.k1.type = file_roll
a2.sinks.k1.sink.directory = /root/tmp/flume/hive_logs_local
kafka 采集到kafka
#--------3 .conf檔案的撰寫:
(多查詢官方檔案https://flume.apache.org/releases/content/1.9.0/FlumeUtaserGuide.html)
1 先定義
定義agent(其實在flume中一個agent相當于一個JVM行程)的名字
定義sources的名字
定義channels的名字
定義sinks的名字
例如
a3.sources = r3
a3.sinks = k3
a3.channels = c3
2 撰寫各個組件的屬性
參考上面的組件選型
撰寫source的屬性
撰寫channel的屬性
撰寫sink的屬性
3 將各個組件進行連接
將source與channels進行連接(一個源可以連多個通道)
a1.sources.r1.channels = c1 c2
將sinks與channel進行連接(一個通道可以連接多個槽)
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#--------4 flune的拓撲結構 (avro源和avro槽是關鍵)
(avro源是服務端,應該先啟動這個agent)
1 簡單串聯 avro源(服務器server)和avro槽(客戶端client)
2 復制 需要選擇器Replicating ChannelSelector
3 多路復用 需要攔截器 自定義 Interceptor 不同的資訊發往不同的sink
4 負載均衡 需要sink group 策略 load_balance(配置rand隨機或輪詢的作業策略)
5 故障轉移 需要sink group 策略 failover(配置組內的sink優先級)
6 聚合 兩個avro槽同時指向一個avro源
7 f-k-f模式 指的是使用flume作為kafka集群的生產者接收日志服務器中的資料然后通過kafka快取之后再由一個fluem作為消費者消費kafka中topic的資料,然后經由這個fluem傳輸到對應的sink中
#--------5 監控程式ganglia
(這里并沒有列出怎么安裝和使用如果有興趣后續可能會更新)
啟動/關閉在node01上/root/shell/ganglia.sh bash ganglia.sh
web端頁面,在電腦登錄web頁面: 網站 http://node01/ganglia/
監控的是flume的各項指標如下
EventPutAttemptCount source 嘗試寫入 channel 的事件總數量
EventPutSuccessCount 成功寫入 channel 且提交的事件總數量
EventTakeAttemptCount sink 嘗試從 channel 拉取事件的總數量
EventTakeSuccessCount sink 成功讀取的事件的總數量
ChannelSize 目前 channel 中事件的總數量
ChannelCapacity channel 的容量
某一時刻 EventPutSuccessCount= EventTakeSuccessCount + ChannelSize
這些指標中能看出集群的flume是否丟失資料是否正常運行
#--------6 結語
說實話是第一次寫博客,寫的哪里不好或者有錯誤歡迎指出,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/428571.html
標籤:其他
上一篇:Flink CEP結合案例詳解
下一篇:R語言使用head函式獲取dataframe的頭部資料、使用tail函式獲取dataframe的尾部資料、使用引數n指定獲取的個數
