前提
- 準備三臺服務器或者三臺虛擬機,使用虛擬機,強烈建議關閉防火墻,等配置完成后,再配置防火墻.
192.168.1.32
192.168.1.33
192.168.1.34 - 在每一臺虛擬機上安裝好JDK. 下載好kafka的壓縮包.
一
在每一臺虛擬機上,創建kafka和zookeeper的檔案和日志目錄
mkdir -p /data/zookeeper/{data,log}
mkdir -p /data/kafka/{data,log}
二
第一臺服務器寫入
echo "1" >/data/zookeeper/data/myid
第二臺服務器寫入
echo "2" >/data/zookeeper/data/myid
第三臺服務器寫入
echo "3" >/data/zookeeper/data/myid
三.修改組態檔
- 把kafka壓縮包解壓, kafka不用安裝, 解壓即用.我的三個虛擬機的檔案目錄是一樣的. 我的kafka的位置在
/usr/local/install目錄下. 進入kafka的conf目錄. 首先修改zookeeper組態檔vi zookeeper.properties.
# 把配置好的zookeeper日志和檔案,目錄配置一下
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
#maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=10
syncLimit=5
# 然后添加這三個
server.1=192.168.1.32:2888:3888
server.2=192.168.1.33:2888:3888
server.3=192.168.1.34:2888:3888
三臺虛擬機的zookeeper的組態檔做同樣的修改.
- 然后修改kafka的組態檔,
vi server.properties, 以192.168.1.32為例, 為了方便看清, 洗掉了大部分注釋.
############################# Server Basics #############################
# 這里的broker.id是唯一的, 三臺虛擬機依次配置成1, 2, 3
broker.id=1
############################# Socket Server Settings #############################
# 這里的IP需要修改成當前虛擬機的IP
advertised.listeners=PLAINTEXT://192.168.1.32:9092
num.network.threads=3
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka的日志目錄
log.dirs=/data/kafka/log
# 磁區數.可以寫成3個
num.partitions=3
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# zookeeper集群地址
zookeeper.connect=192.168.1.32:2181,192.168.1.33:2181,192.168.1.34:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
四. 啟動集群
PS: 一開始本來是使用獨立的zookeeper, 不用kafka自帶. 跑通后,但是就是這啟動腳本實在是不會寫. 而后轉自帶的zookeeper. 說多了都是淚. 這該死的知識儲備
- kafka依賴zookeeper, 啟動kafka前必須先啟動zookeeper
- 把zookeeper和kafka做成服務, 便于開機自啟動
- 撰寫啟動代碼 zookeeper.service
vi /lib/systemd/system/zookeeper.service
寫入:
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/install/jdk1.8.0_152/bin"
User=root
Group=root
ExecStart=/usr/local/install/kafka_2.12-2.7.0/bin/zookeeper-server-start.sh /usr/local/install/kafka_2.12-2.7.0/config/zookeeper.properties
ExecStop=/usr/local/install/kafka_2.12-2.7.0/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
kafka.service
vi /lib/systemd/system/kafka.service
寫入:
[Unit]
Description=Apache Kafka server (broker1)
After=network.target zookeeper.service
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/install/jdk1.8.0_152/bin"
User=root
Group=root
ExecStart=/usr/local/install/kafka_2.12-2.7.0/bin/kafka-server-start.sh /usr/local/install/kafka_2.12-2.7.0/config/server.properties
ExecStop=/usr/local/install/kafka_2.12-2.7.0/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
三臺虛擬機做同樣的修改
- 輸入
systemctl daemon-reload - 設定集群開機自啟動
systemctl enable zookeeper
systemctl enable kafka
- 啟動zookeeper
systemctl start zookeeper
查看狀態
systemctl status zookeeper

- 啟動kafka
systemctl start kafka
查看kafka狀態
systemctl status kafka

五. 測驗
在kafka解壓目錄/usr/local/install/kafka_2.12-2.7.0, 以啟動一臺虛擬機為生產者, 另外兩臺為消費者測驗
- 查看topic串列
bin/kafka-topics.sh --list --zookeeper 192.168.1.34:2181 - 創建topic
bin/kafka-topics.sh --describe --zookeeper 192.168.1.34:2181 --topic dcy-test - 在其中一臺服務器創建生產者
bin/kafka-console-producer.sh --broker-list 192.168.1.32:9092,192.168.1.33:9092,192.168.1.34:9092 --topic dcy-test

- 另外兩臺服務器創建消費者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.32:9092,192.168.1.33:9092,192.168.1.34:9092 --topic dcy-test --from-beginning

測驗通過!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/250651.html
標籤:其他
上一篇:記錄一次electron踩坑
