本文原始碼:GitHub·點這里 || GitEE·點這里
一、Kafka集群環境
1、環境版本
版本:kafka2.11,zookeeper3.4
注意:這里zookeeper3.4也是基于集群模式部署,
2、解壓重命名
tar -zxvf kafka_2.11-0.11.0.0.tgz
mv kafka_2.11-0.11.0.0 kafka2.11
創建日志目錄
[root@en-master kafka2.11]# mkdir logs
注意:以上操作需要同步到集群下其他服務上,
3、添加環境變數
vim /etc/profile
export KAFKA_HOME=/opt/kafka2.11
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
4、修改核心配置
[root@en-master /opt/kafka2.11/config]# vim server.properties
-- 核心修改如下
# 唯一編號
broker.id=0
# 開啟topic洗掉
delete.topic.enable=true
# 日志地址
log.dirs=/opt/kafka2.11/logs
# zk集群
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
注意:broker.id安裝集群服務個數編排即可,集群下不能重復,
5、啟動kafka集群
# 啟動命令
[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties
# 停止命令
[root@node02 kafka2.11]# bin/kafka-server-stop.sh
# 行程查看
[root@node02 kafka2.11]# jps
注意:這里默認啟動了zookeeper集群服務,并且集群下的kafka分別啟動,
6、基礎管理命令
創建topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--create --replication-factor 3 --partitions 1 --topic one-topic
引數說明:
- replication-factor 定義副本個數
- partitions 定義磁區個數
- topic:定義topic名稱
查看topic串列
bin/kafka-topics.sh --zookeeper zk01:2181 --list
修改topic磁區
bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5
查看topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--describe --topic one-topic
發送訊息
bin/kafka-console-producer.sh \
--broker-list 192.168.72.133:9092 --topic one-topic
消費訊息
bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic
洗掉topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--delete --topic first
7、Zk集群用處
Kafka集群中有一個broker會被選舉為Controller,Controller依賴Zookeeper環境,管理集群broker的上下線,所有topic的磁區副本分配和leader選舉等作業,
二、訊息攔截案例
1、攔截器簡介
Kafka中間件的Producer攔截器主要用于實作訊息發送的自定義控制邏輯,用戶可以在訊息發送前以及回呼邏輯執行前有機會對訊息做一些自定義,比如訊息修改等,發送狀態監控等,用戶可以指定多個攔截器按順序執行攔截,
核心方法
- configure:獲取配置資訊和初始化資料時呼叫;
- onSend:訊息被序列化以及和計算磁區前呼叫該方法,可以對訊息做操作;
- onAcknowledgement:訊息發送到Broker之后,或發送程序失敗時呼叫;
- close:關閉攔截器呼叫,執行一些資源清理作業;
注意:這里說的攔截器是針對訊息發送流程,
2、自定義攔截
定義方式:實作ProducerInterceptor介面即可,
攔截器一:在onSend方法中,對攔截的訊息進行修改,
@Component
public class SendStartInterceptor implements ProducerInterceptor<String, String> {
private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor");
@Override
public void configure(Map<String, ?> configs) {
LOGGER.info("configs...");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 修改訊息內容
return new ProducerRecord<>(record.topic(), record.partition(),
record.timestamp(), record.key(),
"onSend:{" + record.value()+"}");
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
LOGGER.info("onAcknowledgement...");
}
@Override
public void close() {
LOGGER.info("SendStart close...");
}
}
攔截器二:在onAcknowledgement方法中,判斷訊息是否發送成功,
@Component
public class SendOverInterceptor implements ProducerInterceptor<String, String> {
private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor");
@Override
public void configure(Map<String, ?> configs) {
LOGGER.info("configs...");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
LOGGER.info("record...{}", record.value());
return record ;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception != null){
LOGGER.info("Send Fail...exe-msg",exception.getMessage());
}
LOGGER.info("Send success...");
}
@Override
public void close() {
LOGGER.info("SendOver close...");
}
}
加載攔截器:基于一個KafkaProducer配置Bean,加入攔截器,
@Configuration
public class KafkaConfig {
@Bean
public Producer producer (){
Properties props = new Properties();
// 省略其他配置...
// 添加攔截器
List<String> interceptors = new ArrayList<>();
interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor");
interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
return new KafkaProducer<>(props) ;
}
}
3、代碼案例
@RestController
public class SendMsgWeb {
@Resource
private KafkaProducer<String,String> producer ;
@GetMapping("/sendMsg")
public String sendMsg (){
producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));
return "success" ;
}
}
基于上述自定義Bean型別,進行訊息發送,關注攔截器中列印日志資訊,
三、Kafka存盤分析
說明:該程序基于上述案例producer.send方法追蹤的原始碼執行流程,原始碼中的程序相對清楚,涉及的核心流程如下,
1、訊息生成程序

