大資料必學框架-Flume
Flume簡介
Flume定義:
Flume 是 Cloudera 提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume 基于流式架構,靈活簡單,

**官網地址:**https://flume.apache.org
Flume的基礎架構

介紹一下Flume組件
Agent
Agent 是一個 JVM 行程,它以事件的形式將資料從源頭送至目的,
Agent 主要有 3 個部分組成,Source、Channel、Sink,
Source
Source 是負責接收資料到 Flume Agent 的組件,Source 組件可以處理各種型別、各種格式的日志資料,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy,
Sink
Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,并將這些事件批量寫入到存盤或索引系統、或者被發送到另一個 Flume Agent,
Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定
義,
Channel
Channel 是位于 Source 和 Sink 之間的緩沖區,因此,Channel 允許 Source 和 Sink 運作在不同的速率上,Channel 是執行緒安全的,可以同時處理幾個 Source 的寫入操作和幾個Sink 的讀取操作,
Flume 自帶兩種 Channel:Memory Channel 和 File Channel 以及 Kafka Channel,
Memory Channel 是記憶體中的佇列,Memory Channel 在不需要關心資料丟失的情景下適用,如果需要關心資料丟失,那么 Memory Channel 就不應該使用,因為程式死亡、機器宕機或者重啟都會導致資料丟失,
File Channel 將所有事件寫到磁盤,因此在程式關倍訓機器宕機的情況下不會丟失數
據,
Event
傳輸單元,Flume 資料傳輸的基本單元,以 Event 的形式將資料從源頭送至目的地,
Event 由 Header 和 Body 兩部分組成,Header 用來存放該 event 的一些屬性,為 K-V 結構,Body 用來存放該條資料,形式為位元組陣列,
Flume快速入門
Flume的安裝部署
Flume下載地址
https://flume.apache.org/download.html
檔案查看地址
https://flume.apache.org/FlumeUserGuide.html
安裝Flume
Flume和ZooKeeper一樣,都是由java開發的,所以在安裝之前要確定安裝過了java
下載完得tar包,解壓之后放到相應的檔案夾后,打開flume的組態檔夾conf,會看到一個flume-env.sh.template檔案,這是官方給出的參考模板,可以對其修改名字,然后進去修改配置
flume的配置十分簡單,只需要修改JAVA_HOME的路徑即可
Flume的入門案例
需求:使用Flume監聽一個埠,收集該埠資料,并列印到控制臺
分析:監聽一個埠需要使用netcat工具,輸出到控制臺則是logger

撰寫組態檔flume-netcat-logger.conf
添加內容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
開啟flume監聽埠
兩種寫法
第一種
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
第二種
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
引數說明
–conf/-c:表示組態檔存盤在 conf/目錄
–name/-n:表示給 agent 起名為 a1
–conf-file/-f:flume 本次啟動讀取的組態檔是在 job 檔案夾下的 flume-telnet.conf
實時 監控單個追加 檔案
**案例需求:實時監控 H ive 日志,并上傳到 HDFS 中 **
分析:

Source:監聽檔案,使用exec,exec 即 execute 執行的意思,表示執行 Linux
命令來讀取檔案
Sink:傳輸到HDFS,使用hdfs
Channel:依舊使用memory
實作步驟:
Flume 要想將資料輸出到 HDFS ,須持有 Hadoop 相關 jar 包
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷貝到/opt/module/flume/lib 檔案夾下,
**創建 flume- file- hdfs.conf 檔案 **
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳檔案的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動檔案夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創建一個新的檔案夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 1000
#設定檔案型別,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的檔案
a2.sinks.k2.hdfs.rollInterval = 30
#設定每個檔案的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#檔案的滾動與 Event 數量無關
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
實時監控 目錄 下多個 新 檔案
**案例需求:使用 F lume 監聽整個目錄的檔案 ,并上傳至 HDFS **
分析:
Source:監控整個目錄:Spooldir
Sink:上傳到HDFS:hdfs
Channel:使用memory

實作步驟:
創建組態檔 flume- dir- hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 結尾的檔案,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path =
hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳檔案的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動檔案夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創建一個新的檔案夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#設定檔案型別,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的檔案
a3.sinks.k3.hdfs.rollInterval = 60
#設定每個檔案的滾動大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#檔案的滾動與 Event 數量無關
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
實時監控目錄下的多個追加檔案
Exec source 適用于監控一個實時追加的檔案,但不能保證資料不丟失;Spooldir
Source 能夠保證資料不丟失, 且能夠實作斷點續傳, 但延遲較高, 不能實時監控; 而 Taildir Source 既能夠實作斷點續傳,又可以保證資料不丟失,還能夠進行實時監控,
案例需求:使用Flume監聽整個目錄的實時追加檔案,并且上傳至HDFS
分析:
Source:實時追加監聽:TAILDIR:支持斷點續傳,而exec不支持斷點續傳
Sink:上傳至HDFS:hdfs
Channel:使用memory

實作步驟
**創建組態檔 flume- tail dir- hdfs.conf **
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path =
hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳檔案的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動檔案夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創建一個新的檔案夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#設定檔案型別,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的檔案
a3.sinks.k3.hdfs.rollInterval = 60
#設定每個檔案的滾動大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#檔案的滾動與 Event 數量無關
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
Taildir 說明:
Taildir Source 維護了一個 json 格式的 position File, 其會定期的往 position File中更新每個檔案讀取到的最新的位置, 因此能夠實作斷點續傳, Position File的格式下:
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
Flume進階
Flume事務

Flume Agent內部原理

重要組件:
ChannelSelector
ChannelSelector 的作用就是選出 Event 將要被發往哪個 Channel,其共有兩種型別,分別是 Replicating(復制)和 Multiplexing(多路復用) ,
ReplicatingSelector 會將同一個 Event 發往所有的 Channel,Multiplexing 會根據相
應的原則,將不同的 Event 發往不同的 Channel,
SinkProcessor
SinkProcessor 共 有 三 種 類 型 , 分 別 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor
DefaultSinkProcessor 對應的是單個的 Sink,LoadBalancingSinkProcessor 和
FailoverSinkProcessor 對應的是 Sink Group,LoadBalancingSinkProcessor 可以實作負載均衡的功能,FailoverSinkProcessor 可以實作故障轉移的功能,
Flume拓撲結構
簡單串聯
前一個Flume的輸出是后一個Flume的輸入

復制和多路復用
一個source,多個channel和sink
Flume 支持將事件流向一個或者多個目的地,這種模式可以將相同資料復制到多個
channel 中,或者將不同資料分發到不同的 channel 中,sink 可以選擇傳送到不同的目的地,

負載均衡和故障轉移
Flume支持使用將多個sink邏輯上分到一個sink組, sink組配合不同的SinkProcessor
可以實作負載均衡和錯誤恢復的功能,

聚合
這種模式是我們最常見的,也非常實用,日常 web 應用通常分布在上百個服務器,大者甚至上千個、上萬個服務器,產生的日志,處理起來也非常麻煩,用 flume 的這種組合方式能很好的解決這一問題, 每臺服務器部署一個 flume 采集日志, 傳送到一個集中收集日志的flume,再由此 flume 上傳到 hdfs、hive、hbase 等,進行日志分析,

Flume企業開發案例
復制和多路復用
案例需求
使用 Flume-1 監控檔案變動,Flume-1 將變動內容傳遞給 Flume-2,Flume-2 負責存盤到HDFS, 同時Flume-1將變動內容傳遞給Flume-3, Flume-3負責輸出到Local FileSystem,
需求分析

