數倉采集之環境搭建hadoop,zookeeper,kafka
前期的阿里云ECS環境已裝好,現在開始正式搭建專案的環境
hadoop安裝配置
1.集群規劃
| 服務器hadoop102 | 服務器hadoop103 | 服務器hadoop104 | |
|---|---|---|---|
| HDFS | NameNodeDataNode | DataNode | DataNodeSecondaryNameNode |
| Yarn | NodeManager | ResourcemanagerNodeManager | NodeManager |
2.部署安裝
hadoop我這里使用的版本是hadoop-3.1.3.tar.gz
# 解壓
[atguigu@hadoop102 software]$ pwd
/opt/software
[atguigu@hadoop102 software]$ ll |grep hadoop
-rw-rw-r-- 1 atguigu atguigu 338075860 Oct 16 21:37 hadoop-3.1.3.tar.gz
[atguigu@hadoop102 software]$ tar -zxvf hadoop-3.1.3.tar.gz -C ../module/
# 配置環境變數 /etc/profile.d/my_env.sh, 然后 source /etc/profile
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
# hadoop version 檢驗hadoop是否安裝成功

3.配置集群
core-site.xml,在configuration中添加如下配置
<!-- 指定NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop102:8020</value>
</property>
<!-- 指定hadoop資料的存盤目錄 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>
<!-- 配置HDFS網頁登錄使用的靜態用戶為atguigu -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>atguigu</value>
</property>
<!-- 配置該atguigu(superUser)允許通過代理訪問的主機節點 -->
<property>
<name>hadoop.proxyuser.atguigu.hosts</name>
<value>*</value>
</property>
<!-- 配置該atguigu(superUser)允許通過代理用戶所屬組 -->
<property>
<name>hadoop.proxyuser.atguigu.groups</name>
<value>*</value>
</property>
<!-- 配置該atguigu(superUser)允許通過代理的用戶-->
<property>
<name>hadoop.proxyuser.atguigu.groups</name>
<value>*</value>
</property>
hdfs-site.xml
<!-- nn web端訪問地址-->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop102:9870</value>
</property>
<!-- 2nn web端訪問地址-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:9868</value>
</property>
yarn-site.xml
<!-- 指定MR走shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定ResourceManager的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop103</value>
</property>
<!-- 環境變數的繼承 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<!-- yarn容器允許分配的最大最小記憶體 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
<!-- yarn容器允許管理的物理記憶體大小 默認8個G-->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>
<!-- 關閉yarn對物理記憶體和虛擬記憶體的限制檢查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
mapred-site.xml
<!-- 指定MapReduce程式運行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
配置works
hadoop102
hadoop103
hadoop104
4.配置歷史服務器
mapred-site.xml
<!-- 歷史服務器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop102:10020</value>
</property>
<!-- 歷史服務器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop102:19888</value>
</property>
5.配置日志聚集
yarn-site.xml
<!-- 開啟日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 設定日志聚集服務器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 設定日志保留時間為7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
ok,配到這里,所有的配置已經配置完畢,接下來就把整個hadoop的安裝目錄分發到hadoop103,hadoop104機器,環境變數也分發一下,
[atguigu@hadoop102 module]$ pwd
/opt/module
# 分發hadoop安裝目錄
[atguigu@hadoop102 module]$ my_rsync.sh hadoop-3.1.3
# 分發環境變數
[atguigu@hadoop102 module]$ my_rsync.sh /etc/profile.d/my_env.sh
# 查看各個機器的hadoop環境變數是否生效
[atguigu@hadoop102 module]$ all.sh hadoop version
測驗完成,現在所有機器的hadoop的環境已經裝好
6.啟動hadoop
# 102
[atguigu@hadoop102 module]$ hdfs namenode -format
# 102
[atguigu@hadoop102 module]$ start-dfs.sh
# 103
[atguigu@hadoop102 module]$ start-yarn.sh
ok.啟動成功,
# namenode web地址
hadoop102:9870
# yarn web地址
hadoop103:8088
7.簡單跑個mr測驗一下
hadoop fs -mkdir /input
hadoop fs -put READ.txt /input
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output
# 啟動歷史服務器
mapred --daemon start historyserver
多目錄存盤
先熟悉一下,將來生產環境一定會用到
1.生產環境服務器磁盤情況

