大資料之 Flume 對 接 Kafka 完整使用
- 一、Flume 對 接 Kafka
- 1)配置 flume(flume-kafka.conf)
- 2) 啟動 kafkaIDEA 消費者
- 3) 進入 flume 根目錄下,啟動 flume
- 4) 向 /opt/module/data/flume.log 里追加資料,查看 kafka 消費者消費情況
- 二、為什么要kafka對接Flume
- 1、問題
- 三、kafka對接Flume (資料分類)
- 1、編碼
- 2、丟到服務器
- 3、在flume 的job里面新增分類檔案如下
- 4、啟動兩個消費者
- 5、啟動flume
- 6、開啟發送資料埠
一、Flume 對 接 Kafka
1)配置 flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers =
hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 啟動 kafkaIDEA 消費者
3) 進入 flume 根目錄下,啟動 flume
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4) 向 /opt/module/data/flume.log 里追加資料,查看 kafka 消費者消費情況
$ echo hello >> /opt/module/data/flume.log
二、為什么要kafka對接Flume
1、問題
采集日志給多個人使用
如果使用flume、那就的再多加一個channel、不能動態加業務線
增加業務線動態增加(類似消費者可以動態增加、副本數不變)
三、kafka對接Flume (資料分類)
1、編碼
監聽頭部資訊 headers.put(“topic”, “first”);
package org.example.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 使用flume 判斷 卡法卡 走那個主題 自定義攔截器
* flum 自定義攔截器
* 定義型別攔截器
*/
public class TypeInterceptor implements Interceptor {
//宣告一個集合、用于存放攔截器處理后的事件
private List<Event> addHeaderEvents;
@Override
public void initialize() {
//初始化集合用于存放攔截器處理后的事件
addHeaderEvents = new ArrayList<>();
}
/**
* 單個事件處理方法
* event 包含body 和header
* https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
*
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
//1、獲取事件中的頭部資訊 header & body
Map<String, String> headers = event.getHeaders();
//獲取事件中的body資訊
String body = new String(event.getBody());
//根據body中是否有hello 來決定添加怎樣的頭部資訊
if (body.contains("hello")) {
headers.put("topic", "first");
} else {
headers.put("topic", "second");
}
//回傳資料
return event;
}
/**
* 批量事件處理方法
*
* @param list
* @return
*/
@Override
public List<Event> intercept(List<Event> list) {
//清空集合
addHeaderEvents.clear();
for (Event event : list) {
//交給單個Event 處理
addHeaderEvents.add(intercept(event));
}
//回傳資料
return addHeaderEvents;
}
@Override
public void close() {
}
/**
* a1.sources = r1
* a1.sinks = k1
* a1.channels = c1
* a1.sources.r1.interceptors = i1 i2
* a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
* a1.sources.r1.interceptors.i1.preserveExisting = false
* a1.sources.r1.interceptors.i1.hostHeader = hostname
* a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
* a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
* a1.sinks.k1.channel = c1
* 幫助構建攔截器物件 $Builder
*/
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
2、丟到服務器
位置是flume的lib下
3、在flume 的job里面新增分類檔案如下

新增配置屬性
#Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.example.interceptor.TypeInterceptor$Builder
#Chabbel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#事務容量
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、啟動兩個消費者
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic second
5、啟動flume
bin/flume-ng agent -c conf/ -f job/type_kafka.conf -n a1
6、開啟發送資料埠
nc localhost 44444
查看效果


轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/356278.html
標籤:其他
上一篇:2021第六屆數維杯A題新冠肺炎背景下的港口資源配置優化策略
下一篇:pandas使用groupby函式、agg函式獲取每個分組聚合對應的均值(mean)實戰:計算分組聚合單資料列的均值、計算分組聚合多資料列的均值
