Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<NoWarn>1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618</NoWarn>
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>4.12.1</Version>
<Version>4.12.2</Version>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Wolverine;
using Wolverine.Postgresql;
using Xunit;

namespace PersistenceTests.Bugs;

public class schedule_execution_outside_of_message_handler
{
[Fact]
public async Task try_it_out()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "wolverine");
opts.Policies.UseDurableLocalQueues();
}).StartAsync();


var bus = host.MessageBus();
await bus.ScheduleAsync(new MyGuy("Hey"), 10.Minutes());


await Task.Delay(1.Minutes());
}
}

public record MyGuy(string Name);

public static class MyGuyHandler
{
public static void Handle(MyGuy guy) => Debug.WriteLine("Got my guy " + guy.Name);
}
12 changes: 7 additions & 5 deletions src/Wolverine/Tracking/TrackedSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void AssertCondition(string message, Func<bool> condition)
public Task<ITrackedSession> PlayScheduledMessagesAsync(TimeSpan timeout)
{
var serviceName = _primaryHost.GetRuntime().Options.ServiceName;
var recordsInOrder = _envelopes.SelectMany(x => x.Records).Where(x => x.MessageEventType == MessageEventType.Scheduled).ToArray();
var recordsInOrder = _envelopes.SelectMany(x => x.Records).Where(x => x.MessageEventType == MessageEventType.Scheduled || x.Envelope.Status == EnvelopeStatus.Scheduled || x.WasScheduled).ToArray();
var records = recordsInOrder.Where(x => x.ServiceName == serviceName).ToArray();
if (!records.Any())
{
Expand All @@ -181,15 +181,17 @@ public Task<ITrackedSession> PlayScheduledMessagesAsync(TimeSpan timeout)

internal async Task ReplayAll(IMessageContext context, EnvelopeRecord[] records)
{
foreach (var record in records)
var envelopes = records.Select(x => x.Envelope).Distinct().ToArray();

foreach (var envelope in envelopes)
{
if (record.Envelope.Destination.Scheme == TransportConstants.Local)
if (envelope.Destination.Scheme == TransportConstants.Local)
{
await context.InvokeAsync(record.Envelope.Message);
await context.InvokeAsync(envelope.Message);
}
else
{
await context.EndpointFor(record.Envelope.Destination).SendAsync(record.Envelope.Message);
await context.EndpointFor(envelope.Destination).SendAsync(envelope.Message);
}
}
}
Expand Down