主頁 > .NET開發 > .Net Core 集成 Kafka

.Net Core 集成 Kafka

2021-04-27 13:46:12 .NET開發

最近維護的一個系統并發有點高,所以想引入一個訊息佇列來進行削峰,考察了一些產品,最終決定使用kafka來當做訊息佇列,以下是關于kafka的一些知識的整理筆記,

kafka

kafka 是分布式流式平臺,它由linkedin開發,后貢獻給了Apache開源組織并成為頂級開源專案,它可以應用在高并發場景下的日志系統,也可以當作訊息佇列來使用,也可以當作訊息服務對系統進行解耦,

流處理平臺有以下三種特性:

  1. 可以讓你發布和訂閱流式的記錄,這一方面與訊息佇列或者企業訊息系統類似,
  2. 可以儲存流式的記錄,并且有較好的容錯性,
  3. 可以在流式記錄產生時就進行處理,

一般它可以應用于兩個場景:

  1. 構造實時流資料管道,它可以在系統或應用之間可靠地獲取資料, (相當于message queue)
  2. 構建實時流式應用程式,對這些流資料進行轉換或者影響, (就是流處理,通過kafka stream topic和topic之間內部進行變化)

broker

kafka中的每個節點即每個服務器就是一個broker ,

topic

kafka中的topic是一個分類的概念,表示一類訊息,生產者在生產訊息的時候需要指定topic,消費者在消費訊息的時候也需要指定topic,

partition

partition是磁區的概念,kafka的一個topic可以有多個partition,每個partition會分散到不同的broker上,起到負載均衡的作用,生產者的訊息會通過演算法均勻的分散在各個partition上,

consumer group

kafka的消費者有個組的概念,一個partition可以被多consumer group訂閱,每個訊息會廣播到每一個group中,但是每個訊息只會被group中的一個consumer消費,相當于每個group,一個partition只能有一個consumer訂閱,所以group中的consumer數量不可以超過topic中partition的數量,并且訊息的消費的順序在每個partition中是保證有序的,但是在多個partition之間是不保證的,因為consumer的消費速度是有快慢的,
所以如果要用kafka實作嚴格的訊息佇列點對點模式那么我們可以設定一個partition并且設定一個consumer,如果對訊息消費的順序不是那么敏感,那么可以設定多個partition來并行消費訊息,提高吞吐量,

安裝kafka

為了能體驗下kafka,我們還是要實際安裝一下kafka,畢竟空想是沒有用的,現在有了docker,安裝起來也是相當滴簡單,我們只需要定義好docker-compose的yml就行了,

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

我們在yml里定義2個service:

  1. zookeeper,kafka的分布式依賴zookeeper,所以我需要先定義它,
  2. kafka ,kafka的定義有幾個地方要注意的,
  • depends_on:zookeeper 指定kafka依賴zookeeper這個service,當啟動kafka的時候自動會啟動zookeeper,
  • KAFKA_ADVERTISED_HOST_NAME 這里要指定宿主機的ip
  • KAFKA_CREATE_TOPICS 這個變數只是的默認創建的topic,"test:3:1"代表創建一個名為test的topic并且創建3個磁區1個復制,

定義好這些之后我們只需要使用docker-compose命令運行它:

sudo docker-compose up -d

.net 操作 kafka

安裝好kafka的docker環境之后,下面演示下如何使用.net操作kafka,進行訊息的生產與消費,

生產者

        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World Producer!");

            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                ClientId = Dns.GetHostName(),
            };


            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                string topic = "test";
                for (int i = 0; i < 100; i++)
                {
                    var msg = "message " + i;
                    Console.WriteLine($"Send message:   value {msg}");
                    var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = https://www.cnblogs.com/kklldog/archive/2021/04/27/msg });
                    Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
                    Thread.Sleep(500);
                }
            }

            Console.ReadLine();

        }

新建一個控制臺專案,從nuget安裝kafka的官方client,

Install-Package Confluent.Kafka

