Topic
事件被組織并持久地存盤在Topic中,Topic類似于檔案系統中的檔案夾,事件就是該檔案夾中的檔案,Kafka中的Topic始終是多生產者和多訂閱者:一個Topic可以有零個、一個或多個生產者向其寫入事件,也可以有零個、一個或多個消費者訂閱這些事件,Topic中的事件可以根據需要隨時讀取,與傳統的訊息傳遞系統不同,事件在使用后不會被洗掉,相反,可以通過每個Topic的配置來定義Kafka應該保留事件的時間,之后舊事件將被丟棄,Kafka的性能在資料大小方面實際上是恒定的,因此長時間存盤資料是非常好的,
Partition
Topic是磁區的,這意味著一個Topic可以分布在多個Kafka節點上,資料的這種分布式放置對于可伸縮性非常重要,因為它允許客戶端應用程式同時從Kafka節點讀取和寫入資料,將新事件發布到Topic時,它實際上會appended到Topic的一個Partition中,具有相同事件key的事件將寫入同一Partition,Kafka保證給定Topic的Partition的任何使用者都將始終以與寫入時完全相同的順序讀取該磁區的事件,
Replication
為了使資料具有容錯性和高可用性,每個Topic都可以有多個Replication,以便始終有多個Kafka節點具有資料副本,以防出現問題,常見的生產設定是replicationFactor為3,即始終有三個資料副本,此Replication在Topic的Partition級別執行,
Kafka在指定數量(通過replicationFactor)的服務器上復制每個Topic的Partition,這允許在集群中的某些服務器發生故障時進行自動故障轉移,以便在出現故障時服務仍然可用,Replication的單位是Topic的Partition,在非故障條件下,Kafka中的每個Partition都有一個leader和零個或多個follower,replicationFactor是復制副本(包括leader)的總數,所有讀和寫操作都將轉到Partition的leader上,通常,有比Kafka節點多得多的Partition,并且這些Partition的leader在Kafka節點之間均勻分布,follower上的資料需要與leader的資料同步,所有資料都具有相同的偏移量和順序(當然,在任何給定時間,leader的資料末尾可能有一些尚未復制的資料),follower會像普通Kafka消費者一樣使用來自leader者的訊息,并將其應用到自己的資料中,如下圖所示,三個Kafka節點上有兩個Topic(Topic 0和Topic 1),Topic 0有兩個Partition并且replicationFactor為3(紅色的Partition為leader),Topic 1有三個Partition,replicationFactor也為3(紅色的Partition為leader),

API
添加依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
這里使用的kafka-clients版本和博主之前部署的Kafka版本一致:
- Kafka:部署Kafka
client
操作Topic的客戶端通過AdminClient抽象類來創建,原始碼如下:
package org.apache.kafka.clients.admin;
import java.util.Map;
import java.util.Properties;
public abstract class AdminClient implements Admin {
/**
* 使用給定的配置創建一個新的Admin
* props:Admin的配置
* 回傳KafkaAdminClient實體
*/
public static AdminClient create(Properties props) {
return (AdminClient) Admin.create(props);
}
/**
* 多載方法
* 使用給定的配置創建一個新的Admin
* props:Admin的配置
* 回傳KafkaAdminClient實體
*/
public static AdminClient create(Map<String, Object> conf) {
return (AdminClient) Admin.create(conf);
}
}
實際上會回傳一個KafkaAdminClient實體(KafkaAdminClient類是AdminClient抽象類的子類),KafkaAdminClient類的方法比較多,其中private方法服務于public方法(提供給用戶的服務),

KafkaAdminClient類提供的public方法是對Admin介面的實作,

create
創建一個新的Topic,
package com.kaven.kafka.admin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class Admin {
private static final AdminClient adminClient = Admin.getAdminClient();
public static void main(String[] args) throws InterruptedException, ExecutionException {
Admin admin = new Admin();
admin.createTopic();
Thread.sleep(100000);
}
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
}
創建AdminClient(簡單使用,配置BOOTSTRAP_SERVERS_CONFIG就可以了):
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
創建Topic(傳入一個NewTopic實體,并且給該NewTopic實體配置name、numPartitions、replicationFactor):
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
提供的方法大都是異步編程模式的,這些基礎知識就不介紹了,輸出如下圖所示:

list
獲取Topic串列,
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.
listTopics(new ListTopicsOptions().listInternal(true));
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}
get方法會等待future完成,然后回傳其結果,輸出如下圖所示:

通過下面這個配置,可以獲取到Kafka內置的Topic,
new ListTopicsOptions().listInternal(true)
默認是不會獲取到Kafka內置的Topic,
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}

delete
洗掉Topic,
public void deleteTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
deleteTopicsResult.topicNameValues().forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
輸出如下圖所示:

現在再獲取Topic的串列,輸出如下圖所示(洗掉的Topic已經不在了):

describe
獲取Topic的描述,
public void describeTopic() {
Map<String, KafkaFuture<TopicDescription>> values =
adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
for (String name : values.keySet()) {
values.get(name).whenComplete((describe, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
System.out.println(describe);
});
}
}
輸出如下圖所示:

輸出符合預期,因為創建該Topic的配置為:
new NewTopic("new-topic-kaven", 1, (short) 1)
config
獲取Topic的配置,
public void describeTopicConfig() throws ExecutionException, InterruptedException {
DescribeConfigsResult describeConfigsResult = adminClient
.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
describeConfigsResult.all().get().forEach(((configResource, config) -> {
System.out.println(configResource);
System.out.println(config);
}));
}
輸出如下圖所示:

describeConfigs方法很顯然還可以獲取其他資源的配置(通過指定資源的型別),
public enum Type {
BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
...
}
alter
增量更新Topic的配置,
public void incrementalAlterConfig() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
voidKafkaFuture.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(configResource);
latch.countDown();
});
}));
latch.await();
}
輸出如下圖所示:

很顯然incrementalAlterConfigs方法也可以增量更新其他資源的配置(通過指定資源的型別),
ConfigResource定義需要修改配置的資源,Collection<AlterConfigOp>定義該資源具體的配置修改操作,
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
configEntry定義資源需要修改的配置條目,operationType定義修改操作的型別,
public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
this.configEntry = configEntry;
this.opType = operationType;
}
修改操作的型別,
public enum OpType {
/**
* 設定配置條目的值
*/
SET((byte) 0),
/**
* 將配置條目恢復為默認值(可能為空)
*/
DELETE((byte) 1),
/**
* 僅適用于串列型別的配置條目
* 將指定的值添加到配置條目的當前值
* 如果尚未設定配置值,則添加到默認值
*/
APPEND((byte) 2),
/**
* 僅適用于串列型別的配置條目
* 從配置條目的當前值中洗掉指定的值
* 洗掉當前不在配置條目中的值是合法的
* 從當前配置值中洗掉所有條目會留下一個空串列,并且不會恢復為條目的默認值
*/
SUBTRACT((byte) 3);
...
}
資源的配置條目,包含配置名稱、值等,
public class ConfigEntry {
private final String name;
private final String value;
private final ConfigSource source;
private final boolean isSensitive;
private final boolean isReadOnly;
private final List<ConfigSynonym> synonyms;
private final ConfigType type;
private final String documentation;
...
}
在獲取Topic配置的輸出中也可以發現這些配置條目,

很顯然,這里修改名稱為new-topic-kaven的Topic的compression.type配置條目(壓縮型別),
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
compression.type配置條目的默認值為producer(意味著保留生產者設定的原始壓縮編解碼器),和上面的圖也對應,博主將該配置條目修改成了gzip,

再來獲取該Topic的配置,如下圖所示(很顯然配置修改成功了):

Kafka的Topic概念與API介紹就到這里,如果博主有說錯的地方或者大家有不同的見解,歡迎大家評論補充,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/401590.html
標籤:其他
上一篇:Flink使用local模式執行Flink程式,并且開啟Flink的webUI
下一篇:B站瘋傳24小時刪
