—— 目錄 ——
- 0. 前言
- 1. 安裝、啟動與關閉
- ① 下載與配置
- ② 啟動與關閉
- 2. 基礎使用
- ① 主題(Topic)的增刪查
- ② 模擬發布訂閱
- 3. Java API 使用(SpringBoot)
0. 前言
本博客使用的版本是 0.11.0.3
各版本的區別可以自行了解,大致可選 0.11.x 1.x 2.x 三種版本
① 使用的集群是 Docker 搭建的,可參見:
【Docker x Hadoop】使用 Docker 搭建 Hadoop 集群(從零開始保姆級)
② 由于 Docker 搭建的集群有局限性,需要頻繁為 Docker 容器動態添加埠映射(下邊也會用到),可參見:
【Docker之軌跡】為正在運行中的容器動態添加埠映射(使用 iptables,附洗掉 iptables 規則)
③ 集群之間的分發腳本 xsync 參照尚硅谷,可參見:
【Linux之軌跡】Linux 各種實用小功能合集(持續補充)
④ 該版本的 kafka 需要用到 zookeeper,簡單的搭建流程可參見:
【Zookeeper之軌跡】Zookeeper 入門使用(集群使用 Docker 模擬)
⑤ 由于 Kafka 需要占用較多記憶體,必要的話需要使用 swap 交換磁區(相當于虛擬記憶體),可參見:
【Linux之軌跡】1核2G 記憶體不夠怎么辦?Swap 交換磁區解決記憶體不足問題
1. 安裝、啟動與關閉
① 下載與配置
下載地址:http://kafka.apache.org/downloads

