我正試圖使用RabbitMQ佇列連接兩個服務。第一個服務將值推送到佇列中,第二個服務檢索它并進行處理。一切都很好,但是當第二個服務試圖處理作業時,它拋出了例外。佇列專案停留在JobAttempt佇列中,沒有任何資訊,消費者服務重新嘗試處理作業,但每次都拋出相同的例外。
Exception
fail: MassTransit.ReceiveTransport[0]
S-FAULT rabbitmq: /localhost/JobAttempt f0cb0000-1616-902e-edb0-08d97cd26cf9 MassTransit. 合同.JobService.JobStatusCheckRequested
失敗。MassTransit.ReceiveTransport[0]
T-FAULT rabbitmq: /localhost/JobAttempt f0cb0000-1616-902e8129-08d97cca994f
System.Threading.Tasks.TaskCanceledException。一個任務被取消了。
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContext`2.Delete(SagaConsumeContext`1 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>> 。 Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)。
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Partitioning.Partition.Send[T](T context, IPipe`1 next)
at GreenPipes.Filters.TeeFilter`1.<>c__DisplayClass5_0.<>G__SendAsync|1>d.MoveNext()
--- 前面的堆疊跟蹤結束 ---
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.DynamicFilter`1.<>c__DisplayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()。
--- 前面的堆疊跟蹤結束 ---
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
在GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
在GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
在MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
在MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass24_0.<<HandleBasicDeliver>b__0>d.MoveNext()
生產者啟動:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter()。
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService()。
消費者啟動:
services.AddMassTransit(x => /span>
{
x.AddDelayedMessageScheduler()。
x.AddConsumer<LoanRequestJobConsumer>(cfg=>)
{
cfg。 Options<JobOptions< LoanRequestBroker>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(5) )。)
options.SetConcurrentJobLimit(10)。
});
});
x.SetKebabCaseEndpointNameFormatter()。
x.UsingRabbitMq((context, cfg) =>/span>
{
cfg.UseDelayedMessageScheduler()。
cfg.ServiceInstance(instance =>)
{
instance.ConfigureJobServiceEndpoints(js =>)
{
js.SagaPartitionCount = 1;
js.FinalizeCompleted = true;
});
cfg.ReceiveEndpoint("loan-request-processing", e =>
{
e.ConfigureConsumer<LoanRequestJobConsumer>(context)。
});
instance.ConfigureEndpoints(context)。
});
});
});
services.AddMassTransitHostedService()。
Job consumer
public class LoanRequestJobConsumer : IJobConsumer<LoanRequestBroker>
{
private readonly ILogger<LoanRequestJobConsumer> _logger;
private readonly ILoanProcessingService _processingService。
public LoanRequestJobConsumer()
ILogger<LoanRequestJobConsumer> logger,
ILoanProcessingService processingService)。
{
_logger = logger;
_processingService = processingService。
}
public async Task Run(JobContext< LoanRequestBroker> context)
{
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: 開始處理貸款請求 id = {context.Job.Id}")。
var processingInfo = new LoanProcessingInfo
{
狀態 = TaskStatus.InProgress,
LoanRequest = context.Job.Adapt<LoanRequest>()
};
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo)。
processingInfo = await _processingService.ProcessAsync(processingInfo);
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo)。
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: 結束處理貸款請求id = {context.Job.Id}"
$"
結果。{JsonConvert.SerializeObject(processingInfo)}")。)
}
我如何將專案推送到佇列中
var endpoint = await _sendEndpointProvider。 GetSendEndpoint(_brokerEndpoints.LoanProcessingQueue) 。
await endpoint.Send(loanRequest.Adapt<LoanRequestBroker>())。
uj5u.com熱心網友回復:
如果我不得不猜測,在沒有任何其他錯誤日志細節的情況下,我會認為延遲交換插件沒有在 RabbitMQ 上安裝/啟用。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/332276.html
標籤:
