主頁 >  其他 > 大資料必學框架-Flume

大資料必學框架-Flume

2021-08-07 07:29:28 其他

大資料必學框架-Flume

Flume簡介

Flume定義:

Flume 是 Cloudera 提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume 基于流式架構,靈活簡單,
在這里插入圖片描述

**官網地址:**https://flume.apache.org

Flume的基礎架構

在這里插入圖片描述

介紹一下Flume組件

Agent

Agent 是一個 JVM 行程,它以事件的形式將資料從源頭送至目的,
Agent 主要有 3 個部分組成,Source、Channel、Sink

Source

Source 是負責接收資料到 Flume Agent 的組件,Source 組件可以處理各種型別、各種格式的日志資料,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy,

Sink

Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,并將這些事件批量寫入到存盤或索引系統、或者被發送到另一個 Flume Agent,
Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定
義,

Channel

Channel 是位于 Source 和 Sink 之間的緩沖區,因此,Channel 允許 Source 和 Sink 運作在不同的速率上,Channel 是執行緒安全的,可以同時處理幾個 Source 的寫入操作和幾個Sink 的讀取操作,
Flume 自帶兩種 Channel:Memory Channel 和 File Channel 以及 Kafka Channel,
Memory Channel 是記憶體中的佇列,Memory Channel 在不需要關心資料丟失的情景下適用,如果需要關心資料丟失,那么 Memory Channel 就不應該使用,因為程式死亡、機器宕機或者重啟都會導致資料丟失,
File Channel 將所有事件寫到磁盤,因此在程式關倍訓機器宕機的情況下不會丟失數
據,

Event

傳輸單元,Flume 資料傳輸的基本單元,以 Event 的形式將資料從源頭送至目的地,
Event 由 Header 和 Body 兩部分組成,Header 用來存放該 event 的一些屬性,為 K-V 結構,Body 用來存放該條資料,形式為位元組陣列,

Flume快速入門

Flume的安裝部署

Flume下載地址

https://flume.apache.org/download.html

檔案查看地址

https://flume.apache.org/FlumeUserGuide.html

安裝Flume

Flume和ZooKeeper一樣,都是由java開發的,所以在安裝之前要確定安裝過了java

下載完得tar包,解壓之后放到相應的檔案夾后,打開flume的組態檔夾conf,會看到一個flume-env.sh.template檔案,這是官方給出的參考模板,可以對其修改名字,然后進去修改配置

flume的配置十分簡單,只需要修改JAVA_HOME的路徑即可

Flume的入門案例

需求:使用Flume監聽一個埠,收集該埠資料,并列印到控制臺

分析:監聽一個埠需要使用netcat工具,輸出到控制臺則是logger
在這里插入圖片描述

撰寫組態檔flume-netcat-logger.conf

添加內容如下: 
# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
 
# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 
 
# Describe the sink 
a1.sinks.k1.type = logger 
 
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

開啟flume監聽埠

兩種寫法

第一種

bin/flume-ng agent --conf conf/ --name a1  --conf-file  job/flume-netcat-logger.conf  -Dflume.root.logger=INFO,console 

第二種

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f 
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

引數說明

–conf/-c:表示組態檔存盤在 conf/目錄

–name/-n:表示給 agent 起名為 a1

–conf-file/-f:flume 本次啟動讀取的組態檔是在 job 檔案夾下的 flume-telnet.conf

實時 監控單個追加 檔案

**案例需求:實時監控 H ive 日志,并上傳到 HDFS 中 **

分析:
在這里插入圖片描述

Source:監聽檔案,使用exec,exec 即 execute 執行的意思,表示執行 Linux
命令來讀取檔案

Sink:傳輸到HDFS,使用hdfs

Channel:依舊使用memory

實作步驟:

Flume 要想將資料輸出到 HDFS ,須持有 Hadoop 相關 jar 包

commons-configuration-1.6.jar、 
hadoop-auth-2.7.2.jar、 
hadoop-common-2.7.2.jar、 
hadoop-hdfs-2.7.2.jar、 
commons-io-2.4.jar、 
htrace-core-3.1.0-incubating.jar 

