文章目錄
- 運維管控
- 接入集群
- 物理集群串列
- 集群概覽
- 實時流量
- 歷史流量
- Broker 資訊
- Leader Rebalance
- Broker詳情
- 基本資訊
- 監控資訊
- Topic資訊 (TODO 頁面跳轉)
- 磁盤資訊 (TODO 頁面跳轉)
- partition資訊
- Topic分析
- 消費者資訊
- Region資訊
- Region串列
- 邏輯集群資訊
- 邏輯集群串列
- Controller資訊
- 限流資訊
- 專欄文章串列
- Finally 官方群
專案地址: didi/Logi-KafkaManager: 一站式Apache Kafka集群指標監控與運維管控平臺
前面的文章簡單介紹了如何接入集群,以及Topic的申請和配額申請,這個時候我們還不是很了解Logi-KafkaManager究竟有哪些優點,如何去管理眾多的kafka集群;
運維管控
運維管控這個選單欄目下面主要是供
運維人員來管理所有集群的;
接入集群
Kafka的靈魂伴侶Logi-KafkaManger一之集群的接入及相關概念講解
物理集群串列

列出了所有物理集群,點擊一個物理集群進去看詳細資訊;
如果沒有資訊請檢查一下是否正確開啟了JMX; ==> JMX-連接失敗問題解決
集群概覽
實時流量
指標說明

因為我發送和消費過訊息, 為了不讓之前的資料干擾; 我們重新把Broker重啟一下,Jmx的資料就會清0了; 歷史資料清楚就去資料庫中把_metrics結尾的表資料全部清空;

