主頁 >  其他 > 分布式中間件之Kafka

分布式中間件之Kafka

2021-11-07 07:52:54 其他

安裝kafka

下載地址

? https://kafka.apache.org/documentation

解壓

tar -zxvf kafka_2.12-2.4.1-C /usr/local/

進入到config目錄,編輯 server.properties 組態檔

# 每個broker節點的id,集群架構下要唯一
broker.id = 0
# kafka對外提供的ip和埠號
#listeners=PLAINTEXT://:9092 # 這是系統默認的
# 這里的ip要使用內網的,也就是ifconfig查看出來的
listeners=PLAINTEXT://內網ip:9092
# 外部代理地址,比如java代碼使用客戶端操作連接kafka服務器
advertised.listeners=PLAINTEXT://外網ip:9092
# 訊息存盤的目錄
log.dirs=/usr/local/kafka_2.12-2.4.1/data
# zookeeper地址
zookeeper.connect=localhost:2181
# 訊息保留的時間,單位為小時,默認為7天
log.retention.hours=168

啟動kafka:

./bin/kafka-server-start.sh config/server.properties &
#或者
./bin/kafka-server-start.sh -daemon config/server.properties  

查看是否啟動成功

jps或者 ps -ef | grep kafka

Kafka核心組件

Broker

一個kafka節點就是一個broker,一個broker或者多個broker可以組成一個集群,(1臺機器也是集群)

Topic

訊息的主題,發送到kafka的每條訊息都需要指定一個主題,

Producer

訊息的生產者,向Broker發送訊息的客戶端,

Consumer

訊息的消費者,從Broker消費訊息的客戶端,

ConsumerGroup

每個Consumer都可以指定一個ConsumerGroup,一條訊息可以被不同的ConsumerGroup的Consumer進行消費,

但是一個ConsumerGroup中只有一個Consumer能夠消費該訊息,

Partition

物理概念,一個topic可以可以分為多個partition,每個partition內部訊息是有序的,

命令列操作kafka

操作topic

# 把元資訊資料存盤到zk中,真正佇列的資料還是存盤在kafka的broker中 
# --partitions為磁區數量,默認也是1個磁區 --topic test為指定topic的名稱
./bin/kafka‐topics.sh ‐‐create ‐‐zookeeper localhost:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test
# 查詢有哪些topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
# 洗掉一個topic
./bin/kafka‐topics.sh ‐‐delete ‐‐topic test ‐‐zookeeper localhost:2181
# 查看topic的情況
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

使用producer生產者發送一條訊息

# 這里的ip為內網ip
./bin/kafka-console-producer.sh --broker-list ip:9092 --topic test

出現下面這個界面表示生產者啟動成功
在這里插入圖片描述
可直接輸入要發送的內容,

使用consumer消費者消費一條訊息

# 這里的ip為內網ip,該命令只能消費該消費者啟動成功之后生產者發送的訊息
./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
# 消費生產者所有的訊息,只需要在上面的命令加上 --from-beginning
# kafka消費完之前的訊息不會立馬洗掉,還會在磁盤的檔案里面存在,默認保留一周,這是與傳統訊息中間件不同之一
./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic test
# 消費多個主題的訊息,加上 --whitelist "topic1|topic2",每個主題中間用管道符 相隔
 ./bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server ip:9092 ‐‐whitelist "test|test‐2"

每個消費者會記錄上一次消費訊息的偏移量,等下一次啟動消費的時候從上一次的偏移量的下一條訊息開始消費,

consumer-group

一個消費組可以有多個消費者,

單播訊息
# 通過 --consumer-property group.id=testGroup 來進行指定消費者組名
./bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server ip:9092 ‐‐consumer‐property group.id=testGroup ‐‐topic test

使用該命令打開兩個視窗,只能被其中一個視窗的消費者消費,也就是說只能被同一個消費組下面的某一個消費者消費,類似于Queue模式,

