1、首先自己啟動zookeeper、kafka集群后,集群啟動起來后,行程查看如下:
[root@flink102 kafka-2.11]# jps
15459 QuorumPeerMain
21466 Kafka
2、自己已經把kafka的topic創建出來了,查看當前服務器中的所有topic如下:
[root@flink102 kafka-2.11]# bin/kafka-topics.sh --zookeeper flink102:2181 --list
ct
3、接著自己創建kafka消費者
[root@flink102 kafka-2.11]# bin/kafka-console-consumer.sh --zookeeper flink102:2181 --from-beginning --topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
4、自己在在workProject檔案目錄下創建 flume-kafka.conf檔案
[root@flink102 ~]# cd /opt/workProject/
[root@flink102 workProject]# ll
total 32
-rw-r--r-- 1 root root 4312 Mar 27 15:10 call.log
-rw-r--r-- 1 root root 543 Mar 24 12:26 contact.log
-rw-r--r-- 1 root root 14155 Mar 24 12:53 ct-producer.jar
-rw-r--r-- 1 root root 683 Mar 27 14:37 flume-kafka.conf
drwxr-xr-x 2 root root 24 Mar 25 11:11 log
[root@flink102 workProject]# vim flume-kafka.conf
//添加配置引數:
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/workProject/call.log
a1.sources.r1.shell = /bin/bash -c
# # sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers =flink102:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# # channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
其中, call.log是有資料的,如下:
[root@flink102 workProject]# tail -f call.log
15884588694 19154926260 20180721043739 1172
16574556259 19154926260 20180311120306 0942
15280214634 15647679901 20180904154615 0234
16160892861 14171709460 20181223154548 1720
15244749863 19342117869 20180404160230 2565
15647679901 14171709460 20180801213806 0758
15884588694 14397114174 20180222050955 0458
19154926260 16569963779 20180715235743 1489
14171709460 19602240179 20181120075855 2488
19683537146 16574556259 20180724031723 0652
5、啟動flume做資料采集
[root@flink102 ~]# cd /usr/hadoop/module/flume/flume-1.7.0/
[root@flink102 flume-1.7.0]# bin/flume-ng agent -c conf/ -f /opt/workProject/flume-kafka.conf
執行加載資料的程序,如圖所示:


6、在kafka消費者查看,資料發現沒有,無法消費資料

一直停留在:
[root@flink102 kafka-2.11]# bin/kafka-console-consumer.sh --zookeeper flink102:2181 --from-beginning --topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2020-03-30 10:59:11,139] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
問下,大佬們,這是什么原因,如何解決,謝謝!
uj5u.com熱心網友回復:
檢查一下你的flume-kafka.conf組態檔是否匹配你的flume和kafka版本轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/28257.html
標籤:分布式計算/Hadoop
上一篇:Sd卡不能格式化,求方法,
