主頁 >  其他 > Kafka 2.8.0 JAVA API基本使用

Kafka 2.8.0 JAVA API基本使用

2021-08-16 10:11:10 其他

以下測驗皆在windows下進行,請根據自己情況酌情配置kafka zookeeper等環境

本人使用的是jdk11,代碼中可能存在jdk9的新特性,使用jdk9以前的jdk的朋友請自行轉換

kafka環境變數等暫時略過

1.java匯入依賴

        <!--匯入kafka依賴-->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.8.0</version>
        </dependency>

kafka Producer

匯入相關依賴后,創建測驗類ProducerDemo;

  • 創建生產者物件

    使用KafkaProducer 創建kafka生產者物件,這時可以發現kafka不允許我們使用空構造來創建物件;

    那么我們就選用傳入properties的方式創建kafka生產者

    創建生產者的時候,跟控制臺命令一樣,我們需要指定集群名稱以及序列化器,而這些相關設定都會存盤在我們的組態檔中;

    kafka給我們提供了ProducerConfig類,并在其中已經給我們提前準備好了我們所需要的key,在向properties中put鍵值時,可以直接使用producerConfig的靜態常量作為key;并傳入相應value

  • 向kafka中發送資訊

    使用kafkaProducer向kafka中發送資訊,可以使用其提供的send()方法 ;使用時可以看到其需要傳入ProducerRecord以及一個可選的Callback

    ProducerRecord: 即為每條資料所封裝成的物件

    CallBack:可選;獲取函式的回呼

  • close()

    在真實生產環境中,我們可能不需要手動呼叫close方法關閉kafkaProducer,但是目前的測驗階段,如果不使用close關閉,可能會導致發送的資訊在設定等待的時間內,不會被真正的發送;

    流在關閉的時候會對資料進行回收操作