多播訊息
# 通過 --consumer-property group.id=testGroup 來進行指定消費者組名
./bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server ip:9092 ‐‐consumer‐property group.id=testGroup2 ‐‐topic test

創建兩個不同的消費組名稱即可,也就是再創建一個testGroup2的消費組,每個消費組都可以消費到訊息,類似于發布訂閱的模式,

查看所有消費組
./bin/kafka-consumer-groups.sh --bootstrap-server ip:9092 --list
查看消費組的偏移量
./bin/kafka-consumer-groups.sh --bootstrap-server ip:9092 --describe --group testGroup
  • GROUP:消費組名稱
  • TOPIC:主題名稱
  • PARTITION:磁區名稱,默認1個磁區
  • CURRENT-OFFSET:當前已消費的偏移量
  • LOG-END-OFFSET:訊息末尾的偏移量
  • LAG:剩余未消費的訊息數量
  • CONSUMER-ID:消費者的id
  • HOST:主機地址
  • CLIENT-ID:
    在這里插入圖片描述

partition

創建多個磁區的topic
##### --partitions 后面的引數為磁區的數量
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1
  • 創建完成之后查看該topic的情況,可以看到PartitionCount為2,也就是上面我們指定的磁區數量
    在這里插入圖片描述
  • 查看我們配置的log.dir檔案夾目錄下面,可以看到每個topic下面的磁區對應一個檔案夾
    在這里插入圖片描述
  • 進入到test-0,可以看到當前磁區的檔案內容,以 .log 檔案結尾的檔案就是訊息日志存盤的檔案
    在這里插入圖片描述
擴容topic的磁區數量
./bin/kafka‐topics.sh ‐alter ‐‐partitions 3 ‐‐zookeeper localhost:2181 ‐‐topic test
# 重新查看該topic的情況
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

擴容主題的磁區數量并重新查看主題情況,可以看到磁區已經是3個了,
在這里插入圖片描述

Kafka集群

這里使用單機搭建kafka集群,只需拷貝兩份 server.properties 組態檔即可,在原先的kafka上面增加兩個節點即可,

節點2的 server.properties 組態檔:

# 每個broker節點的id,集群架構下要唯一
broker.id = 1
# kafka對外提供的ip和埠號
#listeners=PLAINTEXT://:9092 # 這是系統默認的
# 這里的ip要使用內網的,也就是ifconfig查看出來的
listeners=PLAINTEXT://ip:9093
# 外部代理地址,比如java代碼使用客戶端操作連接kafka服務器
advertised.listeners=PLAINTEXT://外網ip:9093
# 訊息存盤的目錄
log.dirs=/usr/local/kafka_2.12-2.4.1/data2
# zookeeper地址
zookeeper.connect=localhost:2181

節點3的 server.properties 組態檔:

# 每個broker節點的id,集群架構下要唯一
broker.id = 2
# kafka對外提供的ip和埠號
#listeners=PLAINTEXT://:9092 # 這是系統默認的
# 這里的ip要使用內網的,也就是ifconfig查看出來的
listeners=PLAINTEXT://ip:9094
# 外部代理地址,比如java代碼使用客戶端操作連接kafka服務器
advertised.listeners=PLAINTEXT://外網ip:9094
# 訊息存盤的目錄
log.dirs=/usr/local/kafka_2.12-2.4.1/data3
# zookeeper地址
zookeeper.connect=localhost:2181

主要更改 broker.id、listeners、log.dirs 這三個屬性,

kafka的集群是通過zookeer的地址去判斷的,zookeeper的地址一樣,后面的節點都會水平擴容到原先的集群上面去,

然后分別啟動這里兩個節點,帶上這兩個組態檔,

啟動完成之后創建一個新的topic

# --replication-factor 3 指定副本為3個,每個磁區都分別對應3個副本
# --partitions 2 磁區為2個
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-replication-topic

可以看到兩個磁區分散在兩個broker節點上面,防止其中一個節點掛掉了,另外一個節點可以繼續提供作業,

