云服務器(Linux)安裝部署Kafka
前期準備
kafka的安裝需要依賴于jdk,需要在服務器上提前安裝好該環境,這里使用用jdk1.8,
下載安裝包
官網地址:
較新的版本已自帶Zookeeper,無需額外下載,這里使用3.2.0做演示,

注意要下載Binary downloads標簽下的tgz包,Source download標簽下的包為原始碼,無法直接運行,需要編譯,
上載安裝包到云服務器
使用ssh連接工具將kafka_2.12-3.2.0.tgz這個包上傳到云服務器上的一個目錄,

打開命令列,進入到放有壓縮包的目錄,執行
tar -zxvf kafka_2.12-3.2.0.tgz
配置kafka
然后使用cd命令進入到/kafka_2.12-3.2.0/config/下,使用
vi server.properties
編輯組態檔,

洗掉listeners和advertised前方的#號,改成如下配置:
listeners=PLAINTEXT://云服務器內網ip:9092(本地訪問用本地ip)
# 如果要提供外網訪問則必須配置此項
advertised.listeners=PLAINTEXT://云服務器公網ip:9092(若要遠程訪問需配置此項為云服務器的公網ip)
# zookeeper連接地址,集群配置格式為ip:port,ip:port,ip:port
zookeeper.connect=云服務器公網ip:2181
開放云服務器埠
在云服務器控制臺內進入安全組頁面,添加兩條新的入站規則,tcp/9092和tcp/2181
開放linux防火墻埠
先查看使用的防火墻型別iptables/firewalld
iptables操作命令
1.打開/關閉/重啟防火墻
開啟防火墻(重啟后永久生效):chkconfig iptables on
關閉防火墻(重啟后永久生效):chkconfig iptables off
開啟防火墻(即時生效,重啟后失效):service iptables start
關閉防火墻(即時生效,重啟后失效):service iptables stop
重啟防火墻:service iptables restartd
2.查看打開的埠
/etc/init.d/iptables status
3.開啟埠
iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.保存并重啟防火墻
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart
Centos7默認安裝了firewalld,如果沒有安裝的話,可以使用 yum install firewalld firewalld-config進行安裝,
操作指令如下:
1.啟動防火墻
systemctl start firewalld
2.禁用防火墻
systemctl stop firewalld
3.設定開機啟動
systemctl enable firewalld
4.停止并禁用開機啟動
sytemctl disable firewalld
5.重啟防火墻
firewall-cmd --reload
6.查看狀態
systemctl status firewalld或者 firewall-cmd --state
7.在指定區域打開埠(記得重啟防火墻)
firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)
打開tcp/9092和tcp/2181這兩個埠后,重啟防火墻,并查看開放的埠確實生效,
啟動kafka服務
cd命令進入kafka_2.12-3.2.0目錄下,執行
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動zookeeper,不加-daemon方便排除啟動錯誤,新建一個shell視窗,進入該目錄再執行
bin/kafka-server-start.sh config/server.properties
啟動kafka,若列印日志未報錯,若未出現error日志,說明啟動成功,
測驗單機連通性
查詢kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因為kafka使用zookeeper作為配置中心,一些topic資訊需要查詢該kafka對應的zookeeper
創建topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
開啟生產者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
開啟消費者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test
Springboot連接kafak
在pom.xml檔案中引入kafka依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
在application.yml組態檔中配置kafka
server:
port: 8080
spring:
kafka:
bootstrap-servers: 云服務器外網ip地址:9092
producer: # 生產者
retries: 3 # 設定大于0的值,則客戶端會將發送失敗的記錄重新發送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定訊息key和訊息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
# RECORD
# 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后提交
# BATCH
# 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交
# TIME
# 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交
# COUNT
# TIME | COUNT 有一個條件滿足時提交
# COUNT_TIME
# 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后, 手動呼叫Acknowledgment.acknowledge()后提交
# MANUAL
# 手動呼叫Acknowledgment.acknowledge()后立即提交,一般使用這種
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
生產者
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "test-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public String send(@RequestParam("msg") String msg) {
kafkaTemplate.send(TOPIC_NAME, "key", msg);
return String.format("訊息 %s 發送成功!", msg);
}
}
消費者
@Component
public class DemoConsumer {
/**
* @param record record
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同組下的消費者個數,就是并發消費數,必須小于等于磁區總數
*/
@KafkaListener(topics = "test-topic", groupId = "testGroup1")
public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = https://www.cnblogs.com/ndchao/archive/2022/11/14/record.value();
System.out.println("testGroup1 message: " + value);
System.out.println("testGroup1 record: " + record);
//手動提交offset,一般是提交一個banch,冪等性防止重復訊息
// === 每條消費完確認性能不好!
ack.acknowledge();
}
//配置多個消費組
@KafkaListener(topics = "test--topic", groupId = "testGroup2")
public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = https://www.cnblogs.com/ndchao/archive/2022/11/14/record.value();
System.out.println("testGroup2 message: " + value);
System.out.println("testGroup2 record: " + record);
//手動提交offset
ack.acknowledge();
}
}
使用swagger測驗發送訊息

控制臺列印訊息

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/533509.html
標籤:其他
上一篇:Tomcat 執行緒池學習總結
