主頁 > .NET開發 > .Net Core微服務入門全紀錄(六)——EventBus-事件總線

.Net Core微服務入門全紀錄(六)——EventBus-事件總線

2020-09-11 17:23:01 .NET開發

Tips:本篇已加入系列文章閱讀目錄,可點擊查看更多相關文章,

前言

上一篇【.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)】中已經完成了Ocelot + Consul的搭建,這一篇簡單說一下EventBus,

EventBus-事件總線

  • 首先,什么是事件總線呢?

貼一段參考:

事件總線是對觀察者(發布-訂閱)模式的一種實作,它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種解耦的目的,

如果沒有接觸過EventBus,可能不太好理解,其實EventBus在客戶端開發中應用非常廣泛(android,ios,web前端等),用于多個組件(或者界面)之間的相互通信,懂的人都懂,,,

  • 那么,我們為什么要用EventBus呢?

就拿當前的專案舉例,我們有一個訂單服務,一個產品服務,客戶端有一個下單功能,當用戶下單時,呼叫訂單服務的下單介面,那么下單介面需要呼叫產品服務的減庫存介面,這涉及到服務與服務之間的呼叫,那么服務之間又怎么呼叫呢?直接RESTAPI?或者效率更高的gRPC?可能這兩者各有各的使用場景,但是他們都存在一個服務之間的耦合問題,或者難以做到異步呼叫,

試想一下:假設我們下單時呼叫訂單服務,訂單服務需要呼叫產品服務,產品服務又要呼叫物流服務,物流服務再去呼叫xx服務 等等,,,如果每個服務處理時間需要2s,不使用異步的話,那這種體驗可想而知,

如果使用EventBus的話,那么訂單服務只需要向EventBus發一個“下單事件”就可以了,產品服務會訂閱“下單事件”,當產品服務收到下單事件時,自己去減庫存就好了,這樣就避免了兩個服務之間直接呼叫的耦合性,并且真正做到了異步呼叫,

既然涉及到多個服務之間的異步呼叫,那么就不得不提分布式事務,分布式事務并不是微服務獨有的問題,而是所有的分布式系統都會存在的問題,
關于分布式事務,可以查一下“CAP原則”和“BASE理論”了解更多,當今的分布式系統更多的會追求事務的最終一致性,

下面使用國人開發的優秀專案“CAP”,來演示一下EventBus的基本使用,之所以使用“CAP”是因為它既能解決分布式系統的最終一致性,同時又是一個EventBus,它具備EventBus的所有功能!
作者介紹:https://www.cnblogs.com/savorboard/p/cap.html

CAP使用

  • 環境準備

在Docker中準備一下需要的環境,首先是資料庫,資料庫我使用PostgreSQL,用別的也行,CAP支持:SqlServer,MySql,PostgreSql,MongoDB,
關于在Docker中運行PostgreSQL可以看我的另一篇博客:https://www.cnblogs.com/xhznl/p/13155054.html

然后是MQ,這里我使用RabbitMQ,Kafka也可以,
Docker運行RabbitMQ:

docker pull rabbitmq:management
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management

默認用戶:guest,密碼:guest

環境準備就完成了,Docker就是這么方便,,,

  • 代碼修改:

為了模擬以上業務,需要修改大量代碼,下面代碼如有遺漏的直接去github找,

NuGet安裝:

Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Npgsql.EntityFrameworkCore.PostgreSQL

CAP相關:

DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.PostgreSql

Order.API/Controllers/OrdersController.cs增加下單介面:

[Route("[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{
    private readonly ILogger<OrdersController> _logger;
    private readonly IConfiguration _configuration;
    private readonly ICapPublisher _capBus;
    private readonly OrderContext _context;

    public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context)
    {
        _logger = logger;
        _configuration = configuration;
        _capBus = capPublisher;
        _context = context;
    }

    [HttpGet]
    public IActionResult Get()
    {
        string result = $"【訂單服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +
            $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";
        return Ok(result);
    }

    /// <summary>
    /// 下單 發布下單事件
    /// </summary>
    /// <param name="order"></param>
    /// <returns></returns>
    [Route("Create")]
    [HttpPost]
    public async Task<IActionResult> CreateOrder(Models.Order order)
    {
        using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true))
        {
            //業務代碼
            order.CreateTime = DateTime.Now;
            _context.Orders.Add(order);

            var r = await _context.SaveChangesAsync() > 0;

            if (r)
            {
                //發布下單事件
                await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });
                return Ok();
            }
            return BadRequest();
        }

    }

}

