主頁 >  其他 > 數倉采集之環境搭建hadoop,zookeeper,kafka,flume

數倉采集之環境搭建hadoop,zookeeper,kafka,flume

2021-10-19 08:49:39 其他

數倉采集之環境搭建hadoop,zookeeper,kafka

前期的阿里云ECS環境已裝好,現在開始正式搭建專案的環境

hadoop安裝配置

1.集群規劃

服務器hadoop102服務器hadoop103服務器hadoop104
HDFSNameNodeDataNodeDataNodeDataNodeSecondaryNameNode
YarnNodeManagerResourcemanagerNodeManagerNodeManager

2.部署安裝

hadoop我這里使用的版本是hadoop-3.1.3.tar.gz

# 解壓
[atguigu@hadoop102 software]$ pwd
/opt/software
[atguigu@hadoop102 software]$ ll |grep hadoop
-rw-rw-r-- 1 atguigu atguigu 338075860 Oct 16 21:37 hadoop-3.1.3.tar.gz

[atguigu@hadoop102 software]$ tar -zxvf hadoop-3.1.3.tar.gz -C ../module/

# 配置環境變數 /etc/profile.d/my_env.sh, 然后 source /etc/profile
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

# hadoop version 檢驗hadoop是否安裝成功

在這里插入圖片描述

3.配置集群

core-site.xml,在configuration中添加如下配置

<!-- 指定NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop102:8020</value>
</property>
<!-- 指定hadoop資料的存盤目錄 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-3.1.3/data</value>
</property>

<!-- 配置HDFS網頁登錄使用的靜態用戶為atguigu -->
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>atguigu</value>
</property>

<!-- 配置該atguigu(superUser)允許通過代理訪問的主機節點 -->
    <property>
        <name>hadoop.proxyuser.atguigu.hosts</name>
        <value>*</value>
</property>
<!-- 配置該atguigu(superUser)允許通過代理用戶所屬組 -->
    <property>
        <name>hadoop.proxyuser.atguigu.groups</name>
        <value>*</value>
</property>
<!-- 配置該atguigu(superUser)允許通過代理的用戶-->
    <property>
        <name>hadoop.proxyuser.atguigu.groups</name>
        <value>*</value>
</property>

hdfs-site.xml

<!-- nn web端訪問地址-->
<property>
    <name>dfs.namenode.http-address</name>
    <value>hadoop102:9870</value>
</property>
<!-- 2nn web端訪問地址-->
<property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>hadoop104:9868</value>
</property>

yarn-site.xml

<!-- 指定MR走shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
	</property>
<!-- 指定ResourceManager的地址-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop103</value>
	</property>
<!-- 環境變數的繼承 -->
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
	</property>
<!-- yarn容器允許分配的最大最小記憶體 -->
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>512</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>4096</value>
	</property>
<!-- yarn容器允許管理的物理記憶體大小 默認8個G-->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>4096</value>
	</property>
<!-- 關閉yarn對物理記憶體和虛擬記憶體的限制檢查 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>

mapred-site.xml

<!-- 指定MapReduce程式運行在Yarn上 -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value> 
    </property>

配置works

hadoop102
hadoop103
hadoop104

4.配置歷史服務器

mapred-site.xml

<!-- 歷史服務器端地址 -->
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop102:10020</value>
</property>

<!-- 歷史服務器web端地址 -->
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop102:19888</value>
</property>

5.配置日志聚集

yarn-site.xml

<!-- 開啟日志聚集功能 -->
<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>
<!-- 設定日志聚集服務器地址 -->
<property>  
    <name>yarn.log.server.url</name>
    <value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 設定日志保留時間為7天 -->
<property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>604800</value>
</property>

ok,配到這里,所有的配置已經配置完畢,接下來就把整個hadoop的安裝目錄分發到hadoop103,hadoop104機器,環境變數也分發一下,

[atguigu@hadoop102 module]$ pwd
/opt/module
# 分發hadoop安裝目錄
[atguigu@hadoop102 module]$ my_rsync.sh hadoop-3.1.3
# 分發環境變數
[atguigu@hadoop102 module]$ my_rsync.sh /etc/profile.d/my_env.sh

# 查看各個機器的hadoop環境變數是否生效
[atguigu@hadoop102 module]$ all.sh hadoop version

測驗完成,現在所有機器的hadoop的環境已經裝好

6.啟動hadoop

# 102
[atguigu@hadoop102 module]$ hdfs namenode -format
# 102
[atguigu@hadoop102 module]$ start-dfs.sh
# 103
[atguigu@hadoop102 module]$ start-yarn.sh