代碼非常簡單,使用ProducerBuilder構造一個producer,然后呼叫ProduceAsync方法發送訊息,
其中需要注意的是如果你的場景并發非常之高,官方檔案推薦的方法是Produce而不是ProduceAsync,這是一個比較迷的地方,按常理使用ProduceAsync應該比使用同步方法Produce能獲得更高的并發才對,但是檔案確確實實說高并發場景請使用Produce,可能是為了避免ProduceAsync結果回傳的時候異步執行緒背景關系切換造成的性能開銷,
原文:

There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.

消費者

        static void Main(string[] args)
        {
            Console.WriteLine("Hello World kafka consumer !");

            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            var cancel = false;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                var topic = "test";
                consumer.Subscribe(topic);

                while (!cancel)
                {
                    var consumeResult = consumer.Consume(CancellationToken.None);

                    Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
                }

                consumer.Close();
            }
        }

消費者的演示代碼同樣很簡單,我們需要指定groupId,然后訂閱topic,使用ConsumerBuilder構造一個consumer,然后呼叫Consume方法進行消費就可以,
注意:
這里默認是自動commit消費,你也可以根據情況手動提交commit,

運行一下


我們運行一個生產者行程,按照500ms的速度生產訊息,運行三個consumer進行消費,可以看到訊息被均勻的推送到三個consumer上去,

總結

以上簡單的介紹了kafka的背景、安裝方法、使用場景,還簡單演示了如何使用.net來操作kafka,它可以當作流式計算平臺來使用,也可以當作傳統的訊息佇列使用,它當前非常流行,網上的資料也多如牛毛,官方也提供了簡單易用的.net sdk ,為.net 平臺集成kafka提供了便利,

關注我的公眾號一起玩轉技術

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/280597.html

標籤:.NET技术

上一篇:記一次 .NET WPF布草管理系統 掛死分析