這3個副本有一個leader和兩個follower,Learder:0表示這三個副本的主節點所在的broker的id,
在這里插入圖片描述

  • Partition:磁區
  • Leader:該partition的Learder節點所在的broker節點的id
  • Replicas:該partition在哪幾個broker上備份著,不管是不是leader都會顯示出來
  • Isr:以存活著的節點的broker.id,這里只顯示已備份該partition的節點的broker.id

停掉broker.id為1的節點,再來查看該topic的情況,此時磁區1的Leader已經發生了變化,重新選舉了brokder.id為2的作為了leader,Isr也沒有之前的broker.id為1那臺機器,Replicas副本位于哪幾臺機器上面是不會變化的,
在這里插入圖片描述
一個磁區的訊息可以被不同的消費組的某一個消費者消費,一個消費者可以消費不同磁區的訊息,

kafka在同一個partition內可以保證訊息的消費順序,不能在多個partition中保證總的訊息消費順序,每個partition會維護自己的offset,

如果需要保證總體上的順序,只能將partition設定為,同時消費組里面設定1個consumer,這樣性能會低,所以kafka的順序消費很少用,

一個消費組中的消費者不能比磁區數量多,否則多出來的消費者會消費不到訊息,

Java 使用 Kafka

1.原生api方式

pom依賴
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>
生產者
public class Producer {
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        // 如果當前連接的kafka配置了集群,只需要注冊一個就可以,他會自動把當前集群里面所有的節點注冊進來,
        // 一般情況下為了高可用,還是在這里多注冊兩個節點,以防某一個節點掛掉了,其他的也注冊不上,
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:9092,ip:9093,ip:9094");
        
        // key 序列化
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // 宕機的情況下重試幾次,默認為3此
        //        properties.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
        // 每次重試間隔時間,單位毫秒
        //        properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,"100");
        //生產者每次批量發送訊息的大小,默認16kb,一定要配合下面的 linger.ms 引數使用
        //        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10");
        // 生產者快取資料到一定時間發送,默認時間為0毫秒(單位毫秒),kafka作者建議8-10毫秒
        //        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,"0");
        // 壓縮的型別,一共四種型別(none、gzip、snappy、lz4),一般情況下用第三個,使用壓縮的會降低性能
        //        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //client.id,此引數也可以不配置,代表訊息從哪里來(在發出請求時傳遞給服務器的id字串,
        // 這樣做的目的是通過允許在服務器端請求日志中包含邏輯應用程式名稱,從而能夠跟蹤ip/埠之外的請求源,)
        //        properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "");
        // 在阻塞之前,客戶機在單個連接上發送的未確認請求的最大數目,請注意,如果此設定設定為大于1且發送失敗,則由于重試(即,如果啟用重試)
        //        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "");
        
        // 自定義磁區管理器(磁區就是存盤資料所在的檔案夾)
        //        properties.setProperty("partitioner.class",MyPartitioner.class.getName());
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
        for (int i = 11; i <= 20; i++) {
//                        ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic2", 0,"key" + i,"hello world" + i);
            
            // 未指定磁區,使用 hash(key) % partition 來選擇磁區,核心代碼 `Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions`
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-replication-topic","key" + i, "hello world" + i);
            // 使用的并發編程 `Future` ,異步發送,拿結果還是同步的,
            // 也就是訊息發送不成功,這里一直會阻塞
            Future<RecordMetadata> send = kafkaProducer.send(record);
            
            // 異步回呼拿結果,假如下面有其他業務邏輯可以先行處理,比如扣減庫存,
        /*kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
            
            }
        });*/
            // TODO 扣減庫存的邏輯
            
            // 拿到發送者的相關資訊
            RecordMetadata recordMetadata = send.get();
            String sendTargetTopic = recordMetadata.topic();
            long sendTargetOffset = recordMetadata.offset();
            int sendTargetPartition = recordMetadata.partition();
            
            System.out.println(
                    "發送的主題:" + sendTargetTopic + ", 偏移量:" + sendTargetOffset + ", 磁區:" + sendTargetPartition);
        }
        