Order.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下單事件訊息
/// </summary>
public class CreateOrderMessageDto
{
    /// <summary>
    /// 產品ID
    /// </summary>
    public int ProductID { get; set; }

    /// <summary>
    /// 購買數量
    /// </summary>
    public int Count { get; set; }
}

Order.API/Models/Order.cs訂單物體類:

public class Order
{
    [Key]
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public int ID { get; set; }

    /// <summary>
    /// 下單時間
    /// </summary>
    [Required]
    public DateTime CreateTime { get; set; }

    /// <summary>
    /// 產品ID
    /// </summary>
    [Required]
    public int ProductID { get; set; }

    /// <summary>
    /// 購買數量
    /// </summary>
    [Required]
    public int Count { get; set; }
}

Order.API/Models/OrderContext.cs資料庫Context:

public class OrderContext : DbContext
{
    public OrderContext(DbContextOptions<OrderContext> options)
       : base(options)
    {

    }

    public DbSet<Order> Orders { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {

    }
}

Order.API/appsettings.json增加資料庫連接字串:

"ConnectionStrings": {
  "OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}

Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));

    //CAP
    services.AddCap(x =>
    {
        x.UseEntityFramework<OrderContext>();

        x.UseRabbitMQ("host.docker.internal");
    });
}


以上是訂單服務的修改,

Product.API/Controllers/ProductsController.cs增加減庫存介面:

[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{
    private readonly ILogger<ProductsController> _logger;
    private readonly IConfiguration _configuration;
    private readonly ICapPublisher _capBus;
    private readonly ProductContext _context;

    public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context)
    {
        _logger = logger;
        _configuration = configuration;
        _capBus = capPublisher;
        _context = context;
    }

    [HttpGet]
    public IActionResult Get()
    {
        string result = $"【產品服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +
            $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";
        return Ok(result);
    }

    /// <summary>
    /// 減庫存 訂閱下單事件
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    [NonAction]
    [CapSubscribe("order.services.createorder")]
    public async Task ReduceStock(CreateOrderMessageDto message)
    {
        //業務代碼
        var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);
        product.Stock -= message.Count;

        await _context.SaveChangesAsync();
    }

}

Product.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下單事件訊息
/// </summary>
public class CreateOrderMessageDto
{
    /// <summary>
    /// 產品ID
    /// </summary>
    public int ProductID { get; set; }

    /// <summary>
    /// 購買數量
    /// </summary>
    public int Count { get; set; }
}

Product.API/Models/Product.cs產品物體類:

public class Product
{
    [Key]
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public int ID { get; set; }

    /// <summary>
    /// 產品名稱
    /// </summary>
    [Required]
    [Column(TypeName = "VARCHAR(16)")]
    public string Name { get; set; }

    /// <summary>
    /// 庫存
    /// </summary>
    [Required]
    public int Stock { get; set; }
}

Product.API/Models/ProductContext.cs資料庫Context:

public class ProductContext : DbContext
{
    public ProductContext(DbContextOptions<ProductContext> options)
       : base(options)
    {

    }

    public DbSet<Product> Products { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        
        //初始化種子資料
        modelBuilder.Entity<Product>().HasData(new Product
        {
            ID = 1,
            Name = "產品1",
            Stock = 100
        },
        new Product
        {
            ID = 2,
            Name = "產品2",
            Stock = 100
        });
    }
}

Product.API/appsettings.json增加資料庫連接字串:

"ConnectionStrings": {
  "ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}

Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));

    //CAP
    services.AddCap(x =>
    {
        x.UseEntityFramework<ProductContext>();

        x.UseRabbitMQ("host.docker.internal");
    });
}


以上是產品服務的修改,

訂單服務和產品服務的修改到此就完成了,看著修改很多,其實功能很簡單,就是各自增加了自己的資料庫表,然后訂單服務增加了下單介面,下單介面會發出“下單事件”,產品服務增加了減庫存介面,減庫存介面會訂閱“下單事件”,然后客戶端呼叫下單介面下單時,產品服務會減去相應的庫存,功能就這么簡單,

關于EF資料庫遷移之類的基本使用就不介紹了,使用Docker重新構建鏡像,運行訂單服務,產品服務:

docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"

docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"

最后 Ocelot.APIGateway/ocelot.json 增加一條路由配置:

好了,進行到這里,整個環境就有點復雜了,確保我們的PostgreSQL,RabbitMQ,Consul,Gateway,服務實體都正常運行,