上傳壓縮包到服務器集群任意一臺,解壓,然后使用 xsync 分發到集群中的各個容器,再
tar -zxvf kafka_2.11-0.11.0.3.tgz
xsync kafka_2.11-0.11.0.3
rm -rf kafka_2.11-0.11.0.3.tgz
# 這一步改名可選
mv kafka_2.11-0.11.0.3 kafka_0.11.0.3
然后進行配置
1) 在 kafka_0.11.0.3 目錄下新建檔案夾 data 作為臨時存放資料的檔案夾
mkdir data
2) 然后進入 config 目錄修改 server.properties 檔案
vim server.properties
下面是對組態檔的修改
1) 首先修改 broker.id,每臺服務器需要唯一,下邊是我的配置
`hadoop001:broker.id=1
hadoop002:broker.id=2
hadoop003:broker.id=3`
2) 接著設定主題允許被洗掉(將注釋打開)
`delete.topic.enable=true`
3) 修改資料暫存的目錄,默認存放在 tmp 中會被定時洗掉,需修改為我們剛剛新建的 data 檔案夾路徑
名字看著像日志檔案輸出的地方,但實際存放的是真實資料的
`log.dirs=/xxx/kafka_0.11.0.3/data`
4) 修改 zookeeper 連接資訊,以下是我的配置供參考(用逗號隔開)
`zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181`
5) 然后將組態檔分發到各臺服務器上(data 檔案夾也順帶分發了)
`xsync data/ config/server.properties`
6) 最后修改每一臺服務器的 broker.id,就完事了
② 啟動與關閉
首先確保三臺服務器的 zookeeper 都已經啟動,然后執行:
bin/kafka-server-start.sh config/server.properties
| 》》》 出現問題 Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; eory' (errno=12) 原因是記憶體不足,查了我的服務器,發現 2G 記憶體剩下不到 1G 然后在 kafka-server-start.sh 中發現了這個東西 ![]() 引數意思為: -Xmx Java Heap 最大值,默認值為物理記憶體的 1/4 -Xms java Heap 初始值,Server 端 JVM 最好將 -Xms 和 -Xmx 設為相同值 里邊要求 Kafka 啟動是必須有 1G 空閑的記憶體 很明顯我沒有,況且 2G 記憶體要啟動 3 臺顯然也不合理 所以這里我們把它改小一點,就 256M 應該可以,修改: 再次啟動,就可以了(后邊填坑:分發完畢后,記得將 broker.id 修改呀) 此外,如果記憶體實在不足,可以考慮使用 swap 交換磁區作為虛擬記憶體使用 《《《 問題解決 |
最后上邊的程式啟動后,都會阻塞視窗,可以加 -daemon 使其以守護行程運行
bin/kafka-server-start.sh -daemon config/server.properties
關閉同樣得加上組態檔,如下:
bin/kafka-server-stop.sh config/server.properties
2. 基礎使用
① 主題(Topic)的增刪查
1) 創建主題
這里指定了 partition 磁區數,replication-factor 副本數,下邊會詳細說明
bin/kafka-topics.sh --zookeeper hadoop001:2181 --create --topic first --partitions 2 --replication-factor 2
2) 查看主題
bin/kafka-topics.sh --zookeeper hadoop001:2181 --list
3) 查看具體主題詳情
bin/kafka-topics.sh --zookeeper hadoop001:2181 --describe --topic first
4) 洗掉主題(需要前邊配置有改為 true 才能洗掉成功)
bin/kafka-topics.sh --zookeeper hadoop001:2181 --delete --topic first
5) 修改主題
bin/kafka-topics.sh --zookeeper hadoop001:2181 --alter --topic first-1 --partition 3
② 模擬發布訂閱
1) 在 hadoop001 中,用控制臺模擬生產訊息
bin/kafka-console-producer.sh --topic first --broker-list hadoop001:9092
2) 在 hadoop002 和 hadoop003 中,用控制臺模擬訂閱訊息,--from-beginning 指從頭開始消費資訊
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic first [--from-beginning]
舊版:bin/kafka-console-consumer.sh --zookeeper hadoop001:2181 --topic first
| 中途出現了一些問題:使用 --bootstrap-server 無法消費訊息? 但 zookeeper 可以,且當 --bootstrap-server 磁區為 1 時也可以,其他情況都不行 后來發現是我修改完 server.properties 進行同步分發,將 broker.id 都變成 1 了忘記改回來 所以:broker.id 相同,會出現無法消費訊息的情況 |
3. Java API 使用(SpringBoot)
| 前置問題: 這里是使用 Docker 搭建集群的通病,如果是使用虛擬機搭建的話,則可以跳過 但是注意,這里面將三臺服務器的 9092 埠分別改成了 19092 19093 19094 當使用 API 訪問 Docker 搭建的集群時,最大的問題就是埠問題了 要想讓外網訪問到 Docker 容器,就必須設定埠映射,然后外網通過訪問主機中已經映射的埠,再由主機找到對應的 Docker 容器 但現在我們有 3 個容器(服務器),則有 3 個 9092 埠,很明顯一個主機的 9092 埠不能同時映射三個埠 所以就有了如下的修改:將主機的 19092 19093 19094 分別映射到 hadoop001:19092 hadoop002:19093 hadoop003:19094 這樣外網通過訪問這三個已經映射的埠,就能成功訪問到容器啦 這里由于我們的容器已經啟動,無法再通過 Docker 進行埠映射,所以我們采用的是動態埠映射,如下: 如果有興趣了解為 Docker 動態添加埠映射的,文章前言里邊的對應鏈接哦 |
準備作業完成,接下來進入正軌
① 首先是導包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.6</version>
</dependency>
② 然后進行配置
spring:
kafka:
bootstrap-servers: <主機地址>:19092,<主機地址>:19093,<主機地址>:19094
producer:
bootstrap-servers: <主機地址>:19092
③ 然后是簡單測驗
/**
* @author : Ice'Clean
* @date : 2021-10-24
*/
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@GetMapping("/send/{msg}")
public void send(@PathVariable String msg) {
kafkaTemplate.send("first", msg);
}
@KafkaListener(id = "webListener", topics = "first")
public void listen(String msg) {
System.out.println("收到訊息:" + msg);
}
}
這樣呼叫 send 介面發送訊息,在控制臺就能看到對應輸出啦
④ API 工具類
以下為簡單的工具類:
/**
* @author : Ice'Clean
* @date : 2021-10-25
*/
@Component
public class KafkaUtils {
/** Kafka 操作客戶端 */
private static AdminClient adminClient;
/** Kafka 生產模者板 */
private static KafkaTemplate<Object, Object> kafkaTemplate;
@Autowired
public void setKafkaProperties(KafkaProperties kafkaProperties) {
// 通過注入的配置,創建客戶端
adminClient = AdminClient.create(kafkaProperties.buildAdminProperties());
}
@Autowired
public void setKafkaTemplate(KafkaTemplate<Object, Object> kafkaTemplate) {
// 注入生產者模板
KafkaUtils.kafkaTemplate = kafkaTemplate;
}
/**
* 生產者發送訊息
* @param topic 訊息所屬主題
* @param message 訊息內容
*/
public static void send(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(success -> {
System.out.println("訊息:" + topic + " [" + message + "] 發送成功");
}, failure -> {
System.out.println("訊息:" + topic + " [" + message + "] 發送失敗");
System.out.println("原因:" + failure.getMessage());
});
}
/**
* 創建主題
* @param topicName 主題名稱
* @param numPartitions 主題磁區數
* @param replicationFactor 主題副本數
*/
public static void createTopic(String topicName, int numPartitions, short replicationFactor) {
adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, replicationFactor)));
}
/**
* 洗掉主題
* @param topicName 主題名稱
*/
public static void deleteTopic(String topicName) {
adminClient.deleteTopics(Collections.singleton(topicName));
}
/**
* 獲取主題詳情
* @param topicName 主題名稱
*/
public static Map<String, TopicDescription> getTopicDetail(String topicName) throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton(topicName));
return result.all().get();
}
}
一石二石,一箭雙箭(IceClean)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/340684.html
標籤:其他