ok.啟動成功,

# namenode web地址   
hadoop102:9870
# yarn web地址
hadoop103:8088

7.簡單跑個mr測驗一下

hadoop fs -mkdir /input
hadoop fs -put READ.txt /input
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output

# 啟動歷史服務器
mapred --daemon start historyserver

多目錄存盤

先熟悉一下,將來生產環境一定會用到

1.生產環境服務器磁盤情況

在這里插入圖片描述

2.在hdfs-site.xml檔案中配置多目錄,注意新掛載磁盤的訪問權限問題,

HDFS的DataNode節點保存資料的路徑由dfs.datanode.data.dir引數決定,其默認值為file://${hadoop.tmp.dir}/dfs/data,若服務器有多個磁盤,必須對該引數進行修改,如服務器磁盤如上圖所示,則該引數應修改為如下的值,
<property>

  <name>dfs.datanode.data.dir</name>

<value>file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>

</property>
注意:每臺服務器掛載的磁盤不一樣,所以每個節點的多目錄配置可以不一致,單獨配置即可,
不慌,到時候看下磁盤掛載情況,然后datanode屬性dfs.datanode.data.dir相應的配置就行

注意:每臺服務器掛載的磁盤不一樣,所以每個節點的多目錄配置可以不一致,單獨配置即可,

集群資料均衡

節點間資料均衡

開啟資料均衡命令:
start-balancer.sh -threshold 10
對于引數10,代表的是集群中各個節點的磁盤空間利用率相差不超過10%,可根據實際情況進行調整,

停止資料均衡命令:
stop-balancer.sh

磁盤間資料均衡

(1)生成均衡計劃(我們只有一塊磁盤,不會生成計劃)
hdfs diskbalancer -plan hadoop103
(2)執行均衡計劃
hdfs diskbalancer -execute hadoop103.plan.json
(3)查看當前均衡任務的執行情況
hdfs diskbalancer -query hadoop103
(4)取消均衡任務
hdfs diskbalancer -cancel hadoop103.plan.json

LZO壓縮配置

hadoop本身并不支持lzo壓縮,故需要使用twitter提供的hadoop-lzo開源組件,hadoop-lzo需依賴hadoop和lzo進行編譯,編譯步驟略,編譯后打成jar包: hadoop-lzo-0.4.20.jar

# 1.將編譯好后的hadoop-lzo-0.4.20.jar 放入hadoop-3.1.3/share/hadoop/common/
[atguigu@hadoop102 common]$ pwd
/opt/module/hadoop-3.1.3/share/hadoop/common
[atguigu@hadoop102 common]$ ls
hadoop-lzo-0.4.20.jar
# 2.同步hadoop-lzo-0.4.20.jar到hadoop103、hadoop104
[atguigu@hadoop102 common]$ my_rsync.sh hadoop-lzo-0.4.20.jar
# 3.core-site.xml增加配置支持LZO壓縮
<configuration>
    <property>
        <name>io.compression.codecs</name>
        <value>
            org.apache.hadoop.io.compress.GzipCodec,
            org.apache.hadoop.io.compress.DefaultCodec,
            org.apache.hadoop.io.compress.BZip2Codec,
            org.apache.hadoop.io.compress.SnappyCodec,
            com.hadoop.compression.lzo.LzoCodec,
            com.hadoop.compression.lzo.LzopCodec
        </value>
    </property>

    <property>
        <name>io.compression.codec.lzo.class</name>
        <value>com.hadoop.compression.lzo.LzoCodec</value>
    </property>
</configuration>

# 4.同步core-site.xml到hadoop103、hadoop104
[atguigu@hadoop102 hadoop]$ my_rsync.sh core-site.xml
# 5.啟動及查看集群
[atguigu@hadoop102 hadoop-3.1.3]$ start-dfs.sh
[atguigu@hadoop103 hadoop-3.1.3]$ start-yarn.sh


# 5.測驗lzo壓縮
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec /input /lzo-output


# 5.測驗lzo切片
# 注意:lzo的切片必須要創建索引,執行后會新建一個后綴為.index的檔案
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /lzo-input/bigtable.lzo

# 測驗lzo切片(把默認的TextInputFormat替換為LzoInputFormat) 
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount - Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /lzo-input /lzo-output

基準測驗

測驗HDFS寫性能

# 測驗命令(-nrFiles設定為cpu數量-2)hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 4 -fileSize 128MB