服務實體運行成功后,資料庫應該是這樣的:




產品表種子資料:

cap.published表和cap.received表是由CAP自動生成的,它內部是使用本地訊息表+MQ來實作異步確保,

運行測驗

這次使用Postman作為客戶端呼叫下單介面(9070是之前的Ocelot網關埠):

訂單庫published表:

訂單庫order表:

產品庫received表:

產品庫product表:

再試一下:

OK,完成,雖然功能很簡單,但是我們實作了服務的解耦,異步呼叫,和最終一致性,

總結

注意,上面的例子純粹是為了說明EventBus的使用,實際中的下單流程絕對不會這么做的!希望大家不要較真,,,

可能有人會說如果下單成功,但是庫存不足導致減庫存失敗了怎么辦,是不是要回滾訂單表的資料?如果產生這種想法,說明還沒有真正理解最終一致性的思想,首先下單前肯定會檢查一下庫存數量,既然允許下單那么必然是庫存充足的,這里的事務是指:訂單保存到資料庫,和下單事件保存到cap.published表(保存到cap.published表理論上就能夠發送到MQ)這兩件事情,要么一同成功,要么一同失敗,如果這個事務成功,那么就可以認為這個業務流程是成功的,至于產品服務的減庫存是否成功那就是產品服務的事情了(理論上也應該是成功的,因為訊息已經確保發到了MQ,產品服務必然會收到訊息),CAP也提供了失敗重試,和失敗回呼機制,

如果非要資料回滾也是能實作的,CAP的ICapPublisher.Publish方法提供一個callbackName引數,當減庫存時,可以觸發這個回呼,其本質也是通過發布訂閱完成,這是不推薦的做法,就不詳細說了,有興趣自己研究一下,
另外,CAP無法保證訊息不重復,實際使用中需要自己考慮一下訊息的重復過濾和冪等性,

這一篇內容有點多,不知道有沒有表達清楚,有問題歡迎評論交流,如有不對之處還望大家指出,

下一篇計劃寫一下授權認證相關的內容,

代碼放在:https://github.com/xiajingren/NetCoreMicroserviceDemo

未完待續...

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/6426.html

標籤:.NET Core

上一篇:Task.Result跟 Task.GetAwaiter.GetResult()相同嗎?怎么選?