拷貝到/opt/module/flume/lib 檔案夾下,

**創建 flume- file- hdfs.conf 檔案 **

# Name the components on this agent 
a2.sources = r2 
a2.sinks = k2 
a2.channels = c2 
 
# Describe/configure the source 
a2.sources.r2.type = exec 
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log 
a2.sources.r2.shell = /bin/bash -c 
 
# Describe the sink 
a2.sinks.k2.type = hdfs 
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H 
#上傳檔案的前綴 
a2.sinks.k2.hdfs.filePrefix = logs- 
#是否按照時間滾動檔案夾 
a2.sinks.k2.hdfs.round = true 
#多少時間單位創建一個新的檔案夾 
a2.sinks.k2.hdfs.roundValue = 1 
#重新定義時間單位 
a2.sinks.k2.hdfs.roundUnit = hour 
#是否使用本地時間戳 
a2.sinks.k2.hdfs.useLocalTimeStamp = true 
#積攢多少個 Event 才 flush 到 HDFS 一次 
a2.sinks.k2.hdfs.batchSize = 1000 
#設定檔案型別,可支持壓縮 
a2.sinks.k2.hdfs.fileType = DataStream 
#多久生成一個新的檔案 
a2.sinks.k2.hdfs.rollInterval = 30 
#設定每個檔案的滾動大小 
a2.sinks.k2.hdfs.rollSize = 134217700 
#檔案的滾動與 Event 數量無關 
a2.sinks.k2.hdfs.rollCount = 0 
 
# Use a channel which buffers events in memory 
a2.channels.c2.type = memory 
a2.channels.c2.capacity = 1000 
a2.channels.c2.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a2.sources.r2.channels = c2 
a2.sinks.k2.channel = c2 

實時監控 目錄 下多個 新 檔案

**案例需求:使用 F lume 監聽整個目錄的檔案 ,并上傳至 HDFS **

分析:

Source:監控整個目錄:Spooldir

Sink:上傳到HDFS:hdfs

Channel:使用memory
在這里插入圖片描述

實作步驟:

創建組態檔 flume- dir- hdfs.conf

a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 
 
# Describe/configure the source 
a3.sources.r3.type = spooldir 
a3.sources.r3.spoolDir = /opt/module/flume/upload 
a3.sources.r3.fileSuffix = .COMPLETED 
a3.sources.r3.fileHeader = true 
#忽略所有以.tmp 結尾的檔案,不上傳 
a3.sources.r3.ignorePattern = ([^ ]*\.tmp) 
 
# Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path  = 
hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H 
#上傳檔案的前綴 
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照時間滾動檔案夾 
a3.sinks.k3.hdfs.round = true 
#多少時間單位創建一個新的檔案夾 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定義時間單位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地時間戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#積攢多少個 Event 才 flush 到 HDFS 一次 
a3.sinks.k3.hdfs.batchSize = 100 
#設定檔案型別,可支持壓縮 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一個新的檔案 
a3.sinks.k3.hdfs.rollInterval = 60 
#設定每個檔案的滾動大小大概是 128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#檔案的滾動與 Event 數量無關 
a3.sinks.k3.hdfs.rollCount = 0 
 
# Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3 

實時監控目錄下的多個追加檔案

Exec source 適用于監控一個實時追加的檔案,但不能保證資料不丟失;Spooldir
Source 能夠保證資料不丟失, 且能夠實作斷點續傳, 但延遲較高, 不能實時監控; 而 Taildir Source 既能夠實作斷點續傳,又可以保證資料不丟失,還能夠進行實時監控,

案例需求:使用Flume監聽整個目錄的實時追加檔案,并且上傳至HDFS

分析:

Source:實時追加監聽:TAILDIR:支持斷點續傳,而exec不支持斷點續傳

Sink:上傳至HDFS:hdfs

Channel:使用memory
在這里插入圖片描述

實作步驟

**創建組態檔 flume- tail dir- hdfs.conf **

