diff --git a/src/Persistence/EfCoreTests/Bugs/Bug_1846_duplicate_execution_of_scheduled_jobs.cs b/src/Persistence/EfCoreTests/Bugs/Bug_1846_duplicate_execution_of_scheduled_jobs.cs new file mode 100644 index 000000000..3872fbbd0 --- /dev/null +++ b/src/Persistence/EfCoreTests/Bugs/Bug_1846_duplicate_execution_of_scheduled_jobs.cs @@ -0,0 +1,96 @@ +using Humanizer; +using IntegrationTests; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Shouldly; +using Wolverine; +using Wolverine.EntityFrameworkCore; +using Wolverine.SqlServer; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace EfCoreTests.Bugs; + +public class Bug_1846_duplicate_execution_of_scheduled_jobs +{ + private readonly ITestOutputHelper _output; + + public Bug_1846_duplicate_execution_of_scheduled_jobs(ITestOutputHelper output) + { + _output = output ?? throw new ArgumentNullException(nameof(output)); + } + + [Fact] + public async Task should_not_double_execute() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddSingleton( + new Wolverine.ComplianceTests.OutputLoggerProvider(_output)); + + opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "scheduled"); + opts.Durability.MessageIdentity = MessageIdentity.IdAndDestination; + + opts.Durability.ScheduledJobPollingTime = TimeSpan.FromMilliseconds(300); + opts.Durability.KeepAfterMessageHandling = TimeSpan.FromMinutes(5); + + opts.Policies.UseDurableLocalQueues(); + opts.Policies.AutoApplyTransactions(); + opts.UseEntityFrameworkCoreTransactions(); + + opts.Services.AddDbContextWithWolverineIntegration(x => + x.UseSqlServer(Servers.SqlServerConnectionString)); + }).StartAsync(); + + var session = await host.TrackActivity() + .WaitForMessageToBeReceivedAt(host) + .Timeout(1.Minutes()) + .DoNotAssertOnExceptionsDetected() + .InvokeMessageAndWaitAsync(new Msg0(Guid.NewGuid())); + + if (session.AllExceptions().Any()) + { + throw new AggregateException(session.AllExceptions()); + } + + session.FindSingleTrackedMessageOfType(); + session.FindSingleTrackedMessageOfType(MessageEventType.MessageSucceeded); + } +} + +public sealed record Msg0(Guid MsgId); +public sealed record MsgA(Guid MsgId); +public sealed record MsgB(Guid MsgId, int Count); + +public static class TestHandler +{ + private static int CounterA; + private static int CounterB; + + public static MsgA Handle(Msg0 msg) + { + return new(msg.MsgId); + } + + public static async Task> Handle(MsgA msg, ILogger logger, CleanDbContext dbContext) + { + var now = DateTimeOffset.UtcNow.AddMinutes(-1); + await Task.Delay(1); + logger.LogInformation("Recv message A with ID {Id}", msg.MsgId); + return new MsgB(msg.MsgId, Interlocked.Increment(ref CounterA)).ScheduledAt(now); + } + + public static async Task Handle(MsgB msg, ILogger logger, CleanDbContext dbContext) + { + var value = Interlocked.Increment(ref CounterB); + if (value != msg.Count) + throw new NotSupportedException($"Count mismatch (expecting {value}, got {msg.Count}), event evaluated twice"); + + logger.LogInformation("Recv message B with ID {Id}: count: {Count}", msg.MsgId, msg.Count); + + await Task.Delay(1000); + } +} \ No newline at end of file diff --git a/src/Persistence/EfCoreTests/Bugs/OutputLoggerProvider.cs b/src/Persistence/EfCoreTests/Bugs/OutputLoggerProvider.cs new file mode 100644 index 000000000..d0815bc7b --- /dev/null +++ b/src/Persistence/EfCoreTests/Bugs/OutputLoggerProvider.cs @@ -0,0 +1,24 @@ +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace EfCoreTests.Bugs; + +public class OutputLoggerProvider : ILoggerProvider +{ + private readonly ITestOutputHelper _output; + + public OutputLoggerProvider(ITestOutputHelper output) + { + _output = output; + } + + + public void Dispose() + { + } + + public ILogger CreateLogger(string categoryName) + { + return new XUnitLogger(_output, categoryName); + } +} \ No newline at end of file diff --git a/src/Persistence/EfCoreTests/Bugs/XUnitLogger.cs b/src/Persistence/EfCoreTests/Bugs/XUnitLogger.cs new file mode 100644 index 000000000..c7b6cfce3 --- /dev/null +++ b/src/Persistence/EfCoreTests/Bugs/XUnitLogger.cs @@ -0,0 +1,77 @@ +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace EfCoreTests.Bugs; + +public class XUnitLogger : ILogger +{ + private readonly string _categoryName; + + private readonly List _ignoredStrings = new() + { + "Declared", + "Successfully processed message" + }; + + private readonly ITestOutputHelper _testOutputHelper; + + public XUnitLogger(ITestOutputHelper testOutputHelper, string categoryName) + { + _testOutputHelper = testOutputHelper; + _categoryName = categoryName; + } + + public bool IsEnabled(LogLevel logLevel) + { + return logLevel != LogLevel.None; + } + + public IDisposable BeginScope(TState state) + { + return new Disposable(); + } + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, + Func formatter) + { + if (exception is DivideByZeroException) + { + return; + } + + if (exception is BadImageFormatException) + { + return; + } + + if (_categoryName == "Wolverine.Transports.Sending.BufferedSendingAgent" && + logLevel == LogLevel.Information) return; + if (_categoryName == "Wolverine.Runtime.WolverineRuntime" && + logLevel == LogLevel.Information) return; + if (_categoryName == "Microsoft.Hosting.Lifetime" && + logLevel == LogLevel.Information) return; + if (_categoryName == "Wolverine.Transports.ListeningAgent" && + logLevel == LogLevel.Information) return; + if (_categoryName == "JasperFx.Resources.ResourceSetupHostService" && + logLevel == LogLevel.Information) return; + if (_categoryName == "Wolverine.Configuration.HandlerDiscovery" && + logLevel == LogLevel.Information) return; + + var text = formatter(state, exception); + if (_ignoredStrings.Any(x => text.Contains(x))) return; + + _testOutputHelper.WriteLine($"{_categoryName}/{logLevel}: {text}"); + + if (exception != null) + { + _testOutputHelper.WriteLine(exception.ToString()); + } + } + + public class Disposable : IDisposable + { + public void Dispose() + { + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Envelope.cs b/src/Wolverine/Envelope.cs index ec0dbd111..7cd383a15 100644 --- a/src/Wolverine/Envelope.cs +++ b/src/Wolverine/Envelope.cs @@ -425,6 +425,10 @@ public override int GetHashCode() /// public bool IsScheduledForLater(DateTimeOffset utcNow) { + // Doesn't matter, if it's been scheduled and persisted, it has + // to be scheduled + if (Status == EnvelopeStatus.Scheduled) return true; + return ScheduledTime.HasValue && ScheduledTime.Value > utcNow; }