主頁 > 資料庫 > 手記系列之六 ----- 分享個人使用kafka經驗

手記系列之六 ----- 分享個人使用kafka經驗

2023-06-09 08:20:22 資料庫

前言

本篇文章主要介紹的關于本人從剛作業到現在使用kafka的經驗,內容非常多,包含了kafka的常用命令,在生產環境中遇到的一些場景處理,kafka的一些web工具推薦等等,由于kafka這塊的記錄以及經驗是從我剛開始使用kafka,從2017年開始,可能里面有些內容過時,請見諒,溫馨提醒,本文有3w多字,建議收藏觀看~

Kafka理論知識

kafka基本介紹

Kafka是一種高吞吐量的分布式發布訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料,

Kafka 有如下特性:

-以時間復雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間復雜度的訪問性能,

-高吞吐率,即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上訊息的傳輸,

- 支持KafkaServer間的訊息磁區,及分布式消費,同時保證每個Partition內的訊息順序傳輸,

- 同時支持離線資料處理和實時資料處理,

- Scale out:支持在線水平擴展,

kafka的術語

- Broker:Kafka集群包含一個或多個服務器,這種服務器被稱為broker,

-Topic:每條發布到Kafka集群的訊息都有一個類別,這個類別被稱為Topic,(物理上不同Topic的訊息分開存盤,邏輯上一個Topic的訊息雖然保存于一個或多個broker上但用戶只需指定訊息的Topic即可生產或消費資料而不必關心資料存于何處)

-Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition,

- Producer:負責發布訊息到Kafka broker,

- Consumer:訊息消費者,向Kafka broker讀取訊息的客戶端,

- Consumer Group:每個Consumer屬于一個特定的Consumer
Group(可為每個Consumer指定group name,若不指定group
name則屬于默認的group),

kafka核心Api

kafka有四個核心API

- 應用程式使用producer API發布訊息到1個或多個topic中,

- 應用程式使用consumer API來訂閱一個或多個topic,并處理產生的訊息,

- 應用程式使用streams
API充當一個流處理器,從1個或多個topic消費輸入流,并產生一個輸出流到1個或多個topic,有效地將輸入流轉換到輸出流,

- connector
API允許構建或運行可重復使用的生產者或消費者,將topic鏈接到現有的應用程式或資料系統,

示例圖如下:

kafka面試問題

Kafka的用途有哪些?使用場景如何?

使用kafka的目的是為了解耦、異步、削峰,

訊息系統: Kafka
和傳統的訊息系統(也稱作訊息中間件)都具備系統解耦、冗余存盤、流量削峰、緩沖、異步通信、擴展性、可恢復性等功能,與此同時,Kafka
還提供了大多數訊息系統難以實作的訊息順序性保障及回溯消費的功能,

存盤系統: Kafka
把訊息持久化到磁盤,相比于其他基于記憶體存盤的系統而言,有效地降低了資料丟失的風險,也正是得益于
Kafka 的訊息持久化功能和多副本機制,我們可以把 Kafka
作為長期的資料存盤系統來使用,只需要把對應的資料保留策略設定為"永久"或啟用主題的日志壓縮功能即可,

流式處理平臺: Kafka
不僅為每個流行的流式處理框架提供了可靠的資料來源,還提供了一個完整的流式處理類別庫,比如視窗、連接、變換和聚合等各類操作,

Kafka中的ISR、AR又代表什么?ISR的伸縮又指什么

磁區中的所有副本統稱為 AR(Assigned Replicas),所有與 leader
副本保持一定程度同步的副本(包括 leader 副本在內)組成ISR(In-Sync
Replicas),ISR 集合是 AR 集合中的一個子集,

ISR的伸縮:

leader 副本負責維護和跟蹤 ISR 集合中所有 follower 副本的滯后狀態,當
follower 副本落后太多或失效時,leader 副本會把它從 ISR 集合中剔除,如果
OSR 集合中有 follower 副本"追上"了 leader 副本,那么 leader 副本會把它從
OSR 集合轉移至 ISR 集合,默認情況下,當 leader 副本發生故障時,只有在
ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR
集合中的副本則沒有任何機會(不過這個原則也可以通過修改相應的引數配置來改變),

replica.lag.time.max.ms : 這個引數的含義是 Follower 副本能夠落后 Leader
副本的最長時間間隔,當前默認值是 10 秒,
unclean.leader.election.enable:是否允許 Unclean 領導者選舉,開啟
Unclean 領導者選舉可能會造成資料丟失,但好處是,它使得磁區 Leader
副本一直存在,不至于停止對外提供服務,因此提升了高可用性,

Kafka中的HW、LEO、LSO、LW等分別代表什么?

HW 是 High Watermark
的縮寫,俗稱高水位,它標識了一個特定的訊息偏移量(offset),消費者只能拉取到這個
offset 之前的訊息,