在這里插入圖片描述

測驗HDFS讀性能

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 4 -fileSize 128MB

洗掉測驗生成的資料

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

hadoop引數調優

HDFS引數調優hdfs-site.xml

The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
NameNode有一個作業執行緒池,用來處理不同DataNode的并發心跳以及客戶端并發的元資料操作,
對于大集群或者有大量客戶端的集群來說,通常需要增大引數dfs.namenode.handler.count的默認值10,

<property>
    <name>dfs.namenode.handler.count</name>
    <value>10</value>
</property>

計算公式:

在這里插入圖片描述

YARN引數調優yarn-site.xml

(1)情景描述:總共7臺機器,每天幾億條資料,資料源->Flume->Kafka->HDFS->Hive
面臨問題:資料統計主要用HiveSQL,沒有資料傾斜,小檔案已經做了合并處理,開啟的JVM重用,而且IO沒有阻塞,記憶體用了不到50%,但是還是跑的非常慢,而且資料量洪峰過來時,整個集群都會宕掉,基于這種情況有沒有優化方案,

(2)解決辦法:
記憶體利用率不夠,這個一般是Yarn的2個配置造成的,單個任務可以申請的最大記憶體大小,和Hadoop單個節點可用記憶體大小,調節這兩個引數能提高系統記憶體的利用率,
(a)yarn.nodemanager.resource.memory-mb
表示該節點上YARN可使用的物理記憶體總量,默認是8192(MB),注意,如果你的節點記憶體資源不夠8GB,則需要調減小這個值,而YARN不會智能的探測節點的物理記憶體總量,
(b)yarn.scheduler.maximum-allocation-mb
單個任務可申請的最多物理記憶體量,默認是8192(MB),

zookeeper安裝配置

集群規劃

服務器hadoop102服務器hadoop103服務器hadoop104
ZookeeperZookeeperZookeeperZookeeper

解壓安裝

# 解壓
[atguigu@hadoop102 module]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C ../module/
# 配置環境變數
# zookeeper
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
export PATH=$PATH:$ZOOKEEPER_HOME/bin

# 修改zookeeper的組態檔(創建zkData檔案夾,myid檔案,不重復id:2,3,4,)
# 還要zoo.sample.conf 改名為 zoo.cfg
dataDir=/opt/module/zookeeper-3.5.7/zkData
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

# 檔案分發到103,104
[atguigu@hadoop102 module]$ my_rsync.sh zookeeper-3.5.7/
# 分發后各自改 myid檔案3,4
# 分發環境變數
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/  
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/

zk群啟腳本zk_cluster.sh

#!/bin/bash
if [ $# -lt 1 ]
then
  echo "USAGE: zk.sh {start|stop|status}"
  exit
fi  
case $1 in
start)
	for i in hadoop102 hadoop103 hadoop104
	do
	  echo "=================> START $i ZK <================="
	  ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh start
	done
;;
stop)
	for i in hadoop102 hadoop103 hadoop104
	do
	  echo "=================> STOP $i ZK <================="
	  ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh stop
	done
;;

status)
	for i in hadoop102 hadoop103 hadoop104
	do
	  echo "=================> STATUS $i ZK <================="
	  ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh status
	done
;;

*)
  echo "USAGE: zk.sh {start|stop|status}"
  exit
;;
esac

kafka安裝配置

集群規劃

服務器hadoop102服務器hadoop103服務器hadoop104
KafkaKafkaKafkaKafka

解壓安裝

# 解壓
[atguigu@hadoop102 config]$ tar -zxvf kafka_2.11-2.4.1.tgz -C ../module/

# 在安裝目錄新建data目錄,用來存訊息資料
# 修改組態檔
broker.id=2
log.dirs=/opt/module/kafka_2.11-2.4.1/datas
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

# 撰寫環境變數
#kafka
export KAFKA_HOME=/opt/module/kafka_2.11-2.4.1
export PATH=$PATH:$KAFKA_HOME/bin


# 分發kafka安裝目錄(各自修改broker.id)以及環境變數
[atguigu@hadoop102 module]$ my_rsync.sh kafka_2.11-2.4.1/
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop103:/etc/profile  
[atguigu@hadoop102 module]$ scp /etc/profile.d/my_env.sh root@hadoop104:/etc/profile

kafka群啟腳本

#!/bin/bash
if [ $# -lt 1 ]
then
  echo "USAGE: kafka.sh {start|stop}"
  exit
fi  
case $1 in
start)
	for i in hadoop102 hadoop103 hadoop104
	do
	  echo "=================> START $i KF <================="
	  ssh $i /opt/module/kafka_2.11-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties
	done
