1、首先自己啟動zookeeper、kafka集群后,集群啟動起來后,行程查看如下:
[root@flink102 kafka-2.11]# jps
15459 QuorumPeerMain
21466 Kafka
2、自己已經把kafka的topic創建出來了,查看當前服務器中的所有topic如下:
[root@flink102 kafka-2.11]# bin/kafka-topics.sh --zookeeper flink102:2181 --list
ct
3、接著自己創建kafka消費者
[root@flink102 kafka-2.11]# bin/kafka-console-consumer.sh --zookeeper flink102:2181 --from-beginning --topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
4、自己在在workProject檔案目錄下創建 flume-kafka.conf檔案
[root@flink102 ~]# cd /opt/workProject/
[root@flink102 workProject]# ll
total 32
-rw-r--r-- 1 root root 4312 Mar 27 15:10 call.log
-rw-r--r-- 1 root root 543 Mar 24 12:26 contact.log
-rw-r--r-- 1 root root 14155 Mar 24 12:53 ct-producer.jar
-rw-r--r-- 1 root root 683 Mar 27 14:37 flume-kafka.conf
drwxr-xr-x 2 root root 24 Mar 25 11:11 log
[root@flink102 workProject]# vim flume-kafka.conf
//添加配置引數:
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/workProject/call.log
a1.sources.r1.shell = /bin/bash -c
# # sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers =flink102:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# # channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
其中, call.log是有資料的,如下:
[root@flink102 workProject]# tail -f call.log
15884588694 19154926260 20180721043739 1172
16574556259 19154926260 20180311120306 0942
15280214634 15647679901 20180904154615 0234
16160892861 14171709460 20181223154548 1720
15244749863 19342117869 20180404160230 2565
15647679901 14171709460 20180801213806 0758
15884588694 14397114174 20180222050955 0458
19154926260 16569963779 20180715235743 1489
14171709460 19602240179 20181120075855 2488
19683537146 16574556259 20180724031723 0652
5、啟動flume做資料采集
[root@flink102 ~]# cd /usr/hadoop/module/flume/flume-1.7.0/
[root@flink102 flume-1.7.0]# bin/flume-ng agent -c conf/ -f /opt/workProject/flume-kafka.conf
執行加載資料的程序,如圖所示:


6、在kafka消費者查看,資料發現沒有,無法消費資料

一直停留在:
[root@flink102 kafka-2.11]# bin/kafka-console-consumer.sh --zookeeper flink102:2181 --from-beginning --topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2020-03-30 10:59:11,139] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
7 、在/flume-1.7.0目錄下的logs日志查看,發現報錯:
30 Mar 2020 11:15:33,808 ERROR [main] (org.apache.flume.node.Application.main:348) - A fatal error occurred while running. Exception follows.
org.apache.commons.cli.MissingOptionException: Missing required option: n
at org.apache.commons.cli.Parser.checkRequiredOptions(Parser.java:299)
at org.apache.commons.cli.Parser.parse(Parser.java:231)
at org.apache.commons.cli.Parser.parse(Parser.java:85)
at org.apache.flume.node.Application.main(Application.java:263)
問下,大佬們,這是什么原因,如何解決,謝謝!
uj5u.com熱心網友回復:
# # source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/workProject/call.log
a1.sources.r1.shell = /bin/bash -c
tail -F -c +0 /opt/workProject/call.log 應該是這里的問題,你先把這部分拷出來在shell里測驗通過,再復制進去。應該是-f而不是-F
uj5u.com熱心網友回復:
好的,謝謝,我看看uj5u.com熱心網友回復:
剛按照你的方法試過了,
# # source
a1.sources.r1.type = exec
a1.sources.r1.command =tail -f -c +0 /opt/workProject/call.log
a1.sources.r1.shell = /bin/bash -c
如圖,所示:

之后,我再重新啟動kafka的消費者
[root@flink102 kafka-2.11]# bin/kafka-console-consumer.sh --zookeeper flink102:2181 --topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
再啟動flume服務:
[root@flink102 flume-1.7.0]# bin/flume-ng agent -c conf/ a1 -f /usr/hadoop/module/flume/flume-1.7.0/conf/flume-kafka.conf
flume服務加載程序,正常

最后,在kafka消費者,還是無法接收資料
[root@flink102 kafka-2.11]# bin/kafka-console-consumer.sh --zookeeper flink102:2181 --topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2020-03-30 16:30:28,065] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
最后,自己在flume的logs檔案查看日志資訊,還是出錯
[root@flink102 flume-1.7.0]# tail -f logs/flume.log
at org.apache.commons.cli.Parser.checkRequiredOptions(Parser.java:299)
at org.apache.commons.cli.Parser.parse(Parser.java:231)
at org.apache.commons.cli.Parser.parse(Parser.java:85)
at org.apache.flume.node.Application.main(Application.java:263)
30 Mar 2020 16:38:04,434 ERROR [main] (org.apache.flume.node.Application.main:348) - A fatal error occurred while running. Exception follows.
org.apache.commons.cli.MissingOptionException: Missing required option: n
at org.apache.commons.cli.Parser.checkRequiredOptions(Parser.java:299)
at org.apache.commons.cli.Parser.parse(Parser.java:231)
at org.apache.commons.cli.Parser.parse(Parser.java:85)
at org.apache.flume.node.Application.main(Application.java:263)
uj5u.com熱心網友回復:
看漏了,是你flume啟動引數缺少agentName (-n a1)bin/flume-ng agent -n a1 -c conf/ -f /usr/hadoop/module/flume/flume-1.7.0/conf/flume-kafka.conf
uj5u.com熱心網友回復:
好的,我剛才試了一下,確實少了agent,我重新執行一下:[root@flink102 flume-1.7.0]# bin/flume-ng agent -n a1 -c conf/ -f /usr/hadoop/module/flume/flume-1.7.0/conf/flume-kafka.conf
加載程序:


kafka消費者,好像還是收不到
[root@flink102 kafka-2.11]# bin/kafka-console-consumer.sh --zookeeper flink102:2181 --topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
如圖所示:
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/78091.html
標籤:分布式計算/Hadoop
上一篇:TFS的門戶站點的燃盡圖的“理想燃速”公式是不是錯了?
下一篇:區間調度問題變形