實作步驟
創建flume-file-flume.conf
配置 1 個接收日志檔案的 source 和兩個 channel、兩個 sink,分別輸送給 flume-flume-hdfs 和 flume-flume-dir,
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將資料流復制給所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一個資料發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
創建 flume- flume- hdfs .conf
配置上級 Flume 輸出的 Source,輸出是到 HDFS 的 Sink,
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一個資料接收服務
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳檔案的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動檔案夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的檔案夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#設定檔案型別,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的檔案
a2.sinks.k1.hdfs.rollInterval = 600
#設定每個檔案的滾動大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#檔案的滾動與 Event 數量無關
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
創建 flume- flume- dir .conf
配置上級 Flume 輸出的 Source,輸出是到本地目錄的 Sink,
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
自定義 Interceptor
案例需求
使用 Flume 采集服務器本地日志, 需要按照日志型別的不同, 將不同種類的日志發往不同的分析系統,
需求分析
在實際的開發中, 一臺服務器產生的日志型別可能有很多種, 不同型別的日志可能需要發送到不同的分析系統, 此時會用到 Flume 拓撲結構中的 Multiplexing 結構, Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 發送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同型別的 event 的 Header 中的 key 賦予不同的值,

實作步驟
新建maven工程,匯入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
自定義的攔截器需要實作Interceptor介面,然后重寫四個方法
//初始化方法
public void initialize(){}
//處理單個事件,Event中封裝了Header和Body,可以根據body內容設定為回應的//Header,然后Flume會以Header進行區分
public Event intercept(Event event){}
//處理多個事件,可以呼叫處理單個事件的intercept()
public List<Event> intercept(List<Event> list){}
public void close();
//注意:還要定義一個靜態內部類,實作Interceptor.Builder,在實作的build方法中,回傳一個本類實體