        kafkaProducer.flush();
        // 優雅關閉,關閉之后kafka會有個心跳機制,默認10秒,
        // 如果還沒有連接上就認為當前生產者宕機了,然后重新進行磁區負載均衡
        kafkaProducer.close();
    }
}
消費者
public class Consumer {
    
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092,ip:9093,ip");
        // 反序列化器
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 反序列化器
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 關閉自動提交,默認值為true,true表示自動提交,也就是消費過的訊息不再重復消費,false會一直重復消費,
        // 配置true的話下一次消費會從最后消費的下一個偏移量去進行消費,
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 自動提交的間隔時間
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        
        // 定義消費者群組
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "1111");
        
        // 最小拉取訊息的大小,默認1位元組,一般配合下面的時間引數一起使用
        //        properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1");
        // 最久等待資料時間
        //        properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,"500");
        
        //最大拉取訊息的大小,一般也是配合時間使用,默認50MB(50 * 1024 * 1024)
        //        properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "50");
        // 從每個磁區里面所能讀取到的最大位元組數,默認為1MB(1 * 1024 * 1024)
        //        properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,"1");
        // 消費者定時發送心跳/多久沒有發送心跳kafka判斷消費者死亡,默認值為10秒(10000ms,單位毫秒),一般配合下面引數使用
        //        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
        // 發送心跳的間隔時間,默認3秒(3000ms,單位也是毫秒)
        //        properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        
        String topicName = "my-replication-topic";
        // 系結主題,注意:subscribe和assign方法只能是用一個
//        kafkaConsumer.subscribe(Collections.singletonList(topicName));
        // 指定磁區消費
//        kafkaConsumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
        
        // 從頭開始消費,對應from-beginning命令
//        kafkaConsumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
//        kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition(topicName, 0)));
        
        // 指定offset消費
//        kafkaConsumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
//        kafkaConsumer.seek(new TopicPartition(topicName, 0), 10);
        
        // 消費1小時之前的資料
        List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
        long fetchDateTime = new Date().getTime() - 60 * 60 * 1000;
        Map<TopicPartition, Long> partitionLongMap = new HashMap<>(10);
        for (PartitionInfo partitionInfo : partitions) {
            partitionLongMap.put(new TopicPartition(topicName, partitionInfo.partition()), fetchDateTime);
        }
    
        // 根據時間戳找磁區的偏移量,并從該偏移量往后面去進行消費
        Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(partitionLongMap);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetAndTimestamp offsetAndTimestamp = entry.getValue();
            if (topicPartition == null || offsetAndTimestamp == null) {
                continue;
            }
        
            // 得到磁區
            int partition = topicPartition.partition();
            // 得到偏移量
            long offset = offsetAndTimestamp.offset();
            System.out.println("磁區:" + partition + ",偏移量:" + offset);
    
            // 最侄訓是根據磁區和偏移量消費訊息
            consumer.assign(Collections.singletonList(topicPartition));
            consumer.seek(topicPartition, offset);
        }
        
        
        try {
            //拉取訊息
            while (true) {
                // 引數為每間隔多久拉取一次
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    //偏移量是根據同一個磁區里面的偏移量遞增的
                    System.out.println(
                            "訊息所在磁區:" + consumerRecord.partition() + ",訊息的偏移量:" + consumerRecord.offset() + ",key:"
                                    + consumerRecord.key() + ",value:" + consumerRecord.value());
                }
                
                // 同步提交偏移量,當前執行緒阻塞,直至訊息提交完成之后處理后面的業務邏輯,
                consumer.commitSync();
    
                // 異步提交偏移量(可能會有訊息丟失的情況),也就是不管當前消費的訊息是否提交了offset,都不會阻塞,可以繼續處理后面的業務邏輯,
                /*kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (null != exception) {
                            System.out.println("error offset:" + offsets);
                            System.out.println("error stackTrace:" + exception.getStackTrace());
                        }
                    }
                });*/
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
        
    }
}