下一篇:Magicodes.IE在.NET Core中通過請求頭匯出多種格式檔案

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • WebAPI簡介

    Web體系結構: 有三個核心:資源(resource),URL(統一資源識別符號)和表示 他們的關系是這樣的:一個資源由一個URL進行標識,HTTP客戶端使用URL定位資源,表示是從資源回傳資料,媒體型別是資源回傳的資料格式。 接下來我們說下HTTP. HTTP協議的系統是一種無狀態的方式,使用請求/ ......

    uj5u.com 2020-09-09 22:07:47 more
  • asp.net core 3.1 入口:Program.cs中的Main函式

    本文分析Program.cs 中Main()函式中代碼的運行順序分析asp.net core程式的啟動,重點不是剖析原始碼,而是理清程式開始時執行的順序。到呼叫了哪些實體,哪些法方。asp.net core 3.1 的程式入口在專案Program.cs檔案里,如下。ususing System; us ......

    uj5u.com 2020-09-09 22:07:49 more
  • asp.net網站作為websocket服務端的應用該如何寫

    最近被websocket的一個問題困擾了很久,有一個需求是在web網站中搭建websocket服務。客戶端通過網頁與服務器建立連接,然后服務器根據ip給客戶端網頁發送資訊。 其實,這個需求并不難,只是剛開始對websocket的內容不太了解。上網搜索了一下,有通過asp.net core 實作的、有 ......

    uj5u.com 2020-09-09 22:08:02 more
  • ASP.NET 開源匯入匯出庫Magicodes.IE Docker中使用

    Magicodes.IE在Docker中使用 更新歷史 2019.02.13 【Nuget】版本更新到2.0.2 【匯入】修復單列匯入的Bug,單元測驗“OneColumnImporter_Test”。問題見(https://github.com/dotnetcore/Magicodes.IE/is ......

    uj5u.com 2020-09-09 22:08:05 more
  • 在webform中使用ajax

    如果你用過Asp.net webform, 說明你也算是.NET 開發的老兵了。WEBform應該是2011 2013左右,當時還用visual studio 2005、 visual studio 2008。后來基本都用的是MVC。 如果是新開發的專案,估計沒人會用webform技術。但是有些舊版 ......

    uj5u.com 2020-09-09 22:08:50 more
  • iis添加asp.net網站,訪問提示:由于擴展配置問題而無法提供您請求的

    今天在iis服務器配置asp.net網站,遇到一個問題,記錄一下: 問題:由于擴展配置問題而無法提供您請求的頁面。如果該頁面是腳本,請添加處理程式。如果應下載檔案,請添加 MIME 映射。 WindowServer2012服務器,添加角色安裝完.netframework和iis之后,運行aspx頁面 ......

    uj5u.com 2020-09-09 22:10:00 more
  • WebAPI-處理架構

    帶著問題去思考,大家好! 問題1:HTTP請求和回傳相應的HTTP回應資訊之間發生了什么? 1:首先是最底層,托管層,位于WebAPI和底層HTTP堆疊之間 2:其次是 訊息處理程式管道層,這里比如日志和快取。OWIN的參考是將訊息處理程式管道的一些功能下移到堆疊下端的OWIN中間件了。 3:控制器處理 ......

    uj5u.com 2020-09-09 22:11:13 more
  • 微信門戶開發框架-使用指導說明書

    微信門戶應用管理系統,采用基于 MVC + Bootstrap + Ajax + Enterprise Library的技術路線,界面層采用Boostrap + Metronic組合的前端框架,資料訪問層支持Oracle、SQLServer、MySQL、PostgreSQL等資料庫。框架以MVC5,... ......

    uj5u.com 2020-09-09 22:15:18 more
  • WebAPI-HTTP編程模型

    帶著問題去思考,大家好!它是什么?它包含什么?它能干什么? 訊息 HTTP編程模型的核心就是訊息抽象,表示為:HttPRequestMessage,HttpResponseMessage.用于客戶端和服務端之間交換請求和回應訊息。 HttpMethod類包含了一組靜態屬性: private stat ......

    uj5u.com 2020-09-09 22:15:23 more
  • 部署WebApi隨筆

    一、跨域 NuGet參考Microsoft.AspNet.WebApi.Cors WebApiConfig.cs中配置: // Web API 配置和服務 config.EnableCors(new EnableCorsAttribute("*", "*", "*")); 二、清除默認回傳XML格式 ......

    uj5u.com 2020-09-09 22:15:48 more
最新发布
  • C#多執行緒學習(二) 如何操縱一個執行緒

    <a href="https://www.cnblogs.com/x-zhi/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2943582/20220801082530.png" alt="" /></...

    uj5u.com 2023-04-19 09:17:20 more
  • C#多執行緒學習(二) 如何操縱一個執行緒

    C#多執行緒學習(二) 如何操縱一個執行緒 執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念 下面我們就動手來創建一個執行緒,使用Thread類創建執行緒時,只需提供執行緒入口即可。(執行緒入口使程式知道該讓這個執行緒干什么事) 在C#中,執行緒入口是通過ThreadStart代理(delegate)來提供的 ......

    uj5u.com 2023-04-19 09:16:49 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    <a href="https://www.cnblogs.com/huangxincheng/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/214741/20200614104537.png" alt="" /&g...

    uj5u.com 2023-04-18 08:39:04 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    一:背景 1. 講故事 前段時間協助訓練營里的一位朋友分析了一個程式卡死的問題,回過頭來看這個案例比較經典,這篇稍微整理一下供后來者少踩坑吧。 二:WinDbg 分析 1. 為什么會卡死 因為是表單程式,理所當然就是看主執行緒此時正在做什么? 可以用 ~0s ; k 看一下便知。 0:000> k # ......

    uj5u.com 2023-04-18 08:33:10 more
  • SignalR, No Connection with that ID,IIS

    <a href="https://www.cnblogs.com/smartstar/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/u36196.jpg" alt="" /></a>...

    uj5u.com 2023-03-30 17:21:52 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:15:33 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:13:31 more
  • C#遍歷指定檔案夾中所有檔案的3種方法

    <a href="https://www.cnblogs.com/xbhp/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/957602/20230310105611.png" alt="" /></a&...

    uj5u.com 2023-03-27 14:46:55 more
  • C#/VB.NET:如何將PDF轉為PDF/A

    <a href="https://www.cnblogs.com/Carina-baby/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2859233/20220427162558.png" alt="" />...

    uj5u.com 2023-03-27 14:46:35 more
  • 武裝你的WEBAPI-OData聚合查詢

    <a href="https://www.cnblogs.com/podolski/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/616093/20140323000327.png" alt="" /><...

    uj5u.com 2023-03-27 14:46:16 more