Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using Humanizer;
using IntegrationTests;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Shouldly;
using Wolverine;
using Wolverine.EntityFrameworkCore;
using Wolverine.SqlServer;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace EfCoreTests.Bugs;

public class Bug_1846_duplicate_execution_of_scheduled_jobs
{
private readonly ITestOutputHelper _output;

public Bug_1846_duplicate_execution_of_scheduled_jobs(ITestOutputHelper output)
{
_output = output ?? throw new ArgumentNullException(nameof(output));
}

[Fact]
public async Task should_not_double_execute()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddSingleton<ILoggerProvider>(
new Wolverine.ComplianceTests.OutputLoggerProvider(_output));

opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "scheduled");
opts.Durability.MessageIdentity = MessageIdentity.IdAndDestination;

opts.Durability.ScheduledJobPollingTime = TimeSpan.FromMilliseconds(300);
opts.Durability.KeepAfterMessageHandling = TimeSpan.FromMinutes(5);

opts.Policies.UseDurableLocalQueues();
opts.Policies.AutoApplyTransactions();
opts.UseEntityFrameworkCoreTransactions();

opts.Services.AddDbContextWithWolverineIntegration<CleanDbContext>(x =>
x.UseSqlServer(Servers.SqlServerConnectionString));
}).StartAsync();

var session = await host.TrackActivity()
.WaitForMessageToBeReceivedAt<MsgB>(host)
.Timeout(1.Minutes())
.DoNotAssertOnExceptionsDetected()
.InvokeMessageAndWaitAsync(new Msg0(Guid.NewGuid()));

if (session.AllExceptions().Any())
{
throw new AggregateException(session.AllExceptions());
}

session.FindSingleTrackedMessageOfType<MsgA>();
session.FindSingleTrackedMessageOfType<MsgB>(MessageEventType.MessageSucceeded);
}
}

public sealed record Msg0(Guid MsgId);
public sealed record MsgA(Guid MsgId);
public sealed record MsgB(Guid MsgId, int Count);

public static class TestHandler
{
private static int CounterA;
private static int CounterB;

public static MsgA Handle(Msg0 msg)
{
return new(msg.MsgId);
}

public static async Task<ScheduledMessage<MsgB>> Handle(MsgA msg, ILogger logger, CleanDbContext dbContext)
{
var now = DateTimeOffset.UtcNow.AddMinutes(-1);
await Task.Delay(1);
logger.LogInformation("Recv message A with ID {Id}", msg.MsgId);
return new MsgB(msg.MsgId, Interlocked.Increment(ref CounterA)).ScheduledAt(now);
}

public static async Task Handle(MsgB msg, ILogger logger, CleanDbContext dbContext)
{
var value = Interlocked.Increment(ref CounterB);
if (value != msg.Count)
throw new NotSupportedException($"Count mismatch (expecting {value}, got {msg.Count}), event evaluated twice");

logger.LogInformation("Recv message B with ID {Id}: count: {Count}", msg.MsgId, msg.Count);

await Task.Delay(1000);
}
}
24 changes: 24 additions & 0 deletions src/Persistence/EfCoreTests/Bugs/OutputLoggerProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Microsoft.Extensions.Logging;
using Xunit.Abstractions;

namespace EfCoreTests.Bugs;

public class OutputLoggerProvider : ILoggerProvider
{
private readonly ITestOutputHelper _output;

public OutputLoggerProvider(ITestOutputHelper output)
{
_output = output;
}


public void Dispose()
{
}

public ILogger CreateLogger(string categoryName)
{
return new XUnitLogger(_output, categoryName);
}
}
77 changes: 77 additions & 0 deletions src/Persistence/EfCoreTests/Bugs/XUnitLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using Microsoft.Extensions.Logging;
using Xunit.Abstractions;

namespace EfCoreTests.Bugs;

public class XUnitLogger : ILogger
{
private readonly string _categoryName;

private readonly List<string> _ignoredStrings = new()
{
"Declared",
"Successfully processed message"
};

private readonly ITestOutputHelper _testOutputHelper;

public XUnitLogger(ITestOutputHelper testOutputHelper, string categoryName)
{
_testOutputHelper = testOutputHelper;
_categoryName = categoryName;
}

public bool IsEnabled(LogLevel logLevel)
{
return logLevel != LogLevel.None;
}

public IDisposable BeginScope<TState>(TState state)
{
return new Disposable();
}

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception,
Func<TState, Exception, string> formatter)
{
if (exception is DivideByZeroException)
{
return;
}

if (exception is BadImageFormatException)
{
return;
}

if (_categoryName == "Wolverine.Transports.Sending.BufferedSendingAgent" &&
logLevel == LogLevel.Information) return;
if (_categoryName == "Wolverine.Runtime.WolverineRuntime" &&
logLevel == LogLevel.Information) return;
if (_categoryName == "Microsoft.Hosting.Lifetime" &&
logLevel == LogLevel.Information) return;
if (_categoryName == "Wolverine.Transports.ListeningAgent" &&
logLevel == LogLevel.Information) return;
if (_categoryName == "JasperFx.Resources.ResourceSetupHostService" &&
logLevel == LogLevel.Information) return;
if (_categoryName == "Wolverine.Configuration.HandlerDiscovery" &&
logLevel == LogLevel.Information) return;

var text = formatter(state, exception);
if (_ignoredStrings.Any(x => text.Contains(x))) return;

_testOutputHelper.WriteLine($"{_categoryName}/{logLevel}: {text}");

if (exception != null)
{
_testOutputHelper.WriteLine(exception.ToString());
}
}

public class Disposable : IDisposable
{
public void Dispose()
{
}
}
}
4 changes: 4 additions & 0 deletions src/Wolverine/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ public override int GetHashCode()
/// <returns></returns>
public bool IsScheduledForLater(DateTimeOffset utcNow)
{
// Doesn't matter, if it's been scheduled and persisted, it has
// to be scheduled
if (Status == EnvelopeStatus.Scheduled) return true;

return ScheduledTime.HasValue && ScheduledTime.Value > utcNow;
}

Expand Down
Loading