/**
 * 描述:kafkaProducer生產者
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/10 23:14    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ProducerPartitionerDemo01 {
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        //自行修改為對應的集群地址 kafka默認為9092,此處我沒有更改
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");   
        //需要傳入序列化器的全類名,kafka需要通過反射全類名去獲取序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
?
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < 10; i++) {
            //使用callBack收集回呼資訊,使用了lamdba運算式
            kafkaProducer.send(new ProducerRecord("此處使用自己存在的主題","value"),((metadata, exception) ->{
                if(exception==null){
                    System.out.println("沒有錯誤,資料添加成功");
                }
            } ));
        }
        //關閉
        kafkaProducer.close();
    }
}

自定義磁區器

如果想要自己根據業務需求撰寫自定義的磁區規則,可以自定義磁區器;

說到自定義,就勢必需要去實作某個介面或者繼承某個類

這里, 我們需要實作的是kafka給我們的Partitioner介面,實作后重寫方法

/**
 * 描述: 自定義磁區器
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/10 22:24    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class MyPartitioner implements Partitioner {
?
    /**
     * 撰寫磁區規則
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //根據業務需求撰寫磁區規則
        return 0;
    }
?
    @Override
    public void close() {
?
    }
?
    /**
     * 讀取配置資訊
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {
?
    }
}

在撰寫規則時可以參考Kafka對Partitioner的默認實作 DefaultPartitioner

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        //如果key也不存在,則會對可用磁區進行輪詢
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        //如果沒有指定磁區,且存在key值,則會根據key的hash進行取模來選擇磁區
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

實作同步發送

正常情況下kafka生產者發送資訊采用的是異步發送的方式,主執行緒將資訊發送給共享變數 RecordAccumulator ,Sender執行緒不同的從共享變數中拉取資料發送到broker上;

實作邏輯

在兩個執行緒其中一個執行的時候去阻塞另一個執行緒,實作串行

我們可以發現kafkaProducer的send()方法是存在回傳值 Future 的;

而我們知道,當future物件呼叫get()方法時,不僅會獲得當前執行緒回呼的物件,還會阻塞當前執行緒

我們便使用這個方法來實作同步發送

同步發送的使用場景相對較少,我們可以使用同步發送來確保區內有序,即當上一條資訊發送后,未接收到ack之前,阻塞發送執行緒,不繼續發送,從而實作有序


消費者API

撰寫消費者api的邏輯與生產者十分的相像,使用kafka提供的 KafkaConsumer 來創建消費者物件

并在組態檔中傳遞對應資訊,可以使用ConsumerConfig中的靜態屬性充當key值

/**
 * 描述:kafka消費者
 *
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/11 0:21    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ConsumerTest {
?
    public static void main(String[] args) {
        Properties properties = new Properties();
        //連接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //生產者需要指定序列化器,那么消費者就需要指定對應的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
?
        //自動提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //自動提交的延遲,提交的是消費者的offset
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
?
        //消費者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroup01");
?
        //創建消費者
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        //訂閱主題
        //此處可以添加多個集群
        //可以看到這里沒有回傳值,也就是說,這里只是單純的指定了主題,如果想獲取主題中的資訊,需要使用別的方法
        kafkaConsumer.subscribe(List.of("你的主題"));
?
?
        while (true){
            //獲取的型別與Producer類似,不過為ConsumerRecords類,想要得到單個資料,需要遍歷輸出
            //新版本建議使用傳入Duration的方式,直接傳入毫秒數的方式以過時
            ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ZERO);
            consumerRecords.forEach(stringStringConsumerRecord -> {
                //可以看到,使用consumerRecord去呼叫方法的時候,可以獲取到Key,所以key并不只是用來劃分磁區之用,如果沒有指定key,會輸出null
                System.out.println(stringStringConsumerRecord.key()+":"+stringStringConsumerRecord.value()+"    :"+stringStringConsumerRecord.offset());
            });
        }
        //consumer進行訂閱拉去資訊的時候不需要手動關閉,因為順序執行完畢后,jvm會關閉;所以可以使用一個while回圈來持續消費
    }
}

啟動消費者,會發現我們可以連接到對應的主題,但是不會獲取到先前已經存在的資訊;

在我們使用控制臺呼叫消費者時,如果我們想獲取該磁區已經存在的資訊,我們可以使用 --from-beginning指令將offset放到最前端從頭獲取;

java api中也是一樣;

kafka中 命令列能做的事情,在組態檔中應該都有相關的配置

我們進入ConsumerConfig,可以查看到其已經給我們提供了 AUTO_OFFSET_RESET_CONFIG這個屬性;

根據下方的doc描述,該屬性默認值為lastest,這也是我們為什么在不設定的時候會無法獲取已存在資訊的原因,我們可以手動在組態檔中傳入

earierlast

此處注意這個指定的生效條件:

  • 只有當當前消費者/消費者組第一次消費(即還沒有offset時),或當前的offset在這個server中不存在時,指令才會生效

    這里解釋一下為什么會不存在,kafka的資料默認時7天清空一次,如果我們拿著已經清空的資料的offset去尋找資料,就會出現offset在server中不存在的現象,此時AUTO_OFFSET_RESET_CONFIG就會生效

關于offset的手動提交

我們為什么需要手動提交? 自動提交無法保證準確的提交時機

  • 如果設定的提交延時過短,會丟是資料

  • 如果設定的延時過長,會導致資料重復

1.在組態檔中關閉自動提交

既然我們需要手動提交,則必然需要在組態檔中將自動提交置為false

ENABLE_AUTO_COMMIT_CONFIG,<----將它改成false

  1. 在消費結束后進行手動提交

    使用consumer的 commitSync() 同步提交,或commitAsync() 異步提交

    • commitSyn:

      相比于異步提交,因為其提交offset時自帶失敗重試的機制,相對更加可靠

    • commitAsync:

      同步提交相對可靠,但是會阻塞當先執行緒,影響吞吐量;

      在大多數情況下,我們會選用異步提交的方式

自定義存盤offset

手動提交雖然可以解決丟是資料的問題,但是仍然會存在資料重復的現象;

kafka也早已考慮到這種情況,所以允許我們自定義存盤offset的規則,(比如我們可以和MySQL的寫入操作進行事務系結...)

但是相對于自定義磁區器,自定義存盤offset要相對麻煩一些;在0.9版本之后,kafka會將offset暫存在kafka內置的一個主題中,想要去維護一個offset,就需要考慮到消費者的Rebalance問題

即,如果當前消費者所消費的磁區掛掉了,消費者需要轉移到另一磁區去消費,此時的offset需要定位到這個磁區最近提交的offset

為此,我們需要實作kafka提供的ConsumerRebalanceListener

/**
 * 描述: 自定義存盤Offset
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/11 23:26    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ConsumerConfigOffset {
    //創建一個Map在暫存當前offset
    private static Map<TopicPartition,Long> currentOffset = new ConcurrentHashMap<>();
    
    public static void main(String[] args) {
        //組態檔較為冗長,我寫了個工具類進行配置,相關配置內容已經提到過,就不再贅述
        PropertiesUtils propertiesUtils = new PropertiesUtils();
        Properties properties = propertiesUtils.ConsumerProperties("localhost:9092", "bambiOffset", "false", "100", 1);
?
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
?
        //在此處創建ConsumerRebalanceListener類
        kafkaConsumer.subscribe(List.of("solo1"), new ConsumerRebalanceListener() {
            //在Rebalance之前呼叫
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }
?
            //在Rebalance之后呼叫
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                partitions.forEach(partition -> {
                    //定位到磁區中最近的offset,繼續消費
                    kafkaConsumer.seek(partition,getOffset(partition));
                });
            }
        });
?
        while (true){
            ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
            poll.forEach(consumerRecord ->{
                System.out.printf("offset =  %d %n",consumerRecord.offset());
                System.out.printf("key = %s %n",consumerRecord.key());
                System.out.printf("value = %s %n",consumerRecord.value());
?
                //將下標快取到offset中
                currentOffset.put(new TopicPartition(consumerRecord.topic(),consumerRecord.partition()),consumerRecord.offset());
            });
            commitOffset(currentOffset);
        }
    }
?
    /**
     * 提交當前offset
     * @param currentOffset
     */
    private static void commitOffset(Map<TopicPartition , Long> currentOffset){
        //處理異步提交的業務邏輯
    }
?
    //獲取當前磁區的offset
    private static long getOffset(TopicPartition partition){
        return 0;
    }
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293899.html

標籤:其他

上一篇:Flink 內核原理與實作-入門

下一篇:Elasticsearch 分詞器的使用與IK分詞器安裝

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more