上篇文章咱們基于Flume舉了幾個例子,包括它的扇入扇出等等,這篇文章我們主要來看一下怎樣通過自定義Source和Sink來實作Flume的資料采集,關注專欄《破繭成蝶——大資料篇》,查看更多相關的內容~
目錄
一、自定義Source
1.1 需求說明
1.2 編碼實作
1.3 撰寫Flume組態檔
1.4 測驗自定義的Source
二、自定義Sink
2.1 需求說明
2.2 代碼實作
2.3 撰寫Flume的組態檔
2.4 測驗自定義Sink
一、自定義Source
Source是負責接收資料到Flume Agent的組件,我們不僅可以根據Flume自身提供的API,實作Source的定義,還可以根據專案的實際需求,撰寫自己的Source,比如Source可以是從網路上下載一個檔案,或者是從資料庫中查詢到的資料,Source組件可以處理各種型別、各種格式的日志資料,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy等等,官方提供的source型別已經很多,但是有時候并不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Source,
自定義的Source有兩種型別:PollableSource(輪訓拉取)與EventDrivenSource (事件驅動),兩者的區別在于PollableSource是通過執行緒不斷去呼叫process方法,主動拉取訊息,而EventDrivenSource是需要觸發一個呼叫機制,即被動等待,在利用PollableSource實作自定義Source時還需要實作Configurable介面,以便在專案中初始化某些配置,下面我們就以實作PollableSource為例進行自定義Source的說明,官方也提供了自定義source的介面,可以點擊這里進行查看~
1.1 需求說明
使用Flume接收資料,自定義Source接收資料,并將資料輸出到控制臺,
1.2 編碼實作
首先需要匯入Maven依賴,如下所示:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
實作自定義Source的代碼如下所示:
package com.xzw.source;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
/**
* @author: xzw
* @create_date: 2021/1/14 15:24
* @desc: 自定義source
* @modifier:
* @modified_date:
* @desc:
*/
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定義需要從配置中讀取的欄位
//兩條資料之間的間隔
private long delay;
//模擬資訊
private String field;
/**
* 處理程序
*
* @return
* @throws EventDeliveryException
*/
public Status process() throws EventDeliveryException {
try {
Map<String, String> header = new HashMap<>();
SimpleEvent event = new SimpleEvent();
//拿到資料
for (int i = 0; i < 5; i++) {
event.setHeaders(header);
event.setBody((field + i).getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (Exception e) {
return Status.BACKOFF;
}
return Status.READY;
}
/**
* 回滾之后睡眠增加多長時間(每回滾一次增加多長時間)
*
* @return
*/
public long getBackOffSleepIncrement() {
return 0;
}
/**
* 最大的失敗睡眠間隔
*
* @return
*/
public long getMaxBackOffSleepInterval() {
return 0;
}
/**
* 配置方法
*
* @param context
*/
public void configure(Context context) {
delay = context.getLong("delay", 2000l);
field = context.getString("field", "xzw");
}
public static void main(String[] args) {
}
}
1.3 撰寫Flume組態檔
自定義flume-mysource.conf組態檔并添加如下內容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = com.xzw.source.MySource
a1.sources.r1.delay = 5000
#a1.sources.r1.field = xzw
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.4 測驗自定義的Source
將寫好的代碼打包上傳到Flume的lib目錄下,我們這里的路徑是/opt/modules/flume/lib,使用如下命令啟動Flume進行測驗:
bin/flume-ng agent -c conf/ -f conf/flume-mysource.conf -n a1 -Dflume.root.logger=INFO,console
可以發現在控制天列印出了我們傳輸的資料:

二、自定義Sink
Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr等等,但是有時候并不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Sink,官方也提供了自定義source的介面,請點擊這里進行查看~
2.1 需求說明
使用Flume接收資料,并在Sink端給每條資料添加前綴和后綴,最后將添加了后綴的資料輸出到控制臺,
2.2 代碼實作
實作自定義Sink的代碼如下所示:
package com.xzw.source;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author: xzw
* @create_date: 2021/1/15 10:09
* @desc: 自定義sink
* @modifier:
* @modified_date:
* @desc:
*/
public class MySink extends AbstractSink implements Configurable {
//創建Logger物件
private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
//宣告前綴和后綴
private String prefix = "";
private String suffix = "";
@Override
public Status process() throws EventDeliveryException {
//宣告回傳值狀態資訊
Status status = null;
//獲取當前sink系結的channel
Channel channel = getChannel();
//獲取事務
Transaction transaction = channel.getTransaction();
//開啟事務
transaction.begin();
try {
//宣告事件
Event take;
while ((take = channel.take()) == null) {
Thread.sleep(200);
}
//處理事件
LOG.info(prefix + new String(take.getBody()) + suffix);
//事務提交
transaction.commit();
status = Status.READY;
} catch (Throwable e) {
//當遇到例外的時候,回滾事務
transaction.rollback();
status = Status.BACKOFF;
if (e instanceof Error) {
throw (Error) e;
}
} finally {
//關閉事務
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
//讀取組態檔內容,有默認值
prefix = context.getString("prefix", "PREFIX:");
//讀取組態檔內容,沒有默認值
suffix = context.getString("suffix");
}
public static void main(String[] args) {
}
}
2.3 撰寫Flume的組態檔
撰寫flume-mysink.conf組態檔,檔案內容如下所示:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = com.xzw.source.MySink
a1.sinks.k1.suffix = :SUFFIX
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.4 測驗自定義Sink
將撰寫好的代碼打包上傳到Flume的lib目錄下,使用如下命令啟動Flume:
bin/flume-ng agent -c conf/ -f conf/flume-mysink.conf -n a1 -Dflume.root.logger=INFO,console
通過測驗可以發現,資料通過自定義的Sink輸出到控制臺:

OK,本文就介紹到這里,自定義的Source、Sink這部分比較簡單,你們在這個程序中遇到了什么問題,歡迎留言,讓我看看你們遇到了什么問題~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/250652.html
標籤:其他