a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 
 
# Describe/configure the source 
a3.sources.r3.type = TAILDIR 
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json 
a3.sources.r3.filegroups = f1 
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.* 
 
# Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path  = 
hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H 
#上傳檔案的前綴 
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照時間滾動檔案夾 
a3.sinks.k3.hdfs.round = true 
#多少時間單位創建一個新的檔案夾 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定義時間單位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地時間戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#積攢多少個 Event 才 flush 到 HDFS 一次 
a3.sinks.k3.hdfs.batchSize = 100 
#設定檔案型別,可支持壓縮 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一個新的檔案 
a3.sinks.k3.hdfs.rollInterval = 60 
#設定每個檔案的滾動大小大概是 128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#檔案的滾動與 Event 數量無關 
a3.sinks.k3.hdfs.rollCount = 0 
 
# Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3

Taildir 說明:

Taildir Source 維護了一個 json 格式的 position File, 其會定期的往 position File中更新每個檔案讀取到的最新的位置, 因此能夠實作斷點續傳, Position File的格式下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"} 
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"} 

Flume進階

Flume事務

在這里插入圖片描述

Flume Agent內部原理

在這里插入圖片描述

重要組件:

ChannelSelector

ChannelSelector 的作用就是選出 Event 將要被發往哪個 Channel,其共有兩種型別,分別是 Replicating(復制)和 Multiplexing(多路復用) ,

ReplicatingSelector 會將同一個 Event 發往所有的 Channel,Multiplexing 會根據相
應的原則,將不同的 Event 發往不同的 Channel,

SinkProcessor

SinkProcessor 共 有 三 種 類 型 , 分 別 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor

DefaultSinkProcessor 對應的是單個的 Sink,LoadBalancingSinkProcessor 和
FailoverSinkProcessor 對應的是 Sink Group,LoadBalancingSinkProcessor 可以實作負載均衡的功能,FailoverSinkProcessor 可以實作故障轉移的功能,

Flume拓撲結構

簡單串聯

前一個Flume的輸出是后一個Flume的輸入

在這里插入圖片描述

復制和多路復用

一個source,多個channel和sink

Flume 支持將事件流向一個或者多個目的地,這種模式可以將相同資料復制到多個
channel 中,或者將不同資料分發到不同的 channel 中,sink 可以選擇傳送到不同的目的地,
在這里插入圖片描述

負載均衡和故障轉移

Flume支持使用將多個sink邏輯上分到一個sink組, sink組配合不同的SinkProcessor
可以實作負載均衡和錯誤恢復的功能,
在這里插入圖片描述

聚合

這種模式是我們最常見的,也非常實用,日常 web 應用通常分布在上百個服務器,大者甚至上千個、上萬個服務器,產生的日志,處理起來也非常麻煩,用 flume 的這種組合方式能很好的解決這一問題, 每臺服務器部署一個 flume 采集日志, 傳送到一個集中收集日志的flume,再由此 flume 上傳到 hdfs、hive、hbase 等,進行日志分析,
在這里插入圖片描述

Flume企業開發案例

復制和多路復用

案例需求

使用 Flume-1 監控檔案變動,Flume-1 將變動內容傳遞給 Flume-2,Flume-2 負責存盤到HDFS, 同時Flume-1將變動內容傳遞給Flume-3, Flume-3負責輸出到Local FileSystem,

需求分析
在這里插入圖片描述

實作步驟

創建flume-file-flume.conf

配置 1 個接收日志檔案的 source 和兩個 channel、兩個 sink,分別輸送給 flume-flume-hdfs 和 flume-flume-dir,

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 k2 
a1.channels = c1 c2 
 
# 將資料流復制給所有 channel 
a1.sources.r1.selector.type = replicating 
 
# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log 
a1.sources.r1.shell = /bin/bash -c 
 
# Describe the sink 
# sink 端的 avro 是一個資料發送者 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141 
 
a1.sinks.k2.type = avro 
a1.sinks.k2.hostname = hadoop102 
a1.sinks.k2.port = 4142 
 