2.Spring Boot方式

pom依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tqz</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/>
    </parent>
    
    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
application.properties配置
spring.application.name=kafka-service

### 服務端地址
spring.kafka.bootstrap-servers=ip:9092,ip:9093,ip:9094

### 生產者配置
### 等leader寫成功,不要等follower寫成功就可以發送下一條訊息
spring.kafka.producer.acks=1
#重試次數
spring.kafka.producer.retries=3
### kafka會從本地緩沖區取資料,批量發送到broker,設定批量發送的大小,默認值是16384,即16kb,也就是說一個batch滿了16kb就發送出去,
spring.kafka.producer.batch-size=16384
### 生產者可用于緩沖等待發送到服務器的記錄的總記憶體位元組數,設定了該緩沖區,訊息會先快取到本地,可以提高訊息發送的性能,默認值是32MB,即 33554432
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

### 消費者配置
### 消費者組名
spring.kafka.consumer.group-id=myGroup
### 是否自動提交,true表示自動提交,false關閉自動提交,會一直重復消費,
spring.kafka.consumer.enable-auto-commit=false
### 當Kafka中沒有初始偏移量或者服務器上不再存在當前偏移量時該怎么辦,默認值為latest,表示自動將偏移重置為最新的偏移量,可選的值為latest, earliest, none
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.listener.ack-mode=manual_immediate
# RECORD 當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
# BATCH 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后提交
# TIME 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交
# COUNT 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交
# TIME 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交
# COUNT_TIME TIME或COUNT有一個條件滿足時提交
# MANUAL 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后, 手動呼叫Acknowledgment.acknowledge()后提交
# MANUAL_IMMEDIATE 手動呼叫Acknowledgment.acknowledge()后立即提交
生產者
/**
 * <p>發送訊息的控制器
 *
 * @author tianqingzhao
 * @since 2021/10/28 13:50
 */
@RestController
public class ProducerController {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    private static final String TOPIC_NAME = "my-replication-topic";
    
    @RequestMapping("send")
    public long send(String key, String data) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(TOPIC_NAME, 0, key, data);
        
        SendResult<String, String> sendResult = result.get();
        RecordMetadata metadata = sendResult.getRecordMetadata();
        
        String msg = "主題:" + metadata.topic() + ",磁區:" + metadata.partition() + ",偏移量:" + metadata.offset();
        System.out.println(msg);
        
        return System.currentTimeMillis();
    }
    
}
消費者
/**
 * <p>kafka消費者
 *
 * @author tianqingzhao
 * @since 2021/10/28 13:59
 */
@Component
public class KafkaConsumer {
    
    /**
     * 一個消費者可以接收多個topic
     *
     * @param record 資料
     * @param ack    確認機制
     */
    @KafkaListener(groupId = "myGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = "0"),
            @TopicPartition(topic = "my-replication-topic", partitions = {"0", "1"})})
    public void kafkaConsumer(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String key = record.key();
        String value = record.value();
        System.out.println(key + "===" + value);
        
        // 手動提交offset
        ack.acknowledge();
    }
}

Kafka日志存盤

.index索引檔案,kafka每次往磁區發送4kb(默認值,可配置)訊息就會記錄一下當前訊息的offset到index檔案,

如果要查找某一條訊息的offset就會現在這個索引檔案里面找,再去log檔案里面查找訊息,

.log檔案日志資訊,存盤offset和訊息體,

.timeindex是訊息發送的時間索引檔案,每次往磁區發送4kb的大小同時也會往該檔案里面記錄一下當前的時間戳與offset,根據時間

查找offset的就會現在這個檔案里面查找,
在這里插入圖片描述

Partition副本選舉leader機制

controller控制器感知磁區leader的broker節點掛掉了,controller會從ISR串列里面選舉第一個broker節點的id作為leader,

