diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/ConditionPoller.cs b/src/Transports/Redis/Wolverine.Redis.Tests/ConditionPoller.cs new file mode 100644 index 000000000..f8653247f --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/ConditionPoller.cs @@ -0,0 +1,23 @@ +using Xunit.Abstractions; + +namespace Wolverine.Redis.Tests; + +public class ConditionPoller(ITestOutputHelper output, int maxRetries, TimeSpan retryDelay) +{ + public async ValueTask WaitForAsync(string message, Func> condition) + { + var i = 0; + while (!await condition() && i < maxRetries) + { + i++; + output.WriteLine("{0} Waiting for condition: {1} (total {2}ms)", + DateTime.UtcNow, + message, + i * retryDelay.TotalMilliseconds); + await Task.Delay(retryDelay); + } + } + + public ValueTask WaitForAsync(string message, Func condition) => + WaitForAsync(message, () => ValueTask.FromResult(condition())); +} diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/EndToEndRetryTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/EndToEndRetryTests.cs index d7a3133b7..f69340c0d 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/EndToEndRetryTests.cs +++ b/src/Transports/Redis/Wolverine.Redis.Tests/EndToEndRetryTests.cs @@ -13,180 +13,131 @@ namespace Wolverine.Redis.Tests; [Collection("EndToEndRetryTests")] -public class EndToEndRetryTests +public class EndToEndRetryTests(ITestOutputHelper output): IAsyncLifetime { - private readonly ITestOutputHelper _output; - - public EndToEndRetryTests(ITestOutputHelper output) - { - _output = output; - } - - [Fact] - public async Task message_with_retry_policy_saves_to_redis_and_retries() - { - var streamKey = $"e2e-retry-{Guid.NewGuid():N}"; - - using var host = await Host.CreateDefaultBuilder() + private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 500.Milliseconds()); + private RedisStreamEndpoint _endpoint = null!; + private IDatabase _database = null!; + private string _scheduledKey = null!; + private string _streamKey = null!; + private IHost _host = null!; + private E2ERetryTracker _tracker = null!; + + public async Task InitializeAsync() + { + _streamKey = $"e2e-retry-{Guid.NewGuid():N}"; + + _tracker = new E2ERetryTracker(); + _host = await Host.CreateDefaultBuilder() .UseWolverine(opts => { opts.ServiceName = "E2ERetryTestService"; - + // Configure fast polling for test opts.Durability.ScheduledJobFirstExecution = 100.Milliseconds(); opts.Durability.ScheduledJobPollingTime = 200.Milliseconds(); - + opts.UseRedisTransport("localhost:6379").AutoProvision(); - + // Configure routing to our test stream (without SendInline to ensure durable processing) - opts.PublishMessage().ToRedisStream(streamKey); - - // Listen to the stream - it will be Durable mode by default for Redis streams - opts.ListenToRedisStream(streamKey, "e2e-retry-group") + opts.PublishMessage().ToRedisStream(_streamKey); + + opts.ListenToRedisStream(_streamKey, "e2e-retry-group") + .UseDurableInbox() // Use Durable endpoint (for Redis streams BufferedInMemory is default) .StartFromBeginning(); - + // Configure a retry policy opts.Policies.OnException() .ScheduleRetry(1.Seconds()); - + // Register the handler opts.Discovery.IncludeType(); - - opts.Services.AddSingleton(); + + opts.Services.AddSingleton(_tracker); }).StartAsync(); - var runtime = host.Services.GetRequiredService(); + var runtime = _host.Services.GetRequiredService(); var transport = runtime.Options.Transports.GetOrCreate(); - var endpoint = transport.StreamEndpoint(streamKey); - var database = transport.GetDatabase(database: endpoint.DatabaseId); - var scheduledKey = endpoint.ScheduledMessagesKey; - - // Verify endpoint implements IDatabaseBackedEndpoint - _output.WriteLine($"Endpoint Mode: {endpoint.Mode}"); - _output.WriteLine($"Is IDatabaseBackedEndpoint: {endpoint is IDatabaseBackedEndpoint}"); - endpoint.ShouldBeAssignableTo("Endpoint should implement IDatabaseBackedEndpoint"); - - // Clear the scheduled set - await database.KeyDeleteAsync(scheduledKey); - await database.KeyDeleteAsync(streamKey); - - var tracker = host.Services.GetRequiredService(); - tracker.FailCount = 1; // Fail once, then succeed - // Wait for listener to fully initialize - await Task.Delay(500); + _endpoint = transport.StreamEndpoint(_streamKey); + _database = transport.GetDatabase(database: _endpoint.DatabaseId); + _scheduledKey = _endpoint.ScheduledMessagesKey; + await DeleteDatabaseKeys(); + } + + public async Task DisposeAsync() + { + await DeleteDatabaseKeys(); + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task message_with_retry_policy_saves_to_redis_and_retries() + { + _endpoint.Mode.ShouldBe(EndpointMode.Durable, "Endpoint should be in Durable mode for retries to work"); + _endpoint.ShouldBeAssignableTo("Endpoint should implement IDatabaseBackedEndpoint"); - // Check if there are messages in the stream initially - var initialStreamLength = await database.StreamLengthAsync(streamKey); - _output.WriteLine($"Initial stream length: {initialStreamLength}"); + _tracker.FailCount = 1; // Fail once, then succeed // Send a message that will fail - var bus = host.MessageBus(); - var command = new E2EFailingCommand(Guid.NewGuid().ToString()); - - _output.WriteLine($"Sending command: {command.Id}"); - await bus.PublishAsync(command); + var bus = _host.MessageBus(); + var message = new E2EFailingCommand(Guid.NewGuid().ToString()); - // Wait a moment for the message to be sent to Redis - await Task.Delay(300); + output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, message); + await bus.PublishAsync(message); // Check if the message was actually sent to the stream - var streamLength = await database.StreamLengthAsync(streamKey); - _output.WriteLine($"Stream length after publish: {streamLength}"); - - // Wait for initial processing and failure - await Task.Delay(1500); - - // Check if handler was called - _output.WriteLine($"Handler attempt count: {tracker.AttemptCount}"); - - if (tracker.AttemptCount == 0) - { - _output.WriteLine("⚠ Handler was NEVER called!"); - _output.WriteLine(" Possible issues:"); - _output.WriteLine($" - Message not sent to Redis stream? (stream length: {streamLength})"); - _output.WriteLine(" - Listener not processing messages?"); - _output.WriteLine(" - Handler not registered or discovered?"); - _output.WriteLine(" - Message routing issue?"); - } - - tracker.AttemptCount.ShouldBeGreaterThan(0, "Handler should have been called at least once"); + await _poller.WaitForAsync("message is sent", () => _tracker.AttemptCount == 1); + var streamLength = await _database.StreamLengthAsync(_streamKey); + _tracker.AttemptCount.ShouldBe(1, "Handler should have been called once"); // Verify the message is in the scheduled set (waiting for retry) - var scheduledCount = await database.SortedSetLengthAsync(scheduledKey); - _output.WriteLine($"Scheduled messages count: {scheduledCount}"); - - if (scheduledCount > 0) + long scheduledCount = 0; + await _poller.WaitForAsync("message is in the scheduled set for retry", async () => { - _output.WriteLine("✓ Message was saved to Redis sorted set for retry"); - - // Verify the score is in the future (retry delay) - var entries = await database.SortedSetRangeByScoreWithScoresAsync(scheduledKey); - if (entries.Length > 0) - { - var retryTime = DateTimeOffset.FromUnixTimeMilliseconds((long)entries[0].Score); - var now = DateTimeOffset.UtcNow; - _output.WriteLine($"Retry scheduled for: {retryTime}"); - _output.WriteLine($"Current time: {now}"); - - retryTime.ShouldBeGreaterThan(now.AddMilliseconds(-500), "Retry should be scheduled in the future"); - } - - // Wait for retry to be processed - _output.WriteLine("Waiting for retry to be processed..."); - await Task.Delay(2500); - - // Verify the handler was called again (retry) - tracker.AttemptCount.ShouldBe(2, "Handler should be called twice: initial attempt + retry"); - - // Verify the scheduled set is now empty - var finalScheduledCount = await database.SortedSetLengthAsync(scheduledKey); - finalScheduledCount.ShouldBe(0, "Message should have been removed from scheduled set after retry"); - - _output.WriteLine($"✓ Message successfully retried from Redis"); - } - else - { - _output.WriteLine("⚠ Message was NOT saved to Redis sorted set"); - _output.WriteLine(" This could mean:"); - _output.WriteLine(" 1. The message succeeded on first try (tracker.FailCount might not be working)"); - _output.WriteLine(" 2. DurableReceiver is not using IDatabaseBackedEndpoint.ScheduleRetryAsync"); - _output.WriteLine(" 3. The endpoint mode is not Durable"); - - // Let's check if the message succeeded (which would mean no retry was needed) - if (tracker.AttemptCount == 1 && !tracker.LastFailed) - { - _output.WriteLine(" Message succeeded on first attempt - no retry needed"); - } - } - - _output.WriteLine($"Final state:"); - _output.WriteLine($" Total attempts: {tracker.AttemptCount}"); - _output.WriteLine($" Last failed: {tracker.LastFailed}"); - } + scheduledCount = await _database.SortedSetLengthAsync(_scheduledKey); + return scheduledCount == 1; + }); + scheduledCount.ShouldBe(1, "Message should be in the scheduled set for retry"); + + // Verify the score is in the future (retry delay) + var entries = await _database.SortedSetRangeByScoreWithScoresAsync(_scheduledKey); + entries.Length.ShouldBe(1, "There should be one entry in the scheduled set"); + var retryTime = DateTimeOffset.FromUnixTimeMilliseconds((long)entries[0].Score); + retryTime.ShouldBeGreaterThan(DateTimeOffset.UtcNow.AddMilliseconds(-500), "Retry should be scheduled in the future"); + + // Verify the handler was called again (retry) + await _poller.WaitForAsync("handler is called again for retry", () => _tracker.AttemptCount == 2); + _tracker.AttemptCount.ShouldBe(2, "Handler should be called twice: initial attempt + retry"); + _tracker.LastFailed.ShouldBeFalse(); + + // Verify the scheduled set is now empty + var finalScheduledCount = await _database.SortedSetLengthAsync(_scheduledKey); + finalScheduledCount.ShouldBe(0, "Message should have been removed from scheduled set after retry"); + } + + private async Task DeleteDatabaseKeys() + { + await _database.KeyDeleteAsync(_scheduledKey); + await _database.KeyDeleteAsync(_streamKey); + } } public record E2EFailingCommand(string Id); -public class E2EFailingCommandHandler -{ - private readonly E2ERetryTracker _tracker; - - public E2EFailingCommandHandler(E2ERetryTracker tracker) - { - _tracker = tracker; - } - +public class E2EFailingCommandHandler(E2ERetryTracker tracker) +{ public void Handle(E2EFailingCommand command) { - var attempt = _tracker.RecordAttempt(); - - if (attempt <= _tracker.FailCount) + var attempt = tracker.RecordAttempt(); + + if (attempt <= tracker.FailCount) { throw new InvalidOperationException($"Intentional failure on attempt {attempt} for command {command.Id}"); - } - - // Success + } + + // Success } } @@ -219,4 +170,3 @@ public int RecordAttempt() } } } - diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj b/src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj index 3019a5a61..bd16991a5 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj +++ b/src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj @@ -10,7 +10,7 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -23,7 +23,7 @@ - + diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs b/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs index 035eb94d8..8de253a27 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs +++ b/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs @@ -2,125 +2,99 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Shouldly; -using Wolverine.Configuration; +using System.Collections.Concurrent; using Wolverine.RateLimiting; using Xunit; +using Xunit.Abstractions; namespace Wolverine.Redis.Tests; -public class rate_limiting_end_to_end +public class rate_limiting_end_to_end(ITestOutputHelper output) : IAsyncLifetime { - [Fact] - public async Task rate_limited_messages_are_delayed_with_native_scheduling() - { + private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 1000.Milliseconds()); + private readonly RateLimit _limit = new(1, 2.Seconds()); + private IHost _host = null!; + private RedisRateLimitTracker _tracker = null!; + + public async Task InitializeAsync() + { var streamKey = $"rate-limit-{Guid.NewGuid():N}"; - var groupName = $"rate-limit-group-{Guid.NewGuid():N}"; - var window = 2.Seconds(); - var limit = new RateLimit(1, window); - var tracker = new RedisRateLimitTracker(expectedCount: 2); + var groupName = $"rate-limit-group-{Guid.NewGuid():N}"; + + _tracker = new RedisRateLimitTracker(output); var endpointUri = new Uri($"redis://stream/0/{streamKey}"); - using var host = await Host.CreateDefaultBuilder() + _host = await Host.CreateDefaultBuilder() .UseWolverine(opts => { opts.ApplicationAssembly = typeof(rate_limiting_end_to_end).Assembly; - opts.Services.AddSingleton(tracker); + opts.Services.AddSingleton(_tracker); opts.UseRedisTransport("localhost:6379").AutoProvision(); opts.PublishAllMessages().ToRedisStream(streamKey); - opts.ListenToRedisStream(streamKey, groupName).StartFromBeginning(); - - opts.RateLimitEndpoint(endpointUri, limit); - }).StartAsync(); - - await Task.Delay(250.Milliseconds()); - await waitForNextBucketStartAsync(limit); - - var bus = host.MessageBus(); - await bus.PublishAsync(new RedisRateLimitedMessage(Guid.NewGuid().ToString())); - await bus.PublishAsync(new RedisRateLimitedMessage(Guid.NewGuid().ToString())); - - await tracker.WaitForHandledAsync(15.Seconds()); - - var handled = tracker.HandledTimes; - handled.Count.ShouldBeGreaterThanOrEqualTo(2); - - var firstBucket = RateLimitBucket.For(limit, handled[0]); - var secondBucket = RateLimitBucket.For(limit, handled[1]); - firstBucket.WindowStart.ShouldNotBe(secondBucket.WindowStart); - } - - private static async Task waitForNextBucketStartAsync(RateLimit limit) + opts.ListenToRedisStream(streamKey, groupName) + .UseDurableInbox() + .StartFromBeginning(); + + opts.RateLimitEndpoint(endpointUri, _limit); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task rate_limited_messages_are_delayed_with_native_scheduling() { - var now = DateTimeOffset.UtcNow; - var bucket = RateLimitBucket.For(limit, now); - var delay = bucket.WindowEnd - now + 50.Milliseconds(); - if (delay < TimeSpan.Zero) - { - delay = 50.Milliseconds(); - } - - await Task.Delay(delay); - } + RedisRateLimitedMessage[] messages = [ + new RedisRateLimitedMessage("A"), + new RedisRateLimitedMessage("B") + ]; + var bus = _host.MessageBus(); + + output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[0]); + await bus.PublishAsync(messages[0]); + output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[1]); + await bus.PublishAsync(messages[1]); + + await _poller.WaitForAsync("handled at least 2 messages", + () => _tracker.HandledMessages.Count >= 2); + + var handled = _tracker.HandledMessages; + handled.Select(x => x.Message).ShouldBe(messages, ignoreOrder: true); + + var buckets = handled.Select(x => RateLimitBucket.For(_limit, x.TimeStamp)).ToArray(); + buckets[0].WindowStart.ShouldBeLessThan(buckets[1].WindowStart); + } } public record RedisRateLimitedMessage(string Id); -public class RedisRateLimitedMessageHandler +public class RedisRateLimitedMessageHandler(RedisRateLimitTracker tracker) { - private readonly RedisRateLimitTracker _tracker; - - public RedisRateLimitedMessageHandler(RedisRateLimitTracker tracker) - { - _tracker = tracker; - } + private readonly RedisRateLimitTracker _tracker = tracker; public Task Handle(RedisRateLimitedMessage message, CancellationToken cancellationToken) { - _tracker.RecordHandled(); + _tracker.RecordHandled(message); return Task.CompletedTask; } } -public class RedisRateLimitTracker +public class RedisRateLimitTracker(ITestOutputHelper output) { - private readonly int _expectedCount; - private readonly TaskCompletionSource _completion = - new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly List _handledTimes = []; - private readonly object _lock = new(); - - public RedisRateLimitTracker(int expectedCount) - { - _expectedCount = expectedCount; - } + private readonly ITestOutputHelper output = output; + private readonly ConcurrentQueue<(DateTime, RedisRateLimitedMessage)> _handledMessages = []; - public IReadOnlyList HandledTimes - { - get - { - lock (_lock) - { - return _handledTimes.ToList(); - } - } - } - - public void RecordHandled() - { - lock (_lock) - { - _handledTimes.Add(DateTimeOffset.UtcNow); - if (_handledTimes.Count >= _expectedCount) - { - _completion.TrySetResult(true); - } - } - } + public IReadOnlyCollection<(DateTime TimeStamp, RedisRateLimitedMessage Message)> HandledMessages => _handledMessages; - public async Task WaitForHandledAsync(TimeSpan timeout) + public void RecordHandled(RedisRateLimitedMessage message) { - using var cts = new CancellationTokenSource(timeout); - await _completion.Task.WaitAsync(cts.Token); + var now = DateTime.UtcNow; + _handledMessages.Enqueue((now, message)); + output.WriteLine("{0} Handled message {1}", now, message); } }