使用java代碼進行遠程k
文章目錄
- 使用java代碼進行遠程k
- afka集群的生產者寫訊息(Produce)操作
- 1、修改kafka配置
- 2、搭建maven并撰寫java代碼
- (1)在pom檔案中映入kafka相關依賴
- (2)撰寫java代碼
- (3)運行代碼
- (4)遠程服務端驗證是否produce成功,
afka集群的生產者寫訊息(Produce)操作
本文面向的讀者是:通過xsehll等工具在遠程服務器上操作kafka集群正確,此時想使用java代碼操作遠程kafka集群的讀者,因為其中還是有不少坑,希望可以幫到跟我有相同的困擾的同學,
本人使用的kafka版本為2.8.0,使用版本在kafka2.x的本文均有效,
1、修改kafka配置
組態檔在 你的kafka解壓目錄下的config/server.properties中
broker.id=1
listeners=PLAINTEXT://ranYanQiang:9092
advertised.listeners=PLAINTEXT://公網ip:9092 #這里為了防止哪位同學來我的服務器上搗亂,就隱藏我的公網ip
log.dirs=/usr/local/kafka-cluster/kafka-1/log
zookeeper.connect=ranYanQiang:2181,ranYanQiang:2182,17ranYanQiang:2183 #我這里搭建的是偽集群,但是真集群與偽集群差別不大,也就是真集群以ip區分服務,偽集群以埠區分服務,
其中除了advertised.listeners之外,都是kafka集群的基本配置,不必多說,這里想強調的是listeners和advertised.listeners,容易看出,前者和后者均配置了ip加埠號,其中前者為內網ip后者為公網ip,kafka集群通過內網ip進行通信,外部程式可以通過公網ip訪問遠程服務器,
遠程linux服務器的的公網ip與內網ip的查看
公網ip:使用ssh連接遠程服務器的ip,如果是在你的xshell工具中應該在左下角
內網ip:打開你的
etc/hosts組態檔,可以看到兩個ip,一個是localhost,另一個即你的主機名且為內網ip,通常我們不使用127.0.0.0這種形式的ip名,而是使用組態檔的別名,(可以在云服務臺進行主機名的更改)
etc/hosts組態檔:[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-tWZ8jvdv-1629374363266)(C:\Users\1\AppData\Roaming\Typora\typora-user-images\image-20210819192430639.png)]
上述配置需要配置你集群中的所有kafka組態檔,并重啟kafka集群,通過jps命令發現我們的三臺kafka已經重啟成功

配置的坑:
如果將如下配置寫成了公網ip,將會導致你的啟動kafka后會幾秒鐘之內kafka服務掛掉(廢了我不少時間),
listeners=PLAINTEXT://ranYanQiang:9092
2、搭建maven并撰寫java代碼
相信大家都是會搭maven專案的,我們這里只搭建一個最簡單的maven專案即可,即搭建時不需要引入任何依賴,
(1)在pom檔案中映入kafka相關依賴
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- 我的版本為2.13-2.80,前面為scale版本,后面為kafka版本,-->
<version>2.8.0</version>
</dependency>
<!-- 想要獲取kafka的例外資訊需要引入slf4j包-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
(2)撰寫java代碼
在java檔案夾下新建一個包,在其中撰寫kafka消費者的java測驗代碼
該代碼來自B站的尚硅谷Kafka教程(kafka框架快速入門),我的kafka就是在這里學習的,
該代碼的功能:
連接遠程服務器上的kafka集群,并向second主題中寫入second-0 到second-9這失調資料
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class CustomProducer {
public static void main(String[] args) {
Properties props = new Properties();
// kafka 集群, broker-list
props.put("bootstrap.servers", "119.23.53.2:9092");
//可用ProducerConfig.ACKS_CONFIG 代替 "acks"
//props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put("acks", "all");
// 重試次數
props.put("retries", 1);
// 批次大小
props.put("batch.size", 16384);
// 等待時間
props.put("linger.ms", 1);
// RecordAccumulator 緩沖區大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("second", "second-" + Integer.toString(i),
"second-" + Integer.toString(i)));
}
producer.close();
}
}
(3)運行代碼
程式能在短時間結束即為生產者生產訊息成功,
(4)遠程服務端驗證是否produce成功,
進入某一kafka的檔案夾中,運行以下消費者命令查看second主題的訊息,
bin/kafka-console-consumer.sh --topic second --bootstrap-server ranYanQiang:9093 --from-beginning
結果如下:

我們發現我們回圈寫入的 second-0 到 second-9 寫入成功!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295119.html
標籤:其他