# Describe the channel 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
 
a1.channels.c2.type = memory 
a1.channels.c2.capacity = 1000 
a1.channels.c2.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 c2 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2

創建 flume- flume- hdfs .conf

配置上級 Flume 輸出的 Source,輸出是到 HDFS 的 Sink,

# Name the components on this agent 
a2.sources = r1 
a2.sinks = k1 
a2.channels = c1 
 
# Describe/configure the source 
# source 端的 avro 是一個資料接收服務 
a2.sources.r1.type = avro 
a2.sources.r1.bind = hadoop102 
a2.sources.r1.port = 4141 
 
# Describe the sink 
a2.sinks.k1.type = hdfs 
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H 
#上傳檔案的前綴 
a2.sinks.k1.hdfs.filePrefix = flume2- 
#是否按照時間滾動檔案夾 
a2.sinks.k1.hdfs.round = true 
#多少時間單位創建一個新的檔案夾 
a2.sinks.k1.hdfs.roundValue = 1 
#重新定義時間單位 
a2.sinks.k1.hdfs.roundUnit = hour 
#是否使用本地時間戳 
a2.sinks.k1.hdfs.useLocalTimeStamp = true 
#積攢多少個 Event 才 flush 到 HDFS 一次 
a2.sinks.k1.hdfs.batchSize = 100 
#設定檔案型別,可支持壓縮 
a2.sinks.k1.hdfs.fileType = DataStream 
#多久生成一個新的檔案 
a2.sinks.k1.hdfs.rollInterval = 600 
#設定每個檔案的滾動大小大概是 128M 
a2.sinks.k1.hdfs.rollSize = 134217700 
#檔案的滾動與 Event 數量無關 
a2.sinks.k1.hdfs.rollCount = 0 
 
# Describe the channel 
a2.channels.c1.type = memory 
a2.channels.c1.capacity = 1000 
a2.channels.c1.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1

創建 flume- flume- dir .conf

配置上級 Flume 輸出的 Source,輸出是到本地目錄的 Sink,

# Name the components on this agent 
a3.sources = r1 
a3.sinks = k1 
a3.channels = c2 
 
# Describe/configure the source 
a3.sources.r1.type = avro 
a3.sources.r1.bind = hadoop102 
a3.sources.r1.port = 4142 

# Describe the sink 
a3.sinks.k1.type = file_roll 
a3.sinks.k1.sink.directory = /opt/module/data/flume3 
 
# Describe the channel 
a3.channels.c2.type = memory 
a3.channels.c2.capacity = 1000 
a3.channels.c2.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a3.sources.r1.channels = c2 
a3.sinks.k1.channel = c2 

自定義 Interceptor

案例需求

使用 Flume 采集服務器本地日志, 需要按照日志型別的不同, 將不同種類的日志發往不同的分析系統,

需求分析

在實際的開發中, 一臺服務器產生的日志型別可能有很多種, 不同型別的日志可能需要發送到不同的分析系統, 此時會用到 Flume 拓撲結構中的 Multiplexing 結構, Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 發送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同型別的 event 的 Header 中的 key 賦予不同的值,
在這里插入圖片描述

實作步驟

新建maven工程,匯入依賴

<dependency> 
    <groupId>org.apache.flume</groupId> 
    <artifactId>flume-ng-core</artifactId> 
    <version>1.7.0</version> 
</dependency> 

自定義的攔截器需要實作Interceptor介面,然后重寫四個方法

//初始化方法
public void initialize(){} 

//處理單個事件,Event中封裝了Header和Body,可以根據body內容設定為回應的//Header,然后Flume會以Header進行區分
public Event intercept(Event event){}

//處理多個事件,可以呼叫處理單個事件的intercept()
public List<Event> intercept(List<Event> list){}

public void close();

//注意:還要定義一個靜態內部類,實作Interceptor.Builder,在實作的build方法中,回傳一個本類實體

在這里插入圖片描述

package com.gis.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author LnnuUser
 * @create 2021-08-03-下午8:16
 */