;;
stop)
	for i in hadoop102 hadoop103 hadoop104
	do
	  echo "=================> STOP $i KF <================="
	  ssh $i /opt/module/kafka_2.11-2.4.1/bin/kafka-server-stop.sh
	done
;;

*)
  echo "USAGE: kafka.sh {start|stop}"
  exit
;;
esac

kafka壓力測驗

用Kafka官方自帶的腳本,對Kafka進行壓測,Kafka壓測時,可以查看到哪個地方出現了瓶頸(CPU,記憶體,網路IO),一般都是網路IO達到瓶頸,

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

# Kafka Producer壓力測驗
(1)在/opt/module/kafka/bin目錄下面有這兩個檔案,我們來測驗一下
[atguigu@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 說明:
record-size是一條資訊有多大,單位是位元組,
num-records是總共發送多少條資訊,
throughput 是每秒多少條資訊,設成-1,表示不限流,可測出生產者最大吞吐量,

(2)Kafka會列印下面的資訊
100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.
引數決議:本例中一共寫入10w條訊息,吞吐量為9.14 MB/sec,每次寫入的平均延遲為187.68毫秒,最大的延遲為424.00毫秒,

# Kafka Consumer壓力測驗
Consumer的測驗,如果這四個指標(IO,CPU,記憶體,網路)都不能改變,考慮增加磁區數來提升性能,
[atguigu@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
引數說明:
--zookeeper 指定zookeeper的鏈接資訊
--topic 指定topic的名稱
--fetch-size 指定每次fetch的資料的大小
--messages 總共要消費的訊息個數
測驗結果說明:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153
開始測驗時間,測驗結束資料,共消費資料9.5368MB,吞吐量2.0714MB/s,共消費100010條,平均每秒消費21722.4153條,

kafka機器數量計算

經驗計算公式

Kafka機器數量(經驗公式)=2*(峰值生產速度*副本數/100)+1
先拿到峰值生產速度,再根據設定的副本數,就能預估出需要部署Kafka的數量,
比如我們的峰值生產速度是50M/s,副本數為2,
Kafka機器數量=2*(50*2/100)+ 1=3

kafka磁區計算公式

https://blog.csdn.net/weixin_42641909/article/details/89294698

1)創建一個只有1個磁區的topic
2)測驗這個topic的producer吞吐量和consumer吞吐量,
3)假設他們的值分別是Tp和Tc,單位可以是MB/s,
4)然后假設總的目標吞吐量是Tt,那么磁區數=Tt / min(Tp,Tc)
例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;
磁區數=100 / 20 =5磁區
磁區數一般設定為:3-10個

flume安裝配置

解壓安裝