因為第一個broker是最先放進ISR,他的資料可能是最全的,

可通過 unclean.leader.election.enable=true 來配置的話會從Replicas里面選取一臺作為leader,該值默認為false,

副本進入ISR串列的條件:

  • 副本節點不能產生磁區,必須與zk保持連接以及跟leader保持連接

  • 副本能復制leader上的所有寫操作,并且不能落后太多,與leader副本同步滯后的副本,是由

    replica.lag.time.max.ms 配置決定的,超過這個時間都沒有跟leader同步過的一次的副本會被移出ISR串列)

消費者記錄訊息的offset記錄機制

每個consumer會定期將自己消費的offset發送給kafka的broker內部的topic,也就是在 log.dir 里面配置的檔案夾地址

里面有 __consumer_offsets-數值,

key是consumerGroupId+topic+磁區號,value是當前offset的值,kafka會定期清理topic里面的訊息,只保留最后一條記錄,

比如有兩個消費者,其中一個消費者掛掉了,另外一個消費者頂上,就是根據key值得到的offset往后面加1進行消費,

這個主題默認會有50個磁區,可通過 offsets.topic.num.partitions 來進行設定,

分配到哪個磁區的計算公式:

hash(consumerGroupId) % __consumer_offsets主題的磁區數

消費者Rebalance機制

rebalance是指消費者數量、磁區數量發生了變化或者消費者訂閱了更多的topic,kafka會重新分配消費者消費磁區的關系,

比如消費者掛掉了或者動態擴容磁區,

rebalance只針對subsribe這種不指定磁區的有效,如果通過assign指定了磁區,不會進行rebalance,

rebalance程序中消費者無法進行消費,如果節點數過多,避免高峰期rebalance,

Rebalance策略

range

假設有5個磁區,三個消費者,再新增一個磁區或者消費者掛了一個,所有的磁區都會重新分配消費者,相當于初始化,

round-robin

輪循分配,6個磁區,兩個消費者,第一個消費者0、2、4,第二個消費者1、3、5,

sticky

原先的磁區已分配的消費者不會再變動,只會變動掛掉的消費者的磁區或者新增的磁區,

官方默認range策略,

Rebalance程序

1.1、消費者在啟動的時候會先找到GroupCoordinator所在的節點,

1.2、kafka的集群在其中一個節點上面選舉出來一個GroupCoordinator,這里每個消費組都會選出自己的GroupCoordinator,

1.3、將選舉出來的GroupCoordinator發送給broker節點,

2.1、此時可能還有其他消費者發送假如groupCoordinator的請求,

2.2、由GroupCoordinator選舉出來消費者所在消費組里面的找到一個消費者作為leaderCoordinator,

哪個消費者先向GroupCoordinator發送請求誰就是leaderCoordinator,不同的消費組都會有一個leaderCoordinator,

3.1、leaderCoordinator用來指定消費組的磁區方案,

3.2、把制定的磁區方案發送給GroupCoordinator,

4.1.然后GroupCoordinator同步給該消費組里面其他的消費者消費磁區方案,
在這里插入圖片描述

Kafka-Manager管理界面

安裝檔案:https://www.cnblogs.com/dadonggg/p/8205302.html

解壓kafka-manager安裝包,解壓命令: unzip -d /usr/local/kafka-manager-1.3.3.7.zip

進入到解壓目錄里面,更改 conf 檔案夾下面的 application.conf 檔案:

# 更改zookeeper的連接地址
kafka-manager.zkhosts="localhost:2181"

啟動kafka-manager:

./bin/kafka-manager &

默認埠為9000,登上來之后先添加一個集群,
在這里插入圖片描述
這里的集群名稱隨便起,然后配置zookeeper的連接地址,zookeeper多個的話用英文逗號分隔
在這里插入圖片描述
添加完之后回去點擊剛剛我們添加的集群名稱就可以看到我們的topic和borker的個數了,

