基本概念
簡介
Kafka 最初是由 LinkedIn 即領英公司基于 Scala 和 Java 語言開發的分布式訊息發布-訂閱系統,現已捐獻給Apache 軟體基金會,其具有高吞吐、低延遲的特性,許多大資料實時流式處理系統比如 Storm、Spark、Flink等都能很好地與之集成,
總的來講,Kafka 通常具有 3 重角色:
- 存盤系統:通常訊息佇列會把訊息持久化到磁盤,防止訊息丟失,保證訊息可靠性,Kafka 的訊息持久化機制和多副本機制使其能夠作為通用資料存盤系統來使用,
- 訊息系統:Kafka 和傳統的訊息佇列比如 RabbitMQ、RocketMQ、ActiveMQ 類似,支持流量削峰、服務解耦、異步通信等核心功能, ==》 先進先出 ==》 只針對磁區,不是全域的
- 流處理平臺:Kafka 不僅能夠與大多數流式計算框架完美整合,并且自身也提供了一個完整的流式處理庫,即 Kafka Streaming,Kafka Streaming 提供了類似 Flink 中的視窗、聚合、變換、連接等功能,
一句話概括:Kafka 是一個分布式的基于發布/訂閱模式的訊息中間件,在業界主要應用于大資料實時流式計算領域,起解耦合和削峰填谷的作用,
特點
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, 由多個consumer group 對partition進行consume操作,
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:訊息被持久化到本地磁盤,并且支持資料備份防止資料丟失
- 容錯性:允許集群中有節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高并發:支持數千個客戶端同時讀寫
Kafka在各種應用場景中,起到的作用可以歸納為這么幾個術語:削峰填谷,解耦!
在大資料流式計算領域中,kafka主要作為計算系統的前置快取和輸出結果快取;
安裝部署
kafka基于Zookeeper, 因此需要先安裝Zookeeper, 詳見https://www.cnblogs.com/paopaoT/p/17461562.html
- 上傳安裝包
- 解壓
tar -zxvf kafka_2.11-2.2.2.tgz tar -C /opt/apps/
- 修改組態檔
# 進入組態檔目錄
cd kafka_2.12-2.3.1/config
# 編輯組態檔
vi server.properties
# 為依次增長的:0、1、2、3、4,集群中唯一 id
broker.id=0
# 資料存盤的?錄
log.dirs=/opt/data/kafka
# 底層存盤的資料(日志)留存時長(默認7天)
log.retention.hours=168
# 底層存盤的資料(日志)留存量(默認1G)
log.retention.bytes=1073741824
# 指定zk集群地址
zookeeper.connect=linux01:2181,linux02:2181,linux03:2181
- 環境變數
vi /etc/profile
export KAFKA_HOME=/opt/apps/kafka_2.11-2.2.2
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
- 分發安裝包
for i in {2..3}
do
scp -r kafka_2.11-2.2.2 linux0$i:$PWD
done
# 安裝包分發后,記得修改config/server.properties中的 配置引數: broker.id
# 注意:還需要分發環境變數
- 啟停集群(在各個節點上啟動)
bin/kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.2.2/config/server.properties
# 停止集群
bin/kafka-server-stop.sh
- 一鍵啟停腳本:
#!/bin/bash
case $1 in
"start"){
for i in linux01 linux02 linux03
do
echo ---------- kafka $i 啟動 ------------
ssh $i "source /etc/profile; /opt/app/kafka2.4.1/bin/kafka-server-start.sh -daemon /opt/app/kafka2.4.1/config/server.properties"
done
};;
"stop"){
for i in linux01 linux02 linux03
do
echo ---------- kafka $i 停止 ------------
ssh $i "source /etc/profile; /opt/app/kafka2.4.1/bin/kafka-server-stop.sh "
done
};;
esac
基本操作
概述
Kafka 中提供了許多命令列工具(位于$KAFKA_HOME/bin 目錄下)用于管理集群的變更,
| 腳本 | 作用 |
|---|---|
| kafka-console-producer.sh | 生產訊息 |
| kafka-topics.sh | 管理主題 |
| kafka-server-stop.sh | 關閉Kafka服務 |
| kafka-server-start.sh | 啟動Kafka服務 |
| kafka-configs.sh | 配置管理 |
| kafka-consumer-perf-test.sh | 測驗消費性能 |
| kafka-producer-perf-test.sh | 測驗生產性能 |
| kafka-dump-log.sh | 查看資料日志內容 |
| kafka-preferred-replica-election.sh | 優先副本的選舉 |
| kafka-reassign-partitions.sh | 磁區重分配 |
管理操作:kafka-topics
創建topic
--bootstrap-server 和 --zookeeper一樣的效果 ,新版本建議使用 --bootstrap-server
kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --create --topic test01 --partitions 3 --replication-factor 3
引數解釋:
--replication-factor 副本數量
--partitions 磁區數量
--topic topic名稱
# 本方式,副本的存盤位置是系統自動決定的
# 手動指定分配方案:磁區數,副本數,存盤位置
kafka-topics.sh --create --topic tpc-1 --zookeeper linux01:2181 --replica-assignment 0:1:3,1:2:6
該topic,將有如下partition:(2個磁區 3個副本)
partition0 ,所在節點: broker0、broker1、broker3
partition1 ,所在節點: broker1、broker2、broker6
# 查看topic的狀態資訊
kafka-topics.sh --describe --topic tpc-1 --zookeeper linux01:2181
Topic: tpc-1 PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: tpc-1 Partition: 0 Leader: 0 Replicas: 0,1,3 Isr: 0,1
Topic: tpc-1 Partition: 1 Leader: 1 Replicas: 1,2,6 Isr: 1,2
查看topic串列
kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --list
kafka-topics.sh --list --zookeeper linux01:2181
__consumer_offsets
tpc-1
查看topic狀態資訊
kafka-topics.sh --describe --zookeeper linux01:2181 --topic test
Topic: test PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# topic的磁區數量,以及每個磁區的副本數量,以及每個副本所在的broker節點,以及每個磁區的leader副本所在broker節點,以及每個磁區的ISR副本串列;
# ISR: in sync replica ,同步副同步本(當然也包含leader自身,replica.lag.time.max.ms =30000)
# OSR:out of sync replicas 失去同步的副本(該副本上次請求leader同步資料距現在的時間間隔超出配置閾值)
# ISR同步副本串列
# ISR概念:(同步副本),每個磁區的leader會維護一個ISR串列,ISR串列里面就是follower副本的Borker編號,只有跟得上Leader的 follower副本才能加入到 ISR里面
# 這個是通過replica.lag.time.max.ms =30000(默認值)引數配置的,只有ISR里的成員才有被選為 leader 的可能,
踢出ISR和重新加入ISR的條件:
- 踢出ISR的條件: 由replica.lag.time.max.ms =30000決定,如上圖;
- 重新加入ISR的條件: OSR副本的LEO(log end offset)追上leader的LEO;
洗掉topic
bin/kafka-topics.sh --zookeeper linux01:2181 --delete --topic test
# 洗掉topic,server.properties中需要一個引數處于啟用狀態: delete.topic.enable = true(默認是true)
# 使用 kafka-topics .sh 腳本洗掉主題的行為本質上只是在 ZooKeeper 中的 /admin/delete_topics 路徑下建一個與待洗掉主題同名的節點,以標記該主題為待洗掉的狀態,然后由 Kafka控制器異步完成,
增加磁區數
kafka-topics.sh --zookeeper linux01:2181 --alter --topic paopao --partitions 3
# Kafka只支持增加磁區,不支持減少磁區
# 原因是:減少磁區,代價太大(資料的轉移,日志段拼接合并)
# 如果真的需要實作此功能,則完全可以重新創建一個磁區數較小的主題,然后將現有主題中的訊息按照既定的邏輯復制過去;
動態配置topic引數(不常用)
# 通過管理命令,可以為已創建的topic增加、修改、洗掉topic level引數
# 添加/修改 指定topic的配置引數:
kafka-topics.sh --zookeeper linux01:2181 --alter --topic tpc2 --config compression.type=gzip
# --config compression.type=gzip 修改或添加引數配置
# --add-config compression.type=gzip 添加引數配置
# --delete-config compression.type 洗掉配置引數
生產者:kafka-console-producer
kafka-console-producer.sh --broker-list linux01:9092 --topic test01
>a
>b
>c
>hello
>hi
>hadoop
>hive
順序輪詢(老版本)
順序分配,訊息是均勻的分配給每個 partition,即每個磁區存盤一次訊息,輪詢策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪詢策略的話,Kafka 默認會使用順序輪訓策略的方式,
隨機分配
實作隨機分配的代碼只需要兩行,如下
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
消費者:kafka-console-consumer
消費者在消費的時候,需要指定要訂閱的主題,還可以指定消費的起始偏移量
起始偏移量的指定策略有3中:
- earliest 起始點
- latest 最新
- 指定的offset( 磁區號:偏移量) ==》 必須的告訴他是哪個topic 的哪個磁區的哪個offset
- 從之前所記錄的偏移量開始消費
在命令列中,可以指定從什么地方開始消費
- 加上引數 --from-beginning 指定從最前面開始消費
- 如果不加--from-beginning 就需要分情況討論了,如果之前記錄過消費的位置,那么就從之前消費的位置開始消費,如果說之前沒有記錄過之前消費的偏移量,那么就從最新的位置開始消費
kafka的topic中的訊息,是有序號的(序號叫訊息偏移量),而且訊息的偏移量是在各個partition中獨立維護的,在各個磁區內,都是從0開始遞增編號!
# 消費訊息
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic test01 --from-beginning
hive
hello
hadoop
# 指定從最前面開始消費
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --from-beginning
hadoop
list
hello
kafka
# 不指定他消費的位置的時候,就是從最新的地方開始消費
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao
# 指定要消費的磁區,和要消費的起始offset
# 從指定的offset(需要指定偏移量和磁區號)
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --offset 2 --partition 0
yy
abc
3333
2222
消費組
- 消費組是kafka為了提高消費并行度的一種機制!
- 在kafka的底層邏輯中,任何一個消費者都有自己所屬的組(如果沒有指定,系統會自己給你分配一個組id)
- 組和組之間,沒有任何關系,大家都可以消費到目標topic的所有資料
- 但是組內的各個消費者,就只能讀到自己所分配到的partitions
- KAFKA中的消費組,可以動態增減消費者,而且消費組中的消費者數量發生任意變動,都會重新分配磁區消費任務(消費者組在均衡策略)
如何讓多個消費者組成一個組: 就是讓這些消費者的groupId相同即可!
消費位移的記錄
kafka的消費者,可以記錄自己所消費到的訊息偏移量,記錄的這個偏移量就叫(消費位移);
記錄這個消費到的位置,作用就在于消費者重啟后可以接續上一次消費到位置來繼續往后面消費;
消費位移,是組內共享的!!!消費位置記錄在一個內置的topic中 ,默認是5s提交一次位移更新,
引數:auto.commit.interval.ms 默認是5s記錄一次
# 可以使用特定的工具類 決議內置記錄偏移量的topic
kafka-console-consumer.sh --bootstrap-server linux01:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
# 通過指定formatter工具類,來對__consumer_offsets主題中的資料進行決議;
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=https://www.cnblogs.com/paopaoT/archive/2023/06/07/, commitTimestamp=1659889851318, expireTimestamp=None)
[g01,linux01,2]::OffsetAndMetadata(offset=17, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,1]::OffsetAndMetadata(offset=13, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
# 如果需要獲取某個特定 consumer-group的消費偏移量資訊,則需要計算該消費組的偏移量記錄所在磁區: Math.abs(groupID.hashCode()) % numPartitions(50)
# 根據組id的hash取值%50 確定具體是將這個組具體每個磁區消費到了哪里
# __consumer_offsets的磁區數為:50
配置管理 kafka-config
kafka-configs.sh 腳本是專門用來進行動態引數配置操作的,這里的操作是運行狀態修改原有的配置,如此可以達到動態變更的目的;一般情況下不會進行動態修改 ,
動態配置的引數,會被存盤在zookeeper上,因而是持久生效的
可用引數的查閱地址: https://kafka.apache.org/documentation/#configuration
# kafka-configs.sh 腳本包含:變更alter、查看describe 這兩種指令型別;
# kafka-configs. sh 支持主題、 broker 、用戶和客戶端這4個型別的配置,
# kafka-configs.sh 腳本使用 entity-type 引數來指定操作配置的型別,并且使 entity-name引數來指定操作配置的名稱,
# 比如查看topic的配置可以按如下方式執行:
kafka-configs.sh --zookeeper linux01:2181 --describe --entity-type topics --entity-name paopao
# 查看broker的動態配置可以按如下方式執行:
kafka-configs.sh --describe --entity-type brokers --entity-name 0 --zookeeper linux01:2181
entity-type和entity-name的對應關系

# 示例:添加topic級別引數
kafka-configs.sh --zookeeper linux01:2181 --alter --entity-type topics --entity-name paopao --add-config cleanup.policy=compact,max.message.bytes=10000
# 示例:添加broker引數
kafka-configs.sh --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000 --bootstrap-server linux01:9092,linux02:9092,linux03:9092
動態配置topic引數
通過管理命令,可以為已創建的topic增加、修改、洗掉topic level引數
添加/修改 指定topic的配置引數:
kafka-topics.sh --topic paopao --alter --config compression.type=gzip --zookeeper linux01:2181
# 如果利用 kafka-configs.sh 腳本來對topic、producer、consumer、broker等進行引數動態
# 添加、修改配置引數
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --add-config compression.type=gzip
# 洗掉配置引數
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --delete-config compression.type
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/554640.html
標籤:其他
上一篇:《SQL 必知必會》全決議
下一篇:返回列表