2.在hdfs-site.xml檔案中配置多目錄,注意新掛載磁盤的訪問權限問題,
HDFS的DataNode節點保存資料的路徑由dfs.datanode.data.dir引數決定,其默認值為file://${hadoop.tmp.dir}/dfs/data,若服務器有多個磁盤,必須對該引數進行修改,如服務器磁盤如上圖所示,則該引數應修改為如下的值,
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
</property>
注意:每臺服務器掛載的磁盤不一樣,所以每個節點的多目錄配置可以不一致,單獨配置即可,
不慌,到時候看下磁盤掛載情況,然后datanode屬性dfs.datanode.data.dir相應的配置就行
注意:每臺服務器掛載的磁盤不一樣,所以每個節點的多目錄配置可以不一致,單獨配置即可,
集群資料均衡
節點間資料均衡
開啟資料均衡命令:
start-balancer.sh -threshold 10
對于引數10,代表的是集群中各個節點的磁盤空間利用率相差不超過10%,可根據實際情況進行調整,
停止資料均衡命令:
stop-balancer.sh
磁盤間資料均衡
(1)生成均衡計劃(我們只有一塊磁盤,不會生成計劃)
hdfs diskbalancer -plan hadoop103
(2)執行均衡計劃
hdfs diskbalancer -execute hadoop103.plan.json
(3)查看當前均衡任務的執行情況
hdfs diskbalancer -query hadoop103
(4)取消均衡任務
hdfs diskbalancer -cancel hadoop103.plan.json
LZO壓縮配置
hadoop本身并不支持lzo壓縮,故需要使用twitter提供的hadoop-lzo開源組件,hadoop-lzo需依賴hadoop和lzo進行編譯,編譯步驟略,編譯后打成jar包: hadoop-lzo-0.4.20.jar
# 1.將編譯好后的hadoop-lzo-0.4.20.jar 放入hadoop-3.1.3/share/hadoop/common/
[atguigu@hadoop102 common]$ pwd
/opt/module/hadoop-3.1.3/share/hadoop/common
[atguigu@hadoop102 common]$ ls
hadoop-lzo-0.4.20.jar
# 2.同步hadoop-lzo-0.4.20.jar到hadoop103、hadoop104
[atguigu@hadoop102 common]$ my_rsync.sh hadoop-lzo-0.4.20.jar
# 3.core-site.xml增加配置支持LZO壓縮
<configuration>
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>
# 4.同步core-site.xml到hadoop103、hadoop104
[atguigu@hadoop102 hadoop]$ my_rsync.sh core-site.xml
# 5.啟動及查看集群
[atguigu@hadoop102 hadoop-3.1.3]$ start-dfs.sh
[atguigu@hadoop103 hadoop-3.1.3]$ start-yarn.sh
# 5.測驗lzo壓縮
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec /input /lzo-output
# 5.測驗lzo切片
# 注意:lzo的切片必須要創建索引,執行后會新建一個后綴為.index的檔案
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /lzo-input/bigtable.lzo
# 測驗lzo切片(把默認的TextInputFormat替換為LzoInputFormat)
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount - Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /lzo-input /lzo-output
基準測驗
測驗HDFS寫性能
# 測驗命令(-nrFiles設定為cpu數量-2)hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 4 -fileSize 128MB

測驗HDFS讀性能
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 4 -fileSize 128MB
洗掉測驗生成的資料
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
hadoop引數調優
HDFS引數調優hdfs-site.xml
The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
NameNode有一個作業執行緒池,用來處理不同DataNode的并發心跳以及客戶端并發的元資料操作,
對于大集群或者有大量客戶端的集群來說,通常需要增大引數dfs.namenode.handler.count的默認值10,
<property>
<name>dfs.namenode.handler.count</name>
<value>10</value>
</property>
計算公式:

