這是我第一次使用 Kafka,我計劃在 .net 中使用 kafka
我想知道我是否可以在生成事件時將 JSON 作為訊息發送
我正在關注教程:https : //developer.confluent.io/get-started/dotnet/#build-producer
此外,有沒有辦法將該值映射到模型,以便值/json 結構始終系結到該模型
例如:如果我希望我的 json 值是
{
"customerName":"anything",
"eventType":"one-of-three-enums",
"columnsChanged": "string value or something"
}
我能找到的大多數例子都是這樣的:
using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;
class Producer {
static void Main(string[] args)
{
if (args.Length != 1) {
Console.WriteLine("Please provide the configuration file path as a command line argument");
}
IConfiguration configuration = new ConfigurationBuilder()
.AddIniFile(args[0])
.Build();
const string topic = "purchases";
string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };
using (var producer = new ProducerBuilder<string, string>(
configuration.AsEnumerable()).Build())
{
var numProduced = 0;
const int numMessages = 10;
for (int i = 0; i < numMessages; i)
{
Random rnd = new Random();
var user = users[rnd.Next(users.Length)];
var item = items[rnd.Next(items.Length)];
producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError) {
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else {
Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
numProduced = 1;
}
});
}
producer.Flush(TimeSpan.FromSeconds(10));
Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
}
}
}
我希望該專案是 json 結構中的一個類。
uj5u.com熱心網友回復:
想知道我是否可以在生成事件時將 JSON 作為訊息發送
是的。Kafka 存盤位元組并使用序列化程式轉換位元組。在構建 Producer 時,您可以選擇呼叫SetValueSerializer.
一些內置的序列化器可以在 - https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs找到
您需要自己撰寫來處理任何 JSON 模型型別。
將 Utf8Serializer 用于字串時,您需要從模型類中預序列化物件,然后將其作為值發送。在您的示例中,您將替換var item為一些序列化物件。
如何在 .NET 中將 C# 物件轉換為 JSON 字串?
使用模型類時,您的資料通常是強型別的,直到您開始手動撰寫 JSON 或使用 Dictionary 型別。如果您需要外部訊息驗證,Confluent Schema Registry 就是一個支持 JSONSchema 的示例,并且JsonSerializerfromconfluent-dotnet-kafka專案支持這一點。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/364411.html
下一篇:使用多個實體時的EF核心并發