執行下面的代碼,驗證一下實時流量的指標準不準確;
下面的代碼表示的是: 60S秒發送60條訊息; 每條訊息大小1個位元組; 但是在這60S內只發送一次訊息; 因為將linger.ms=60000, 設定為60秒后發送;
那么期望中的實時指標是;
@Test
void contextLoads() {
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxx");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384000);
props.put("linger.ms", 60000);
props.put("buffer.memory", 335544320);
props.put("client.id", "appId_000001_cn.Test2");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 60; i++){
//將一個訊息設定大一點
byte[] log = new byte[1024];
String slog = new String(log);
producer.send(new ProducerRecord<String, String>("Test2",0, Integer.toString(i), slog));
}
try {
Thread.sleep(62000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
}
messagesIn:每秒發送到kafka的訊息條數 = 1條
byteIn:每秒發送到kafka的位元組數 = 1位元組
totalProduceReques:每秒總共發送的請求數 = 1/60=0.017 這里是請求數量,因為60s內實際上只發送了一次請求;
執行代碼然后看結果

基本上是符合我們預期的,實時流量資料還是準確的;
除了上面幾個指標,我們應該還要關注下面幾個例外指標,正常情況下他們都是0; 如果不為0的情況說明可能就有例外了,運維同學就應該去查查例外日志了;
byteRejected(B/s) 每秒被拒絕的位元組數
failedFetchRequest 每秒拉取失敗的請求數
failedProduceRequest 每秒發送失敗的請求數
messageIn/totalProduceRequest 訊息條數/總請求數 也可以關注一下; 假如他們的結果=1; 說明沒有批量發送,一條訊息就發送了一個請求了
歷史流量
指標說明

歷史資料都存放在_metrics結尾的表中;
Broker 資訊

上面左邊部分是對所有Broker峰值使用率的看板, 可以通過這個圖簡單了解一下Broker的峰值情況, 那么這個使用率是怎么計算的,計算的到底準不準確,得需要去原始碼里面看看, 這個圖我們可以作為一個參考值來了解;

副本狀態圖, 可以理解為在 ISR中的是同步;不在ISR中的是未同步;
我們現在把其中一臺Broker 1 關機 模擬Broker宕機等例外情況; 可以看到變成了下面這樣子;

圖中可以看到, 1的狀態為未使用, 0,2 兩臺broker的副本狀態都變成了未同步 ;
副本狀態:
失效副本磁區的個數 大于0 則這個副本狀態就展示 未同步 ; 失效副本磁區的個數UnderReplicatedPartitions 是通過JMX訪問kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions獲取到的值;如果獲取的UnderReplicatedPartitions值大于0,有可能是某個Broker的問題,也有可能引申到整個集群的問題,也許還要引入其他一些資訊、指標等配合找出問題之所在,
注意:如果Kafka集群正在做磁區遷移(kafka-reassign-partitions.sh)的時候,這個值也會大于0,
更多關于失效副本磁區數例外問題排查請看 失效副本的診斷及預警
理解了副本狀態的意思,那上圖我們就可以理解了; 之所以Broker[0,2] 都顯示未同步,是因為 Broker 2中含有[0,2]的副本; Broker2宕機了,失效副本磁區的個數就大于0了
洗掉操作:
當Broker下線的時候,可以執行洗掉操作, 一般是當你把這個Broker移除集群的時候你就可以去洗掉掉他, 不過洗掉之后,如果重新加入到集群還是會被添加回來的; 如果僅僅只是Broker宕機就不要洗掉了;
Leader Rebalance

想要知道這個功能用來干什么, 那么我們得先了解一個概念 leader 均衡機制;
Leader 均衡機制(auto.leader.rebalance.enable=true)
當一個broker停止或崩潰時,這個broker中所有磁區的leader將轉移給其他副本,這意味著在默認情況下,當這個broker重新啟動之后,它的所有磁區都將僅作為follower,不再用于客戶端的讀寫操作,
為了避免這種不平衡,Kafka有一個首選副本的概念,如果一個磁區的副本串列是1,5,9,節點1將優先作為其他兩個副本5和9的leader,因為它較早存在于副本中,你可以通過運行以下命令讓Kafka集群嘗試恢復已恢復正常的副本的leader地位:,不會導致負載不均衡和資源浪費,這就是leader的均衡機制
# kafka版本 <= 2.4 > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot # kafka新版本 > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:portkafka平衡leader
在組態檔conf/ server.properties中配置開啟(默認就是開啟)auto.leader.rebalance.enable = true
與其相關的配置還有
leader.imbalance.check.interval.seconds partition 檢查重新 rebalance 的周期時間 ; 默認300秒;
leader.imbalance.per.broker.percentage 標識每個 Broker 失去平衡的比率,如果超過改比率,則執行重新選舉 Broker 的 leader;默認比例是10%;
上面幾個配置都是 && 的關系; 同時滿足才能觸發再平衡;
調優建議:考慮到leader重選舉的代價比較大,可能會帶來性能影響,也可能會引發客戶端的阻塞,生產環境建議設定為false,或者周期設定長一點,比如一天一次;
那么如果我們關閉了 均衡機制 , 或者周期時間比較長, 也就有可能造成上面說的問題, 那么Kafka-manager就提供了一個手動再平衡的操作;
假如有一臺Broker宕機了, 等它重啟之后, 并且等它副本同步完成之后(為了副本同步與再平衡錯開一下), 運維管理人員 就可以操作一下這個 Leader Rebalance ;手動觸發一下再平衡;
舉個栗子🌰
- 首先將broker的自動均衡關閉
auto.leader.rebalance.enable = false; 并且逐個重啟 - 查看一下某個Topic在各個broker的 Leader分布情況 ;
我們這里看看TEST3這個TOPIC的情況;
Broker-0

Broker-1

Broker-2

在逐個啟動完成的時候 他們的Leader分布情況如下;
| Broker | Leader |
|---|---|
| 0 | |
| 1 | |
| 2 | 0,1,2,3 |
因為Broker-2是我啟的第一臺; 所以所有磁區的Leader都集中在這一臺機器上; 而后面啟動的Broker都沒有分配到Leader;
這樣的情況明顯不合理; 所以我們需要執行一次 再均衡;
- 手動執行 再均衡策略;下拉選中的Broker; 這里選擇Broker的作用是選擇這臺Broker上的所有Topic來進行再均衡

再均衡之后再看看Leader情況
| Broker | Leader |
|---|---|
| 0 | 2,3 |
| 1 | 0 |
| 2 | 1 |
可以看到均衡之后的結果,Broker-0 分配了2個Leader ; 自動恢復到了之前的分配情況;
PS: Leader Rebalance 時候選擇的Broker的作用是針對該Broker下面的所有Topic來進行再均衡; 假如你3臺Broker上的Topic都一樣,那選哪個Broker都一樣
Broker詳情
基本資訊
展示了當前Broker的基本資訊和 實時流量 歷史流量 ; 注意 這里的流量資訊展示的是當前這一臺Broker的流量; 集群概覽那里展示的是整個物理集群的所有流量資訊(Brokers之和);
監控資訊
按照時間軸展示多個指標資訊,當然指標也是當前選中的Broker的指標資訊;

Topic資訊 (TODO 頁面跳轉)
展示當前Broker下有哪些Topic; 更為詳細的介紹情況 TODO…
磁盤資訊 (TODO 頁面跳轉)
展示當前Broker的一些磁盤資訊; 但是此功能 需要 接入 滴滴的 kafka-gatway 組件才可以生效; 目前該組件為企業服務,暫未有開源計劃; 更為詳細的介紹請看 TODO…
partition資訊
展示當前Broker的partition資訊, 列出當前Broker所有Topic的 Leader 和副本 以及未同步副本 情況;
在上面的 Leader Rebalance 模塊中,其實已經說明講解了一部分這里的資訊情況;
例如Broker-0宕機了,可以看到那些在Broker-0中存在對應副本的Topic, 清晰的展示了哪些副本是沒有同步的; 像下面的TEST2在Broker-0中不含有副本,所有它的狀態是 已同步;

Topic分析
當前Broker的Topic基本資訊,其實這里的資訊在 最左邊的基本資訊里面有了
不過這里展示的是最近一分鐘的資料,而且把所有Topic的資料列出來展示對比;
我們模擬一下批量發送訊息,給TEST2 TEST3的TOPIC發個1萬條訊息
bin/kafka-producer-perf-test.sh --topic TEST3 --num-records 10000 --record-size 100 --throughput 100 --producer-props bootstrap.servers=xxx:9092,xxx:9092,xxx:9092
bin/kafka-producer-perf-test.sh --topic TEST2 --num-records 10000 --record-size 100 --throughput 100 --producer-props bootstrap.servers=xxx:9092,xxx:9092,xxx:9092
看看展示的資料

通過這個資料可以看到當前Broker下最近一分鐘的Topic活動狀態; 可以看到哪個Topic比較活躍; 圖中的百分比應該算的有問題,去提一個BUG;
消費者資訊

展示當前Broker下的所有消費組資訊, Location 表示資料是從Broker上獲取的(老版本存放在ZK中); 注意剛啟動的時候這里可能為空,一分鐘之后執行獲取Consumer的任務才會獲取到
Region資訊
Region串列
展示的是當前物理集群下劃分的所有Region;

我們主要看上面的幾個引數
預估容量: 很多人對這個數值比較疑惑, 也不知道怎來的; 我們找找原始碼就知道它是怎么來的了;
此數值計算的是 當前Region下面能夠承受的最大流量值 ;比如上面的表示最大支持 360M/s; 但是這個值其實是一個非常模糊的預估值,是需要運維管理人員 去設定的,如果沒有設定默認的就是每臺Broker 最大流量值是 120M/s;
運維管理人員 需要對自己的Broker能夠承受的峰值流量有個數; 然后設定完成可以直觀的了解到此Region是否能夠承受住峰值流量;
實際流量: 從歷史資料中計算一下實際的峰值流量;
預估流量: 實際流量+ 新申請的Topic的預估流量;
解釋一下; 我們新申請的Topic,這個時候還沒有流量進來, 但是我們要給這個新申請的Topic預留一個Buffer; 我們在申請Topic的時候不是有讓填寫一個預估峰值流量么;
但是當前代碼里面實際流量=預估流量; 待優化
那么如何修改Broker能夠承受的峰值流量呢?
點擊 運維管控 ->平臺管理->平臺配置 填寫如下資訊

配制鍵: REGION_CAPACITY_CONFIG
配置值Json串; 它是一個array
[{
"clusterId": 4, //物理集群的ID
"duration": 10, //持續時間,為了最大值最小值對實際流量產生的紊亂,才有這么一個值,具體含義就不分析了,默認值就是10
"latestTimeUnitMs": 604800000,// 表示計算的是最近多少天內的資料;比如這個默認值是7天;7 * 24 * 60 * 60 * 1000L
"maxCapacityUnitB": 125829120 //預估容量;默認值是120 * 1024 * 1024 ;也就是120M; 針對的是單臺Broker
}, {
"clusterId": 5,
"duration": 10,
"latestTimeUnitMs": 604800000,
"maxCapacityUnitB": 125829120
}]
PS: 上面的配置每個都是針對物理集群下面的所有Broker; 比如我設定的clusterId=4的物理集群的maxCapacityUnitB=125829120;(120M),那么這個物理集群下面的所有Region下面的Broker;給的預估容量都是120M
上面的計算是每隔12小時才會計算一次;
針對這一塊,后續社區應該會做優化改造,或者讓預估容量可以自動計算 平臺配置那里也不方便; 或者社區也會做修改
邏輯集群資訊
邏輯集群串列
展示當前物理集群的所有邏輯集群資訊;

創建邏輯集群講解請看 【KafkaManager 二 】集群的接入及相關概念講解
Controller資訊
展示Controller的變更歷史 和 設定候選Controller
關于Controller
控制器組件(Controller),是 Apache Kafka 的核心組件,它的主要作用是在 Apache ZooKeeper 的幫助下管理和協調整個 Kafka 集群,集群中任意一臺 Broker 都能充當控制器的角色,但是,在運行程序中,只能有一個 Broker 成為控制器,行使其管理和協調的職責
更為詳細內容請參考Kafka的Controller Broker是什么

設定了候選Controller之后 : Controller將會優先從選中的Broker中選舉 ; 這個功能使用的場景可能是
你知道哪幾臺Broker比較空閑 , 想讓他們承擔Controller的責任;
限流資訊
這里展示的是當前物理集群中此時此刻正在被限流的所有Topic資訊;
還記得我們上一篇文章有也有講過限流的相關么 【KafkaManager 三】kafka針對Topic粒度的配額管理(限流)
那里是查看當前的Topic是否被限流了

關于kafka的配額限流 kafka中的配額管理(限速)機制
那么我們這里的限流資訊怎么看呢?什么時候出來呢?
那我們來制造一個Producer發生限流的場景;
1.設定一個限流配置
// 添加限流資訊
sh bin/kafka-configs.sh --bootstrap-server broker1:9092 --alter --add-config 'producer_byte_rate=100,consumer_byte_rate=100' --entity-type clients --entity-name appId_000001_cn.Test2
上面的命令的意思是 在broker1:9092上 添加一個針對客戶端clientName = appId_000001_cn.Test2 加上一個限流配置;
生產者producer 的速率是100b/s ; 消費組consumer的速率是100b/s ;
不放心我們也可以去zk上看看是不是配置成功了

2.生產訊息
@Test
void contextLoads() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 110);
props.put("linger.ms", 0);
props.put("client.id", "appId_000001_cn.Test2");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 1000; i++){
//將一個訊息設定大一點
byte[] log = new byte[100];
String slog = new String(log);
producer.send(new ProducerRecord<String, String>("Test2",0, Integer.toString(i), slog));
System.out.println("i="+i);
}
try {
Thread.sleep(62000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
}
上面的代碼表示 每次發送100b的訊息出去,并且是立即發送; 因為我們設定的限流速度 是100b/s; 那么妥妥的就被限流了嘛;
注意:客戶端id設定的為 appId_000001_cn.Test2 ;跟我們上面針對的客戶端限流名稱一樣才會生效;
執行代碼之后我們再看看效果;
限流的Topic就展示出來了,當然這個展示的是當前限流的;等它不限流了 就會訊息;

PS:這里有個要注意的地方就是,這里展示的是針對單個Topic的限流資訊; 我們知道kafka當前是不支持針對Topic這一維度來進行限流配置的; 當然想要自己實作針對Topic限流也很簡單,只需要讓每個Topic的client.id不一樣;然后針對每個topic的client.id做限流配置就行; 看上面我設定的客戶端是 appId_000001_cn.Test2 這樣的格式; 但是自己這樣去做非常麻煩;不建議自己去做; 上篇文章有講過 【KafkaManager 三】kafka針對Topic粒度的配額管理(限流)
如果只是開源版本的話 這一塊功能還是用不上了(自己做麻煩主要是)
不過滴滴的kafka-gateway 是支持這個功能的; 但是kafka-gateway 是滴滴的商業服務,暫未開源; kafka-gateway 在原生的kafka上做了很多的增強; 想要使用kafka-gateway的開源聯系滴滴官方
專欄文章串列
Kafka的靈魂伴侶Logi-KafkaManger一之集群的接入及相關概念講解
Kafka的靈魂伴侶Logi-KafkaManger二之kafka針對Topic粒度的配額管理(限流)
Kafka的靈魂伴侶Logi-KafkaManger三之運維管控–集群串列
Kafka的靈魂伴侶Logi-KafkaManger(4)之運維管控–集群運維(任務遷移和集群在線升級)
Finally 官方群
有疑問可以加官方開源群


如果文章對你有幫助的話, 麻煩給博主一鍵三連呀, 原創不易 你的支持是我輸出的動力 ?🏻
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/280379.html
標籤:AI
上一篇:阿里巴巴實習面試經歷