public class TypeInterceptor implements Interceptor {

    //宣告一個存放事件的集合
    private ArrayList<Event> addHeaderEvents;

    @Override
    public void initialize() {

        //初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();
    }

    /**
     * 處理單個事件
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {

        //1、獲取頭資訊
        Map<String, String> headers = event.getHeaders();

        //2、獲取body
        String body = new String(event.getBody());

        //3、根據body中是否有“hello”來決定添加怎樣的頭資訊
        if (body.contains("hello")){

            //4、添加頭資訊
            headers.put("type","GIS");
        }else {

            //4、添加頭資訊
            headers.put("type","AIS");
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        //1、清空集合
        addHeaderEvents.clear();

        //2、遍歷events,給每一個事件添加頭資訊
        for (Event event : list) {

            //3、給每一個事件添加頭資訊
            addHeaderEvents.add(intercept(event));

        }

        //4、回傳結果
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {

            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

**編輯 flume 組態檔 **

添加攔截器,攔截器就是自己寫的全類名,要打包放到flume的lib目錄中

a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type  = 
com.gis.flume.interceptor.CustomInterceptor$Builder 
a1.sources.r1.selector.type = multiplexing 

a1.sources.r1.selector.header = type 
a1.sources.r1.selector.mapping.letter = c1 
a1.sources.r1.selector.mapping.number = c2 

全部的配置

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 k2 
a1.channels = c1 c2 
 
# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 
 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type  = 
com.gis.flume.interceptor.CustomInterceptor$Builder 
a1.sources.r1.selector.type = multiplexing 

a1.sources.r1.selector.header = type 
a1.sources.r1.selector.mapping.letter = c1 
a1.sources.r1.selector.mapping.number = c2 
 
# Describe the sink 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = hadoop103 
a1.sinks.k1.port = 4141 
 
a1.sinks.k2.type=avro 
a1.sinks.k2.hostname = hadoop104 
a1.sinks.k2.port = 4242 
 
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
 
# Use a channel which buffers events in memory 
a1.channels.c2.type = memory 
a1.channels.c2.capacity = 1000 
a1.channels.c2.transactionCapacity = 100 
 
 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 c2 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2 

自定義 Source

Source 是負責接收資料到 Flume Agent 的組件,Source 組件可以處理各種型別、各種格式的日志資料,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy,官方提供的 source 型別已經很多,但是有時候并不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 source,
官方也提供了自定義 source 的介面:
https://flume.apache.org/FlumeDeveloperGuide.html#source 根據官方說明自定義
MySource 需要繼承 AbstractSource 類并實作 Configurable 和 PollableSource 介面,

實作步驟

自定義類繼承AbstractSource并且實作Configurable、PollableSource,然后重寫四個方法

//配置方法,從中讀取組態檔中的資訊
public void configure(Context context) {}

//主要的方法,邏輯都寫在該方法中,回傳值為Status狀態,執行成功Status.READY,執行失敗Status.BACKOFF
public Status process(){}

//這兩個方法很少用到
@Override
public long getBackOffSleepIncrement() {
    return 0;
}

@Override
public long getMaxBackOffSleepInterval() {
    return 0;
}

代碼實作

package com.gis.source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.nio.charset.StandardCharsets;

/**
 * @author LnnuUser
 * @create 2021-08-04-上午9:19
 */
public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定義全域的前綴和后綴
    private String prefix;
    private String subfix;

    Status status = null;

    /**
     *
     * @param context
     */
    @Override
    public void configure(Context context) {

        //讀取配置資訊,給前后綴賦值
        prefix = context.getString("prefix");
        subfix = context.getString("subfix","gis");
    }

