文章目錄
- 生產者的基本實作
- 生產者同步發送訊息
- 生產者異步發送訊息
- 生產者中ack的配置
- 關于訊息發送的緩沖區
- kafka知識點目錄
生產者的基本實作
- 引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
- 具體實作
package com.qf.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.設定引數
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
//把發送的key從字串序列化為位元組陣列
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//把發送訊息value從字串序列化為位元組陣列
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//2.創建?產訊息的客戶端,傳?引數
Producer<String,String> producer = new KafkaProducer<String,
String>(props);
//3.創建訊息
//key:作?是決定了往哪個磁區上發,value:具體要發送的訊息內容
ProducerRecord<String,String> producerRecord = new ProducerRecord<>
(TOPIC_NAME,"mykeyvalue","hellokafka");
//4.發送訊息,得到訊息發送的元資料并輸出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式發送訊息結果:" + "topic-" +
metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
生產者同步發送訊息

如果?產者發送訊息沒有收到ack,?產者會阻塞,阻塞到3s的時間,如果還沒有收到訊息,會進?重試,重試的次數3次,
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步?式發送訊息結果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
生產者異步發送訊息

異步發送,生產者發送完訊息后就可以執行之后的業務,broker在收到訊息后異步呼叫生產者提供的callback回呼方法,
//5.異步發送訊息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("發送訊息失敗:" +
exception.getStackTrace());
}
if (metadata != null) {
System.out.println("異步?式發送訊息結果:" + "topic-" +
metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
生產者中ack的配置
在同步發送的前提下,?產者在獲得集群回傳的ack之前會?直阻塞,那么集群什么時候回傳
ack呢?此時ack有3個配置:
-
ack = 0 kafka-cluster不需要任何的broker收到訊息,就立即回傳ack給生產者,最容易丟訊息,但效率是最?的,
-
ack=1(默認): 多副本之間的leader已經收到訊息,并把訊息寫?到本地的log中,才會回傳ack給生產者,性能和安全性是最均衡的,
-
ack=-1/all,??有默認的配置min.insync.replicas=2(默認為1,推薦配置大于等于2), 此時就需要leader和?個follower同步完后,才會回傳ack給生產者(此時集群中有2個broker已完成資料的接收),這種方式最安全,但性能最差,

下面是關于ack和重試(如果沒有收到ack,就開啟重試)的配置
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
發送失敗會重試,默認重試間隔100ms,重試能保證訊息發送的可靠性,但是也可能造
成訊息重復發送,?如?絡抖動,所以需要在
接收者那邊做好訊息接收的冪等性處理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重試間隔設定
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
關于訊息發送的緩沖區

- kafka默認會創建?個訊息緩沖區,用來存放要發送的訊息,緩沖區是32m,
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- kafka本地執行緒會去緩沖區中?次拉16k的資料,發送到broker,
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果執行緒拉不到16k的資料,間隔10ms也會將已拉到的資料發到broker,
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
kafka知識點目錄
1.Linux環境部署kafka
2.Win10環境部署kafka
3.docker部署kafka
4.kafka的簡單使用
5.kafka訊息的細節
6.kafka主題和磁區的概念
7.kafka集群操作
8.kafka生產者實作細節
9.kafka消費者實作細節
10.kafka集群中的controller、rebalance、HW
11.kafka中的優化問題
12.Kafka-eagle監控平臺
13.kafka錯誤匯總
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/344925.html
標籤:其他
