kafka是什么
在回答這個問題之前,我們需要先了解另一個東西--event streaming,
什么是event streaming
我覺得,event streaming 是一個動態的概念,它描述了一個個 event ( "something happened" in the world ) 在不同主體間連續地、正確地流動的狀態,(這里我想搞個動圖的,不過 plantuml 不支持,所以只能靠想象了,,)

event source 產生 event,event source 可以是資料庫、傳感器、移動設備、應用程式,等等,
event broker 持久化 event,以備 event sink 可以隨時獲取它們,
event sink 實時或回顧性地從 broker 中獲取 event 進行處理,
有的人可能會問,為什么需要 broker,event 從 source 直接流到 sink 不行嗎?當然可以,但是不夠解耦,要么 event source 需要事先知道誰需要這些 event,要么 event sink 需要知道 event 從哪里來,
現在,我們可以在腦子里想象出 event streaming 的樣子:event 由 source 產生,然后流向 broker,在 broker 被持久化,再流到 sink,并不復雜對吧?
event streaming用來干嘛
我們可以在很多的應用場景中找到 event streaming 的身影,例如:
-
實時處理支付、金融交易、客戶訂單等等;
-
實時跟蹤和監控物流進度;
-
持續捕獲和分析來自物聯網設備或其他設備的傳感器資料;
-
不同資料源的資料連接;
-
作為資料平臺、事件驅動架構和微服務等的技識訓礎;
等等,
kafka是什么
現在我們回過頭來回答問題:kafka 是什么?
我認為,如果說 event streaming 是一種規范的話,那么 kafka 就是 event streaming 的一種具體實作,
kafka的架構
概念視圖
從最上層的抽象看,kafka 由三個部分組成:

其中,producer 發布 event,broker 持久化 even,consumer 訂閱 event,其中,producer 和 consumer 完全解耦,互不知曉,
不過,這是概念視圖,不是物理視圖,具體實作會因為 source 或 sink 的不同而有所不同,
物理視圖
Producer/Consumer API
當 event source 為普通應用程式時,可以在程式中引入 Producer API 和 Consumer API 來完成與 broker 的互動,這些 API 涵蓋了大部分主流語言,例如 Java、Scala、Go、Python、C/C++,除此之外,我們也可以直接使用 REST API 呼叫,

Connector
但是,并不是所有 source 或 sink 都能使用 API 的方式,例如,實時捕獲資料庫的更改、檔案的更改,從 RabbitMQ 匯入匯出訊息,等等,
這個時候就需要使用 connector 來完成集成,通常情況下,connector 并不需要我們自己開發,kafka 社區為我們提供了大量的 connector 來滿足我們的使用需求,

topic&partition
接下來我們再來補充下 broker 的一些細節,//zzs001
通常情況下,我們的 broker 會接收到很多不同型別的 event ,broker 需要區分它們,以便正確地路由,topic 就發揮了作用,它有點類似檔案系統的目錄,而 event 就類似于目錄里的檔案,sink 想要什么 event,只要找到對應的 topic 就行了,
同一 topic 可以有零個或多個 producer 和 consumer,不同于傳統 MQ,kafka 的 event 消費后并不洗掉,為什么這么做呢?這個我們后續的博客會說的,

除此之外,一個 topic 會劃分成一個或多個 partition,這些 partition 一般分布在不同的 broker 實體,producer 發布的 event 會根據某種策略分配到不同的 partition,這樣做的好處是,consumer 可以同時從多臺 broker 讀取 event,從而大大提高吞吐量,另外,為了高可用,同一個 partition 還會有多個副本,它們分布在不同的 broker 實體,

需要注意一下,當同一 topic 的 event 被分發到多個 partition 時,寫入和讀取的順序就不能保證了,對于需要嚴格控制順序的 topic,partition 需要設定為 1,
Streams
kafka 那么受歡迎,還有一個很重要的原因,就是它提供了流式處理類別庫,支持對存盤于Kafka內的資料進行流式處理和分析,這部分內容,我也是剛入門而已,后續博客再好好研究,

如何使用kafka
環境說明
kafka:3.2.1
os:CentOS Linux release 8.3.2011
JDK:1.8.0_291
注意,kafka 3.2.1 要求本地環境安裝 Java 8 及以上版本
下載安裝
從 下載頁面下載安裝包,

解壓安裝包,
tar -xzf kafka_2.13-3.2.1.tgz
啟動broker
進入到解壓目錄,我們看看 kafka 的目錄結構,
cd kafka_2.13-3.2.1
ls -al

