flume采集檔案保存到kafka
創建檔案flumeexec.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source組件:r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/nana/text.log
# 描述和配置sink組件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumetopic
a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092,hadoop2:9092,hadoop3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
# 描述和配置channel組件,此處使用是記憶體快取的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
啟動flume:
bin/flume-ng agent -c conf -f conf/flumeexec.conf -n a1 -=Dflume.root.logger=INFO,console
啟動成功,并且自動創建topic
消費kafka中的資料:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic flumeTopic
從目錄采集到kafka
創建檔案flumetaildir.conf
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=TAILDIR
# 元資料位置
a1.sources.r1.positionFile = /usr/local/src/flume/taildir_position.json
# 監控的目錄
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/src/flume/data/.*log
a1.sources.r1.fileHeader = true
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092
a1.sinks.k1.kafka.topic = testTopic3
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.channels.c1.type=file
# 資料存放路徑
a1.channels.c1.checkpointDir = /usr/local/src/flume/filechannle/checkpointDir
# 檢查點路徑
a1.channels.c1.dataDirs = /usr/local/src/flume/filechannle/dataDirs
# channel中最多快取多少
a1.channels.c1.capacity=1000
# channel一次最多吐給sink多少
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動flume:
bin/flume-ng agent -c conf -f conf/flumetaildir.conf -n a1 -=Dflume.root.logger=INFO,console
啟動成功,并且自動創建topic
消費kafka中的資料:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic testTopic3
從埠采集到kafka
創建檔案flumenc.conf
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=testnc
a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092,hadoop2:9092,hadoop3:9092
a1.sinks.k1.kafka.flumeBatchSize=20
a1.sinks.k1.kafka.producer.acks=1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動flume:
bin/flume-ng agent -c conf -f conf/flumenc.conf -n a1 -=Dflume.root.logger=INFO,console
啟動成功,并且自動創建topic
消費kafka中的資料:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic testnc
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/356752.html
標籤:其他
上一篇:Hive的split,explode和lateral view(保姆級教程)
下一篇:Hive(HQL)資料庫