    /**
     * 1.接受資料(for回圈造資料)
     * 2.封裝事件
     * 3.將事件傳給channel
     *
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {

        try {
            //1.接受資料
            for (int i = 0; i < 5; i++){

                //2.構建事件物件
                SimpleEvent event = new SimpleEvent();

                //3.給事件設定值
                event.setBody((prefix + "--" + i + subfix).getBytes());

                //4.將事件傳給channel
                getChannelProcessor().processEvent(event);

                status = Status.READY;
            }
        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return status;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}

在這里插入圖片描述

組態檔

在組態檔中,sink和channel都一樣,只是source需要指定自己自定義的,將代碼打包放到flume的lib中

#宣告三個組件
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#宣告source的屬性
a1.sources.r1.type=com.gis.source.MySource
a1.sources.r1.prefix=lnnu
a1.sources.r1.subfix=done

#宣告sink的屬性
a1.sinks.k1.type=logger

#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#建立聯系
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

自定義Sink

Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,并將這些事件批量寫入到存盤或索引系統、或者被發送到另一個 Flume Agent,

Sink 是完全事務性的,在從 Channel 批量洗掉資料之前,每個 Sink 用 Channel 啟動一個事務,批量事件一旦成功寫出到存盤系統或下一個 Flume Agent,Sink 就利用 Channel 提交事務,事務一旦被提交,該 Channel 從自己的內部緩沖區洗掉事件,

Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、
自定義,官方提供的 Sink 型別已經很多,但是有時候并不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 Sink,

自定義Sink步驟

自定義一個類繼承AbstractSink并且實作Configurable介面,實作里面的兩個方法

代碼撰寫

package com.gis.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author LnnuUser
 * @create 2021-08-04-上午10:22
 */
public class MySink extends AbstractSink implements Configurable {

    //定義兩個屬性,前后綴
    private String prefix;
    private String subfix;

    //獲取logger物件
    private Logger logger = LoggerFactory.getLogger(MySink.class);

    @Override
    public void configure(Context context) {

        //讀取組態檔,為前后綴賦值
        prefix = context.getString("prefix");
        subfix = context.getString("subfix","lnnu");
    }

    /**
     * 1.獲取channel
     * 2.從channel獲取事務以及資料
     * 3.發送資料
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {

        //1.定義回傳值
        Status status = null;

        //2.獲取channel
        Channel channel = getChannel();

        //3.從channel獲取事務
        Transaction transaction = channel.getTransaction();

        //4.開啟事務
        transaction.begin();

        try {
            //5.從channel獲取資料
            Event event = channel.take();

            //6.處理事務
            if (event != null) {

                String body = new String(event.getBody());
                logger.info(prefix + body + subfix);
            }

            //7.提交事務
            transaction.commit();

            //8.成功提交,修改狀態資訊
            status = Status.READY;

        } catch (ChannelException e) {

            //9.提交事務失敗
            transaction.rollback();

            //10.修改狀態
            status = Status.BACKOFF;
            e.printStackTrace();
        }finally {

            //11.最終關閉事務
            transaction.close();
        }

        //12.回傳狀態資訊
        return status;
    }
}

Flume的組態檔

Source和Channel都是正常的寫法,只是Sink需要指定為我們自定義的全類名,要先打包之后上傳到flume的lib目錄下
在這里插入圖片描述

a1.sources=r1
a1.sinks=k1
a1.channels=c1

#宣告source的屬性
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=33333

#宣告sink的屬性
a1.sinks.k1.type=com.gis.sink.MySink
a1.sinks.k1.prefix=lnnu
a1.sinks.k1.subfix=gis

#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#建立聯系
a1.sources.r1.channels=c1
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#宣告source的屬性
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=33333

#宣告sink的屬性
a1.sinks.k1.type=com.gis.sink.MySink
a1.sinks.k1.prefix=lnnu
a1.sinks.k1.subfix=gis

#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#建立聯系
a1.sources.r1.channels=c1
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#宣告source的屬性
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=33333

#宣告sink的屬性
a1.sinks.k1.type=com.gis.sink.MySink
a1.sinks.k1.prefix=lnnu
a1.sinks.k1.subfix=gis

#宣告channel的屬性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#建立聯系
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/292169.html

標籤:其他

上一篇:【秋招面試】貨拉拉面試(1面程序)

下一篇:大資料原理與技術(六):大資料離線計算框架MapReduce

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more