Kafka是一種高吞吐量的分布式發布訂閱訊息系統,有如下特性:
- 通過O的磁盤資料結構提供訊息的持久化,這種結構對于即使數以TB的訊息存盤也能夠保持長時間的穩定性能,
- 高吞吐量:即使是非常普通的硬體Kafka也可以支持每秒數百萬 [2] 的訊息,
- 支持通過Kafka服務器和消費機集群來磁區訊息,
- 支持Hadoop并行資料加載,
Kafka通過官網發布了最新版本2.3.0
相關術語介紹
- Broker
Kafka集群包含一個或多個服務器,這種服務器被稱為broker - Topic
每條發布到Kafka集群的訊息都有一個類別,這個類別被稱為Topic,(物理上不同Topic的訊息分開存盤,邏輯上一個Topic的訊息雖然保存于一個或多個broker上但用戶只需指定訊息的Topic即可生產或消費資料而不必關心資料存于何處) - Partition
Partition是物理上的概念,每個Topic包含一個或多個Partition. - Producer
負責發布訊息到Kafka broker - Consumer
訊息消費者,向Kafka broker讀取訊息的客戶端, - Consumer Group
每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group),
在這里我們用了一個第三方庫叫Confluent.kafka,在nuget上搜索一下就出來了,感謝原作者,
新建一個 .net core類別庫專案
安裝第三方依賴庫,如下圖所示:

新建一個SUPKafkaTopicConsumer類
這是用來創建并初始化消費者,接下來看看這個類里面包含了什么,
- 首先宣告一個委托,用來接收訂閱訊息
public delegate void OnReceivedHandle(object data);
初始化消費者,建構式中傳入kafka地址,以及要訂閱的組groupId,另外注入了log4net記錄日志資訊,
init()方法用來初始化,新建一個消費者,具體代碼如下,
public class SUPKafkaTopicConsumer<TKey, TValue>
{
private IConsumer<TKey, TValue> consumer;
private SUPLogger logger_;
private string BootStrapServer;
private string GroupId;
public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null)
{
BootStrapServer = bootStrapServer;
GroupId = groupId;
logger_ = logger;
}
public bool Init()
{
try
{
var conf = new ConsumerConfig
{
GroupId = GroupId,
BootstrapServers = BootStrapServer,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // 設定非自動偏移,業務邏輯完成后手動處理偏移,防止資料丟失
};
consumer = new ConsumerBuilder<TKey, TValue>(conf)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
return true;
}
catch (Exception ex)
{
throw;
}
}
- 定義回呼事件,用以處理用戶自定義方法,
public event OnReceivedHandle onReceivedHandle;
- 定義一個訂閱的方法,傳入topic,以及是否需要提交偏移量,
其實看init()方法中我把EnableAutoCommit=false,取消了自動提交,讓應用程式決定何時提交 偏移量,為什么這么做呢?
自動提交雖然方便,但是也有一些弊端,自動提交的弊端是通過間隔時間, 一般是默認5s提交時間間隔,在最近一次提交之后的 3s發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取訊息,這個時候偏移量已經落后 了 3s,所以在這 3s 內到達的訊息會被重復處理,可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復訊息的時間窗,不過這種情況是無也完全避免的 ,
大部分開發者通過控制偏移量提交時間來消除丟失訊息的可能性,井在發生再均衡時減少 重復訊息的數量,消費者 API提供了另一種提交偏移量的方式 , 開發者可以在必要的時候 提交當前偏移盤,而不是基于時間間隔,
public void Subscribe(string topic, bool isCommit)
{
try
{
if (consumer != null)
{
consumer.Subscribe(topic);
while (true)
{
var consume = consumer.Consume();
if (onReceivedHandle != null)
{
onReceivedHandle(consume);
if (isCommit)
{
consumer.Commit(consume);
}
}
}
}
}
catch (Exception ex)
{
//consumer.Close();
throw ex;
}
}
- 取消訂閱
public void UnSubscribe()
{
if (consumer != null)
{
consumer.Unsubscribe();
}
}
新建生產者類
- 首先定義了ISUPKafkaProducer<Tkey, TValue>介面,包含四個方法
public interface ISUPKafkaProducer<Tkey,TValue>
{
ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
ISendResult AsyncSend(Tkey key, TValue value,string topic);
ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition);
}
- 介面的實作,初始化程序類似消費者
internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue>
{
private IProducer<Tkey, TValue> producer;
private SUPLogger logger_;
private string m_bootStrapServer;
public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null)
{
m_bootStrapServer = bootStrapServer;
logger_ = logger;
}
public bool Init()
{
try
{
var config = new ProducerConfig
{
BootstrapServers = m_bootStrapServer
};
producer = new ProducerBuilder<Tkey, TValue>(config)
.SetErrorHandler((producer, error) =>
{
logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}",
m_bootStrapServer, error.Code, error.Reason));
})
.SetLogHandler((producer, msg) =>
{
logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}",
m_bootStrapServer, msg.Name, msg.Message));
})
.Build();
return true;
}
catch (Exception ex)
{
throw ex;
}
}
實作繼承至ISUPKafkaProducer<Tkey, TValue>的方法
public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
{
try
{
if (producer != null)
{
var message = new Message<Tkey, TValue>
{
Value = https://www.cnblogs.com/zhanwei103/p/value,
Key = key
};
producer.Produce(topic, message, sendCallBack);
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產者");
}
}
catch (Exception ex)
{
throw ex;
}
}
public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action> sendCallBack = null)
{
try
{
if (producer != null)
{
var message = new Message
{
Value = value,
Key = key
};
producer.Produce(topicPartition, message, sendCallBack);
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產者");
}
}
catch (Exception ex)
{
throw ex;
}
}
public ISendResult AsyncSend(Tkey key, TValue value,string topic)
{
try
{
if (producer != null)
{
var message = new Message
{
Value = value,
Key = key
};
var deliveryReport = producer.ProduceAsync(topic, message);
deliveryReport.ContinueWith(task =>
{
Console.WriteLine("Producer: " + producer.Name + "/r/nTopic: " + topic + "/r/nPartition: " + task.Result.Partition + "/r/nOffset: " + task.Result.Offset);
});
producer.Flush(TimeSpan.FromSeconds(10));
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產者");
}
}
catch (Exception ex)
{
throw ex;
}
}
public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition)
{
try
{
if (producer != null)
{
var message = new Message
{
Value = value,
Key = key
};
var deliveryReport = producer.ProduceAsync(topicPartition, message);
deliveryReport.ContinueWith(task =>
{
Console.WriteLine("Producer: " + producer.Name + "/r/nTopic: " + topicPartition.Topic + "/r/nPartition: " + task.Result.Partition + "/r/nOffset: " + task.Result.Offset);
});
producer.Flush(TimeSpan.FromSeconds(10));
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產者");
}
}
catch (Exception ex)
{
throw ex;
}
}
新建一個SUPKafkaMessageCenter類
這個類是對外開放的,我們利用這個類來管理生產者和消費者,看下代碼非常簡單,
public static class SUPKafkaMessageCenter<Tkey, TValue>
{
private static SUPLogger logger = null;
static SUPKafkaMessageCenter()
{
SUPLoggerManager.Configure();
logger = new SUPLogger("KafkaCenter");
}
/// <summary>
/// 創建生產者
/// </summary>
/// <param name="bootstrapServer"></param>
/// <param name="topicName"></param>
/// <returns></returns>
public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer)
{
if (string.IsNullOrEmpty(bootstrapServer))
{
return null;
}
var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger);
if (!producer.Init())
{
return null;
}
return producer;
}
/// <summary>
/// 創建消費者
/// </summary>
/// <param name="bootstrapServer"></param>
/// <param name="groupId"></param>
/// <returns></returns>
public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group")
{
if (string.IsNullOrEmpty(bootstrapServer))
{
return null;
}
var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger);
if (!consumer.Init())
{
return null;
}
return consumer;
}
測驗
新建一個測驗的控制臺程式,呼叫代碼如下
- 消費者
var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092");
//系結接收資訊,回呼函式
consumer.onReceivedHandle += CallBack;
var topics = new List<string>();
topics.Add("kafka-default-topic");
topics.Add("test");
//訂閱主題
consumer.Subscribe(topics, false);
- 生產者
ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092");
kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...});
除了上面寫的這些方法,其實對于kafka還有很多功能,比如topic的增刪改查,我把它認為是管理類的,這里就不貼代碼了,
有興趣的朋友可以進gitee上下載來看看,https://gitee.com/zhanwei103/Kafka.Net

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/40681.html
標籤:.NET Core
上一篇:串口資料處理分包處理