# 解壓
[atguigu@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C ../module/
# 配置環境變數
#flume
export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin

# 重繪配置
[atguigu@hadoop102 lib]$ source /etc/profile

# 洗掉/opt/module/flume-1.9.0/lib目錄下的  guava   jar包
[atguigu@hadoop102 lib]$ rm -rf guava-11.0.2.jar 

專案架構圖

在這里插入圖片描述

flume采集通道規劃

第一層flume:

source: taildir
channel: kafka channel
sink:無

第二層flume:

source: 無						source: kafka source      使用第二種:可以在source這里加攔截器
channel: kafka channel			 channel: file channel
sink: hdfs sink  				 sink: hdfs sink

攔截器配置

思考一個問題:就在在第一層的flume讀取資料時,可能存在不合法的資料(比如不是json格式),如果這樣這樣的資料進入到hdfs,將來使用決議時必然決議不了,所以有必要在第一層flume這里添加一個攔截器,

pom.xml

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
//資料采集的攔截器
package com.pihao.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class EtlLogInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //取出body
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        //判斷是否是json格式
        try {
            JSON.parseObject(body);
        }catch (JSONException e){
            return null;
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> list1 = new ArrayList<>();
        for (Event event : list) {
            Event newEvent = intercept(event);
            if(null != newEvent){
                list1.add(newEvent);
            }
        }
        return list1;
    }

    @Override
    public void close() {

    }


    /**
     * builder內部類,用來實體化上面的interceptor類
     */
    public static class MyBuilder implements Builder{
        @Override
        public Interceptor build() {
            return new EtlLogInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

Maven打包,將帶依賴的jar包上傳到/flume/lib目錄中

第一層flume配置與啟停腳本

source: taildir
channel: kafka channel
sink:無

在/opt/module/flume-1.9.0/jobs/gmall創建logserver-flume-kafka.conf

#name
a1.sources = r1
a1.channels = c1 

#source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.pihao.flume.interceptor.EtlLogInterceptor$MyBuilder

#channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# bind
a1.sources.r1.channels = c1

先啟動kafka集群,創建一個消費者去消費topic_log主題

再執行flume命令測驗

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

撰寫啟停腳本f1.sh

#!/bin/bash
if [ $# -lt 1 ]
then 
  echo "USAGE: f1.sh {start|stop}"
  exit
fi

case $1 in
start)
	for i in hadoop102 hadoop103 
	do
	  ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
	done  
;;

stop)
	for i in hadoop102 hadoop103 
	do
	  ssh $i "ps -ef | grep logserver-flume-kafka.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
	done 
;;

*)
  echo "USAGE: f1.sh {start|stop}"
  exit
;;
esac 

102自測通過后將flume的安裝目錄發送到103機器,以及環境變數

[atguigu@hadoop102 module]$ scp -r flume-1.9.0/ hadoop103:/opt/module/

第二層flume配置與啟停腳本

source: kafka source 可以在source這里加攔截器
channel: file channel
sink: hdfs sink

在第二層,我們將flume設定在104機器上,它是用來消費kafka中的資料存在hdfs

思考:這個添加的攔截器主要是干什么的呢?主要是用來覆寫kafka source里頭的默認的系統時候,換成日志里面的實際時間戳,不然這個資料會出現一些問題,本來是是23:59:59的資料,如果使用本地時間,可能就變成了第二天的資料了,所以這個問題必須要處理,用攔截器來做,

撰寫攔截器

package com.pihao.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class TimeStampInterceptor implements Interceptor {
    @Override
    public void initialize() {
        
    }
	
    //使用新的ts替換原來的timestamp
    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject jsonObject = JSON.parseObject(body);
        String ts = jsonObject.getString("ts");
        event.getHeaders().put("timestamp",ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    /**
     * builder內部類,用來實體化上面的interceptor類
     */
    public static class MyBuilder implements Builder{
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

將flume的安裝目錄發送到104機器,并且分發環境變數

在104機器創建 /opt/module/flume-1.9.0/jobs/gmall創建kafka-flume-hdfs.conf

#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.kafka.consumer.group.id = gmall
a1.sources.r1.batchDurationMillis = 2000

#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.pihao.flume.interceptor.TimeStampInterceptor$MyBuilder

#channel
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpoint
#a1.channels.c1.useDualCheckpoints = true 
#a1.channels.c1.backupCheckpointDir = /opt/module/flume-1.9.0/jobs/checkpoint-bk
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.keep-alive = 5


#hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#控制輸出檔案是原生檔案(下面的壓縮配置會報錯)
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k1.hdfs.codeC = lzop

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動hdfs

104執行flume命令

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/kafka-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

撰寫啟停腳本f2.sh

#!/bin/bash
if [ $# -lt 1 ]
then 
  echo "USAGE: f2.sh {start|stop}"
  exit
fi

case $1 in
start)
	for i in hadoop104 
	do
	  ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/gmall/kafka-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
	done  
;;

stop)
	for i in hadoop104 
	do
	  ssh $i "ps -ef | grep kafka-flume-hdfs.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
	done 
;;

*)
  echo "USAGE: f2.sh {start|stop}"
  exit
;;
esac 

在這里插入圖片描述

測驗成功

flume記憶體優化

1)問題描述:如果啟動消費Flume拋出如下例外
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解決方案步驟:
(1)在hadoop102服務器的/opt/module/flume/conf/flume-env.sh檔案中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop103、hadoop104服務器
[atguigu@hadoop102 conf]$ xsync flume-env.sh

3)Flume記憶體引數設定及優化
JVM heap一般設定為4G或更高
-Xmx與-Xms最好設定一致,減少記憶體抖動帶來的性能影響,如果設定不一致容易導致頻繁fullgc,
-Xms表示JVM Heap(堆記憶體)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆記憶體)最大允許的尺寸,按需分配,如果不設定一致,容易在初始化時,由于記憶體不夠,頻繁觸發fullgc,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/323374.html

標籤:其他

上一篇:Flink On Yarn模式配置

下一篇:Pyhton操作Neo4j圖資料庫實踐(南北朝隋唐歷史北朝主要人物知識圖譜)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more