以及可以點進去每個Topics或者Brokers進行操作,
在這里插入圖片描述

Kafka容量規劃

以系統日志檔案記錄來說,每條記錄1kb,日均10億點擊量的記錄,每個磁區設定3個副本,也就是30億kb的資料大小,

kafka日志記錄默認保留7天,也就是20TB的資料,如果設定3個磁區,每臺服務器的的磁盤空間最小7個TB,

所以針對流量大的網站,一定要給服務器的磁盤空間保留足夠的大小,

因為kafka是順序讀寫的,而SSD固態硬碟只是針對隨機讀寫快,機械硬碟對順序讀寫比較快,所以從成本來說買機械硬碟即可,

kafka底層使用了大量的并發編程,所以機器的CPU核數也要盡量高一點,

Kafka線上問題

1.訊息丟失情況

發送端

acks

  • 0:客戶端只管發,不管服務端是否接收到都會繼續發送下一條訊息,可能會存在丟失訊息的情況,海量日志可以使用,
  • 1:客戶端等待服務端的leader副本寫成功了之后就可以發送下一條訊息,不需要等待follower寫成功,
  • -1/all:客戶端需要等待服務端的所有副本都寫成功之后才能發送下一條訊息,一般適用于金融業務,
消費端

如果消費端配置的自動提交,可能一次性拉取到500條訊息,但是還沒消費完就宕機了,下次就再也poll不到了,

2.訊息重復

發送端

發送端配置了重試機制,如果broker已經接收到了客戶端發送的訊息,但是在向發送端發送ack的時候網路出現了抖動,

導致發送端超時沒有收到收到ack,此時發送端可能會進行重發,

消費端

消費端配置了自動提交機制,第一次拉取到了訊息,但是還沒來得及提交服務就掛了,下一次還會消費到同樣的資料,

此時可以使用冪等性來進行處理,比如redis的SETNX,

3.訊息順序

如果配置了確認機制都寫成功的話就一定能保證訊息的順序消費問題,因為kafka是有序消費的,

但是如果配置了重試機制,kafka不會等待之前那一條訊息發送成功再去發送下一條訊息,比如三條訊息1、2、3,

第一條掛掉了,這時候可能就變成了2、3、1,

還有多個磁區的情況,消費端也無法保證訊息的順序消費,可以使用 CountDownLatch 來處理,但是這種情況性能可能不高,

可繼續優化為后臺開啟一些記憶體佇列,比如根據訊息的某種型別(訂單、商品、用戶)存到不同的記憶體佇列里面,

后臺開啟多個執行緒去進行消費,

4.訊息積壓

發送端發送訊息太快,消費者處理太慢,就可能會導致broker有大量訊息,可以讓某些消費者一直把訊息發給其他更多的磁區,

相當于進行轉發,因為kafka每個磁區只能被一個消費者消費,所以在原來的基礎上增加消費者已經沒有用,

可以在后面更多的磁區里面一一對應消費者,

可能還有有一些訊息導致我們程式上出現bug,針對這種情況可以搞一個死信佇列,

5.延遲佇列

RockerMQ是支持延遲發送的,但是Kafka不支持,比如要做一個訂單30分鐘未支付就支付超時,可以做一個延遲佇列,

每個訊息帶上一個時間,消費的時候跟當前時間進行對比,如果沒有到約定的時間,就重新扔到磁區里面去并把offset給帶上,

后面的訊息也就不用再比了,

6.Kafka事務

kafka只能保證本地事務,不能保證分布式的,類似于mysql的事務,下面的demo是來自官網的,分別是初始化、開啟、提交、回滾,

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 // 初始化
 producer.initTransactions();

 try {
     // 開啟
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     // 提交
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     // 回滾
     producer.abortTransaction();
 }
 producer.close();

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/350859.html

標籤:其他

上一篇:RabbitMq(四)延時佇列,訂單過期,取消支付場景

下一篇:本地運行Hadoop WordCount程式練習

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more