ZooKeeper 的作用
Kafka使用ZooKeeper管理集群,ZooKeeper用于協調服務器或集群拓撲,ZooKeeper是配置資訊的一致性檔案系統,你可以選擇Kafka自帶的Zookeeper,也可以選擇單獨部署,一臺Linux主機開放三個埠即可構建一個簡單的偽ZooKeeper集群,
ZooKeeper可以將拓撲更改發送到Kafka,如果集群中的某臺服務器宕機或者某個topic被添加、洗掉,集群中的每個節點都可以知道新服務器何時加入,ZooKeeper提供Kafka Cluster配置的同步視圖,
Zookeeper簡介
ZooKeeper 是一個開源的分布式框架,提供了協調分布式應用的基本服務,它向外部應用 暴露一組通用服務——分布式同步(Distributed Synchronization)、命名服務(Naming Service)、集群維護(group maintenance)等,簡化分布式應用協調及其管理的難度, 它是 google 的 chubby 一個開源的實作, 它本身可以搭建成一個集群,這個 zk 集群用來對應用程式集群進行管理,監視應用程式集群中各個節點的狀態,并根據應用程式集群中各個節點提交的反饋資訊決定下一步的合理操作, 做分布式鎖很有效 
Zookeeper下載和安裝
實作的是Zookeeper集群的創建,注意要修改zoo.dfg檔案名稱
(base) root@top1server:~# wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
(base) root@top1server:~# tar -zvxf zookeeper-3.4.10.tar.gz
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper1
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper2
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper3
(base) root@top1server:~/zookeeper/zookeeper1/conf# mv zoo_sample.cfg zoo.cfg
(base) root@top1server:~/zookeeper/zookeeper1/bin# ./zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper1/bin# ./zkServer.sh status


部署三個節點的Zookeeper偽分布集群
在同一臺服務器上,部署一個 3 個 ZooKeeper 節點組成的集群,這樣的集群叫偽分布式集群,而如果集群中的 3 個節點分別部署在 3 個服務器上,那么這種集群就叫真正的分布式集群,
(base) root@top1server:~/zookeeper# cd zookeeper1
(base) root@top1server:~/zookeeper/zookeeper1# mkdir data
(base) root@top1server:~/zookeeper/zookeeper1# mkdir logs
(base) root@top1server:~/zookeeper/zookeeper1# touch data/myid
(base) root@top1server:~/zookeeper/zookeeper1# cd data/
(base) root@top1server:~/zookeeper/zookeeper1/data# cat myid
1
設定組態檔 zoo.cfg 的內容如下:

組態檔中的配置項的含義參見下面的介紹,
- 用同樣的方法,在 zookeeper2 和 zookeeper3 的相應位置創建 zoo.cfg,檔案內容復制zookeeper1 的 zoo.cfg,
- 只不過需要改動 clientport、dataDir、dataLogDir 三個配置項,zookeeper2 的 clientport 改為 2182,zookeeper3 的 clientport 改為 2183,而 dataDir和 dataLogDir 都修改為相應的目錄,就好了,
啟動 zk 集群
接下來再分別啟動服務
(base) root@top1server:~/zookeeper/zookeeper1/bin# sh zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper2/bin# sh zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper3/bin# sh zkServer.sh start
這樣 zookeeper 集群的 3 個節點都啟動起來了,




