diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/process_inline_compliance.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/process_inline_compliance.cs new file mode 100644 index 000000000..3232daa53 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/process_inline_compliance.cs @@ -0,0 +1,64 @@ +using IntegrationTests; +using JasperFx.Core; +using Shouldly; +using Wolverine.ComplianceTests.Compliance; +using Wolverine.Configuration; +using Wolverine.Postgresql; +using Wolverine.RabbitMQ.Internal; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + + +public class ProcessInlineFixture : TransportComplianceFixture, IAsyncLifetime +{ + public ProcessInlineFixture() : base($"rabbitmq://queue/inline1".ToUri()) + { + } + + public async Task InitializeAsync() + { + OutboundAddress = $"rabbitmq://queue/inline1".ToUri(); + + await SenderIs(opts => + { + var listener = $"listener{RabbitTesting.Number}"; + + opts.Durability.Mode = DurabilityMode.Solo; + + opts.UseRabbitMq() + .AutoProvision() + .AutoPurgeOnStartup() + .DisableDeadLetterQueueing() + .DeclareQueue("quorum1").ConfigureListeners(l => l.ProcessInline()); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "inline_sender"); + + opts.ListenToRabbitQueue("inline2").TelemetryEnabled(false); + }); + + await ReceiverIs(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.UseRabbitMq() + .DisableDeadLetterQueueing() + .ConfigureListeners(l => l.ProcessInline()); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "inline_receiver"); + + opts.ListenToRabbitQueue("inline1").TelemetryEnabled(false); + }); + } + + public async Task DisposeAsync() + { + await DisposeAsync(); + } +} + +public class process_inline_compliance : TransportCompliance +{ + +} diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 82df33cfe..3ac2f6013 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -153,7 +153,7 @@ public async Task MoveToDeadLetterQueueAsync(Exception exception) throw new InvalidOperationException("No Envelope is active for this context"); } - if (_channel is ISupportDeadLetterQueue c && c.NativeDeadLetterQueueEnabled) + if (_channel is ISupportDeadLetterQueue { NativeDeadLetterQueueEnabled: true } c) { if (Envelope.Batch != null) { @@ -182,6 +182,9 @@ public async Task MoveToDeadLetterQueueAsync(Exception exception) // If persistable, persist await Storage.Inbox.MoveToDeadLetterStorageAsync(Envelope, exception); } + + // If this is Inline + await _channel.CompleteAsync(Envelope); } public Task RetryExecutionNowAsync()