最近維護的一個系統并發有點高,所以想引入一個訊息佇列來進行削峰,考察了一些產品,最終決定使用kafka來當做訊息佇列,以下是關于kafka的一些知識的整理筆記,
kafka
kafka 是分布式流式平臺,它由linkedin開發,后貢獻給了Apache開源組織并成為頂級開源專案,它可以應用在高并發場景下的日志系統,也可以當作訊息佇列來使用,也可以當作訊息服務對系統進行解耦,

流處理平臺有以下三種特性:
- 可以讓你發布和訂閱流式的記錄,這一方面與訊息佇列或者企業訊息系統類似,
- 可以儲存流式的記錄,并且有較好的容錯性,
- 可以在流式記錄產生時就進行處理,
一般它可以應用于兩個場景:
- 構造實時流資料管道,它可以在系統或應用之間可靠地獲取資料, (相當于message queue)
- 構建實時流式應用程式,對這些流資料進行轉換或者影響, (就是流處理,通過kafka stream topic和topic之間內部進行變化)
broker
kafka中的每個節點即每個服務器就是一個broker ,
topic
kafka中的topic是一個分類的概念,表示一類訊息,生產者在生產訊息的時候需要指定topic,消費者在消費訊息的時候也需要指定topic,
partition
partition是磁區的概念,kafka的一個topic可以有多個partition,每個partition會分散到不同的broker上,起到負載均衡的作用,生產者的訊息會通過演算法均勻的分散在各個partition上,
consumer group
kafka的消費者有個組的概念,一個partition可以被多consumer group訂閱,每個訊息會廣播到每一個group中,但是每個訊息只會被group中的一個consumer消費,相當于每個group,一個partition只能有一個consumer訂閱,所以group中的consumer數量不可以超過topic中partition的數量,并且訊息的消費的順序在每個partition中是保證有序的,但是在多個partition之間是不保證的,因為consumer的消費速度是有快慢的,
所以如果要用kafka實作嚴格的訊息佇列點對點模式那么我們可以設定一個partition并且設定一個consumer,如果對訊息消費的順序不是那么敏感,那么可以設定多個partition來并行消費訊息,提高吞吐量,

安裝kafka
為了能體驗下kafka,我們還是要實際安裝一下kafka,畢竟空想是沒有用的,現在有了docker,安裝起來也是相當滴簡單,我們只需要定義好docker-compose的yml就行了,
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117
KAFKA_CREATE_TOPICS: "test:3:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
我們在yml里定義2個service:
- zookeeper,kafka的分布式依賴zookeeper,所以我需要先定義它,
- kafka ,kafka的定義有幾個地方要注意的,
- depends_on:zookeeper 指定kafka依賴zookeeper這個service,當啟動kafka的時候自動會啟動zookeeper,
- KAFKA_ADVERTISED_HOST_NAME 這里要指定宿主機的ip
- KAFKA_CREATE_TOPICS 這個變數只是的默認創建的topic,"test:3:1"代表創建一個名為test的topic并且創建3個磁區1個復制,
定義好這些之后我們只需要使用docker-compose命令運行它:
sudo docker-compose up -d
.net 操作 kafka
安裝好kafka的docker環境之后,下面演示下如何使用.net操作kafka,進行訊息的生產與消費,
生產者
static async Task Main(string[] args)
{
Console.WriteLine("Hello World Producer!");
var config = new ProducerConfig
{
BootstrapServers = "192.168.0.117:9092",
ClientId = Dns.GetHostName(),
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
string topic = "test";
for (int i = 0; i < 100; i++)
{
var msg = "message " + i;
Console.WriteLine($"Send message: value {msg}");
var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = https://www.cnblogs.com/kklldog/p/msg });
Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
Thread.Sleep(500);
}
}
Console.ReadLine();
}
新建一個控制臺專案,從nuget安裝kafka的官方client,
Install-Package Confluent.Kafka
代碼非常簡單,使用ProducerBuilder構造一個producer,然后呼叫ProduceAsync方法發送訊息,
其中需要注意的是如果你的場景并發非常之高,官方檔案推薦的方法是Produce而不是ProduceAsync,這是一個比較迷的地方,按常理使用ProduceAsync應該比使用同步方法Produce能獲得更高的并發才對,但是檔案確確實實說高并發場景請使用Produce,可能是為了避免ProduceAsync結果回傳的時候異步執行緒背景關系切換造成的性能開銷,
原文:
There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.
消費者
static void Main(string[] args)
{
Console.WriteLine("Hello World kafka consumer !");
var config = new ConsumerConfig
{
BootstrapServers = "192.168.0.117:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancel = false;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var topic = "test";
consumer.Subscribe(topic);
while (!cancel)
{
var consumeResult = consumer.Consume(CancellationToken.None);
Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
}
consumer.Close();
}
}
消費者的演示代碼同樣很簡單,我們需要指定groupId,然后訂閱topic,使用ConsumerBuilder構造一個consumer,然后呼叫Consume方法進行消費就可以,
注意:
這里默認是自動commit消費,你也可以根據情況手動提交commit,
運行一下

我們運行一個生產者行程,按照500ms的速度生產訊息,運行三個consumer進行消費,可以看到訊息被均勻的推送到三個consumer上去,
總結
以上簡單的介紹了kafka的背景、安裝方法、使用場景,還簡單演示了如何使用.net來操作kafka,它可以當作流式計算平臺來使用,也可以當作傳統的訊息佇列使用,它當前非常流行,網上的資料也多如牛毛,官方也提供了簡單易用的.net sdk ,為.net 平臺集成kafka提供了便利,
關注我的公眾號一起玩轉技術

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/280595.html
標籤:.NET Core