LSO是LogStartOffset,一般情況下,日志檔案的起始偏移量 logStartOffset
等于第一個日志分段的 baseOffset,但這并不是絕對的,logStartOffset
的值可以通過 DeleteRecordsRequest 請求(比如使用 KafkaAdminClient 的
deleteRecords()方法、使用 kafka-delete-records.sh
腳本、日志的清理和截斷等操作進行修改,

如上圖所示,它代表一個日志檔案,這個日志檔案中有9條訊息,第一條訊息的
offset(LogStartOffset)為0,最后一條訊息的 offset 為8,offset
為9的訊息用虛線框表示,代表下一條待寫入的訊息,日志檔案的 HW
為6,表示消費者只能拉取到 offset 在0至5之間的訊息,而 offset
為6的訊息對消費者而言是不可見的,

LEO 是 Log End Offset 的縮寫,它標識當前日志檔案中下一條待寫入訊息的
offset,上圖中 offset 為9的位置即為當前日志檔案的 LEO,LEO
的大小相當于當前日志磁區中最后一條訊息的 offset 值加1,磁區 ISR
集合中的每個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即為磁區的
HW,對消費者而言只能消費 HW 之前的訊息,

LW 是 Low Watermark 的縮寫,俗稱"低水位",代表 AR 集合中最小的
logStartOffset
值,副本的拉取請求(FetchRequest,它有可能觸發新建日志分段而舊的被清理,進而導致
logStartOffset 的增加)和洗掉訊息請求(DeleteRecordRequest)都有可能促使 LW
的增長,

Kafka中是怎么體現訊息順序性的?

可以通過磁區策略體現訊息順序性,

磁區策略有輪詢策略、隨機策略、按訊息鍵保序策略,

按訊息鍵保序策略:一旦訊息被定義了 Key,那么你就可以保證同一個 Key
的所有訊息都進入到相同的磁區里面,由于每個磁區下的訊息處理都是有順序的,故這個策略被稱為按訊息鍵保序策略

Kafka中的磁區器、序列化器、攔截器是否了解?它們之間的處理順序是什么?

序列化器:生產者需要用序列化器(Serializer)把物件轉換成位元組陣列才能通過網路發送給
Kafka,而在對側,消費者需要用反序列化器(Deserializer)把從 Kafka
中收到的位元組陣列轉換成相應的物件,

磁區器:磁區器的作用就是為訊息分配磁區,如果訊息 ProducerRecord
中沒有指定 partition 欄位,那么就需要依賴磁區器,根據 key 這個欄位來計算
partition 的值,

Kafka 一共有兩種攔截器:生產者攔截器和消費者攔截器,

生產者攔截器既可以用來在訊息發送前做一些準備作業,比如按照某個規則過濾不符合要求的訊息、修改訊息的內容等,也可以用來在發送回呼邏輯前做一些定制化的需求,比如統計類作業,

消費者攔截器主要在消費到訊息或在提交消費位移時進行一些定制化的操作,

訊息在通過 send() 方法發往 broker
的程序中,有可能需要經過攔截器(Interceptor)、序列化器(Serializer)和磁區器(Partitioner)的一系列作用之后才能被真正地發往
broker,攔截器(下一章會詳細介紹)一般不是必需的,而序列化器是必需的,訊息經過序列化之后就需要確定它發往的磁區,如果訊息
ProducerRecord 中指定了 partition 欄位,那么就不需要磁區器的作用,因為
partition 代表的就是所要發往的磁區號,

處理順序 :攔截器->序列化器->磁區器

KafkaProducer 在將訊息序列化和計算磁區之前會呼叫生產者攔截器的 onSend()
方法來對訊息進行相應的定制化操作,

然后生產者需要用序列化器(Serializer)把物件轉換成位元組陣列才能通過網路發送給
Kafka,

最后可能會被發往磁區器為訊息分配磁區,

Kafka生產者客戶端的整體結構是什么樣子的?

整個生產者客戶端由兩個執行緒協調運行,這兩個執行緒分別為主執行緒和 Sender
執行緒(發送執行緒),

在主執行緒中由 KafkaProducer
創建訊息,然后通過可能的攔截器、序列化器和磁區器的作用之后快取到訊息累加器(RecordAccumulator,也稱為訊息收集器)中,

Sender 執行緒負責從 RecordAccumulator 中獲取訊息并將其發送到 Kafka 中,

RecordAccumulator 主要用來快取訊息以便 Sender
執行緒可以批量發送,進而減少網路傳輸的資源消耗以提升性能,

Kafka生產者客戶端中使用了幾個執行緒來處理?分別是什么?

整個生產者客戶端由兩個執行緒協調運行,這兩個執行緒分別為主執行緒和 Sender
執行緒(發送執行緒),在主執行緒中由 KafkaProducer
創建訊息,然后通過可能的攔截器、序列化器和磁區器的作用之后快取到訊息累加器(RecordAccumulator,也稱為訊息收集器)中,Sender
執行緒負責從 RecordAccumulator 中獲取訊息并將其發送到 Kafka 中,

Kafka的舊版Scala的消費者客戶端的設計有什么缺陷?

老版本的 Consumer Group 把位移保存在 ZooKeeper 中,Apache ZooKeeper
是一個分布式的協調服務框架,Kafka
重度依賴它實作各種各樣的協調管理,將位移保存在 ZooKeeper
外部系統的做法,最顯而易見的好處就是減少了 Kafka Broker
端的狀態保存開銷,

ZooKeeper 這類元框架其實并不適合進行頻繁的寫更新,而 Consumer Group
的位移更新卻是一個非常頻繁的操作,這種大吞吐量的寫操作會極大地拖慢
ZooKeeper 集群的性能

"消費組中的消費者個數如果超過topic的磁區,那么就會有消費者消費不到資料"這句話是否正確?如果正確,那么有沒有什么hack的手段?

一般來說如果消費者過多,出現了消費者的個數大于磁區個數的情況,就會有消費者分配不到任何磁區,

開發者可以繼承AbstractPartitionAssignor實作自定義消費策略,從而實作同一消費組內的任意消費者都可以消費訂閱主題的所有磁區:

消費者提交消費位移時提交的是當前消費到的最新訊息的offset還是offset+1?

在舊消費者客戶端中,消費位移是存盤在 ZooKeeper
中的,而在新消費者客戶端中,消費位移存盤在 Kafka
內部的主題__consumer_offsets 中,

當前消費者需要提交的消費位移是offset+1

有哪些情形會造成重復消費?

Rebalance

一個consumer正在消費一個磁區的一條訊息,還沒有消費完,發生了rebalance(加入了一個consumer),從而導致這條訊息沒有消費成功,rebalance后,另一個consumer又把這條訊息消費一遍,

消費者端手動提交

如果先消費訊息,再更新offset位置,導致訊息重復消費,

消費者端自動提交

設定offset為自動提交,關閉kafka時,如果在close之前,呼叫
consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復消費,

生產者端

生產者因為業務問題導致的宕機,在重啟之后可能資料會重發

那些情景下會造成訊息漏消費?

自動提交

設定offset為自動定時提交,當offset被自動定時提交時,資料還在記憶體中未處理,此時剛好把執行緒kill掉,那么offset已經提交,但是資料未處理,導致這部分記憶體中的資料丟失,

生產者發送訊息

發送訊息設定的是fire-and-forget(發后即忘),它只管往 Kafka
中發送訊息而并不關心訊息是否正確到達,不過在某些時候(比如發生不可重試例外時)會造成訊息的丟失,這種發送方式的性能最高,可靠性也最差,

消費者端

先提交位移,但是訊息還沒消費完就宕機了,造成了訊息沒有被消費,自動位移提交同理

acks沒有設定為all

如果在broker還沒把訊息同步到其他broker的時候宕機了,那么訊息將會丟失

KafkaConsumer是非執行緒安全的,那么怎么樣實作多執行緒消費?#

執行緒封閉,即為每個執行緒實體化一個 KafkaConsumer 物件

一個執行緒對應一個 KafkaConsumer
實體,我們可以稱之為消費執行緒,一個消費執行緒可以消費一個或多個磁區中的訊息,所有的消費執行緒都隸屬于同一個消費組,

消費者程式使用單或多執行緒獲取訊息,同時創建多個消費執行緒執行訊息處理邏輯,

獲取訊息的執行緒可以是一個,也可以是多個,每個執行緒維護專屬的 KafkaConsumer
實體,處理訊息則交由特定的執行緒池來做,從而實作訊息獲取與訊息處理的真正解耦,具體架構如下圖所示:

簡述消費者與消費組之間的關系

Consumer Group 下可以有一個或多個 Consumer
實體,這里的實體可以是一個單獨的行程,也可以是同一行程下的執行緒,在實際場景中,使用行程更為常見一些,

Group ID 是一個字串,在一個 Kafka 集群中,它標識唯一的一個 Consumer
Group,

Consumer Group 下所有實體訂閱的主題的單個磁區,只能分配給組內的某個
Consumer 實體消費,這個磁區當然也可以被其他的 Group 消費,

當你使用kafka-topics.sh創建(洗掉)了一個topic之后,Kafka背后會執行什么邏輯?

在執行完腳本之后,Kafka 會在 log.dir 或 log.dirs
引數所配置的目錄下創建相應的主題磁區,默認情況下這個目錄為/tmp/kafka-logs/,

在 ZooKeeper
的/brokers/topics/目錄下創建一個同名的實節點,該節點中記錄了該主題的磁區副本分配方案,示例如下:

[zk: localhost:2181/kafka(CONNECTED) 2] get
/brokers/topics/topic-create

{"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}

topic的磁區數可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?

可以增加,使用 kafka-topics 腳本,結合 --alter
引數來增加某個主題的磁區數,命令如下:

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter
--topic <topic_name> --partitions <新磁區數>

當磁區數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance,

首先,Rebalance 程序對 Consumer Group 消費程序有極大的影響,在 Rebalance
程序中,所有 Consumer 實體都會停止消費,等待 Rebalance 完成,這是
Rebalance 為人詬病的一個方面,

其次,目前 Rebalance 的設計是所有 Consumer
實體共同參與,全部重新分配所有磁區,其實更高效的做法是盡量減少分配方案的變動,

最后,Rebalance 實在是太慢了,

topic的磁區數可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?

不支持,因為洗掉的磁區中的訊息不好處理,如果直接存盤到現有磁區的尾部,訊息的時間戳就不會遞增,如此對于
Spark、Flink
這類需要訊息時間戳(事件時間)的組件將會受到影響;如果分散插入現有的磁區,那么在訊息量很大的時候,內部的資料復制會占用很大的資源,而且在復制期間,此主題的可用性又如何得到保障?與此同時,順序性問題、事務性問題,以及磁區和副本的狀態機切換問題都是不得不面對的,

創建topic時如何選擇合適的磁區數?副本數?

磁區

在 Kafka
中,性能與磁區數有著必然的關系,在設定磁區數時一般也需要考慮性能的因素,對不同的硬體而言,其對應的性能也會不太一樣,

可以使用Kafka 本身提供的用于生產者性能測驗的 kafka-producer-
perf-test.sh 和用于消費者性能測驗的
kafka-consumer-perf-test.sh來進行測驗,

增加合適的磁區數可以在一定程度上提升整體吞吐量,但超過對應的閾值之后吞吐量不升反降,如果應用對吞吐量有一定程度上的要求,則建議在投入生產環境之前對同款硬體資源做一個完備的吞吐量相關的測驗,以找到合適的磁區數閾值區間,

磁區數的多少還會影響系統的可用性,如果磁區數非常多,如果集群中的某個
broker 節點宕機,那么就會有大量的磁區需要同時進行 leader
角色切換,這個切換的程序會耗費一筆可觀的時間,并且在這個時間視窗內這些磁區也會變得不可用,

磁區數越多也會讓 Kafka
的正常啟動和關閉的耗時變得越長,與此同時,主題的磁區數越多不僅會增加日志清理的耗時,而且在被洗掉時也會耗費更多的時間,

如何設定合理的磁區數量

可以遵循一定的步驟來嘗試確定磁區數:創建一個只有1個磁區的topic,然后測驗這個topic的producer吞吐量和consumer吞吐量,假設它們的值分別是Tp和Tc,單位可以是MB/s,然后假設總的目標吞吐量是Tt,那么磁區數 = Tt / max(Tp, Tc)
說明:Tp表示producer的吞吐量,測驗producer通常是很容易的,因為它的邏輯非常簡單,就是直接發送訊息到Kafka就好了,Tc表示consumer的吞吐量,測驗Tc通常與應用的關系更大, 因為Tc的值取決于你拿到訊息之后執行什么操作,因此Tc的測驗通常也要麻煩一些,

副本

Producer在發布訊息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然后無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer只將該訊息發送到該Partition的Leader,Leader會將該訊息寫入其本地Log,每個Follower都從Leader pull資料,這種方式上,Follower存盤的資料順序與Leader保持一致,
Kafka分配Replica的演算法如下:
將所有Broker(假設共n個Broker)和待分配的Partition排序
將第i個Partition分配到第(imod n)個Broker上
將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上

如何保證kafka的資料完整性

生產者不丟資料:

  1. 設定 acks=all,leader會等待所有的follower同步完成,這個確保訊息不會丟失,除非kafka集群中所有機器掛掉,這是最強的可用性保證,
    2.retries = max ,客戶端會在訊息發送失敗時重新發送,直到發送成功為止,為0表示不重新發送,

訊息佇列不丟資料:
1.replication.factor 也就是topic的副本數,必須大于1
2.min.insync.replicas 也要大于1,要求一個leader至少感知到有至少一個follower還跟自己保持聯系

消費者不丟資料:
改為手動提交,

kafka配置引數

kafka配置引數

  • broker.id:broker的id,id是唯一的非負整數,集群的broker.id不能重復,

  • log.dirs:kafka存放資料的路徑,可以是多個,多個使用逗號分隔即可,

  • port:server接受客戶端連接的埠,默認6667

  • zookeeper.connect:zookeeper集群連接地址,
    格式如:zookeeper.connect=server01:2181,server02:2181,server03:2181,
    如果需要指定zookeeper集群的路徑位置,可以:zookeeper.connect=server01:2181,server02:2181,server03:2181/kafka/cluster,這樣設定后,在啟動kafka集群前,需要在zookeeper集群創建這個路徑/kafka/cluster,

  • message.max.bytes:server可以接受的訊息最大尺寸,默認1000000,
    重要的是,consumer和producer有關這個屬性的設定必須同步,否則producer發布的訊息對consumer來說太大,

  • num.network.threads:server用來處理網路請求的執行緒數,默認3,

  • num.io.threads:server用來處理請求的I/O執行緒數,這個執行緒數至少等于磁盤的個數,

  • background.threads:用于后臺處理的執行緒數,例如檔案的洗掉,默認4,

  • queued.max.requests:在網路執行緒停止讀取新請求之前,可以排隊等待I/O執行緒處理的最大請求個數,默認500,

  • host.name:broker的hostname
    如果hostname已經設定的話,broker將只會系結到這個地址上;如果沒有設定,它將系結到所有介面,并發布一份到ZK

  • advertised.host.name:如果設定,則就作為broker
    的hostname發往producer、consumers以及其他brokers

  • advertised.port:此埠將給與producers、consumers、以及其他brokers,它會在建立連接時用到;
    它僅在實際埠和server需要系結的埠不一樣時才需要設定,

  • socket.send.buffer.bytes:SO_SNDBUFF 快取大小,server進行socket
    連接所用,默認100*1024,

  • socket.receive.buffer.bytes:SO_RCVBUFF快取大小,server進行socket連接時所用,默認100
    * 1024,

  • socket.request.max.bytes:server允許的最大請求尺寸;這將避免server溢位,它應該小于Java
    heap size,

  • num.partitions:如果創建topic時沒有給出劃分partitions個數,這個數字將是topic下partitions數目的默認數值,默認1,

  • log.segment.bytes:topic
    partition的日志存放在某個目錄下諸多檔案中,這些檔案將partition的日志切分成一段一段的;這個屬性就是每個檔案的最大尺寸;當尺寸達到這個數值時,就會創建新檔案,此設定可以由每個topic基礎設定時進行覆寫,默認1014*1024*1024

  • log.roll.hours:即使檔案沒有到達log.segment.bytes,只要檔案創建時間到達此屬性,就會創建新檔案,這個設定也可以有topic層面的設定進行覆寫,默認24*7

  • log.cleanup.policy:log清除策略,默認delete,

  • log.retention.minutes和log.retention.hours:每個日志檔案洗掉之前保存的時間,默認資料保存時間對所有topic都一樣,

  • log.retention.minutes 和 log.retention.bytes
    都是用來設定洗掉日志檔案的,無論哪個屬性已經溢位,這個屬性設定可以在topic基本設定時進行覆寫,

  • log.retention.bytes:每個topic下每個partition保存資料的總量,
    注意,這是每個partitions的上限,因此這個數值乘以partitions的個數就是每個topic保存的資料總量,如果log.retention.hours和log.retention.bytes都設定了,則超過了任何一個限制都會造成洗掉一個段檔案,注意,這項設定可以由每個topic設定時進行覆寫,

  • log.retention.check.interval.ms:檢查日志分段檔案的間隔時間,以確定是否檔案屬性是否到達洗掉要求,默認5min,

  • log.cleaner.enable:當這個屬性設定為false時,一旦日志的保存時間或者大小達到上限時,就會被洗掉;如果設定為true,則當保存屬性達到上限時,就會進行log
    compaction,默認false,

  • log.cleaner.threads:進行日志壓縮的執行緒數,默認1,

  • log.cleaner.io.max.bytes.per.second:進行log compaction時,log
    cleaner可以擁有的最大I/O數目,這項設定限制了cleaner,以避免干擾活動的請求服務,

  • log.cleaner.io.buffer.size:log
    cleaner清除程序中針對日志進行索引化以及精簡化所用到的快取大小,最好設定大點,以提供充足的記憶體,默認500*1024*1024,

  • log.cleaner.io.buffer.load.factor:進行log cleaning時所需要的I/O
    chunk尺寸,你不需要更改這項設定,默認512*1024,

  • log.cleaner.io.buffer.load.factor:log
    cleaning中所使用的hash表的負載因子;你不需要更改這個選項,默認0.9

  • log.cleaner.backoff.ms:進行日志是否清理檢查的時間間隔,默認15000,

  • log.cleaner.min.cleanable.ratio:這項配置控制log
    compactor試圖清理日志的頻率(假定log compaction是打開的),默認避免清理壓縮超過50%的日志,這個比率系結了備份日志所消耗的最大空間(50%的日志備份時壓縮率為50%),更高的比率則意味著浪費消耗更少,也就可以更有效的清理更多的空間,這項設定在每個topic設定中可以覆寫,

  • log.cleaner.delete.retention.ms:保存時間;保存壓縮日志的最長時間;也是客戶端消費訊息的最長時間,與log.retention.minutes的區別在于一個控制未壓縮資料,一個控制壓縮后的資料;會被topic創建時的指定時間覆寫,

  • log.index.size.max.bytes:每個log
    segment的最大尺寸,注意,如果log尺寸達到這個數值,即使尺寸沒有超過log.segment.bytes限制,也需要產生新的log
    segment,默認10*1024*1024,

  • log.index.interval.bytes:當執行一次fetch后,需要一定的空間掃描最近的offset,設定的越大越好,一般使用默認值就可以,默認4096,

  • log.flush.interval.messages:log檔案"sync"到磁盤之前累積的訊息條數,
    因為磁盤IO操作是一個慢操作,但又是一個"資料可靠性"的必要手段,所以檢查是否需要固化到硬碟的時間間隔,需要在"資料可靠性"與"性能"之間做必要的權衡,如果此值過大,將會導致每次"發sync"的時間過長(IO阻塞),如果此值過小,將會導致"fsync"的時間較長(IO阻塞),導致"發sync"的次數較多,這也就意味著整體的client請求有一定的延遲,物理server故障,將會導致沒有fsync的訊息丟失,

  • log.flush.scheduler.interval.ms:檢查是否需要fsync的時間間隔,默認Long.MaxValue

  • log.flush.interval.ms:僅僅通過interval來控制訊息的磁盤寫入時機,是不足的,這個數用來控制"fsync"的時間間隔,如果訊息量始終沒有達到固化到磁盤的訊息數,但是離上次磁盤同步的時間間隔達到閾值,也將觸發磁盤同步,

  • log.delete.delay.ms:檔案在索引中清除后的保留時間,一般不需要修改,默認60000,

  • auto.create.topics.enable:是否允許自動創建topic,如果是true,則produce或者fetch
    不存在的topic時,會自動創建這個topic,否則需要使用命令列創建topic,默認true,

  • controller.socket.timeout.ms:partition管理控制器進行備份時,socket的超時時間,默認30000,

  • controller.message.queue.size:controller-to-broker-channles的buffer尺寸,默認Int.MaxValue,

  • default.replication.factor:默認備份份數,僅指自動創建的topics,默認1,

  • replica.lag.time.max.ms:如果一個follower在這個時間內沒有發送fetch請求,leader將從ISR重移除這個follower,并認為這個follower已經掛了,默認10000,

  • replica.lag.max.messages:如果一個replica沒有備份的條數超過這個數值,則leader將移除這個follower,并認為這個follower已經掛了,默認4000,

  • replica.socket.timeout.ms:leader
    備份資料時的socket網路請求的超時時間,默認30*1000

  • replica.socket.receive.buffer.bytes:備份時向leader發送網路請求時的socket
    receive buffer,默認64*1024,

  • replica.fetch.max.bytes:備份時每次fetch的最大值,默認1024*1024,

  • replica.fetch.max.bytes:leader發出備份請求時,資料到達leader的最長等待時間,默認500,

  • replica.fetch.min.bytes:備份時每次fetch之后回應的最小尺寸,默認1,

  • num.replica.fetchers:從leader備份資料的執行緒數,默認1,

  • replica.high.watermark.checkpoint.interval.ms:每個replica檢查是否將最高水位進行固化的頻率,默認5000.

  • fetch.purgatory.purge.interval.requests:fetch
    請求清除時的清除間隔,默認1000

  • producer.purgatory.purge.interval.requests:producer請求清除時的清除間隔,默認1000

  • zookeeper.session.timeout.ms:zookeeper會話超時時間,默認6000

  • zookeeper.connection.timeout.ms:客戶端等待和zookeeper建立連接的最大時間,默認6000

  • zookeeper.sync.time.ms:zk follower落后于zk leader的最長時間,默認2000

  • controlled.shutdown.enable:是否能夠控制broker的關閉,如果能夠,broker將可以移動所有leaders到其他的broker上,在關閉之前,這減少了不可用性在關機程序中,默認true,

  • controlled.shutdown.max.retries:在執行不徹底的關機之前,可以成功執行關機的命令數,默認3.

  • controlled.shutdown.retry.backoff.ms:在關機之間的backoff時間,默認5000

  • auto.leader.rebalance.enable:如果這是true,控制者將會自動平衡brokers對于partitions的leadership,默認true,

  • leader.imbalance.per.broker.percentage:每個broker所允許的leader最大不平衡比率,默認10,

  • leader.imbalance.check.interval.seconds:檢查leader不平衡的頻率,默認300

  • offset.metadata.max.bytes:允許客戶端保存他們offsets的最大個數,默認4096

  • max.connections.per.ip:每個ip地址上每個broker可以被連接的最大數目,默認Int.MaxValue,

  • max.connections.per.ip.overrides:每個ip或者hostname默認的連接的最大覆寫,

  • connections.max.idle.ms:空連接的超時限制,默認600000

  • log.roll.jitter.{ms,hours}:從logRollTimeMillis抽離的jitter最大數目,默認0

  • num.recovery.threads.per.data.dir:每個資料目錄用來日志恢復的執行緒數目,默認1,

  • unclean.leader.election.enable:指明了是否能夠使不在ISR中replicas設定用來作為leader,默認true

  • delete.topic.enable:能夠洗掉topic,默認false,

  • offsets.topic.num.partitions:默認50,
    由于部署后更改不受支持,因此建議使用更高的設定來進行生產(例如100-200),

  • offsets.topic.retention.minutes:存在時間超過這個時間限制的offsets都將被標記為待洗掉,默認1440,

  • offsets.retention.check.interval.ms:offset管理器檢查陳舊offsets的頻率,默認600000,

  • offsets.topic.replication.factor:topic的offset的備份份數,建議設定更高的數字保證更高的可用性,默認3

  • offset.topic.segment.bytes:offsets topic的segment尺寸,默認104857600

  • offsets.load.buffer.size:這項設定與批量尺寸相關,當從offsets
    segment中讀取時使用,默認5242880

  • offsets.commit.required.acks:在offset
    commit可以接受之前,需要設定確認的數目,一般不需要更改,默認-1,

kafka生產者配置引數

  • boostrap.servers:用于建立與kafka集群連接的host/port組,
    資料將會在所有servers上均衡加載,不管哪些server是指定用于bootstrapping,
    這個串列格式:host1:port1,host2:port2,...

  • acks:此配置實際上代表了資料備份的可用性,

  • acks=0:
    設定為0表示producer不需要等待任何確認收到的資訊,副本將立即加到socket
    buffer并認為已經發送,沒有任何保障可以保證此種情況下server已經成功接收資料,同時重試配置不會發生作用

  • acks=1:
    這意味著至少要等待leader已經成功將資料寫入本地log,但是并沒有等待所有follower是否成功寫入,這種情況下,如果follower沒有成功備份資料,而此時leader又掛掉,則訊息會丟失,

  • acks=all:
    這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失資料,這是最強的保證,

  • buffer.memory:producer可以用來快取資料的記憶體大小,如果資料產生速度大于向broker發送的速度,producer會阻塞或者拋出例外,以"block.on.buffer.full"來表明,

  • compression.type:producer用于壓縮資料的壓縮型別,默認是無壓縮,正確的選項值是none、gzip、snappy,壓縮最好用于批量處理,批量處理訊息越多,壓縮性能越好,

  • retries:設定大于0的值將使客戶端重新發送任何資料,一旦這些資料發送失敗,注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同,允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是發送到同一個partition,則第一個訊息失敗第二個發送成功,則第二條訊息會比第一條訊息出現要早,

  • batch.size:producer將試圖批處理訊息記錄,以減少請求次數,這將改善client與server之間的性能,這項配置控制默認的批量處理訊息位元組數,

  • client.id:當向server發出請求時,這個字串會發送給server,目的是能夠追蹤請求源頭,以此來允許ip/port許可串列之外的一些應用可以發送資訊,這項應用可以設定任意字串,因為沒有任何功能性的目的,除了記錄和跟蹤,

  • linger.ms:producer組將會匯總任何在請求與發送之間到達的訊息記錄一個單獨批量的請求,通常來說,這只有在記錄產生速度大于發送速度的時候才能發生,

  • max.request.size:請求的最大位元組數,這也是對最大記錄尺寸的有效覆寫,注意:server具有自己對訊息記錄尺寸的覆寫,這些尺寸和這個設定不同,此項設定將會限制producer每次批量發送請求的數目,以防發出巨量的請求,

  • receive.buffer.bytes:TCP receive快取大小,當閱讀資料時使用,

  • send.buffer.bytes:TCP send快取大小,當發送資料時使用,

  • timeout.ms:此配置選項控制server等待來自followers的確認的最大時間,如果確認的請求數目在此時間內沒有實作,則會回傳一個錯誤,這個超時限制是以server端度量的,沒有包含請求的網路延遲,

  • block.on.buffer.full:當我們記憶體快取用盡時,必須停止接收新訊息記錄或者拋出錯誤,
    默認情況下,這個設定為真,然而某些阻塞可能不值得期待,因此立即拋出錯誤更好,設定為false則會這樣:producer會拋出一個例外錯誤:BufferExhaustedException,
    如果記錄已經發送同時快取已滿,

  • metadata.fetch.timeout.ms:是指我們所獲取的一些元素據的第一個時間資料,元素據包含:topic,host,partitions,此項配置是指當等待元素據fetch成功完成所需要的時間,否則會拋出例外給客戶端,

  • metadata.max.age.ms:以微秒為單位的時間,是在我們強制更新metadata的時間間隔,即使我們沒有看到任何partition
    leadership改變,

  • metric.reporters:類的串列,用于衡量指標,實作MetricReporter介面,將允許增加一些類,這些類在新的衡量指標產生時就會改變,JmxReporter總會包含用于注冊JMX統計

  • metrics.num.samples:用于維護metrics的樣本數,

  • metrics.sample.window.ms:metrics系統維護可配置的樣本數量,在一個可修正的window
    size,這項配置配置了視窗大小,例如,我們可能在30s的期間維護兩個樣本,當一個視窗推出后,我們會擦除并重寫最老的視窗,

  • recoonect.backoff.ms:連接失敗時,當我們重新連接時的等待時間,這避免了客戶端反復重連,

  • retry.backoff.ms:在試圖重試失敗的produce請求之前的等待時間,避免陷入發送-失敗的死回圈中,

kafka消費者配置引數

  • group.id:用來唯一標識consumer行程所在組的字串,如果設定同樣的group
    id,表示這些processes都是屬于同一個consumer group,

  • zookeeper.connect:指定zookeeper的連接的字串,格式是hostname:port,
    hostname:port...

  • consumer.id:不需要設定,一般自動產生

  • socket.timeout.ms:網路請求的超時限制,真實的超時限制是max.fetch.wait+socket.timeout.ms,默認3000

  • socket.receive.buffer.bytes:socket用于接收網路請求的快取大小,默認64*1024,

  • fetch.message.max.bytes:每次fetch請求中,針對每次fetch訊息的最大位元組數,默認1024*1024
    這些位元組將會督導用于每個partition的記憶體中,因此,此設定將會控制consumer所使用的memory大小,
    這個fetch請求尺寸必須至少和server允許的最大訊息尺寸相等,否則,producer可能發送的訊息尺寸大于consumer所能消耗的尺寸,

  • num.consumer.fetchers:用于fetch資料的fetcher執行緒數,默認1

  • auto.commit.enable:如果為真,consumer所fetch的訊息的offset將會自動的同步到zookeeper,這項提交的offset將在行程掛掉時,由新的consumer使用,默認true,

  • auto.commit.interval.ms:consumer向zookeeper提交offset的頻率,單位是秒,默認60*1000,

  • queued.max.message.chunks:用于快取訊息的最大數目,每個chunk必須和fetch.message.max.bytes相同,默認2,

  • rebalance.max.retries:當新的consumer加入到consumer
    group時,consumers集合試圖重新平衡分配到每個consumer的partitions數目,如果consumers集合改變了,當分配正在執行時,這個重新平衡會失敗并重入,默認4

  • fetch.min.bytes:每次fetch請求時,server應該回傳的最小位元組數,如果沒有足夠的資料回傳,請求會等待,直到足夠的資料才會回傳,

  • fetch.wait.max.ms:如果沒有足夠的資料能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間,默認100

  • rebalance.backoff.ms:在重試reblance之前backoff時間,默認2000

  • refresh.leader.backoff.ms:在試圖確定某個partition的leader是否失去他的leader地位之前,需要等待的backoff時間,默認200

  • auto.offset.reset:zookeeper中沒有初始化的offset時,如果offset是以下值的回應:

  • lastest:自動復位offset為lastest的offset

  • earliest:自動復位offset為earliest的offset

  • none:向consumer拋出例外

  • consumer.timeout.ms:如果沒有訊息可用,即使等待特定的時間之后也沒有,則拋出超時例外

  • exclude.internal.topics:是否將內部topics的訊息暴露給consumer,默認true,

  • paritition.assignment.strategy:選擇向consumer
    流分配partitions的策略,可選值:range,roundrobin,默認range,

  • client.id:是用戶特定的字串,用來在每次請求中幫助跟蹤呼叫,它應該可以邏輯上確認產生這個請求的應用,

  • zookeeper.session.timeout.ms:zookeeper 會話的超時限制,默認6000
    如果consumer在這段時間內沒有向zookeeper發送心跳資訊,則它會被認為掛掉了,并且reblance將會產生

  • zookeeper.connection.timeout.ms:客戶端在建立通zookeeper連接中的最大等待時間,默認6000

  • zookeeper.sync.time.ms:ZK follower可以落后ZK leader的最大時間,默認1000

  • offsets.storage:用于存放offsets的地點:
    zookeeper或者kafka,默認zookeeper,

  • offset.channel.backoff.ms:重新連接offsets
    channel或者是重試失敗的offset的fetch/commit請求的backoff時間,默認1000

  • offsets.channel.socket.timeout.ms:當讀取offset的fetch/commit請求回應的socket
    超時限制,此超時限制是被consumerMetadata請求用來請求offset管理,默認10000,

  • offsets.commit.max.retries:重試offset
    commit的次數,這個重試只應用于offset commits在shut-down之間,默認5,

  • dual.commit.enabled:如果使用"kafka"作為offsets.storage,你可以二次提交offset到zookeeper(還有一次是提交到kafka),
    在zookeeper-based的offset storage到kafka-based的offset
    storage遷移時,這是必須的,對任意給定的consumer
    group來說,比較安全的建議是當完成遷移之后就關閉這個選項

  • partition.assignment.strategy:在"range"和"roundrobin"策略之間選擇一種作為分配partitions給consumer
    資料流的策略,
    回圈的partition分配器分配所有可用的partitions以及所有可用consumer執行緒,它會將partition回圈的分配到consumer執行緒上,如果所有consumer實體的訂閱都是確定的,則partitions的劃分是確定的分布,
    回圈分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數量的資料流,(2)訂閱的topic的集合對于consumer
    group中每個consumer實體來說都是確定的

kafka ack容錯機制(應答機制)

在Producer(生產者)向kafka集群發送訊息,kafka集群會在接受完訊息后,給出應答,成功或失敗,如果失敗,producer(生產者)會再次發送,直到成功為止,

producer(生產者)發送資料給kafka集群,kafka集群反饋有3種模式:

  • 0:producer(生產者)不會等待kafka集群發送ack,producer(生產者)發送完訊息就算成功,

  • 1:producer(生產者)等待kafka集群的leader接受到訊息后,發送ack,producer(生產者)接收到ack,表示訊息發送成功,

  • -1:producer(生產者)等待kafka集群所有包含磁區的follower都同步訊息成功后,發送ack,producer(生產者)接受到ack,表示訊息發送成功,

kafka segment

在Kafka檔案存盤中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1,

每個partion(目錄)相當于一個巨型檔案被平均分配到多個大小相等segment(段)資料檔案中,但每個段segment
file訊息數量不一定相等,這種特性方便old segment
file快速被洗掉,默認保留7天的資料,

每個partiton只需要支持順序讀寫就行了,segment檔案生命周期由服務端配置引數決定,(什么時候創建,什么時候洗掉)

資料有序性:只有在一個partition磁區內,資料才是有序的,

Segment file組成:由2大部分組成,分別為i**ndex file**和data
file,此2個檔案一一對應,成對出現,后綴".index"和".log"分別表示為segment索引檔案、資料檔案,(在目前最新版本,又添加了另外的約束),

Segment檔案命名規則:partion全域的第一個segment從0開始,后續每個segment檔案名為上一個segment檔案最后一條訊息的offset值,數值最大為64位long大小,19位數字字符長度,沒有數字用0填充,

索引檔案存盤大量元資料,資料檔案存盤大量訊息,索引檔案中元資料指向對應資料檔案中message的物理偏移地址,

segment機制的作用:

- 可以通過索引快速找到訊息所在的位置,

用于超過kafka設定的默認時間,清除比較方便,

kafka從零開始使用

這里之前寫過一些kafka使用的文章,這里就不在復制到此文章上面來了,以免文章內容太多太多了,

kafka安裝

文章:

  • Kafka安裝教程

kafka的可視化軟體

kafka-eagle

地址:https://github.com/smartloli/kafka-eagle

下載之后解壓

需要配置環境

Windows環境

KE_HOME = D:\\kafka_eagle\\kafka-eagle-web-1.2.3

LINUX環境

export KE_HOME=/home/jars/kafka_eagle/kafka-eagle-web-1.2.3

配置mysql,執行ke.sql
腳本,然后在D:\kafka_eagle\kafka-eagle-web-1.2.0\conf
中修改system-config.properties 組態檔

zookeeper 服務的配置地址,支持多個集群,多個用逗號隔開

kafka.eagle.zk.cluster.alias=cluster1,cluster2

cluster1.zk.list=192.169.0.23:2181,192.169.0.24:2181,192.169.0.25:2181

cluster2.zk.list=192.169.2.156:2181,192.169.2.98:2181,192.169.2.188:2181

然后配置mysql服務的地址

kafka.eagle.driver=com.mysql.jdbc.Driver

kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull

kafka.eagle.username=root

kafka.eagle.password=123456

配置完成之后,在切換到/bin目錄下,windows雙擊ke.bat ,linux輸入 ke.sh
start,啟動程式,然后在瀏覽器輸入ip:port/ke

進入登錄界面,輸入ke資料庫中的ke_users設定的用戶名和密碼,即可查看,

kafka-manager

地址:https://github.com/yahoo/kafka-manager

下載編譯

git clone https://github.com/yahoo/kafka-manager
cd kafka-manager 
sbt clean distcd target/ 

編譯完成之后,解壓該檔案
在 conf/application.properties路徑下找到 kafka-manager.zkhosts 配置,添加zookeeper的地址,如果是多個,用逗號隔開,

kafka-manager.zkhosts = master:2181,slave1:2181,slave2:2181

修改完成之后,就可以進行啟動了,
kafka-manager 默認的埠是9000,我們可以通過 -Dhttp.port來指定埠,

nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8765 &

啟動成功之后,在瀏覽器輸入地址即可進行訪問了,

Kafka Tool(offset Explorer)

地址:https://www.kafkatool.com/

Offset Explorer(以前稱為Kafka Tool)是一個用于管理和使用Apache Kafka ?集群的GUI應用程式,它提供了一個直觀的用戶界面,允許人們快速查看其中的物件 一個 Kafka 集群以及存盤在集群主題中的訊息,它包含面向開發人員和管理員的功能,一些主要功能包括

  • 快速查看所有 Kafka 集群,包括其代理、主題和使用者
  • 查看磁區中的訊息內容并添加新訊息
  • 查看消費者的偏移量,包括 Apache Storm Kafka 噴口消費者
  • 以漂亮的列印格式顯示 JSON、XML 和 Avro 訊息
  • 添加和洗掉主題以及其他管理功能
  • 將磁區中的單個訊息保存到本地硬碟驅動器
  • 撰寫自己的插件,允許您查看自定義資料格式
  • Offset Explorer 可在 Windows、Linux 和 Mac OS 上運行

demo代碼

文章:

  • Kafka 使用Java實作資料的生產和消費demo
  • 關于Kafka 的 consumer 消費者手動提交詳解

代碼地址:
https://github.com/xuwujing/kafka-study
https://github.com/xuwujing/java-study/tree/master/src/main/java/com/pancm/mq/kafka

kafka生產環境問題排查和解決方案

這里主要是記錄在使用kafka的時候遇到的一些生產環境問題和解決方案,有的可能不是問題,而是需求,有的問題解決方案按照現在來說不完美,畢竟很多時候,快速解決才是第一要素,總之這些就按照我之前的筆記記錄進行分享吧,如有更好的思路或者解決辦法,歡迎提出!
先介紹一些kafka的常用命令

kafka常用命令

官方檔案: http://kafka.apache.org/quickstart

1.啟動和關閉kafka

bin/kafka-server-start.sh config/server.properties \>\>/dev/null 2\>&1 &

bin/kafka-server-stop.sh

zookeeper啟動命令:
./zookeeper-server-start.sh -daemon
../config/zookeeper.properties

kafka啟用命令:
./kafka-server-start.sh -daemon
../config/server.properties

2.查看kafka集群中的訊息佇列和具體佇列

查看集群所有的topic

kafka-topics.sh \--zookeeper master:2181,slave1:2181,slave2:2181 \--list

查看一個topic的資訊

kafka-topics.sh \--zookeeper master:2181 \--describe \--topic
1004_INSERT

查看kafka consumer消費的offset

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \--zookeeper
master:2181 \--group groupB \--topic KAFKA_TEST

在kafka中查詢資料

./kafka-console-consumer.sh \--zookeeper 172.16.253.91:2181 \--topic
MO_RVOK \--from-beginning \| grep -c \'13339309600\'

3.創建Topic

partitions指定topic磁區數,replication-factor指定topic每個磁區的副本數

kafka-topics.sh \--zookeeper master:2181 \--create \--topic t_test
\--partitions 30 \--replication-factor 1

4.生產資料和消費資料

kafka-console-producer.sh \--broker-list master:9092 \--topic t_test

Ctrl+D 退出

kafka-console-consumer.sh \--zookeeper master:2181 \--topic t_test
\--from-beginning

kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--topic
t_test \--from-beginning

--from-beginning 是表示從頭開始消費

Ctrl+C 退出

5.kafka的洗掉命令

1.kafka命令洗掉

kafka-topics.sh \--delete \--zookeeper
master:2181,slave1:2181,slave2:2181 \--topic test

注:如果出現 This will have no impact if delete.topic.enable is not set
to true. 表示沒有徹底的洗掉,而是把topic標記為:marked for deletion
,可以在server.properties中配置delete.topic.enable=true 來洗掉,

2.進入zk洗掉

zkCli.sh -server master:2181,slave1:2181,slave2:2181

找到topic所在的目錄:ls /brokers/topics

找到要洗掉的topic,執行命令:rmr /brokers/topics/【topic
name】即可,此時topic被徹底洗掉,

進入/admin/delete_topics目錄下,找到洗掉的topic,洗掉對應的資訊,

6.添加磁區

kafka-topics.sh \--alter \--topic INSERT_TEST1 \--zookeeper master:2181
\--partitions 15

7.查看消費組

查看所有

kafka-consumer-groups.sh \--bootstrap-server master:9092 \--list

查看某一個消費組

kafka-consumer-groups.sh \--bootstrap-server master:9092 \--describe
\--group groupT

8.查看offset的值

最小值:

kafka-run-class.sh kafka.tools.GetOffsetShell \--broker-list master:9092
-topic KAFKA_TEST \--time -2

最大值:

kafka-run-class.sh kafka.tools.GetOffsetShell \--broker-list master:9092
-topic KAFKA_TEST \--time -1

9,查看kafka日志檔案中某一個topic占用的空間

du -lh --max-depth=1 TEST*

遇到的問題

offset下標丟失問題

kafka版本:V1.0
因為某種原因,kafka集群中的topic資料有一段時間(一天左右)沒有被消費,再次消費的時候,所有的消費程式讀取的offset是從頭開始消費,產生了大量重復資料,

問題原因:offset的過期時間惹的禍,offsets.retention.minutes這個過期時間在kafka低版本的默認配置時間是1天,如果超過1天沒有消費,那么offset就會過期清理,因此導致資料重復消費,在2.0之后的版本這個默認值就設定為了7天,

解決辦法:

臨時解決辦法,根據程式最近列印的日志內容,找到最后消費的offset值,然后批量更改kafka集群的offset,
kafka 的offset偏移量更改
首先通過下面的命令查看當前消費組的消費情況:

kafka-consumer-groups.sh \--bootstrap-server master:9092 \--group groupA
\--describe

current-offset 和 log-end-offset還有 lag
,分別為當前偏移量,結束的偏移量,落后的偏移量,

然后進行offset更改
這是一個示例,offset(所有磁區)更改為100之后

kafka-consumer-groups.sh \--bootstrap-server master:9092 \--group groupA
\--topic KAFKA_TEST2 \--execute \--reset-offsets \--to-offset 100

--group 代表你的消費者分組

--topic 代表你消費的主題

--execute 代表支持復位偏移

--reset-offsets 代表要進行偏移操作

--to-offset 代表你要偏移到哪個位置,是long型別數值,只能比前面查詢出來的小

還一種臨時方案,就是更改代碼,指定kafka的磁區和offset,從指定點開始消費!對應的代碼示例也在上述貼出的github鏈接中,

最終解決辦法:將offset的過期時間值(offsets.retention.minutes)設定調大,

Kafka增加節點資料重新分配

背景:為了緩解之前kafka集群服務的壓力,需要新增kafka節點,并且對資料進行重新分配,

解決方案:利用kafka自身的磁區重新分配原理進行資料重新分配,需要提前將新增的kafka節點添加到zookeeper集群中,可以在zookeeper里面通過ls /brokers/ids 查看節點名稱,

1,創建檔案

創建一個topics-to-move.json的檔案,檔案中編輯如下引數,多個topic用逗號隔開,

{\"topics\": \[{\"topic\": \"t1\"},{\"topic\": \"t2\"}\],\"version\":1}

命令示例:

touch topics-to-move.json

vim topics-to-move.json

2,獲取建議資料遷移文本

在${kakfa}/bin 目錄下輸入如下命令,檔案和命令可以放在同一級,

命令示例:

./kafka-reassign-partitions.sh \--zookeeper 192.168.124.111:2181 
\--topics-to-move-json-file topics-to-move.json \--broker-list
\"111,112,113,114\" \--generate

broker-list
后面的數字就是kafka每個節點的名稱,需要填寫kafka所有集群的節點名稱,

執行完畢之后,復制Proposed partition reassignment configuration
下的文本到一個新的json檔案中,命名為reality.json,

3,執行重新分配任務

執行如下命令即可,

./kafka-reassign-partitions.sh \--zookeeper 192.168.124.111:2181
\--reassignment-json-file reality.json \--execute

出現successfully表示執行成功完畢

查看執行的任務進度,輸入以下命令即可:

kafka-reassign-partitions.sh \--zookeeper ip:host,ip:host,ip:host 
\--reassignment-json-file reality.json \--verify

kafka集群同步

背景:因機房問題,需要將kafka集群進行遷移,并且保證資料同步,

解決方案:使用MirrorMaker進行同步,

1.介紹

MirrorMaker是為解決Kafka跨集群同步、創建鏡像集群而存在的,下圖展示了其作業原理,該工具消費源集群訊息然后將資料又一次推送到目標集群,

2.使用

這里分為兩個kafka集群,名稱為源kafka集群和目標kafka集群,我們是要把源kafka集群的資料同步到目標kafka集群中,可以指定全部的topic或部分的topic進行同步,

其中同步的topic的名稱須一致,需提前創建好,磁區數和副本可以不一致!

主要參數說明:

1\. --consumer.config:消費端相關組態檔 

2\. --producer.config:生產端相關組態檔 

3\. --num.streams: consumer的執行緒數  默認1

4\. --num.producers: producer的執行緒數  默認1

5\. --blacklist: 不需要同步topic的黑名單,支持Java正則運算式

6.--whitelist:需要同步topic的白名單,符合java正則表達式形式

7\. -queue.size:consumer和producer之間快取的queue size,默認10000

在源kafka集群創建consumer.config和producer.config檔案,然后配置如下資訊:

consumer.config配置

bootstrap.servers=192.169.2.144:9092

group.id=MW-MirrorMaker

auto.commit.enable=true

auto.commit.interval.ms=1000

fetch.min.bytes=6553600

auto.offset.reset = earliest

max.poll.records = 1000

producer.config配置

bootstrap.servers=192.169.2.249:9092

retries = 3

acks = all

batch.size = 16384

producer.type=sync

batch.num.messages=1000

其中 consumer.config的
bootstrap.servers是源kafka集群的地址,producer.config是目標kafka的地址,可以填寫多個,用逗號隔開!

同步啟動命令示例:

nohup ../bin/kafka-mirror-maker.sh \--consumer.config consumer.config
\--num.streams 10 \--producer.config producer.config ---num.producers 10
\--whitelist \"MT_RVOK_TEST9\" \>/dev/null 2\>&1 &

可以使用jps在行程中查詢得到,查看具體同步資訊可以查看kafka消費組的offset得到,

3.測驗

用程式往 MT_RVOK_TEST9
先往源kafka(192.169.2.144:9092)發送10000條資料,然后啟動同步命令,查看目標kafka集群(192.169.2.249:9092),同步成功!



內外網kafka穿透(網閘)

背景:因為傳輸原因,需要kafka能夠在內外網傳輸,通過網閘,

解決方案:

1.網閘kafka內外網傳輸必要條件
1.網閘內外網可用,且網閘開放的埠和kakfa開放的埠必須一致,比如kafka默認是9092,那么網閘開放的埠也是9092;
2.網閘開放埠,必須雙向資料同步,不能只單向傳輸,網閘和外網以及kakfa內網之間互信;
3.kafka配置需要添加額外配置引數,server.properties核心配置如下:

listeners=PLAINTEXT://kafka-cluster:9092
advertised.listeners=PLAINTEXT://kafka-cluster:9092

1.kafka服務、內網訪問服務、外網訪問服務,均需設定ip和域名映射,linux在/etc/hosts檔案中,添加ip和域名映射關系,內網訪問,則ip為kafka內網的ip,外網訪問則ip為網閘的ip;
內網訪問kafka的檔案配置示例:

192.168.0.1  kafka-cluster

外網Windows的host檔案配置示例:

100.100.100.100  kafka-cluster

2.測驗步驟

1.依次啟動zookeeper和kafka服務,可以使用jps命令查看是否啟動;

1.使用如下命令在kakfa的bin目錄下進行生產和消費測驗:

生產命令

./kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic test_3

消費命令:

./kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic test_3

示例:
在生產的客戶端隨意資料資料,查看消費端是否有資料消費

1.在外網使用kafka消費程式進行測驗

資料消費成功示例:

其他問題

kafka資料丟失:
一般來說分為發送訊息丟失和消費丟失,區別方式可以用kafka命令進行消費判斷,如果是發送訊息丟失,那么一般是配置或網路問題;如果是消費訊息丟失,多半是自動提交或無事務;找到資料源頭解決就行,組態檔更改配置,可參考上面我發出的配置,自動提交改成手動提交,入庫失敗不提交,

kafka重復消費:
要么是重復發送,要么是消費之后未提交且進行過重啟,要么是更換了消費組(group),還有就是offset重置了這種,可根據原因對癥下藥解決即可,解決辦法可參考本文上述示例,

kafka訊息積壓(堵塞):
生產的訊息速度遠遠大于消費的速度,導致訊息積壓,
常見情況一、磁區設定不合理,磁區個數太少,比如默認磁區5個,導致消費執行緒最多只有5個,可選辦法有增加磁區,然后在增加消費執行緒;
常見情況二、消費端處理過于耗時,拿到訊息之后,遲遲未提交,導致消費速率太慢,可選辦法有將耗時處理方法抽出,比如在進行一次異步處理,確保拿到kafka訊息到入庫這塊效率;
常見情況三、IO問題或kafka集群問題,寬帶升級或集群擴容,
不常見問題、配置設定問題,一般而言,kafka的配置除了必要的配置,大部分配置是不用更改,若資料量實在太大,需要調優,則可根據官方提供的配置進行除錯,

其他

本以為寫這種型別文章不太耗時,沒想到一看又是凌晨了,整理筆記、文字排版還是真的有點費時,后續在更新一下linux的就結束這個系列吧,至于其他的各種知識有空的就在其他的篇章系列繼續更新吧~

手記系列

記載個人從剛開始作業到現在各種雜談筆記、問題匯總、經驗累積的系列,

手記系列

  • 手記系列之一 ----- 關于微信公眾號和小程式的開發流程
  • 手記系列之二 ----- 關于IDEA的一些使用方法經驗
  • 手記系列之三 ----- 關于使用Nginx的一些使用方法和經驗
  • 手記系列之四 ----- 關于使用MySql的經驗
  • 手記系列之五 ----- SQL使用經驗分享

一首很帶感的動漫鋼琴曲~

<iframe frameborder="no" border="0" margin marginheight="0" height="86" src="https://www.cnblogs.com//music.163.com/outchain/player?type=2&id=38574228&auto=0&height=66"></iframe>

原創不易,如果感覺不錯,希望給個推薦!您的支持是我寫作的最大動力!
著作權宣告:
作者:虛無境
博客園出處:http://www.cnblogs.com/xuwujing
CSDN出處:http://blog.csdn.net/qazwsxpcm    
個人博客出處:https://xuwujing.github.io/

如果你對生活感覺到了絕望,請不要氣餒,因為這樣只會讓你更加絕望! 所謂的希望往往都是在絕望中萌發的,所以,請不要放棄希望!

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/554722.html

標籤:大數據

上一篇:Kafka的系統架構和API開發

下一篇:返回列表

標籤雲
其他(160655) Python(38218) JavaScript(25488) Java(18210) C(15237) 區塊鏈(8270) C#(7972) AI(7469) 爪哇(7425) MySQL(7238) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5873) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4588) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2435) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1984) 功能(1967) HtmlCss(1956) Web開發(1951) C++(1933) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1880) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • 手記系列之六 ----- 分享個人使用kafka經驗

    ## 前言 本篇文章主要介紹的關于本人從剛作業到現在使用kafka的經驗,內容非常多,包含了kafka的常用命令,在生產環境中遇到的一些場景處理,kafka的一些web工具推薦等等。由于kafka這塊的記錄以及經驗是從我剛開始使用kafka,從2017年開始,可能里面有些內容過時,請見諒。溫馨提醒, ......

    uj5u.com 2023-06-09 08:20:22 more
  • Kafka的系統架構和API開發

    # 系統架構 **主題topic和磁區partition** - topic Kafka中存盤資料的邏輯分類;你可以理解為資料庫中“表”的概念;比如,將app端日志、微信小程式端日志、業務庫訂單表資料分別放入不同的topic - partition磁區(提升kafka吞吐量) topic中資料的具體 ......

    uj5u.com 2023-06-09 08:20:03 more
  • Centos 7 通過 targz 檔案安裝 Elastic Search 服務

    區別于通過發行版自帶的倉庫, 介紹如何通過 targz 檔案安裝 Elastic Search 服務, 使用的 Linux 為 Centos 7 ......

    uj5u.com 2023-06-09 08:19:56 more
  • 國產資料庫的譜系

    資料庫產品的成功絕對不是技術堆疊的成功,而是需要有大量的應用場景磨合才能逐步成功的。如果僅僅依靠自己那幾百個用戶,想要發展出成熟的高水平的商用資料庫產品來,那幾乎是不太能的。依靠開源社區的廣大用戶來研發自己的資料庫產品不失為一種比較好的策略。 ......

    uj5u.com 2023-06-08 09:44:26 more
  • 手記系列之五 ----- SQL使用經驗分享

    ## 前言 本篇文章主要介紹的關于本人從剛作業到現在使用Sql一些使用方法和經驗,從最基本的SQL函式使用,到一些場景的業務場景SQL撰寫。 ## SQL基礎函式使用 ### 1.欄位轉換 CASE WHEN 意義: If(a==b) a=c; 用法: 1, CASE 欄位 WHEN 欄位結果1 T ......

    uj5u.com 2023-06-08 09:43:44 more
  • Hive執行計劃之hive依賴及權限查詢和常見使用場景

    [TOC] ## 概述 Hive查看執行計劃的命令中還有兩個不怎么常用但很重要的命令,接下來詳細介紹一下。 有一個問題:**如何在hiveSQL執行之前就探查到這段邏輯的血緣依賴關系?** hive血緣是很多生產級數倉必須要提供的功能,大多數解決方案都是**使用hive hooks的方法通過SQL執 ......

    uj5u.com 2023-06-08 09:43:21 more
  • kafka的安裝和基本操作

    # 基本概念 ## 簡介 Kafka 最初是由 LinkedIn 即領英公司基于 Scala 和 Java 語言開發的分布式訊息發布-訂閱系統,現已捐獻給Apache 軟體基金會。其具有高吞吐、低延遲的特性,許多大資料實時流式處理系統比如 Storm、Spark、Flink等都能很好地與之集成。 總 ......

    uj5u.com 2023-06-08 09:43:14 more
  • 《SQL 必知必會》全決議

    > 不要哀求,學會爭取。若是如此,終有所獲。 > > 原文:https://mp.weixin.qq.com/s/zbOqyAtsWsocarsFIGdGgw ## 前言 你是否還在煩惱 SQL 該從何學起,或者學了 SQL 想找個地方練練手?好巧不巧,最近在作業之余登上牛客,發現了牛客不知道啥時候 ......

    uj5u.com 2023-06-08 09:43:09 more
  • GaussDB(DWS)查詢過濾器原理與應用

    摘要:GaussDB(DWS)查詢過濾器(黑名單)提供查詢過濾功能,支持自動隔離反復被終止的查詢,防止爛SQL再次執行。 本文分享自華為云社區《GaussDB(DWS)查詢過濾器原理與應用》,作者:門前一棵葡萄樹 。 一、概述 GaussDB(DWS)查詢過濾器(黑名單)提供查詢過濾功能,支持自動隔 ......

    uj5u.com 2023-06-08 09:43:01 more
  • SQL Server 補丁理解及安裝 內附完整版下載地址及sp1/2/3補丁

    啟動安裝程式 下載sqlserver2014,雙擊startup.exe進行安裝 系統配置檢查器 使用系統配置檢查器,看系統是否符合安裝sqlserver2014的所有要求 開始安裝 然后點擊安裝,全新sqlserver獨立安裝或向現有安裝添加功能 安裝規則 然后就是使用默認的設定,點開詳細資訊,可 ......

    uj5u.com 2023-06-08 09:42:22 more