什么是發布-訂閱
發布訂閱是一種眾所周知并被廣泛使用的訊息傳送模式,常用在微服務架構的服務間通信,高并發削峰等情況,但是不同的訊息中間件之間存在細微的差異,專案使用不同的產品需要實作不同的實作類,雖然是明智的決策,但必須撰寫和維護抽象及其基礎實作, 此方法需要復雜、重復且容易出錯的自定義代碼,
Dapr為了解決這種問題,提供開箱即用的訊息傳送抽象和實作,封裝在 Dapr 構建基塊中,業務系統只需呼叫跟據Dapr的要求實作訂閱發布即可,
作業原理

Dapr 發布&訂閱構建基塊提供了一個與平臺無關的 API 框架來發送和接收訊息,
服務將訊息發布到指定主題, 業務服務訂閱主題以使用訊息,
服務在 Dapr sidecar 上呼叫 pub/sub API, 然后,sidecar 呼叫預定義 Dapr pub/sub 組件,
任何編程平臺都可以使用 Dapr 本機 API 通過 HTTP 或 gRPC 呼叫構建基塊, 若要發布訊息,請進行以下 API 呼叫:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
上述呼叫中有幾個特定于 Dapr 的 URL 段:
<dapr-port>提供 Dapr sidecar 偵聽的埠號,<pub-sub-name>提供所選 Dapr pub/sub 組件的名稱,<topic>提供訊息發布到的主題的名稱,
設定發布訂閱組件
Dapr 為 Pub/Sub 提供很多支持的組件,例如 Redis 和 Kafka 等,支持組件詳見 鏈接
在win10上的自承載的Dapr中,默認在 %UserProfile%\.dapr\components\pubsub.yaml 中使用redis作為了pub/sub組件,dapr run一個app時,使用默認組件作為pub/sub組件
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: ""
訂閱主題
Dapr 允許兩種方法訂閱主題:
- 宣告式,其中定義在外部檔案中,
- 編程方式,訂閱在用戶代碼中定義
1.宣告式訂閱
在默認組件目錄 %UserProfile%\.dapr\components\pubsub.yaml 中新建subscription.yaml檔案,并寫入以下內容
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: myevent-subscription spec: topic: test_topic route: /TestPubSub pubsubname: pubsub scopes: - frontend
上面的示例顯示了 test_topic主題的事件訂閱,使用組件 pubsub,
route告訴 Dapr 將所有主題訊息發送到應用程式中的/TestPubSub端點,scopes為 FrontEnd啟用訂閱
現在需要在FrontEnd專案中定義介面TestSub,在FrontEnd中新建TestPubSubController
using Dapr.Client; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using System.IO; using System.Text; using System.Threading.Tasks; namespace FrontEnd.Controllers { [Route("[controller]")] [ApiController] public class TestPubSubController : ControllerBase { private readonly ILogger<TestPubSubController> _logger; private readonly DaprClient _daprClient; public TestPubSubController(ILogger<TestPubSubController> logger, DaprClient daprClient) { _logger = logger; _daprClient = daprClient; } [HttpPost] public ActionResult Post() { Stream stream = Request.Body; byte[] buffer = new byte[Request.ContentLength.Value]; stream.Position = 0L; stream.ReadAsync(buffer, 0, buffer.Length); string content = Encoding.UTF8.GetString(buffer); return Ok(content); } [HttpGet("pub")] public async Task<ActionResult> PubAsync() { var data = https://www.cnblogs.com/chenyishi/p/new WeatherForecast(); await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data); return Ok(); } } }
需要在Startup的Configure中開啟重復讀取Body才能讀取到資料
app.Use((context, next) => { context.Request.EnableBuffering(); return next(); });
啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll
呼叫 pub API發布訊息

查看訂閱情況,訂閱訊息消費成功

2.編程式訂閱
為了防止宣告式訂閱的影響,需要先把目錄<%UserProfile%\.dapr\components\pubsub.yaml>中subscription.yaml檔案洗掉
TestPubSubController新增Api Sub
[Topic("pubsub", "test_topic")] [HttpPost("sub")] public async Task<ActionResult> Sub() { Stream stream = Request.Body; byte[] buffer = new byte[Request.ContentLength.Value]; stream.Position = 0L; stream.ReadAsync(buffer, 0, buffer.Length); string content = Encoding.UTF8.GetString(buffer); _logger.LogInformation("testsub" + content); return Ok(content); }
在Startup的Configure方法中新增中間件
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { // ... app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); // ... }); }
啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll
呼叫API發布訊息

查看訂閱情況,訂閱訊息消費成功

通過DapreCLI同樣可以發布訊息
dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'
查看訂閱情況,訂閱訊息消費成功
![]()
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/302644.html
標籤:.NET Core