一個leader多個server,也遵循2n+1的選舉策略,想保證高可用至少有三臺機器
客戶端接入集群
進入 zookeeper 集群中任意一個節點的 bin 目錄下,啟動一個客戶端,接入已經啟動好的zookeeper 集群,
這里的 server 可以填寫集群中的任何一個節點的 ip,埠號是對應 ip的節點的組態檔中 clientport 的值,
./zkCli.sh –server 127.0.0.1:2181
真實分布式集群需要注意的地方
真正的分布式集群和偽分布式集群不一樣的地方在于組態檔,
- clientport 埠各個節點一樣就行,
- server.1=127.0.0.1:8880:7770 中的 ip 要修改成對應的 server 的 ip,后邊的兩個埠號不需要不同,各個節點都一樣就可以了,
其他地方偽分布式和真正分布式都是一樣的,
ZooKeeper 組態檔中的配置項的含義
組態檔中配置項的含義:
- tickTime: zookeeper 中使用的基本時間單位,毫秒值,比如可以設為 1000,那么基本時間單位就是 1000ms,也就是 1s,
- initLimit: zookeeper 集群中的包含多臺 server,其中一臺為 leader,集群中其余的 server 為 follower,initLimit 引數配置初始化連接時,follower 和 leader 之間的最長 心 跳 時 間 , 如 果 該 參 數 設 置 為 5 , 就 說 明 時 間 限 制 為 5 倍 tickTime , 即5*1000=5000ms=5s,
- syncLimit: 該引數配置 leader 和 follower 之間發送訊息,請求和應答的最大時間長度,如果該引數設定為 2,說明時間限制為 2 倍 tickTime,即 2000ms,
- dataDir: 資料目錄. 可以是任意目錄,一般是節點安裝目錄下 data 目錄,
- dataLogDir: log目錄, 同樣可以是任意目錄,一般是節點安裝目錄下的logs目錄,如果沒有設定該引數,將使用和 dataDir 相同的設定,
- clientPort: 監聽 client 連接的埠號,
- server.X=A:B:C 其中 X 是一個數字, 表示這是第幾號 server,它的值和 myid 檔案中的值對應,A 是該 server 所在的 IP 地址,B 是配置該 server 和集群中的 leader 交換訊息所使用的埠,C 配置選舉 leader 時所使用的埠,由于配置的是偽集群模式,所以各個 server 的 B, C 引數必須不同,如果是真正分布式集群,那么 B 和 C 在各個節點上可以相同,因為即使相同由于節點處于不同的服務器也不會導致埠沖突
Zookeeper 常用命令
- 啟動 ZK 服務: bin/zkServer.sh start
- 查看 ZK 服務狀態: bin/zkServer.sh status
- 停止 ZK 服務: bin/zkServer.sh stop
- 重啟 ZK 服務: bin/zkServer.sh restart
- 連接服務器: zkCli.sh -server 127.0.0.1:2181

創建節點
使用 create 命令,可以創建一個 Zookeeper 節點, 如
create [-s] [-e] path data acl
其中,-s 或-e 分別指定節點特性,順序或臨時節點,若不指定,則表示持久節點;acl 用來進行權限控制,

順序節點可以看到創建的節點后面添加了一串數字以示區別,


臨時節點在退出會自動洗掉
使用 create /zk-permanent 123 命令創建 zk-permanent 永久節點
可以看到永久節點不同于順序節點,不會自動在后面添加一串數字,

讀取節點
與讀取相關的命令有 ls 命令和 get 命令,ls 命令可以列出 Zookeeper 指定節點下的所有子節點,只能查看指定節點下的第一級的所有子節點;
get 命令可以獲取 ZK 指定節點的資料內容和屬性資訊,其用法分別如下
ls path [watch]
get path [watch]
ls2 path [watch]
ls2可以查看所有內容
get可以獲取系欸但的資料內容和屬性
更新節點
使用 set 命令,可以更新指定節點的資料內容,用法如下
set path data [version]
其中,data 就是要更新的新內容,version 表示資料版本,如將/zk-permanent 節點的資料更新為 456,可以使用如下命令:set /zk-permanent 456
dataVersion 會變為 1 了,表示進行了更新,
洗掉節點
使用 delete 命令可以洗掉 Zookeeper 上的指定節點,用法如下
delete path [version]
其 中 version 也 是 表 示 數 據 版 本 , 使 用 delete /zk-permanent 命 令 即 可 刪 除/zk-permanent 節點
值得注意的是,若洗掉節點存在子節點,那么無法洗掉該節點,必須先洗掉子節點,再洗掉父節點
zk的使用都是靠這些節點使用
分布式鎖的生成是用臨時順序節點來實作的
Kafka概述
Kafka訊息佇列