接下來,我們啟動 broker 的部分,需要按照順序依次啟動 zookeeper 和 kafka server,
先啟動 zookeeper(后續版本可能不再需要 zookeeper),
bin/zookeeper-server-start.sh config/zookeeper.properties
打開另一個會話,再啟動 kafka server,
bin/kafka-server-start.sh config/server.properties
現在,單機版 broker 已經就緒,我們可以開始使用了,
創建topic
producer 發布的 event 會持久化在對應的 topic 中,才能路由給正確的 consumer,所以,在讀寫 event 之前,我們需要先創建 topic,
打開另一個會話,執行以下命令,
# 創建topic zzs001
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# 查詢topic
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

簡單的讀寫event
接下來我們用 kafka 自帶的 console-consumer 和 console-producer 讀寫 event,
使用 console-producer 寫 event 時,我們每輸入一行并回車,就會向 topic 寫入一個 event,
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

寫完之后我們可以按 Ctrl + C 退出,
接著,我們使用 console-consumer 讀 event,可以看到,剛寫的 event 被讀到了,
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

讀完我們按 Ctrl + C 退出,
我們可以在兩個會話中保持 producer 和 consumer 不退出,當我們在 producer 寫入 event 時, consumer 將實時讀取到,
前面提到過,topic 的 event 會被持久化下來,而且被消費過的 event 并不會洗掉,這一點很容易驗證,我們可以再開一個 consumer 來讀取,它還是能讀到被別人讀過的 event,
使用connect匯入匯出
前面提到過,有的 source 或 sink 需要依賴 connector 來讀寫 event,接下來我們以檔案為例,演示如何從已有檔案中將 event 匯入 topic,并從 topic 中匯出到另一個檔案中,
首先我們需要一個可以匯入匯出檔案的 connector,默認情況下,在 kafka 的 libs 目錄就有這樣一個 jar 包--connect-file-3.2.1.jar,我們需要在 connect 的配置中引入這個包,
vi config/connect-standalone.properties
按 i 進入編輯,添加或修改plugin.path=libs/connect-file-3.2.1.jar,
按 ESC 后輸入 :wq 保存并退出,除此之外,這個檔案還可以用來配置需要連接哪個 broker,以及 event 的序列化方式等,

然后,我們創建一個 test.txt 作為 event source,并寫入 event,
echo -e "foo\nbar" > test.txt

現在我們先啟動 event source 的 connector,將 test.txt 的 event 寫入名為 connect-test 的 topic,config/connect-file-source.properties 已經配置好了connector 名稱、event source 的檔案、topic,等等,
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
執行片刻后我們可以按 Ctrl + C 退出,
這時,我們可以先通過 consumer-console 查看 topic 上是否有這些 event,可以看到,event 已經成功匯入,至于格式為什么是這樣的,這個以后再說明,
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

現在我們啟動 event sink 的 connector,將 topic 的 event 匯入到 test.sink.txt,connect-file-sink.properties 已經配置好了connector 名稱、event source 的檔案、topic,等等 ,
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
執行片刻后我們可以按 Ctrl + C 退出,
這時查看 test.sink.txt,可以看到 event 成功匯出,

和前面一樣,這里我們也可以保持 event source 和 event sink 的 connector 不退出,測驗實時生產和消費 event,
使用streams處理
這部分內容后續再補充,
停止
走到這一步,我們已經完成了 kafka 的入門學習,
接下來,我們可以通過以下步驟關閉 kafka,
-
如果 producer 或 consumer 還在運行,Ctrl + C 退出;
-
Ctrl + C 退出 kafka server;
-
Ctrl + C 退出 zookeeper;
如果想清除 kafka 的資料,包括我們創建的 topic 和 event、日志等,執行以下命令:
rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/connect.offsets
結語
以上內容是最近學習 kafka 的一些思考和總結(主要參考官方檔案),如有錯誤,歡迎指正,
任何的事物,都可以被更簡單、更連貫、更系統地了解,希望我的文章能夠幫到你,
最后,感謝閱讀,
參考資料
Apache Kafka 官方檔案
相關原始碼請移步:https://github.com/ZhangZiSheng001/kafka-demo
分層,抽象,高內聚,低耦合本文為原創文章,轉載請附上原文出處鏈接:https://www.cnblogs.com/ZhangZiSheng001/p/16641755.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/503247.html
標籤:Java
上一篇:因勢而變,因時而動,Go lang1.18入門精煉教程,由白丁入鴻儒,Go lang泛型(generic)的使用EP15
下一篇:MVC架構-01
