Kafka介紹和部署應用
kafka介紹
kafka是一個高吞吐量的分布式訂閱訊息系統,可以處理消費者在網站中的所有動作流資料,像hadoop一樣的日志資料和離線分析系統,但是又要求實時處理的限制,kafka的目的是通過Hadoop的并行加載機制來統一線上和離線的訊息處理,也是為了通過集群來提供實時的訊息,
kafka的特性
-
通過O(1)的磁盤資料提供訊息的持久化,這種結構對于即使資料量為TB級的訊息存盤也能夠保持長時間的穩定性能,
-
高吞吐量:即使是非常普通的硬體Kafka也能支持每秒數百萬計的訊息,
-
支持通過Kafka服務器和消費機集群來區分訊息,
-
這次Hadoop并行資料加載,
流媒體平臺三個關鍵功能
-
發布和訂閱記錄流,類似于訊息佇列或企業訊息傳遞系統
-
以容錯的持久方式存盤記錄流
-
記錄發生時處理流
kafka的應用場景
- 構建可在系統或應用程式之間可靠獲取的實時流資料管道
- 構建轉換回應資料的實時流應用程式
例如:
-
日志收集:可以用kafka收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer(消費者)
-
訊息系統:解耦生產者和消費者、快取訊息等
-
用戶活動跟蹤:kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動訊息可以被各個服務器發布到kafka的topic中,然后消費者通過訂閱這些topic來做實時的監控分析,亦可保存到資料庫,
-
運營指標:kafka也經常用來記錄運營監控資料,包括收集各種分布式應用的資料,生產各種操作的集中反饋,比如報警和報告
-
流式處理:比如spark streaming和storm,
kafka的應用原理
1、Kafka作為一個集群運行在一個或者多個可跨多個資料中心的服務器上
2、Kafka集群以稱為** topics主題**的類別存盤記錄流
3、每條記錄都包含一個鍵,一個值和一個時間戳
kafka的核心
-
Producer API(生產者API)允許應用程式發布記錄流至一個或多個kafka的topics(主題),
-
Consumer API(消費者API)允許應用程式訂閱一個或多個topics(主題),并處理所產生的對他們記錄的資料流,
-
Streams API(流API)允許應用程式充當流處理器,從一個或多個topics(主題)消耗的輸入流,并產生一個輸出流至一個或多個輸出的topics(主題),有效的變換所述的輸入流,以輸出流,
-
Connector API(連接器API)允許構建和運行kafka topics(主題)連接到現有的應用程式或資料系統中重用生產者或消費者,例如,關系型資料庫的連接器可能捕獲對表的每個更改,
在Kafka中,客戶端和服務器之間的通信是通過簡單,高性能,語言無關的TCP協議完成的,Kafka提供Java客戶端,但是客戶端提供多語言版本,
Kafka的消費模式
kafka的消費模式主要有兩種,一種是一對一的消費,也就是點對點的通信,即一個發送一個接收,第二種為一對多消費,即一個訊息發送到訊息佇列,消費者根據訊息佇列的訂閱拉取訊息消費,
-
一對一消費:訊息生產者發布訊息到佇列中,通知消費者從佇列中拉取訊息進行消費,`訊息被消費之后則洗掉`,Queue支持多個消費者,但對于一條訊息而言,只有一個消費者可以消費,即一條訊息只能被一個消費者消費,
-
一對多消費:這種模式稱為發布/訂閱模式,即利用Topic存盤訊息,訊息生產者將訊息發布到Topic中,同時有多個消費者訂閱此Topic,消費者可以從中消費訊息,注意發布到Topic中的訊息會被多個消費者消費,`消費者消費資料之后,資料不會被清除`,Kafka會默認保存一段時間,然后再洗掉,

