Skip to content

Commit

Permalink
Attempting to recreate the out of order publishing, but did "fix" the…
Browse files Browse the repository at this point in the history
… execution tracking. Closes GH-468
  • Loading branch information
jeremydmiller committed Jul 14, 2023
1 parent 388bcd7 commit 76f40af
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Diagnostics;
using IntegrationTests;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Oakton.Resources;
using TestingSupport;
using Wolverine.Marten;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.RabbitMQ.Tests.Bugs;

public class Bug_475_durable_outbox_sending_out_of_order
{
[Fact]
public async Task try_messages()
{
var queueName = RabbitTesting.NextQueueName();

var tracker = new OrderTracker();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddSingleton(tracker);

opts.Services.AddMarten(Servers.PostgresConnectionString)
.IntegrateWithWolverine();

opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup();

opts.PublishAllMessages().ToRabbitQueue(queueName).SendInline();
opts.ListenToRabbitQueue(queueName).Sequential();

opts.Policies.UseDurableInboxOnAllListeners();
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
}).StartAsync();

await host.ResetResourceState();

Func<IMessageBus, Task> publishing = async bus =>
{
await bus.PublishAsync(new OrderedMessage(1));
await bus.PublishAsync(new OrderedMessage(2));
await bus.PublishAsync(new OrderedMessage(3));
await bus.PublishAsync(new OrderedMessage(4));
await bus.PublishAsync(new OrderedMessage(5));
await bus.PublishAsync(new OrderedMessage(6));
};

await host.TrackActivity().IncludeExternalTransports().ExecuteAndWaitAsync(publishing);

tracker.Encountered.ShouldHaveTheSameElementsAs(1, 2,3 ,4,5,6);
}
}

public static class OrderedMessageHandler
{
public static void Handle(OrderedMessage message, OrderTracker tracker)
{
tracker.Encountered.Add(message.Order);
}
}

public record OrderedMessage(int Order);

public class OrderTracker
{
public OrderTracker()
{
Debug.WriteLine("foo");
}

public List<int> Encountered { get; } = new();
}
22 changes: 18 additions & 4 deletions src/Wolverine/Tracking/TrackedSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ internal class TrackedSession : ITrackedSession

private readonly TaskCompletionSource<TrackingStatus> _source;

private bool _executionComplete;

private readonly Stopwatch _stopwatch = new();

private TrackingStatus _status = TrackingStatus.Active;
Expand All @@ -32,6 +34,8 @@ public TrackedSession(IHost host)
_source = new TaskCompletionSource<TrackingStatus>();
_primaryLogger = host.GetRuntime();
}



public TimeSpan Timeout { get; set; } = 5.Seconds();

Expand Down Expand Up @@ -224,6 +228,7 @@ public async Task ExecuteAndTrackAsync()
await using var scope = _primaryHost.Services.As<IContainer>().GetNestedContainer();
var context = scope.GetInstance<IMessageContext>();
await Execution(context).WaitAsync(Timeout);
_executionComplete = true;
}
catch (TimeoutException)
{
Expand All @@ -239,10 +244,17 @@ public async Task ExecuteAndTrackAsync()
cleanUp();
throw;
}

startTimeoutTracking();

await _source.Task;

// This is for race conditions if the activity manages to finish really fast
if (IsCompleted())
{
Status = TrackingStatus.Completed;
}
else
{
startTimeoutTracking();
await _source.Task;
}

cleanUp();

Expand Down Expand Up @@ -334,6 +346,8 @@ public void Record(MessageEventType eventType, Envelope envelope, string? servic

public bool IsCompleted()
{
if (!_executionComplete) return false;

if (_conditions.Any(x => x.IsCompleted()))
{
return true;
Expand Down

0 comments on commit 76f40af

Please sign in to comment.