diff --git a/Directory.Build.props b/Directory.Build.props index f98c95e79..87d60370b 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -11,7 +11,7 @@ 1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618 true enable - 4.12.1 + 4.12.2 $(PackageProjectUrl) true true diff --git a/src/Persistence/PersistenceTests/Bugs/schedule_execution_outside_of_message_handler.cs b/src/Persistence/PersistenceTests/Bugs/schedule_execution_outside_of_message_handler.cs new file mode 100644 index 000000000..78ce5bb4a --- /dev/null +++ b/src/Persistence/PersistenceTests/Bugs/schedule_execution_outside_of_message_handler.cs @@ -0,0 +1,37 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.Postgresql; +using Xunit; + +namespace PersistenceTests.Bugs; + +public class schedule_execution_outside_of_message_handler +{ + [Fact] + public async Task try_it_out() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "wolverine"); + opts.Policies.UseDurableLocalQueues(); + }).StartAsync(); + + + var bus = host.MessageBus(); + await bus.ScheduleAsync(new MyGuy("Hey"), 10.Minutes()); + + + await Task.Delay(1.Minutes()); + } +} + +public record MyGuy(string Name); + +public static class MyGuyHandler +{ + public static void Handle(MyGuy guy) => Debug.WriteLine("Got my guy " + guy.Name); +} \ No newline at end of file diff --git a/src/Wolverine/Tracking/TrackedSession.cs b/src/Wolverine/Tracking/TrackedSession.cs index 5d4c11ab5..1e83f9826 100644 --- a/src/Wolverine/Tracking/TrackedSession.cs +++ b/src/Wolverine/Tracking/TrackedSession.cs @@ -161,7 +161,7 @@ public void AssertCondition(string message, Func condition) public Task PlayScheduledMessagesAsync(TimeSpan timeout) { var serviceName = _primaryHost.GetRuntime().Options.ServiceName; - var recordsInOrder = _envelopes.SelectMany(x => x.Records).Where(x => x.MessageEventType == MessageEventType.Scheduled).ToArray(); + var recordsInOrder = _envelopes.SelectMany(x => x.Records).Where(x => x.MessageEventType == MessageEventType.Scheduled || x.Envelope.Status == EnvelopeStatus.Scheduled || x.WasScheduled).ToArray(); var records = recordsInOrder.Where(x => x.ServiceName == serviceName).ToArray(); if (!records.Any()) { @@ -181,15 +181,17 @@ public Task PlayScheduledMessagesAsync(TimeSpan timeout) internal async Task ReplayAll(IMessageContext context, EnvelopeRecord[] records) { - foreach (var record in records) + var envelopes = records.Select(x => x.Envelope).Distinct().ToArray(); + + foreach (var envelope in envelopes) { - if (record.Envelope.Destination.Scheme == TransportConstants.Local) + if (envelope.Destination.Scheme == TransportConstants.Local) { - await context.InvokeAsync(record.Envelope.Message); + await context.InvokeAsync(envelope.Message); } else { - await context.EndpointFor(record.Envelope.Destination).SendAsync(record.Envelope.Message); + await context.EndpointFor(envelope.Destination).SendAsync(envelope.Message); } } }