YARN引數調優yarn-site.xml
(1)情景描述:總共7臺機器,每天幾億條資料,資料源->Flume->Kafka->HDFS->Hive
面臨問題:資料統計主要用HiveSQL,沒有資料傾斜,小檔案已經做了合并處理,開啟的JVM重用,而且IO沒有阻塞,記憶體用了不到50%,但是還是跑的非常慢,而且資料量洪峰過來時,整個集群都會宕掉,基于這種情況有沒有優化方案,
(2)解決辦法:
記憶體利用率不夠,這個一般是Yarn的2個配置造成的,單個任務可以申請的最大記憶體大小,和Hadoop單個節點可用記憶體大小,調節這兩個引數能提高系統記憶體的利用率,
(a)yarn.nodemanager.resource.memory-mb
表示該節點上YARN可使用的物理記憶體總量,默認是8192(MB),注意,如果你的節點記憶體資源不夠8GB,則需要調減小這個值,而YARN不會智能的探測節點的物理記憶體總量,
(b)yarn.scheduler.maximum-allocation-mb
單個任務可申請的最多物理記憶體量,默認是8192(MB),
zookeeper安裝配置
集群規劃
| 服務器hadoop102 | 服務器hadoop103 | 服務器hadoop104 | |
|---|---|---|---|
| Zookeeper | Zookeeper | Zookeeper | Zookeeper |
解壓安裝
# 解壓
[atguigu@hadoop102 module]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C ../module/
# 配置環境變數
# zookeeper
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# 修改zookeeper的組態檔(創建zkData檔案夾,myid檔案,不重復id:2,3,4,)
# 還要zoo.sample.conf 改名為 zoo.cfg
dataDir=/opt/module/zookeeper-3.5.7/zkData
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
# 檔案分發到103,104
[atguigu@hadoop102 module]$ my_rsync.sh zookeeper-3.5.7/
# 分發后各自改 myid檔案3,4
# 分發環境變數
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/
zk群啟腳本zk_cluster.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "USAGE: zk.sh {start|stop|status}"
exit
fi
case $1 in
start)
for i in hadoop102 hadoop103 hadoop104
do
echo "=================> START $i ZK <================="
ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh start
done
;;
stop)
for i in hadoop102 hadoop103 hadoop104
do
echo "=================> STOP $i ZK <================="
ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh stop
done
;;
status)
for i in hadoop102 hadoop103 hadoop104
do
echo "=================> STATUS $i ZK <================="
ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh status
done
;;
*)
echo "USAGE: zk.sh {start|stop|status}"
exit
;;
esac
kafka安裝配置
集群規劃
| 服務器hadoop102 | 服務器hadoop103 | 服務器hadoop104 | |
|---|---|---|---|
| Kafka | Kafka | Kafka | Kafka |
解壓安裝
# 解壓
[atguigu@hadoop102 config]$ tar -zxvf kafka_2.11-2.4.1.tgz -C ../module/
# 在安裝目錄新建data目錄,用來存訊息資料
# 修改組態檔
broker.id=2
log.dirs=/opt/module/kafka_2.11-2.4.1/datas
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
# 撰寫環境變數
#kafka
export KAFKA_HOME=/opt/module/kafka_2.11-2.4.1
export PATH=$PATH:$KAFKA_HOME/bin
# 分發kafka安裝目錄(各自修改broker.id)以及環境變數
[atguigu@hadoop102 module]$ my_rsync.sh kafka_2.11-2.4.1/
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop103:/etc/profile
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop104:/etc/profile
kafka群啟腳本
#!/bin/bash
if [ $# -lt 1 ]
then
echo "USAGE: kafka.sh {start|stop}"
exit
fi
case $1 in
start)
for i in hadoop102 hadoop103 hadoop104
do
echo "=================> START $i KF <================="
ssh $i /opt/module/kafka_2.11-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties
done
;;
stop)
for i in hadoop102 hadoop103 hadoop104
do
echo "=================> STOP $i KF <================="
ssh $i /opt/module/kafka_2.11-2.4.1/bin/kafka-server-stop.sh
done
;;
*)
echo "USAGE: kafka.sh {start|stop}"
exit
;;
esac
kafka壓力測驗
用Kafka官方自帶的腳本,對Kafka進行壓測,Kafka壓測時,可以查看到哪個地方出現了瓶頸(CPU,記憶體,網路IO),一般都是網路IO達到瓶頸,
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
# Kafka Producer壓力測驗
(1)在/opt/module/kafka/bin目錄下面有這兩個檔案,我們來測驗一下
[atguigu@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 說明:
record-size是一條資訊有多大,單位是位元組,
num-records是總共發送多少條資訊,
throughput 是每秒多少條資訊,設成-1,表示不限流,可測出生產者最大吞吐量,
(2)Kafka會列印下面的資訊
100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.
引數決議:本例中一共寫入10w條訊息,吞吐量為9.14 MB/sec,每次寫入的平均延遲為187.68毫秒,最大的延遲為424.00毫秒,
# Kafka Consumer壓力測驗
Consumer的測驗,如果這四個指標(IO,CPU,記憶體,網路)都不能改變,考慮增加磁區數來提升性能,
[atguigu@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
引數說明:
--zookeeper 指定zookeeper的鏈接資訊
--topic 指定topic的名稱
--fetch-size 指定每次fetch的資料的大小
--messages 總共要消費的訊息個數
測驗結果說明:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153
開始測驗時間,測驗結束資料,共消費資料9.5368MB,吞吐量2.0714MB/s,共消費100010條,平均每秒消費21722.4153條,
kafka機器數量計算
經驗計算公式
Kafka機器數量(經驗公式)=2*(峰值生產速度*副本數/100)+1
先拿到峰值生產速度,再根據設定的副本數,就能預估出需要部署Kafka的數量,
比如我們的峰值生產速度是50M/s,副本數為2,
Kafka機器數量=2*(50*2/100)+ 1=3臺
kafka磁區計算公式
https://blog.csdn.net/weixin_42641909/article/details/89294698
1)創建一個只有1個磁區的topic
2)測驗這個topic的producer吞吐量和consumer吞吐量,
3)假設他們的值分別是Tp和Tc,單位可以是MB/s,
4)然后假設總的目標吞吐量是Tt,那么磁區數=Tt / min(Tp,Tc)
例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;
磁區數=100 / 20 =5磁區
磁區數一般設定為:3-10個
flume安裝配置
解壓安裝
# 解壓
[atguigu@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C ../module/
# 配置環境變數
#flume
export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin
# 重繪配置
[atguigu@hadoop102 lib]$ source /etc/profile
# 洗掉/opt/module/flume-1.9.0/lib目錄下的 guava jar包
[atguigu@hadoop102 lib]$ rm -rf guava-11.0.2.jar
專案架構圖

flume采集通道規劃
第一層flume:
source: taildir
channel: kafka channel
sink:無
第二層flume:
source: 無 source: kafka source 使用第二種:可以在source這里加攔截器
channel: kafka channel channel: file channel
sink: hdfs sink sink: hdfs sink
攔截器配置
思考一個問題:就在在第一層的flume讀取資料時,可能存在不合法的資料(比如不是json格式),如果這樣這樣的資料進入到hdfs,將來使用決議時必然決議不了,所以有必要在第一層flume這里添加一個攔截器,
pom.xml
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
//資料采集的攔截器
package com.pihao.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class EtlLogInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//取出body
String body = new String(event.getBody(), StandardCharsets.UTF_8);
//判斷是否是json格式
try {
JSON.parseObject(body);
}catch (JSONException e){
return null;
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> list1 = new ArrayList<>();
for (Event event : list) {
Event newEvent = intercept(event);
if(null != newEvent){
list1.add(newEvent);
}
}
return list1;
}
@Override
public void close() {
}
/**
* builder內部類,用來實體化上面的interceptor類
*/
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new EtlLogInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
Maven打包,將帶依賴的jar包上傳到/flume/lib目錄中
第一層flume配置與啟停腳本
source: taildir
channel: kafka channel
sink:無
在/opt/module/flume-1.9.0/jobs/gmall創建logserver-flume-kafka.conf
#name
a1.sources = r1
a1.channels = c1
#source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.pihao.flume.interceptor.EtlLogInterceptor$MyBuilder
#channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
# bind
a1.sources.r1.channels = c1
先啟動kafka集群,創建一個消費者去消費topic_log主題
再執行flume命令測驗
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
撰寫啟停腳本f1.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "USAGE: f1.sh {start|stop}"
exit
fi
case $1 in
start)
for i in hadoop102 hadoop103
do
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
done
;;
stop)
for i in hadoop102 hadoop103
do
ssh $i "ps -ef | grep logserver-flume-kafka.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
done
;;
*)
echo "USAGE: f1.sh {start|stop}"
exit
;;
esac
102自測通過后將flume的安裝目錄發送到103機器,以及環境變數
[atguigu@hadoop102 module]$ scp -r flume-1.9.0/ hadoop103:/opt/module/
第二層flume配置與啟停腳本
source: kafka source 可以在source這里加攔截器
channel: file channel
sink: hdfs sink在第二層,我們將flume設定在104機器上,它是用來消費kafka中的資料存在hdfs
思考:這個添加的攔截器主要是干什么的呢?主要是用來覆寫kafka source里頭的默認的系統時候,換成日志里面的實際時間戳,不然這個資料會出現一些問題,本來是是23:59:59的資料,如果使用本地時間,可能就變成了第二天的資料了,所以這個問題必須要處理,用攔截器來做,
撰寫攔截器
package com.pihao.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class TimeStampInterceptor implements Interceptor {
@Override
public void initialize() {
}
//使用新的ts替換原來的timestamp
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSON.parseObject(body);
String ts = jsonObject.getString("ts");
event.getHeaders().put("timestamp",ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
/**
* builder內部類,用來實體化上面的interceptor類
*/
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
將flume的安裝目錄發送到104機器,并且分發環境變數
在104機器創建 /opt/module/flume-1.9.0/jobs/gmall創建kafka-flume-hdfs.conf
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.kafka.consumer.group.id = gmall
a1.sources.r1.batchDurationMillis = 2000
#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.pihao.flume.interceptor.TimeStampInterceptor$MyBuilder
#channel
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpoint
#a1.channels.c1.useDualCheckpoints = true
#a1.channels.c1.backupCheckpointDir = /opt/module/flume-1.9.0/jobs/checkpoint-bk
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.keep-alive = 5
#hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#控制輸出檔案是原生檔案(下面的壓縮配置會報錯)
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k1.hdfs.codeC = lzop
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動hdfs
104執行flume命令
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/kafka-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
撰寫啟停腳本f2.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "USAGE: f2.sh {start|stop}"
exit
fi
case $1 in
start)
for i in hadoop104
do
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/kafka-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
done
;;
stop)
for i in hadoop104
do
ssh $i "ps -ef | grep kafka-flume-hdfs.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
done
;;
*)
echo "USAGE: f2.sh {start|stop}"
exit
;;
esac

測驗成功
flume記憶體優化
1)問題描述:如果啟動消費Flume拋出如下例外
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解決方案步驟:
(1)在hadoop102服務器的/opt/module/flume/conf/flume-env.sh檔案中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
(2)同步配置到hadoop103、hadoop104服務器
[atguigu@hadoop102 conf]$ xsync flume-env.sh
3)Flume記憶體引數設定及優化
JVM heap一般設定為4G或更高
-Xmx與-Xms最好設定一致,減少記憶體抖動帶來的性能影響,如果設定不一致容易導致頻繁fullgc,
-Xms表示JVM Heap(堆記憶體)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆記憶體)最大允許的尺寸,按需分配,如果不設定一致,容易在初始化時,由于記憶體不夠,頻繁觸發fullgc,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/323374.html
標籤:其他