package com.gis.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author LnnuUser
* @create 2021-08-03-下午8:16
*/
public class TypeInterceptor implements Interceptor {
//宣告一個存放事件的集合
private ArrayList<Event> addHeaderEvents;
@Override
public void initialize() {
//初始化存放事件的集合
addHeaderEvents = new ArrayList<>();
}
/**
* 處理單個事件
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
//1、獲取頭資訊
Map<String, String> headers = event.getHeaders();
//2、獲取body
String body = new String(event.getBody());
//3、根據body中是否有“hello”來決定添加怎樣的頭資訊
if (body.contains("hello")){
//4、添加頭資訊
headers.put("type","GIS");
}else {
//4、添加頭資訊
headers.put("type","AIS");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
//1、清空集合
addHeaderEvents.clear();
//2、遍歷events,給每一個事件添加頭資訊
for (Event event : list) {
//3、給每一個事件添加頭資訊
addHeaderEvents.add(intercept(event));
}
//4、回傳結果
return addHeaderEvents;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
**編輯 flume 組態檔 **
添加攔截器,攔截器就是自己寫的全類名,要打包放到flume的lib目錄中
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =
com.gis.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
全部的配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =
com.gis.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
自定義 Source
Source 是負責接收資料到 Flume Agent 的組件,Source 組件可以處理各種型別、各種格式的日志資料,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy,官方提供的 source 型別已經很多,但是有時候并不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 source,
官方也提供了自定義 source 的介面:
https://flume.apache.org/FlumeDeveloperGuide.html#source 根據官方說明自定義
MySource 需要繼承 AbstractSource 類并實作 Configurable 和 PollableSource 介面,
實作步驟
自定義類繼承AbstractSource并且實作Configurable、PollableSource,然后重寫四個方法
//配置方法,從中讀取組態檔中的資訊
public void configure(Context context) {}
//主要的方法,邏輯都寫在該方法中,回傳值為Status狀態,執行成功Status.READY,執行失敗Status.BACKOFF
public Status process(){}
//這兩個方法很少用到
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
代碼實作
package com.gis.source;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.nio.charset.StandardCharsets;
/**
* @author LnnuUser
* @create 2021-08-04-上午9:19
*/
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定義全域的前綴和后綴
private String prefix;
private String subfix;
Status status = null;
/**
*
* @param context
*/
@Override
public void configure(Context context) {
//讀取配置資訊,給前后綴賦值
prefix = context.getString("prefix");
subfix = context.getString("subfix","gis");
}
/**
* 1.接受資料(for回圈造資料)
* 2.封裝事件
* 3.將事件傳給channel
*
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
try {
//1.接受資料
for (int i = 0; i < 5; i++){
//2.構建事件物件
SimpleEvent event = new SimpleEvent();
//3.給事件設定值
event.setBody((prefix + "--" + i + subfix).getBytes());
//4.將事件傳給channel
getChannelProcessor().processEvent(event);
status = Status.READY;
}
} catch (Exception e) {
e.printStackTrace();
status = Status.BACKOFF;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}

組態檔
在組態檔中,sink和channel都一樣,只是source需要指定自己自定義的,將代碼打包放到flume的lib中
#宣告三個組件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#宣告source的屬性
a1.sources.r1.type=com.gis.source.MySource
a1.sources.r1.prefix=lnnu
a1.sources.r1.subfix=done
#宣告sink的屬性
a1.sinks.k1.type=logger
#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#建立聯系
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
自定義Sink
Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,并將這些事件批量寫入到存盤或索引系統、或者被發送到另一個 Flume Agent,
Sink 是完全事務性的,在從 Channel 批量洗掉資料之前,每個 Sink 用 Channel 啟動一個事務,批量事件一旦成功寫出到存盤系統或下一個 Flume Agent,Sink 就利用 Channel 提交事務,事務一旦被提交,該 Channel 從自己的內部緩沖區洗掉事件,
Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、
自定義,官方提供的 Sink 型別已經很多,但是有時候并不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 Sink,
自定義Sink步驟
自定義一個類繼承AbstractSink并且實作Configurable介面,實作里面的兩個方法
代碼撰寫
package com.gis.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author LnnuUser
* @create 2021-08-04-上午10:22
*/
public class MySink extends AbstractSink implements Configurable {
//定義兩個屬性,前后綴
private String prefix;
private String subfix;
//獲取logger物件
private Logger logger = LoggerFactory.getLogger(MySink.class);
@Override
public void configure(Context context) {
//讀取組態檔,為前后綴賦值
prefix = context.getString("prefix");
subfix = context.getString("subfix","lnnu");
}
/**
* 1.獲取channel
* 2.從channel獲取事務以及資料
* 3.發送資料
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
//1.定義回傳值
Status status = null;
//2.獲取channel
Channel channel = getChannel();
//3.從channel獲取事務
Transaction transaction = channel.getTransaction();
//4.開啟事務
transaction.begin();
try {
//5.從channel獲取資料
Event event = channel.take();
//6.處理事務
if (event != null) {
String body = new String(event.getBody());
logger.info(prefix + body + subfix);
}
//7.提交事務
transaction.commit();
//8.成功提交,修改狀態資訊
status = Status.READY;
} catch (ChannelException e) {
//9.提交事務失敗
transaction.rollback();
//10.修改狀態
status = Status.BACKOFF;
e.printStackTrace();
}finally {
//11.最終關閉事務
transaction.close();
}
//12.回傳狀態資訊
return status;
}
}
Flume的組態檔
Source和Channel都是正常的寫法,只是Sink需要指定為我們自定義的全類名,要先打包之后上傳到flume的lib目錄下

a1.sources=r1
a1.sinks=k1
a1.channels=c1
#宣告source的屬性
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=33333
#宣告sink的屬性
a1.sinks.k1.type=com.gis.sink.MySink
a1.sinks.k1.prefix=lnnu
a1.sinks.k1.subfix=gis
#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#建立聯系
a1.sources.r1.channels=c1
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#宣告source的屬性
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=33333
#宣告sink的屬性
a1.sinks.k1.type=com.gis.sink.MySink
a1.sinks.k1.prefix=lnnu
a1.sinks.k1.subfix=gis
#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#建立聯系
a1.sources.r1.channels=c1
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#宣告source的屬性
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=33333
#宣告sink的屬性
a1.sinks.k1.type=com.gis.sink.MySink
a1.sinks.k1.prefix=lnnu
a1.sinks.k1.subfix=gis
#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#建立聯系
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/292169.html
標籤:其他