下一篇:[Abp vNext 原始碼分析] - 18. 單元測驗

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • WebAPI簡介

    Web體系結構: 有三個核心:資源(resource),URL(統一資源識別符號)和表示 他們的關系是這樣的:一個資源由一個URL進行標識,HTTP客戶端使用URL定位資源,表示是從資源回傳資料,媒體型別是資源回傳的資料格式。 接下來我們說下HTTP. HTTP協議的系統是一種無狀態的方式,使用請求/ ......

    uj5u.com 2020-09-09 22:07:47 more
  • asp.net core 3.1 入口:Program.cs中的Main函式

    本文分析Program.cs 中Main()函式中代碼的運行順序分析asp.net core程式的啟動,重點不是剖析原始碼,而是理清程式開始時執行的順序。到呼叫了哪些實體,哪些法方。asp.net core 3.1 的程式入口在專案Program.cs檔案里,如下。ususing System; us ......

    uj5u.com 2020-09-09 22:07:49 more
  • asp.net網站作為websocket服務端的應用該如何寫

    最近被websocket的一個問題困擾了很久,有一個需求是在web網站中搭建websocket服務。客戶端通過網頁與服務器建立連接,然后服務器根據ip給客戶端網頁發送資訊。 其實,這個需求并不難,只是剛開始對websocket的內容不太了解。上網搜索了一下,有通過asp.net core 實作的、有 ......

    uj5u.com 2020-09-09 22:08:02 more
  • ASP.NET 開源匯入匯出庫Magicodes.IE Docker中使用

    Magicodes.IE在Docker中使用 更新歷史 2019.02.13 【Nuget】版本更新到2.0.2 【匯入】修復單列匯入的Bug,單元測驗“OneColumnImporter_Test”。問題見(https://github.com/dotnetcore/Magicodes.IE/is ......

    uj5u.com 2020-09-09 22:08:05 more
  • 在webform中使用ajax

    如果你用過Asp.net webform, 說明你也算是.NET 開發的老兵了。WEBform應該是2011 2013左右,當時還用visual studio 2005、 visual studio 2008。后來基本都用的是MVC。 如果是新開發的專案,估計沒人會用webform技術。但是有些舊版 ......

    uj5u.com 2020-09-09 22:08:50 more
  • iis添加asp.net網站,訪問提示:由于擴展配置問題而無法提供您請求的

    今天在iis服務器配置asp.net網站,遇到一個問題,記錄一下: 問題:由于擴展配置問題而無法提供您請求的頁面。如果該頁面是腳本,請添加處理程式。如果應下載檔案,請添加 MIME 映射。 WindowServer2012服務器,添加角色安裝完.netframework和iis之后,運行aspx頁面 ......

    uj5u.com 2020-09-09 22:10:00 more
  • WebAPI-處理架構

    帶著問題去思考,大家好! 問題1:HTTP請求和回傳相應的HTTP回應資訊之間發生了什么? 1:首先是最底層,托管層,位于WebAPI和底層HTTP堆疊之間 2:其次是 訊息處理程式管道層,這里比如日志和快取。OWIN的參考是將訊息處理程式管道的一些功能下移到堆疊下端的OWIN中間件了。 3:控制器處理 ......

    uj5u.com 2020-09-09 22:11:13 more
  • 微信門戶開發框架-使用指導說明書

    微信門戶應用管理系統,采用基于 MVC + Bootstrap + Ajax + Enterprise Library的技術路線,界面層采用Boostrap + Metronic組合的前端框架,資料訪問層支持Oracle、SQLServer、MySQL、PostgreSQL等資料庫。框架以MVC5,... ......

    uj5u.com 2020-09-09 22:15:18 more
  • WebAPI-HTTP編程模型

    帶著問題去思考,大家好!它是什么?它包含什么?它能干什么? 訊息 HTTP編程模型的核心就是訊息抽象,表示為:HttPRequestMessage,HttpResponseMessage.用于客戶端和服務端之間交換請求和回應訊息。 HttpMethod類包含了一組靜態屬性: private stat ......

    uj5u.com 2020-09-09 22:15:23 more
  • 部署WebApi隨筆

    一、跨域 NuGet參考Microsoft.AspNet.WebApi.Cors WebApiConfig.cs中配置: // Web API 配置和服務 config.EnableCors(new EnableCorsAttribute("*", "*", "*")); 二、清除默認回傳XML格式 ......

    uj5u.com 2020-09-09 22:15:48 more
最新发布
  • C#多執行緒學習(二) 如何操縱一個執行緒

    <a href="https://www.cnblogs.com/x-zhi/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2943582/20220801082530.png" alt="" /></...

    uj5u.com 2023-04-19 09:17:20 more
  • C#多執行緒學習(二) 如何操縱一個執行緒

    C#多執行緒學習(二) 如何操縱一個執行緒 執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念 下面我們就動手來創建一個執行緒,使用Thread類創建執行緒時,只需提供執行緒入口即可。(執行緒入口使程式知道該讓這個執行緒干什么事) 在C#中,執行緒入口是通過ThreadStart代理(delegate)來提供的 ......

    uj5u.com 2023-04-19 09:16:49 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    <a href="https://www.cnblogs.com/huangxincheng/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/214741/20200614104537.png" alt="" /&g...

    uj5u.com 2023-04-18 08:39:04 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    一:背景 1. 講故事 前段時間協助訓練營里的一位朋友分析了一個程式卡死的問題,回過頭來看這個案例比較經典,這篇稍微整理一下供后來者少踩坑吧。 二:WinDbg 分析 1. 為什么會卡死 因為是表單程式,理所當然就是看主執行緒此時正在做什么? 可以用 ~0s ; k 看一下便知。 0:000> k # ......

    uj5u.com 2023-04-18 08:33:10 more
  • SignalR, No Connection with that ID,IIS

    <a href="https://www.cnblogs.com/smartstar/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/u36196.jpg" alt="" /></a>...

    uj5u.com 2023-03-30 17:21:52 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:15:33 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:13:31 more
  • C#遍歷指定檔案夾中所有檔案的3種方法

    <a href="https://www.cnblogs.com/xbhp/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/957602/20230310105611.png" alt="" /></a&...

    uj5u.com 2023-03-27 14:46:55 more
  • C#/VB.NET:如何將PDF轉為PDF/A

    <a href="https://www.cnblogs.com/Carina-baby/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2859233/20220427162558.png" alt="" />...

    uj5u.com 2023-03-27 14:46:35 more
  • 武裝你的WEBAPI-OData聚合查詢

    <a href="https://www.cnblogs.com/podolski/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/616093/20140323000327.png" alt="" /><...

    uj5u.com 2023-03-27 14:46:16 more