Producer發送訊息采用的是異步發送的方式,訊息發送程序如下:
- Producer發送訊息之后,經過攔截器,序列化,事務判斷;
- 流程執行后,訊息內容放入容器中;
- 容器在指定時間內如果裝滿(size),會喚醒Sender執行緒;
- 容器如果在指定時間內沒有裝滿,也會執行一次Sender執行緒喚醒;
- 喚醒Sender執行緒之后,把容器資料拉取到topic中;
絮叨一句:讀這些中間件的原始碼,不僅能開闊思維,也會讓自己意識到平時寫的代碼可能真的叫搬磚,
2、存盤機制
Kafka中訊息是以topic進行標識分類,生產者面向topic生產訊息,topic磁區(partition)是物理上的存盤,基于訊息日志檔案的方式,

- 每個partition對應于一個log檔案,發送的訊息不斷追加到該log檔案末端;
- log檔案中存盤的就是producer生產的訊息資料,采用分片和索引機制;
- partition分為多個segment,每個segment對應兩個(.index)和(.log)檔案;
- index檔案型別存盤的索引資訊;
- log檔案存盤訊息的資料;
- 索引檔案中的元資料指向對應資料檔案中message的物理偏移地址;
- 消費者組中的每個消費者,都會實時記錄消費的訊息offset位置;
- 當然訊息消費出錯時,恢復是從上次的記錄位置繼續消費;
3、事務控制機制

Kafka支持訊息的事務控制
Producer事務
跨磁區跨會話的事務原理,引入全域唯一的TransactionID,并將Producer獲得的PID和TransactionID系結,Producer重啟后可以通過正在進行的TransactionID獲得原來的PID,
Kafka基于TransactionCoordinator組件管理Transaction,Producer通過和TransactionCoordinator互動獲得TransactionID對應的任務狀態,TransactionCoordinator將事務狀態寫入Kafka的內部Topic,即使整個服務重啟,進行中的事務狀態可以得到恢復,
Consumer事務
Consumer訊息消費,事務的保證強度很低,無法保證訊息被精確消費,因為同一事務的訊息可能會出現重啟后已經被洗掉的情況,
四、源代碼地址
GitHub·地址
https://github.com/cicadasmile/data-manage-parent
GitEE·地址
https://gitee.com/cicadasmile/data-manage-parent

推薦關聯閱讀:資料源管理系列
| 序號 | 標題 |
|---|---|
| 01 | 資料源管理:主從庫動態路由,AOP模式讀寫分離 |
| 02 | 資料源管理:基于JDBC模式,適配和管理動態資料源 |
| 03 | 資料源管理:動態權限校驗,表結構和資料遷移流程 |
| 04 | 資料源管理:關系型分庫分表,列式庫分布式計算 |
| 05 | 資料源管理:PostGreSQL環境整合,JSON型別應用 |
| 06 | 資料源管理:基于DataX組件,同步資料和原始碼分析 |
| 07 | 資料源管理:OLAP查詢引擎,ClickHouse集群化管理 |
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/167448.html
標籤:Java
上一篇:JVM常見面試題決議