Kafka基礎結構
Kafka像其他MQ一樣,也有自己的基礎架構,主要存在生產者Producer、Kafka集群的Broker、消費者Consumer,注冊訊息Zookeeper等,
zookeeper(Linux)部署與應用
-
Producer:訊息生產者,向kafka發送訊息的角色,
-
Consumer:訊息消費者,即從Kafka中拉取訊息消費的客戶端,
-
Consumer Group:消費者組,消費者組則是一組中存在多個消費者,消費者消費Broker中當前Topic的不同磁區中的訊息,消費者組之間互不影響,所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者,某一個磁區中的訊息只能夠一個消費者組中的一個消費者所消費,
-
Broker:代理,經紀人,一臺kafka服務器就是一個Broker,一個集群由多個Broker組成,一個Broker可以容納多個Topic,
-
Topic:主題,可以理解成一個佇列,生產者和消費者都是面向一個Topic,
-
Partition:磁區,為了實作拓展性,一個非常大的Topic可以分布到多個Broker上,一個Topic可以分為多個Partition,每個Partition是一個有序的佇列(磁區內部有序,但不能保證全域有序)
-
Replica:副本Replication,為保證集群中某個節點發生故障,節點上的Partition資料不丟失,Kafka可以正常作業,Kafka提供了副本機制,一個Topic的每個磁區有若干個副本,一個Leader和多個Follwer,
-
Leader:每個磁區多個副本的主角色,生產者發送資料物件,以及消費者消費資料的物件都是Leader,
-
Follower:每個磁區多個副本的從角色,實時的從Leader中同步資料,保持和Leader資料的同步,Leader發生故障的時候,某個Follwer會成為新的Leader,
上述一個Topic會產生多個磁區Partition,磁區中區分為Leader和Follwer,訊息一般發送到Lerder,Follwer通過資料的同步與Leader保持同步,消費的話也是再Leader中發生消費,如果多個消費者,則分別消費Leader和各個Follwer中的訊息,當Leader發生故障的時候,某個Follwer會成為主節點,此時會對齊訊息的偏移量,
訊息佇列的特性
耦合的狀態表示當你實作某個功能的時候,是直接接入當前介面,而利用訊息佇列,可以將相應的訊息發送到訊息佇列,這樣的話,如果介面出了問題,也不會影響到當前的功能,
傳統呼叫方式:A -呼叫-> B
中間件:A-發送->kafka->訂閱->B
異步處理,代替了之前的同步處理,異步處理不需要讓流程走完就回傳結果,可以將訊息發送到訊息佇列中,然后回傳結果,剩下的讓其他業務處理介面從訊息佇列中拉取消費處理即可,
流量削峰,高流量的時候,使用訊息佇列作為中間件可以將流量的高峰保存在訊息佇列中,從而防止了系統的高請求,減輕服務器的請求處理壓力,
kafka的部署應用(單機環境安裝,基于Linux)
kafka需要依賴java環境運行,kafka的安裝包可以在這里下載:
kafka和zookeeper的安裝包(Liunx)
版本是:kafka_2.13-3.0.0
將包下載到相關的目錄,這里新建了一個kafka的檔案夾,然后解壓到指定目錄中;
cd /kafka
tar -zxvf kafka_2.13-3.0.0.tgz` 重命名為:`mv kafka_2.13-3.0.0 kafka_2.13
配置日志:進入到kafka/kafka_2.13中,創建日志目錄logs;
cd /kafka/kafka_2.13
mkdir logs
修改kafka組態檔;
進入到目錄
/kafka/kafka_2.13/config
修改server.properties檔案
編輯相應的引數vim server.properties
broker.id=0
port=9092
host.name=127.0.0.1 # 服務器ip地址,修改為自己的服務器ip
log.dirs=/kafka/kafka_2.13/logs # 日志存放路徑,上面創建的目錄
zookeeper.connect=localhost:2181 #zookeeper地址和埠,單機配置部署,localhost:2181
然后保存退出
kafka需要基于Zookeeper服務使用,因此需要安裝zookeeper環境先`;(詳見zookeeper部署與應用)
注釋去掉,`listeners=PLAINTEXT://:9092
注釋去掉,把`advertised.listeners`值改為`PLAINTEXT://host_ip:9092(改成服務器ip)
啟動kafka
Kafka支持內置的Zookeeper和參考外部的Zookeeper,這里使用遠程服務器的Zookeeper,
先啟動zookeeper
進入bin目錄,啟動:
./kafka-server-start.sh /config/server.properties &
應用整合及使用(基于SpringBoot、SpringCloud、Eruka)
Springboot中使用kafka
pom檔案依賴
<!--kafka依賴-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置yml資訊
spring:
# kafka配置資訊
kafka:
bootstrap-servers: 192.168.115.79:9092 # 指定kafka服務地址 集群用逗號隔開
producer:
retries: 1 # 發生錯誤后,訊息重發次數,
#當有多個訊息需要被發送到同一個磁區時,生產者會把它們放在同一個批次里,該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算,
batch-size: 16384
buffer-memory: 33554432 # 設定生產者記憶體緩沖區的大小,
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 鍵的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式
# =0:生產者在成功寫入訊息之前不會等待任何來自服務器的回應,
# =1:只要集群的首領節點收到訊息,生產者就會收到一個來自服務器成功回應,
# =all:只有當所有參與復制的節點全部收到訊息時,生產者才會收到一個來自服務器的成功回應,
acks: 1
consumer:
# 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的型別為Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1s
# 該屬性指定了消費者在讀取一個沒有偏移量的磁區或者偏移量無效的情況下該作何處理:
# latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取資料(在消費者啟動之后生成的記錄)
# earliest :在偏移量無效的情況下,消費者將從起始位置讀取磁區的記錄
auto-offset-reset: earliest
# 是否自動提交偏移量,默認值是true,為了避免出現重復資料和資料丟失,可以把它設定為false,然后手動提交偏移量
enable-auto-commit: false
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在偵聽器容器中運行的執行緒數,
concurrency: 5
#listner負責ack,每呼叫一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
封裝生產者_KafkaProducer
/**
* kafka生產者_配置
*/
@Component
@Slf4j
public class KafkaProducer {
/**
* kafka使用模板
*/
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
/**
* 自定義topic
*/
public static final String TOPIC_TEST1 = "topic_test1";
public static final String TOPIC_TEST2 = "topic_test2";
/**
* 自定義組
*/
public static final String TOPIC_GROUP1 = "topic_group1";
public static final String TOPIC_GROUP2 = "topic_group2";
/**
* 生產訊息_發送
*/
public void send(Object obj) {
String obj2String = JSONObject.toJSONString(obj);
log.info("準備發送訊息為:{}",obj2String);
// 發送訊息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST1,obj);
// 監聽訊息加入佇列結果回傳
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
/**
* 發送失敗處理
* @param throwable
*/
@Override
public void onFailure(Throwable throwable) {
log.info(TOPIC_TEST1 + " - 生產者 發送訊息失敗:" + throwable.getMessage());
}
/**
* 發送成功處理
* @param stringObjectSendResult
*/
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
// 發送成功處理
log.info(TOPIC_TEST1 + " - 生產者 發送訊息成功:" + stringObjectSendResult.toString());
}
});
}
}
封裝消費者_KafkaConsumer
/**
* kafka消費者_配置_topic
* 實作topic監聽
*/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = KafkaProducer.TOPIC_TEST1,groupId = KafkaProducer.TOPIC_GROUP1)
public void topic_test1(ConsumerRecord<?,?> record, Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消費了:Topic:" + topic + ",Message" + msg);
ack.acknowledge();
}
}
@KafkaListener(topics = KafkaProducer.TOPIC_TEST2,groupId = KafkaProducer.TOPIC_GROUP2)
public void topic_test2(ConsumerRecord<?,?> record, Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test2 消費了:Topic:" + topic + ",Message" + msg);
ack.acknowledge();
}
}
}
測驗使用_KafkaController
/**
* kafka介面測驗
*/
@RestController
@Slf4j
@RequestMapping("kafka")
@Api(value = "測驗kafka介面",tags = "測驗kafka介面實作")
public class KafkaController {
/**
* 注入生產者
*/
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping("send")
@Transactional(rollbackFor = Exception.class)
public void send() {
kafkaProducer.send("這是 kafka 的測驗 topic 資料");
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/379502.html
標籤:其他
