diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_475_durable_outbox_sending_out_of_order.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_475_durable_outbox_sending_out_of_order.cs new file mode 100644 index 000000000..155594eae --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_475_durable_outbox_sending_out_of_order.cs @@ -0,0 +1,76 @@ +using System.Diagnostics; +using IntegrationTests; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Oakton.Resources; +using TestingSupport; +using Wolverine.Marten; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests.Bugs; + +public class Bug_475_durable_outbox_sending_out_of_order +{ + [Fact] + public async Task try_messages() + { + var queueName = RabbitTesting.NextQueueName(); + + var tracker = new OrderTracker(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddSingleton(tracker); + + opts.Services.AddMarten(Servers.PostgresConnectionString) + .IntegrateWithWolverine(); + + opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup(); + + opts.PublishAllMessages().ToRabbitQueue(queueName).SendInline(); + opts.ListenToRabbitQueue(queueName).Sequential(); + + opts.Policies.UseDurableInboxOnAllListeners(); + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + }).StartAsync(); + + await host.ResetResourceState(); + + Func publishing = async bus => + { + await bus.PublishAsync(new OrderedMessage(1)); + await bus.PublishAsync(new OrderedMessage(2)); + await bus.PublishAsync(new OrderedMessage(3)); + await bus.PublishAsync(new OrderedMessage(4)); + await bus.PublishAsync(new OrderedMessage(5)); + await bus.PublishAsync(new OrderedMessage(6)); + }; + + await host.TrackActivity().IncludeExternalTransports().ExecuteAndWaitAsync(publishing); + + tracker.Encountered.ShouldHaveTheSameElementsAs(1, 2,3 ,4,5,6); + } +} + +public static class OrderedMessageHandler +{ + public static void Handle(OrderedMessage message, OrderTracker tracker) + { + tracker.Encountered.Add(message.Order); + } +} + +public record OrderedMessage(int Order); + +public class OrderTracker +{ + public OrderTracker() + { + Debug.WriteLine("foo"); + } + + public List Encountered { get; } = new(); +} \ No newline at end of file diff --git a/src/Wolverine/Tracking/TrackedSession.cs b/src/Wolverine/Tracking/TrackedSession.cs index c0847d93b..e0ae715b3 100644 --- a/src/Wolverine/Tracking/TrackedSession.cs +++ b/src/Wolverine/Tracking/TrackedSession.cs @@ -22,6 +22,8 @@ internal class TrackedSession : ITrackedSession private readonly TaskCompletionSource _source; + private bool _executionComplete; + private readonly Stopwatch _stopwatch = new(); private TrackingStatus _status = TrackingStatus.Active; @@ -32,6 +34,8 @@ public TrackedSession(IHost host) _source = new TaskCompletionSource(); _primaryLogger = host.GetRuntime(); } + + public TimeSpan Timeout { get; set; } = 5.Seconds(); @@ -224,6 +228,7 @@ public async Task ExecuteAndTrackAsync() await using var scope = _primaryHost.Services.As().GetNestedContainer(); var context = scope.GetInstance(); await Execution(context).WaitAsync(Timeout); + _executionComplete = true; } catch (TimeoutException) { @@ -239,10 +244,17 @@ public async Task ExecuteAndTrackAsync() cleanUp(); throw; } - - startTimeoutTracking(); - - await _source.Task; + + // This is for race conditions if the activity manages to finish really fast + if (IsCompleted()) + { + Status = TrackingStatus.Completed; + } + else + { + startTimeoutTracking(); + await _source.Task; + } cleanUp(); @@ -334,6 +346,8 @@ public void Record(MessageEventType eventType, Envelope envelope, string? servic public bool IsCompleted() { + if (!_executionComplete) return false; + if (_conditions.Any(x => x.IsCompleted())) { return true;