安裝kafka
下載地址
? https://kafka.apache.org/documentation
解壓
tar -zxvf kafka_2.12-2.4.1-C /usr/local/
進入到config目錄,編輯 server.properties 組態檔
# 每個broker節點的id,集群架構下要唯一
broker.id = 0
# kafka對外提供的ip和埠號
#listeners=PLAINTEXT://:9092 # 這是系統默認的
# 這里的ip要使用內網的,也就是ifconfig查看出來的
listeners=PLAINTEXT://內網ip:9092
# 外部代理地址,比如java代碼使用客戶端操作連接kafka服務器
advertised.listeners=PLAINTEXT://外網ip:9092
# 訊息存盤的目錄
log.dirs=/usr/local/kafka_2.12-2.4.1/data
# zookeeper地址
zookeeper.connect=localhost:2181
# 訊息保留的時間,單位為小時,默認為7天
log.retention.hours=168
啟動kafka:
./bin/kafka-server-start.sh config/server.properties &
#或者
./bin/kafka-server-start.sh -daemon config/server.properties
查看是否啟動成功
jps或者 ps -ef | grep kafka
Kafka核心組件
Broker
一個kafka節點就是一個broker,一個broker或者多個broker可以組成一個集群,(1臺機器也是集群)
Topic
訊息的主題,發送到kafka的每條訊息都需要指定一個主題,
Producer
訊息的生產者,向Broker發送訊息的客戶端,
Consumer
訊息的消費者,從Broker消費訊息的客戶端,
ConsumerGroup
每個Consumer都可以指定一個ConsumerGroup,一條訊息可以被不同的ConsumerGroup的Consumer進行消費,
但是一個ConsumerGroup中只有一個Consumer能夠消費該訊息,
Partition
物理概念,一個topic可以可以分為多個partition,每個partition內部訊息是有序的,
命令列操作kafka
操作topic
# 把元資訊資料存盤到zk中,真正佇列的資料還是存盤在kafka的broker中
# --partitions為磁區數量,默認也是1個磁區 --topic test為指定topic的名稱
./bin/kafka‐topics.sh ‐‐create ‐‐zookeeper localhost:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test
# 查詢有哪些topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
# 洗掉一個topic
./bin/kafka‐topics.sh ‐‐delete ‐‐topic test ‐‐zookeeper localhost:2181
# 查看topic的情況
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
使用producer生產者發送一條訊息
# 這里的ip為內網ip
./bin/kafka-console-producer.sh --broker-list ip:9092 --topic test
出現下面這個界面表示生產者啟動成功

可直接輸入要發送的內容,
使用consumer消費者消費一條訊息
# 這里的ip為內網ip,該命令只能消費該消費者啟動成功之后生產者發送的訊息
./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
# 消費生產者所有的訊息,只需要在上面的命令加上 --from-beginning
# kafka消費完之前的訊息不會立馬洗掉,還會在磁盤的檔案里面存在,默認保留一周,這是與傳統訊息中間件不同之一
./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic test
# 消費多個主題的訊息,加上 --whitelist "topic1|topic2",每個主題中間用管道符 相隔
./bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server ip:9092 ‐‐whitelist "test|test‐2"
每個消費者會記錄上一次消費訊息的偏移量,等下一次啟動消費的時候從上一次的偏移量的下一條訊息開始消費,
consumer-group
一個消費組可以有多個消費者,
單播訊息
# 通過 --consumer-property group.id=testGroup 來進行指定消費者組名
./bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server ip:9092 ‐‐consumer‐property group.id=testGroup ‐‐topic test
使用該命令打開兩個視窗,只能被其中一個視窗的消費者消費,也就是說只能被同一個消費組下面的某一個消費者消費,類似于Queue模式,
多播訊息
# 通過 --consumer-property group.id=testGroup 來進行指定消費者組名
./bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server ip:9092 ‐‐consumer‐property group.id=testGroup2 ‐‐topic test
創建兩個不同的消費組名稱即可,也就是再創建一個testGroup2的消費組,每個消費組都可以消費到訊息,類似于發布訂閱的模式,
查看所有消費組
./bin/kafka-consumer-groups.sh --bootstrap-server ip:9092 --list
查看消費組的偏移量
./bin/kafka-consumer-groups.sh --bootstrap-server ip:9092 --describe --group testGroup
- GROUP:消費組名稱
- TOPIC:主題名稱
- PARTITION:磁區名稱,默認1個磁區
- CURRENT-OFFSET:當前已消費的偏移量
- LOG-END-OFFSET:訊息末尾的偏移量
- LAG:剩余未消費的訊息數量
- CONSUMER-ID:消費者的id
- HOST:主機地址
- CLIENT-ID:

partition
創建多個磁區的topic
##### --partitions 后面的引數為磁區的數量
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1
- 創建完成之后查看該topic的情況,可以看到PartitionCount為2,也就是上面我們指定的磁區數量

- 查看我們配置的log.dir檔案夾目錄下面,可以看到每個topic下面的磁區對應一個檔案夾

- 進入到test-0,可以看到當前磁區的檔案內容,以
.log檔案結尾的檔案就是訊息日志存盤的檔案

擴容topic的磁區數量
./bin/kafka‐topics.sh ‐alter ‐‐partitions 3 ‐‐zookeeper localhost:2181 ‐‐topic test
# 重新查看該topic的情況
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
擴容主題的磁區數量并重新查看主題情況,可以看到磁區已經是3個了,

Kafka集群
這里使用單機搭建kafka集群,只需拷貝兩份 server.properties 組態檔即可,在原先的kafka上面增加兩個節點即可,
節點2的 server.properties 組態檔:
# 每個broker節點的id,集群架構下要唯一
broker.id = 1
# kafka對外提供的ip和埠號
#listeners=PLAINTEXT://:9092 # 這是系統默認的
# 這里的ip要使用內網的,也就是ifconfig查看出來的
listeners=PLAINTEXT://ip:9093
# 外部代理地址,比如java代碼使用客戶端操作連接kafka服務器
advertised.listeners=PLAINTEXT://外網ip:9093
# 訊息存盤的目錄
log.dirs=/usr/local/kafka_2.12-2.4.1/data2
# zookeeper地址
zookeeper.connect=localhost:2181
節點3的 server.properties 組態檔:
# 每個broker節點的id,集群架構下要唯一
broker.id = 2
# kafka對外提供的ip和埠號
#listeners=PLAINTEXT://:9092 # 這是系統默認的
# 這里的ip要使用內網的,也就是ifconfig查看出來的
listeners=PLAINTEXT://ip:9094
# 外部代理地址,比如java代碼使用客戶端操作連接kafka服務器
advertised.listeners=PLAINTEXT://外網ip:9094
# 訊息存盤的目錄
log.dirs=/usr/local/kafka_2.12-2.4.1/data3
# zookeeper地址
zookeeper.connect=localhost:2181
主要更改 broker.id、listeners、log.dirs 這三個屬性,
kafka的集群是通過zookeer的地址去判斷的,zookeeper的地址一樣,后面的節點都會水平擴容到原先的集群上面去,
然后分別啟動這里兩個節點,帶上這兩個組態檔,
啟動完成之后創建一個新的topic
# --replication-factor 3 指定副本為3個,每個磁區都分別對應3個副本
# --partitions 2 磁區為2個
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-replication-topic
可以看到兩個磁區分散在兩個broker節點上面,防止其中一個節點掛掉了,另外一個節點可以繼續提供作業,
這3個副本有一個leader和兩個follower,Learder:0表示這三個副本的主節點所在的broker的id,

- Partition:磁區
- Leader:該partition的Learder節點所在的broker節點的id
- Replicas:該partition在哪幾個broker上備份著,不管是不是leader都會顯示出來
- Isr:以存活著的節點的broker.id,這里只顯示已備份該partition的節點的broker.id
停掉broker.id為1的節點,再來查看該topic的情況,此時磁區1的Leader已經發生了變化,重新選舉了brokder.id為2的作為了leader,Isr也沒有之前的broker.id為1那臺機器,Replicas副本位于哪幾臺機器上面是不會變化的,

一個磁區的訊息可以被不同的消費組的某一個消費者消費,一個消費者可以消費不同磁區的訊息,
kafka在同一個partition內可以保證訊息的消費順序,不能在多個partition中保證總的訊息消費順序,每個partition會維護自己的offset,
如果需要保證總體上的順序,只能將partition設定為,同時消費組里面設定1個consumer,這樣性能會低,所以kafka的順序消費很少用,
一個消費組中的消費者不能比磁區數量多,否則多出來的消費者會消費不到訊息,
Java 使用 Kafka
1.原生api方式
pom依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
生產者
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
// 如果當前連接的kafka配置了集群,只需要注冊一個就可以,他會自動把當前集群里面所有的節點注冊進來,
// 一般情況下為了高可用,還是在這里多注冊兩個節點,以防某一個節點掛掉了,其他的也注冊不上,
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:9092,ip:9093,ip:9094");
// key 序列化
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 宕機的情況下重試幾次,默認為3此
// properties.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
// 每次重試間隔時間,單位毫秒
// properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,"100");
//生產者每次批量發送訊息的大小,默認16kb,一定要配合下面的 linger.ms 引數使用
// properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10");
// 生產者快取資料到一定時間發送,默認時間為0毫秒(單位毫秒),kafka作者建議8-10毫秒
// properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,"0");
// 壓縮的型別,一共四種型別(none、gzip、snappy、lz4),一般情況下用第三個,使用壓縮的會降低性能
// properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
//client.id,此引數也可以不配置,代表訊息從哪里來(在發出請求時傳遞給服務器的id字串,
// 這樣做的目的是通過允許在服務器端請求日志中包含邏輯應用程式名稱,從而能夠跟蹤ip/埠之外的請求源,)
// properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "");
// 在阻塞之前,客戶機在單個連接上發送的未確認請求的最大數目,請注意,如果此設定設定為大于1且發送失敗,則由于重試(即,如果啟用重試)
// properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "");
// 自定義磁區管理器(磁區就是存盤資料所在的檔案夾)
// properties.setProperty("partitioner.class",MyPartitioner.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 11; i <= 20; i++) {
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic2", 0,"key" + i,"hello world" + i);
// 未指定磁區,使用 hash(key) % partition 來選擇磁區,核心代碼 `Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions`
ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-replication-topic","key" + i, "hello world" + i);
// 使用的并發編程 `Future` ,異步發送,拿結果還是同步的,
// 也就是訊息發送不成功,這里一直會阻塞
Future<RecordMetadata> send = kafkaProducer.send(record);
// 異步回呼拿結果,假如下面有其他業務邏輯可以先行處理,比如扣減庫存,
/*kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});*/
// TODO 扣減庫存的邏輯
// 拿到發送者的相關資訊
RecordMetadata recordMetadata = send.get();
String sendTargetTopic = recordMetadata.topic();
long sendTargetOffset = recordMetadata.offset();
int sendTargetPartition = recordMetadata.partition();
System.out.println(
"發送的主題:" + sendTargetTopic + ", 偏移量:" + sendTargetOffset + ", 磁區:" + sendTargetPartition);
}
kafkaProducer.flush();
// 優雅關閉,關閉之后kafka會有個心跳機制,默認10秒,
// 如果還沒有連接上就認為當前生產者宕機了,然后重新進行磁區負載均衡
kafkaProducer.close();
}
}
消費者
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092,ip:9093,ip");
// 反序列化器
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 反序列化器
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 關閉自動提交,默認值為true,true表示自動提交,也就是消費過的訊息不再重復消費,false會一直重復消費,
// 配置true的話下一次消費會從最后消費的下一個偏移量去進行消費,
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 自動提交的間隔時間
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 定義消費者群組
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "1111");
// 最小拉取訊息的大小,默認1位元組,一般配合下面的時間引數一起使用
// properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1");
// 最久等待資料時間
// properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,"500");
//最大拉取訊息的大小,一般也是配合時間使用,默認50MB(50 * 1024 * 1024)
// properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "50");
// 從每個磁區里面所能讀取到的最大位元組數,默認為1MB(1 * 1024 * 1024)
// properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,"1");
// 消費者定時發送心跳/多久沒有發送心跳kafka判斷消費者死亡,默認值為10秒(10000ms,單位毫秒),一般配合下面引數使用
// properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
// 發送心跳的間隔時間,默認3秒(3000ms,單位也是毫秒)
// properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
String topicName = "my-replication-topic";
// 系結主題,注意:subscribe和assign方法只能是用一個
// kafkaConsumer.subscribe(Collections.singletonList(topicName));
// 指定磁區消費
// kafkaConsumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
// 從頭開始消費,對應from-beginning命令
// kafkaConsumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
// kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition(topicName, 0)));
// 指定offset消費
// kafkaConsumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
// kafkaConsumer.seek(new TopicPartition(topicName, 0), 10);
// 消費1小時之前的資料
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
long fetchDateTime = new Date().getTime() - 60 * 60 * 1000;
Map<TopicPartition, Long> partitionLongMap = new HashMap<>(10);
for (PartitionInfo partitionInfo : partitions) {
partitionLongMap.put(new TopicPartition(topicName, partitionInfo.partition()), fetchDateTime);
}
// 根據時間戳找磁區的偏移量,并從該偏移量往后面去進行消費
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(partitionLongMap);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndTimestamp offsetAndTimestamp = entry.getValue();
if (topicPartition == null || offsetAndTimestamp == null) {
continue;
}
// 得到磁區
int partition = topicPartition.partition();
// 得到偏移量
long offset = offsetAndTimestamp.offset();
System.out.println("磁區:" + partition + ",偏移量:" + offset);
// 最侄訓是根據磁區和偏移量消費訊息
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
}
try {
//拉取訊息
while (true) {
// 引數為每間隔多久拉取一次
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
//偏移量是根據同一個磁區里面的偏移量遞增的
System.out.println(
"訊息所在磁區:" + consumerRecord.partition() + ",訊息的偏移量:" + consumerRecord.offset() + ",key:"
+ consumerRecord.key() + ",value:" + consumerRecord.value());
}
// 同步提交偏移量,當前執行緒阻塞,直至訊息提交完成之后處理后面的業務邏輯,
consumer.commitSync();
// 異步提交偏移量(可能會有訊息丟失的情況),也就是不管當前消費的訊息是否提交了offset,都不會阻塞,可以繼續處理后面的業務邏輯,
/*kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (null != exception) {
System.out.println("error offset:" + offsets);
System.out.println("error stackTrace:" + exception.getStackTrace());
}
}
});*/
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
2.Spring Boot方式
pom依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tqz</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.properties配置
spring.application.name=kafka-service
### 服務端地址
spring.kafka.bootstrap-servers=ip:9092,ip:9093,ip:9094
### 生產者配置
### 等leader寫成功,不要等follower寫成功就可以發送下一條訊息
spring.kafka.producer.acks=1
#重試次數
spring.kafka.producer.retries=3
### kafka會從本地緩沖區取資料,批量發送到broker,設定批量發送的大小,默認值是16384,即16kb,也就是說一個batch滿了16kb就發送出去,
spring.kafka.producer.batch-size=16384
### 生產者可用于緩沖等待發送到服務器的記錄的總記憶體位元組數,設定了該緩沖區,訊息會先快取到本地,可以提高訊息發送的性能,默認值是32MB,即 33554432
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
### 消費者配置
### 消費者組名
spring.kafka.consumer.group-id=myGroup
### 是否自動提交,true表示自動提交,false關閉自動提交,會一直重復消費,
spring.kafka.consumer.enable-auto-commit=false
### 當Kafka中沒有初始偏移量或者服務器上不再存在當前偏移量時該怎么辦,默認值為latest,表示自動將偏移重置為最新的偏移量,可選的值為latest, earliest, none
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.ack-mode=manual_immediate
# RECORD 當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
# BATCH 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后提交
# TIME 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交
# COUNT 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交
# TIME 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交
# COUNT_TIME TIME或COUNT有一個條件滿足時提交
# MANUAL 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后, 手動呼叫Acknowledgment.acknowledge()后提交
# MANUAL_IMMEDIATE 手動呼叫Acknowledgment.acknowledge()后立即提交
生產者
/**
* <p>發送訊息的控制器
*
* @author tianqingzhao
* @since 2021/10/28 13:50
*/
@RestController
public class ProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC_NAME = "my-replication-topic";
@RequestMapping("send")
public long send(String key, String data) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(TOPIC_NAME, 0, key, data);
SendResult<String, String> sendResult = result.get();
RecordMetadata metadata = sendResult.getRecordMetadata();
String msg = "主題:" + metadata.topic() + ",磁區:" + metadata.partition() + ",偏移量:" + metadata.offset();
System.out.println(msg);
return System.currentTimeMillis();
}
}
消費者
/**
* <p>kafka消費者
*
* @author tianqingzhao
* @since 2021/10/28 13:59
*/
@Component
public class KafkaConsumer {
/**
* 一個消費者可以接收多個topic
*
* @param record 資料
* @param ack 確認機制
*/
@KafkaListener(groupId = "myGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = "0"),
@TopicPartition(topic = "my-replication-topic", partitions = {"0", "1"})})
public void kafkaConsumer(ConsumerRecord<String, String> record, Acknowledgment ack) {
String key = record.key();
String value = record.value();
System.out.println(key + "===" + value);
// 手動提交offset
ack.acknowledge();
}
}
Kafka日志存盤
.index索引檔案,kafka每次往磁區發送4kb(默認值,可配置)訊息就會記錄一下當前訊息的offset到index檔案,
如果要查找某一條訊息的offset就會現在這個索引檔案里面找,再去log檔案里面查找訊息,
.log檔案日志資訊,存盤offset和訊息體,
.timeindex是訊息發送的時間索引檔案,每次往磁區發送4kb的大小同時也會往該檔案里面記錄一下當前的時間戳與offset,根據時間
查找offset的就會現在這個檔案里面查找,