(1)點對點模式(一對一,消費者主動拉取資料,訊息收到后訊息清除)
點對點模型通常是一個基于拉取或者輪詢的訊息傳送模型,這種模型從佇列中請求資訊,而不是將訊息推送到客戶端,這個模型的特點是發送到佇列的訊息被一個且只有一個接收者接收處理,即使有多個訊息監聽者也是如此,
(2)發布/訂閱模式(一對多,資料生產后,推送給所有訂閱者)
發布訂閱模型則是一個基于推送的訊息傳送模型,發布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收訊息,而持久訂閱者則監聽主題的所有訊息,即使當前訂閱者不可用,處于離線狀態,
為什么需要訊息佇列
1)解耦:
允許你獨立的擴展或修改兩邊的處理程序,只要確保它們遵守同樣的介面約束,
2)冗余:
訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險,許多訊息佇列所采用的"插入-獲取-洗掉"范式中,在把一個訊息從佇列中洗掉之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的保存直到你使用完畢,
3)擴展性:
因為訊息佇列解耦了你的處理程序,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理程序即可,
4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見,如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費,使用訊息佇列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰,
5)可恢復性:
系統的一部分組件失效時,不會影響到整個系統,訊息佇列降低了行程間的耦合度,所以即使一個處理訊息的行程掛掉,加入佇列中的訊息仍然可以在系統恢復后被處理,
6)順序保證:
在大多使用場景下,資料處理的順序都很重要,大部分訊息佇列本來就是排序的,并且能保證資料會按照特定的順序來處理,(Kafka 保證一個 Partition 內的訊息的有序性)
7)緩沖:
有助于控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況,
8)異步通信:
很多時候,用戶不想也不需要立即處理訊息,訊息佇列提供了異步處理機制,允許用戶把一個訊息放入佇列,但并不立即處理它,想向佇列中放入多少訊息就放多少,然后在需要的時候再去處理它們,
什么是 Kafka
在流式計算中,Kafka 一般用來快取資料,Storm 通過消費 Kafka 的資料進行計算,
1)Apache Kafka 是一個開源訊息系統,由 Scala 寫成,是由 Apache 軟體基金會開發的一個開源訊息系統專案,
2)Kafka 最初是由 LinkedIn 公司開發,并于 2011 年初開源,2012 年 10 月從 Apache Incubator畢業,該專案的目標是為處理實時資料提供一個統一、高通量、低等待的平臺,
3)Kafka 是一個分布式訊息佇列,Kafka 對訊息保存時根據 Topic 進行歸類,發送訊息者稱為Producer,訊息接受者稱為 Consumer,此外 kafka 集群有多個 kafka 實體組成,每個實體(server)稱為broker,
4)無論是 kafka 集群,還是 consumer 都依賴于 zookeeper 集群保存一些 meta 資訊,來保證系統可用性,

1)Producer :訊息生產者,就是向 kafka broker 發訊息的客戶端;
2)Consumer :訊息消費者,向 kafka broker 取訊息的客戶端;
3)Topic :可以理解為一個佇列;
4) Consumer Group (CG):這是 kafka 用來實作一個 topic 訊息的廣播(發給所有的 consumer)和單播(發給任意一個 consumer)的手段,一個 topic 可以有多個 CG,topic 的訊息會復制(不是真的復制,是概念上的)到所有的 CG,但每個 partion 只會把訊息發給該 CG 中的一個 consumer,如果需要實作廣播,只要每個 consumer 有一個獨立的 CG 就可以了,要實作單播只要所有的consumer 在同一個 CG,用 CG 還可以將 consumer 進行自由的分組而不需要多次發送訊息到不同的 topic;
5)Broker :一臺 kafka 服務器就是一個 broker,一個集群由多個 broker 組成,一個 broker 可以容納多個 topic;
6)Partition:為了實作擴展性,一個非常大的 topic 可以分布到多個 broker(即服務器)上,一個 topic可以分為多個 partition,每個 partition 是一個有序的佇列,partition 中的每條訊息都會被分配一個有序的 id(offset),kafka 只保證按一個 partition 中的順序將訊息發給 consumer,不保證一個 topic 的整體(多個 partition 間)的順序;
7)Offset:kafka 的存盤檔案都是按照 offset.kafka 來命名,用 offset 做名字的好處是方便查找,例如你 想 找 位 于 2049 的 位 置 , 只 要 找 到 2048.kafka 的 文 件 即 可 , 當 然 the first offset 就 是
00000000000.kafka,
Kafka 單節點運行方式
https://kafka.apache.org/
下載代碼
下載 kafka_2.13-2.7.0 版本并且解壓,
https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
wget https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -zvxf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0/
首先為每個 broker 創建一個組態檔,直接復制多個server.properties,并且修改他們的log日志和id
Kafka 集群部署方式
配置broker的id

設定log檔案地址

關聯zookeeper

啟動服務
現在啟動 Kafka 服務器:
bin/kafka-server-start.sh config/server.properties
創建topic
創建一個名為“test”的 topic,它有一個磁區和一個副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
運行 list(串列)命令來查看這個 topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181 test
除了手工創建 topic 外,你也可以配置你的 broker,當發布一個不存在的 topic 時自動創建 topic,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/274972.html
標籤:Java
