我正在嘗試使用 MassTransit 狀態機制作一個完全有效的示例,以編排完全解耦的服務并遇到例外:The Step1FinishedEvent Event event is not handled during the ProcessingStartedState state for the ArcStateMachine state machine error. 在除錯會話期間,似乎訊息(由消費者使用)觸發狀態機處理的事件為時已晚。
我的定義:
// StateMachine processing instance definition
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public Guid ActivationId { get; set; }
}
// Contracts correlations definitions for message exchange of the state machine
public static class MessageContracts
{
static bool _initialized;
public static void Initialize()
{
if (_initialized)
return;
GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<StartStep1Message>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<Step1FinishedMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<StartStep2Message>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<Step2FinishedMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);
_initialized = true;
}
}
// Step1CounsumerDefinition to avoid fault messages be routed to _error RMQ queue for Step1Consumer
public class Step1ConsumerDefinition : ConsumerDefinition<Step1Consumer>
{
public Step1ConsumerDefinition()
{
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<Step1Consumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}
我的 2 個實際處理消費者:
public class Step1Consumer : IConsumer<StartStep1Message>
{
readonly ILogger<Step1Consumer> _Logger;
private readonly int _DelaySeconds = 5;
public Step1Consumer(ILogger<Step1Consumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartStep1Message> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Step 1 started: {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Step 2 finished: {activationId}");
await context.Publish<Step1FinishedMessage>(new { ActivationId = activationId });
}
}
public class Step2Consumer : IConsumer<StartStep2Message>
{
readonly ILogger<Step2Consumer> _Logger;
private readonly int _DelaySeconds = 1;
public Step2Consumer(ILogger<Step2Consumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartStep2Message> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Step 2 started {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Step 2 finished {activationId}");
await context.Publish<Step2FinishedMessage>(new { ActivationId = activationId });
}
}
我還有 2 個輔助消費者來編排不同服務之間的訊息轉換,以解耦它們并檢測所有處理的完成:
public class TransitionConsumer :
IConsumer<StartProcessingMessage>,
IConsumer<Step1FinishedMessage>,
IConsumer<Step2FinishedMessage>
{
readonly ILogger<TransitionConsumer> _Logger;
private readonly int _DelaySeconds = 5;
public TransitionConsumer(
ILogger<TransitionConsumer> logger
)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartProcessingMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Transition from Started to Step 1: {activationId}");
await context.Publish<StartStep1Message>(new { ActivationId = activationId });
}
public async Task Consume(ConsumeContext<Step1FinishedMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Transition from Step 1 to Step 2: {activationId}");
await context.Publish<StartStep2Message>(new { ActivationId = activationId });
}
public async Task Consume(ConsumeContext<Step2FinishedMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Transition from Step 2 to Completion: {activationId}");
await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
}
}
public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
readonly ILogger<ProcessingFinishedConsumer> _Logger;
public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
{
_Logger.LogInformation($"Finish {context.Message.ActivationId}");
await Task.CompletedTask;
}
}
以及Fault<>處理可能來自Step1Consumer和的所有故障的消費者Step2Consumer:
public class FaultConsumer :
IConsumer<Fault<StartStep1Message>>,
IConsumer<Fault<StartStep2Message>>
{
readonly ILogger<FaultConsumer> _Logger;
public FaultConsumer(ILogger<FaultConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<Fault<StartStep1Message>> context)
{
await LogError("Step 1", context.Message.Message.ActivationId, context.Message.Exceptions);
}
public async Task Consume(ConsumeContext<Fault<StartStep2Message>> context)
{
await LogError("Step 2", context.Message.Message.ActivationId, context.Message.Exceptions);
}
private async Task LogError(string step, Guid activationId, ExceptionInfo[] exceptions)
{
var errorMessages = string.Join(", ", exceptions.Select(e => e.Message));
_Logger.LogInformation($"{step} failed for {activationId}, cause: {errorMessages}");
}
}
這是狀態機定義:
public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
static ArcStateMachine()
{
MessageContracts.Initialize();
}
public ArcStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(StartProcessingEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(ProcessingStartedState));
During(ProcessingStartedState,
When(Step1StartedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step1StartedState));
During(Step1StartedState,
When(Step1FinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step1FinishedState));
During(Step1FinishedState,
When(Step2StartedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step2StartedState));
During(Step2StartedState,
When(Step2FinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step2FinishedState));
During(Step2FinishedState,
When(ProcessingFinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.Finalize());
}
public State ProcessingStartedState { get; }
public State Step1StartedState { get; }
public State Step1FinishedState { get; }
public State Step2StartedState { get; }
public State Step2FinishedState { get; }
public Event<StartProcessingMessage> StartProcessingEvent { get; }
public Event<StartStep1Message> Step1StartedEvent { get; }
public Event<Step1FinishedMessage> Step1FinishedEvent { get; }
public Event<StartStep2Message> Step2StartedEvent { get; }
public Event<Step2FinishedMessage> Step2FinishedEvent { get; }
public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}
以及 MassTransit 的設定:
var rabbitHost = Configuration["RABBIT_MQ_HOST"];
if (rabbitHost.IsNotEmpty())
{
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
.Endpoint(e => e.Name = BusConstants.SagaQueue)
.MongoDbRepository(connectionString, r =>
{
r.DatabaseName = "mongo";
r.CollectionName = "WorkflowState";
});
cnf.AddConsumer(typeof(TransitionConsumer));
cnf.AddConsumer(typeof(Step1Consumer), typeof(Step1ConsumerDefinition));
cnf.AddConsumer(typeof(Step2Consumer));
cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
cnf.AddConsumer(typeof(FaultConsumer));
//cnf.AddMessageScheduler(schedulerEndpoint);
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
//cfg.UseMessageScheduler(schedulerEndpoint);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
});
}
So actually the message flow should be as follows:
- A controller publishes the
StartProcessingMessagewhich is consumed byTransitionConsumerwhich in turn publishes theStartStep1Message. Step1Consumergets the message, does its job and publishes theStep1FinishedMessage.TransitionConsumergets the message and publishes theStartStep2Message.Step2Consumergets the message, does its job and publishes theStep2FinishedMessage.TransitionConsumergets the message and publishes theProcessingFinishedMessagewhich is consumed by theProcessingFinishedConsumer.
In this scenario both Step1Consumer and Step2Consumer do not know about existence of the other and the only responsibility for transition between steps is orchestrated by the TransitionConsumer. All this is done while the state machine tracks each and every message and passes through all respectful states.
問題一開始就出現了,因為在開始處理之前TransitionConsumer發布我認為之前會被解雇的。所有這些都會導致狀態機陷入困境。結果是機器的發布不在其中,因為應該觸發.StartStep1MessageArcStateMachineStartProcessingEventProcessingStartedStateStep1FinishedEventStep1StartedStateStartStep1MessageStep1StartedEvent
我該如何解決這個問題?
uj5u.com熱心網友回復:
您應該為您的狀態機創建一個Saga 定義,以便您可以配置訊息重試和記憶體發件箱。
在該定義中,將重試/發件箱直接添加到接收端點,如下所示。
endpointConfigurator.UseMessageRetry(r => r.Interval(3,1000));
endpointConfigurator.UseInMemoryOutbox();
這應該處理 saga 中的任何并發問題(消費者可能正在接收訊息,產生事件,并且在 saga 完成處理觸發消費者命令的事件之前,該事件正在被分派到 saga。是的,就這么快。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/343107.html