Partition副本選舉leader機制
controller控制器感知磁區leader的broker節點掛掉了,controller會從ISR串列里面選舉第一個broker節點的id作為leader,
因為第一個broker是最先放進ISR,他的資料可能是最全的,
可通過 unclean.leader.election.enable=true 來配置的話會從Replicas里面選取一臺作為leader,該值默認為false,
副本進入ISR串列的條件:
-
副本節點不能產生磁區,必須與zk保持連接以及跟leader保持連接
-
副本能復制leader上的所有寫操作,并且不能落后太多,與leader副本同步滯后的副本,是由
replica.lag.time.max.ms配置決定的,超過這個時間都沒有跟leader同步過的一次的副本會被移出ISR串列)
消費者記錄訊息的offset記錄機制
每個consumer會定期將自己消費的offset發送給kafka的broker內部的topic,也就是在 log.dir 里面配置的檔案夾地址
里面有 __consumer_offsets-數值,
key是consumerGroupId+topic+磁區號,value是當前offset的值,kafka會定期清理topic里面的訊息,只保留最后一條記錄,
比如有兩個消費者,其中一個消費者掛掉了,另外一個消費者頂上,就是根據key值得到的offset往后面加1進行消費,
這個主題默認會有50個磁區,可通過 offsets.topic.num.partitions 來進行設定,
分配到哪個磁區的計算公式:
hash(consumerGroupId) % __consumer_offsets主題的磁區數
消費者Rebalance機制
rebalance是指消費者數量、磁區數量發生了變化或者消費者訂閱了更多的topic,kafka會重新分配消費者消費磁區的關系,
比如消費者掛掉了或者動態擴容磁區,
rebalance只針對subsribe這種不指定磁區的有效,如果通過assign指定了磁區,不會進行rebalance,
rebalance程序中消費者無法進行消費,如果節點數過多,避免高峰期rebalance,
Rebalance策略
range
假設有5個磁區,三個消費者,再新增一個磁區或者消費者掛了一個,所有的磁區都會重新分配消費者,相當于初始化,
round-robin
輪循分配,6個磁區,兩個消費者,第一個消費者0、2、4,第二個消費者1、3、5,
sticky
原先的磁區已分配的消費者不會再變動,只會變動掛掉的消費者的磁區或者新增的磁區,
官方默認range策略,
Rebalance程序
1.1、消費者在啟動的時候會先找到GroupCoordinator所在的節點,
1.2、kafka的集群在其中一個節點上面選舉出來一個GroupCoordinator,這里每個消費組都會選出自己的GroupCoordinator,
1.3、將選舉出來的GroupCoordinator發送給broker節點,
2.1、此時可能還有其他消費者發送假如groupCoordinator的請求,
2.2、由GroupCoordinator選舉出來消費者所在消費組里面的找到一個消費者作為leaderCoordinator,
哪個消費者先向GroupCoordinator發送請求誰就是leaderCoordinator,不同的消費組都會有一個leaderCoordinator,
3.1、leaderCoordinator用來指定消費組的磁區方案,
3.2、把制定的磁區方案發送給GroupCoordinator,
4.1.然后GroupCoordinator同步給該消費組里面其他的消費者消費磁區方案,

