目錄章節:
1.pom.xml匯入kafka依賴包;
2.kafka普通生產者實作方式;
3.kafka帶回呼函式的生產者;
4.生產者自定義磁區;
4.1使用自定義磁區
1.pom.xml匯入kafka依賴包:
<!--kafka依賴-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
PS:kafkaProducer發送資料流程及ACK、重復消費與資料丟失問題:
1.Kafka 的 Producer 發送訊息采用的是 異步發送的方式,在訊息發送的程序中,涉及到了兩個執行緒 ——main 執行緒和Sender執行緒,以及 一個執行緒共享變數 ——RecordAccumulator,main 執行緒將訊息發送給 RecordAccumulator,Sender 執行緒不斷從 RecordAccumulator 中拉取 訊息發送到 Kafka broker, 2.異步和ack并不沖突,生產者一直發送資料,不等應答,如果某條資料遲遲沒有應答,生產者會再發一次;
3.acks: -1 代表所有處于isr串列中的follower partition都會同步寫入訊息成功 0 代表訊息只要發送出去就行,其他不管 1 代表發送訊息到leader partition寫入成功就可以;
4.重復消費與資料丟失:
說明: 已經消費的資料對于kafka來說,會將消費組里面的o?set值進行修改,那什么時候進行修改了?是在資料消費 完成之后,比如在控制臺列印完后自動提交;
提交程序:是通過kafka將o?set進行移動到下個message所處的o?set的位置,拿到資料后,存盤到hbase中或者mysql中,如果hbase或者mysql在這個時候連接不上,就會拋出例外,如果在處理資料的時候已經進行了提交,
那么kafka上的o?set值已經進行了修改了,但是hbase或者mysql中沒有資料,這個時候就會出現資料丟失,什么時候提交o?set值?在Consumer將資料處理完成之后,再來進行o?set的修改提交,默認情況下o?set是 自動提交,
需要修改為手動提交o?set值,如果在處理代碼中正常處理了,但是在提交o?set請求的時候,沒有連接到kafka或者出現了故障,那么該次修 改o?set的請求是失敗的,那么下次在進行讀取同一個磁區中的資料時,會從已經處理掉的o?set值再進行處理一 次,
那么在hbase中或者mysql中就會產生兩條一樣的資料,也就是資料重復,
PS:資料來源:
/**
* 獲取資料庫資料
* @param
* @return
* @throws SQLException
*/
public static List<KafKaMyImage> getKafKaMyImages() throws SQLException {
List<KafKaMyImage> kafKaMyImages=new ArrayList<>();
KafKaMyImage kafKaMyImage=null;
String sql="select id,loginip,updatetime,username,loginaddr from adminlogin";
Connection conection = SingleJavaJDBC.getConection();
PreparedStatement preparedStatement = conection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
kafKaMyImage=new KafKaMyImage(Integer.parseInt(resultSet.getString("id")),
resultSet.getString("loginip"),
resultSet.getString("updatetime"),
resultSet.getString("username"),
resultSet.getString("loginaddr"));
kafKaMyImages.add(kafKaMyImage);
}
// SingleJavaJDBC.close(resultSet,preparedStatement,conection);
return kafKaMyImages;
}
}
2.kafka普通生產者實作方式:
public void producerOne() {
2 Properties props = new Properties();
3 // Kafka服務端的主機名和埠號
4 props.put("bootstrap.servers", "hadoop01:9092");
5 // 所有副本都必須應答后再發送
6 props.put("acks", "all");
7 // 發送失敗后,再重復發送的次數
8 props.put("retries", 0);
9 // 一批訊息處理大小
10 props.put("batch.size", 16384);
11 // 請求時間間隔
12 props.put("linger.ms", 1);
13 // 發送快取區記憶體大小
14 props.put("buffer.memory", 33554432);
15 // key序列化
16 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
17 // value序列化
18 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
19 //2.定義kafka生產者
20 Producer<String, String> producer = new KafkaProducer<>(props);
21 //3.發送訊息
22 for (int i = 0; i < 5; i++) {
23 //top,指定磁區,資料
24 //("second",0,key,"");指定磁區
25 //("second",key,"");指定key,根據key磁區
26 //("second","");不指定,隨機磁區,輪詢
27 producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
28 }
29 producer.close();
30 }
3.kafka帶回呼函式的生產者:
/**
* 創建生產者帶回呼函式02
* @throws SQLException
*/
public static void producerThree() throws SQLException{
//step1 配置引數,這些跟優化kafka性能有關系
Properties props=new Properties();
// props.put("partitioner.class","com.comment.kafka.demo.producer.MyPartitioner");
//1 連接broker
props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
//2 key和value序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3 acks
// -1 代表所有處于isr串列中的follower partition都會同步寫入訊息成功
// 0 代表訊息只要發送出去就行,其他不管
// 1 代表發送訊息到leader partition寫入成功就可以
props.put("acks","-1");
//4 重試次數
props.put("retries",3);//大部分問題,設定這個就可以解決,生產環境可以設定多些 5-10次
// 5 隔多久重試一次
props.put("retry.backoff.ms",2000);
//6 如果要提升kafka的吞吐量,可以指定壓縮型別,如lz4
props.put("compression.type","none");
//7 緩沖區大小,默認是32M
props.put("buffer.size",33554432);
//8 一個批次batch的大小,默認是16k,需要根據一條訊息的大小去調整
props.put("batch.size",323840);//設定為32k
//9 如果一個batch沒滿,達到如下的時間也會發送出去
props.put("linger.ms",200);
//10 一條訊息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯
props.put("max.request.size",1048576);
//11 一條訊息發送出去后,多久還沒收到回應,就認為是超時
props.put("request.timeout.ms",5000);
//step2 創建生產者物件
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
//step3 使用訊息的封裝形式,注意value一般是json格式
List<KafKaMyImage> kafKaMyImages = getKafKaMyImages();
for (int i = 0; i < kafKaMyImages.size(); i++) {
//step4 呼叫生產者物件的send方法發送訊息,有異步和同步兩種選擇
//1 異步發送,一般使用異步,發送后會執行一個回呼函式
//top,指定磁區,資料
KafKaMyImage kafKaMyImage = kafKaMyImages.get(i);
JSONObject jsonObject = JSONObject.fromObject(kafKaMyImage);
producer.send(new ProducerRecord<String, String>("topicC","0",jsonObject.toString()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//判斷是否有例外
if(exception==null){
System.out.println("訊息發送到磁區"+metadata.partition()+"成功");
}else{
System.out.println("訊息發送失敗");
// TODO 可以寫入到redis,或mysql
}
}
});
}
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//2 同步發送,需要等待一條訊息發送完成,才能發送下一條訊息
//RecordMetadata recordMetadata = https://www.cnblogs.com/zhuzhu-you/p/producer.send(record).get();
//System.out.println("發送到的磁區是:"+recordMetadata.partition());
//step5 關閉連接
producer.close();
}
4.生產者自定義磁區:
Kafka自定義磁區需要實作Partitioner類,這里實作的是根據某個欄位的值把資料寫入相應磁區
package com.comment.kafka.demo.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
/**
* @className: MyPartitioner
* @description: TODO 類描述
* @author: 東林
* @date: 2022/2/26
**/
public class MyPartitioner implements Partitioner {
/**
* 主要重寫這個方法,假設有topic country三個磁區,producer將key為china、usa和korea的訊息分開存盤到不同的磁區,否則都放到0號磁區
* @param topic 要使用自定義磁區的topic
* @param key 訊息key
* @param keyBytes 訊息key序列化位元組陣列
* @param value 訊息value
* @param valueBytes 訊息value序列化位元組陣列
* @param cluster 集群元資訊
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partitions=0;
String keyStr=(String) key;
//獲取磁區資訊
List<PartitionInfo> partitionInfoList=cluster.availablePartitionsForTopic(topic);
//獲取當前topic的磁區數
int partitionInfoListSize=partitionInfoList.size();
//判斷是否有三個磁區
if(partitionInfoListSize==3){
switch (Integer.parseInt(keyStr)){
case 1:
partitions=0;
break;
case 0:
partitions=1;
break;
default:
partitions=2;
break;
}
}
//回傳磁區序號
return partitions;
}
@Override
public void close() {}
/**
* 檔案加載時
* @param map
*/
@Override
public void configure(Map<String, ?> map) {}
}
4.1使用自定義磁區
public static void producerPartition() throws SQLException {
//step1 配置引數,這些跟優化kafka性能有關系
Properties props=new Properties();
//1 連接broker
props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
//2 key和value序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3 acks
// -1 代表所有處于isr串列中的follower partition都會同步寫入訊息成功
// 0 代表訊息只要發送出去就行,其他不管
// 1 代表發送訊息到leader partition寫入成功就可以
props.put("acks","-1");
//4 重試次數
props.put("retries",3);//大部分問題,設定這個就可以解決,生產環境可以設定多些 5-10次
// 5 隔多久重試一次
props.put("retry.backoff.ms",2000);
//6 如果要提升kafka的吞吐量,可以指定壓縮型別,如lz4
props.put("compression.type","none");
//7 緩沖區大小,默認是32M
props.put("buffer.size",33554432);
//8 一個批次batch的大小,默認是16k,需要根據一條訊息的大小去調整
props.put("batch.size",323840);//設定為32k
//9 如果一個batch沒滿,達到如下的時間也會發送出去
props.put("linger.ms",200);
//10 一條訊息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯
props.put("max.request.size",1048576);
//11 一條訊息發送出去后,多久還沒收到回應,就認為是超時
props.put("request.timeout.ms",5000);
//12 使用自定義磁區器
props.put("partitioner.class","com.comment.kafka.demo.producer.MyPartitioner");
//step2 創建生產者物件
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
//step3 使用訊息的封裝形式,注意value一般是json格式
List<KafKaMyImage> kafKaMyImages = getKafKaMyImages();
for (int i = 0; i < kafKaMyImages.size(); i++) {
//step4 呼叫生產者物件的send方法發送訊息,有異步和同步兩種選擇
//1 異步發送,一般使用異步,發送后會執行一個回呼函式
//top,指定磁區,資料
KafKaMyImage kafKaMyImage = kafKaMyImages.get(i);
JSONObject jsonObject = JSONObject.fromObject(kafKaMyImage);
producer.send(new ProducerRecord<String, String>("topicD",kafKaMyImages.get(i).getIsdel(),jsonObject.toString()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//判斷是否有例外
if(exception==null){
System.out.println("訊息發送到磁區"+metadata.partition()+"成功");
}else{
System.out.println("訊息發送失敗");
// TODO 可以寫入到redis,或mysql
}
}
});
}
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//2 同步發送,需要等待一條訊息發送完成,才能發送下一條訊息
//RecordMetadata recordMetadata = https://www.cnblogs.com/zhuzhu-you/p/producer.send(record).get();
//System.out.println("發送到的磁區是:"+recordMetadata.partition());
producer.flush();
//step5 關閉連接
producer.close();
}
本文來自博客園,作者:zhuzhu&you,轉載請注明原文鏈接:https://www.cnblogs.com/zhuzhu-you/p/15948155.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/435325.html
標籤:Java
下一篇:阿里云視頻點播