Kafka-Manager管理界面
安裝檔案:https://www.cnblogs.com/dadonggg/p/8205302.html
解壓kafka-manager安裝包,解壓命令: unzip -d /usr/local/kafka-manager-1.3.3.7.zip
進入到解壓目錄里面,更改 conf 檔案夾下面的 application.conf 檔案:
# 更改zookeeper的連接地址
kafka-manager.zkhosts="localhost:2181"
啟動kafka-manager:
./bin/kafka-manager &
默認埠為9000,登上來之后先添加一個集群,

這里的集群名稱隨便起,然后配置zookeeper的連接地址,zookeeper多個的話用英文逗號分隔

添加完之后回去點擊剛剛我們添加的集群名稱就可以看到我們的topic和borker的個數了,
以及可以點進去每個Topics或者Brokers進行操作,

Kafka容量規劃
以系統日志檔案記錄來說,每條記錄1kb,日均10億點擊量的記錄,每個磁區設定3個副本,也就是30億kb的資料大小,
kafka日志記錄默認保留7天,也就是20TB的資料,如果設定3個磁區,每臺服務器的的磁盤空間最小7個TB,
所以針對流量大的網站,一定要給服務器的磁盤空間保留足夠的大小,
因為kafka是順序讀寫的,而SSD固態硬碟只是針對隨機讀寫快,機械硬碟對順序讀寫比較快,所以從成本來說買機械硬碟即可,
kafka底層使用了大量的并發編程,所以機器的CPU核數也要盡量高一點,
Kafka線上問題
1.訊息丟失情況
發送端
acks
- 0:客戶端只管發,不管服務端是否接收到都會繼續發送下一條訊息,可能會存在丟失訊息的情況,海量日志可以使用,
- 1:客戶端等待服務端的leader副本寫成功了之后就可以發送下一條訊息,不需要等待follower寫成功,
- -1/all:客戶端需要等待服務端的所有副本都寫成功之后才能發送下一條訊息,一般適用于金融業務,
消費端
如果消費端配置的自動提交,可能一次性拉取到500條訊息,但是還沒消費完就宕機了,下次就再也poll不到了,
2.訊息重復
發送端
發送端配置了重試機制,如果broker已經接收到了客戶端發送的訊息,但是在向發送端發送ack的時候網路出現了抖動,
導致發送端超時沒有收到收到ack,此時發送端可能會進行重發,
消費端
消費端配置了自動提交機制,第一次拉取到了訊息,但是還沒來得及提交服務就掛了,下一次還會消費到同樣的資料,
此時可以使用冪等性來進行處理,比如redis的SETNX,
3.訊息順序
如果配置了確認機制都寫成功的話就一定能保證訊息的順序消費問題,因為kafka是有序消費的,
但是如果配置了重試機制,kafka不會等待之前那一條訊息發送成功再去發送下一條訊息,比如三條訊息1、2、3,
第一條掛掉了,這時候可能就變成了2、3、1,
還有多個磁區的情況,消費端也無法保證訊息的順序消費,可以使用 CountDownLatch 來處理,但是這種情況性能可能不高,
可繼續優化為后臺開啟一些記憶體佇列,比如根據訊息的某種型別(訂單、商品、用戶)存到不同的記憶體佇列里面,
后臺開啟多個執行緒去進行消費,
4.訊息積壓
發送端發送訊息太快,消費者處理太慢,就可能會導致broker有大量訊息,可以讓某些消費者一直把訊息發給其他更多的磁區,
相當于進行轉發,因為kafka每個磁區只能被一個消費者消費,所以在原來的基礎上增加消費者已經沒有用,
可以在后面更多的磁區里面一一對應消費者,
可能還有有一些訊息導致我們程式上出現bug,針對這種情況可以搞一個死信佇列,
5.延遲佇列
RockerMQ是支持延遲發送的,但是Kafka不支持,比如要做一個訂單30分鐘未支付就支付超時,可以做一個延遲佇列,
每個訊息帶上一個時間,消費的時候跟當前時間進行對比,如果沒有到約定的時間,就重新扔到磁區里面去并把offset給帶上,
后面的訊息也就不用再比了,
6.Kafka事務
kafka只能保證本地事務,不能保證分布式的,類似于mysql的事務,下面的demo是來自官網的,分別是初始化、開啟、提交、回滾,
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
// 初始化
producer.initTransactions();
try {
// 開啟
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
// 提交
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
// 回滾
producer.abortTransaction();
}
producer.close();
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/350859.html
標籤:其他
