Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/Transports/Redis/Wolverine.Redis.Tests/ConditionPoller.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Xunit.Abstractions;

namespace Wolverine.Redis.Tests;

public class ConditionPoller(ITestOutputHelper output, int maxRetries, TimeSpan retryDelay)
{
public async ValueTask WaitForAsync(string message, Func<ValueTask<bool>> condition)
{
var i = 0;
while (!await condition() && i < maxRetries)
{
i++;
output.WriteLine("{0} Waiting for condition: {1} (total {2}ms)",
DateTime.UtcNow,
message,
i * retryDelay.TotalMilliseconds);
await Task.Delay(retryDelay);
}
}

public ValueTask WaitForAsync(string message, Func<bool> condition) =>
WaitForAsync(message, () => ValueTask.FromResult(condition()));
}
228 changes: 89 additions & 139 deletions src/Transports/Redis/Wolverine.Redis.Tests/EndToEndRetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,180 +13,131 @@
namespace Wolverine.Redis.Tests;

[Collection("EndToEndRetryTests")]
public class EndToEndRetryTests
public class EndToEndRetryTests(ITestOutputHelper output): IAsyncLifetime
{
private readonly ITestOutputHelper _output;

public EndToEndRetryTests(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task message_with_retry_policy_saves_to_redis_and_retries()
{
var streamKey = $"e2e-retry-{Guid.NewGuid():N}";

using var host = await Host.CreateDefaultBuilder()
private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 500.Milliseconds());
private RedisStreamEndpoint _endpoint = null!;
private IDatabase _database = null!;
private string _scheduledKey = null!;
private string _streamKey = null!;
private IHost _host = null!;
private E2ERetryTracker _tracker = null!;

public async Task InitializeAsync()
{
_streamKey = $"e2e-retry-{Guid.NewGuid():N}";

_tracker = new E2ERetryTracker();
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ServiceName = "E2ERetryTestService";

// Configure fast polling for test
opts.Durability.ScheduledJobFirstExecution = 100.Milliseconds();
opts.Durability.ScheduledJobPollingTime = 200.Milliseconds();

opts.UseRedisTransport("localhost:6379").AutoProvision();

// Configure routing to our test stream (without SendInline to ensure durable processing)
opts.PublishMessage<E2EFailingCommand>().ToRedisStream(streamKey);
// Listen to the stream - it will be Durable mode by default for Redis streams
opts.ListenToRedisStream(streamKey, "e2e-retry-group")
opts.PublishMessage<E2EFailingCommand>().ToRedisStream(_streamKey);

opts.ListenToRedisStream(_streamKey, "e2e-retry-group")
.UseDurableInbox() // Use Durable endpoint (for Redis streams BufferedInMemory is default)
.StartFromBeginning();

// Configure a retry policy
opts.Policies.OnException<InvalidOperationException>()
.ScheduleRetry(1.Seconds());

// Register the handler
opts.Discovery.IncludeType<E2EFailingCommandHandler>();
opts.Services.AddSingleton<E2ERetryTracker>();

opts.Services.AddSingleton(_tracker);
}).StartAsync();

var runtime = host.Services.GetRequiredService<IWolverineRuntime>();
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();
var transport = runtime.Options.Transports.GetOrCreate<RedisTransport>();
var endpoint = transport.StreamEndpoint(streamKey);
var database = transport.GetDatabase(database: endpoint.DatabaseId);
var scheduledKey = endpoint.ScheduledMessagesKey;

// Verify endpoint implements IDatabaseBackedEndpoint
_output.WriteLine($"Endpoint Mode: {endpoint.Mode}");
_output.WriteLine($"Is IDatabaseBackedEndpoint: {endpoint is IDatabaseBackedEndpoint}");
endpoint.ShouldBeAssignableTo<IDatabaseBackedEndpoint>("Endpoint should implement IDatabaseBackedEndpoint");

// Clear the scheduled set
await database.KeyDeleteAsync(scheduledKey);
await database.KeyDeleteAsync(streamKey);

var tracker = host.Services.GetRequiredService<E2ERetryTracker>();
tracker.FailCount = 1; // Fail once, then succeed

// Wait for listener to fully initialize
await Task.Delay(500);
_endpoint = transport.StreamEndpoint(_streamKey);
_database = transport.GetDatabase(database: _endpoint.DatabaseId);
_scheduledKey = _endpoint.ScheduledMessagesKey;
await DeleteDatabaseKeys();
}

public async Task DisposeAsync()
{
await DeleteDatabaseKeys();
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task message_with_retry_policy_saves_to_redis_and_retries()
{
_endpoint.Mode.ShouldBe(EndpointMode.Durable, "Endpoint should be in Durable mode for retries to work");
_endpoint.ShouldBeAssignableTo<IDatabaseBackedEndpoint>("Endpoint should implement IDatabaseBackedEndpoint");

// Check if there are messages in the stream initially
var initialStreamLength = await database.StreamLengthAsync(streamKey);
_output.WriteLine($"Initial stream length: {initialStreamLength}");
_tracker.FailCount = 1; // Fail once, then succeed

// Send a message that will fail
var bus = host.MessageBus();
var command = new E2EFailingCommand(Guid.NewGuid().ToString());

_output.WriteLine($"Sending command: {command.Id}");
await bus.PublishAsync(command);
var bus = _host.MessageBus();
var message = new E2EFailingCommand(Guid.NewGuid().ToString());

// Wait a moment for the message to be sent to Redis
await Task.Delay(300);
output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, message);
await bus.PublishAsync(message);

// Check if the message was actually sent to the stream
var streamLength = await database.StreamLengthAsync(streamKey);
_output.WriteLine($"Stream length after publish: {streamLength}");

// Wait for initial processing and failure
await Task.Delay(1500);

// Check if handler was called
_output.WriteLine($"Handler attempt count: {tracker.AttemptCount}");

if (tracker.AttemptCount == 0)
{
_output.WriteLine("⚠ Handler was NEVER called!");
_output.WriteLine(" Possible issues:");
_output.WriteLine($" - Message not sent to Redis stream? (stream length: {streamLength})");
_output.WriteLine(" - Listener not processing messages?");
_output.WriteLine(" - Handler not registered or discovered?");
_output.WriteLine(" - Message routing issue?");
}

tracker.AttemptCount.ShouldBeGreaterThan(0, "Handler should have been called at least once");
await _poller.WaitForAsync("message is sent", () => _tracker.AttemptCount == 1);
var streamLength = await _database.StreamLengthAsync(_streamKey);
_tracker.AttemptCount.ShouldBe(1, "Handler should have been called once");

// Verify the message is in the scheduled set (waiting for retry)
var scheduledCount = await database.SortedSetLengthAsync(scheduledKey);
_output.WriteLine($"Scheduled messages count: {scheduledCount}");

if (scheduledCount > 0)
long scheduledCount = 0;
await _poller.WaitForAsync("message is in the scheduled set for retry", async () =>
{
_output.WriteLine("✓ Message was saved to Redis sorted set for retry");

// Verify the score is in the future (retry delay)
var entries = await database.SortedSetRangeByScoreWithScoresAsync(scheduledKey);
if (entries.Length > 0)
{
var retryTime = DateTimeOffset.FromUnixTimeMilliseconds((long)entries[0].Score);
var now = DateTimeOffset.UtcNow;
_output.WriteLine($"Retry scheduled for: {retryTime}");
_output.WriteLine($"Current time: {now}");

retryTime.ShouldBeGreaterThan(now.AddMilliseconds(-500), "Retry should be scheduled in the future");
}

// Wait for retry to be processed
_output.WriteLine("Waiting for retry to be processed...");
await Task.Delay(2500);

// Verify the handler was called again (retry)
tracker.AttemptCount.ShouldBe(2, "Handler should be called twice: initial attempt + retry");

// Verify the scheduled set is now empty
var finalScheduledCount = await database.SortedSetLengthAsync(scheduledKey);
finalScheduledCount.ShouldBe(0, "Message should have been removed from scheduled set after retry");

_output.WriteLine($"✓ Message successfully retried from Redis");
}
else
{
_output.WriteLine("⚠ Message was NOT saved to Redis sorted set");
_output.WriteLine(" This could mean:");
_output.WriteLine(" 1. The message succeeded on first try (tracker.FailCount might not be working)");
_output.WriteLine(" 2. DurableReceiver is not using IDatabaseBackedEndpoint.ScheduleRetryAsync");
_output.WriteLine(" 3. The endpoint mode is not Durable");

// Let's check if the message succeeded (which would mean no retry was needed)
if (tracker.AttemptCount == 1 && !tracker.LastFailed)
{
_output.WriteLine(" Message succeeded on first attempt - no retry needed");
}
}

_output.WriteLine($"Final state:");
_output.WriteLine($" Total attempts: {tracker.AttemptCount}");
_output.WriteLine($" Last failed: {tracker.LastFailed}");
}
scheduledCount = await _database.SortedSetLengthAsync(_scheduledKey);
return scheduledCount == 1;
});
scheduledCount.ShouldBe(1, "Message should be in the scheduled set for retry");

// Verify the score is in the future (retry delay)
var entries = await _database.SortedSetRangeByScoreWithScoresAsync(_scheduledKey);
entries.Length.ShouldBe(1, "There should be one entry in the scheduled set");
var retryTime = DateTimeOffset.FromUnixTimeMilliseconds((long)entries[0].Score);
retryTime.ShouldBeGreaterThan(DateTimeOffset.UtcNow.AddMilliseconds(-500), "Retry should be scheduled in the future");

// Verify the handler was called again (retry)
await _poller.WaitForAsync("handler is called again for retry", () => _tracker.AttemptCount == 2);
_tracker.AttemptCount.ShouldBe(2, "Handler should be called twice: initial attempt + retry");
_tracker.LastFailed.ShouldBeFalse();

// Verify the scheduled set is now empty
var finalScheduledCount = await _database.SortedSetLengthAsync(_scheduledKey);
finalScheduledCount.ShouldBe(0, "Message should have been removed from scheduled set after retry");
}

private async Task DeleteDatabaseKeys()
{
await _database.KeyDeleteAsync(_scheduledKey);
await _database.KeyDeleteAsync(_streamKey);
}
}

public record E2EFailingCommand(string Id);

public class E2EFailingCommandHandler
{
private readonly E2ERetryTracker _tracker;

public E2EFailingCommandHandler(E2ERetryTracker tracker)
{
_tracker = tracker;
}

public class E2EFailingCommandHandler(E2ERetryTracker tracker)
{
public void Handle(E2EFailingCommand command)
{
var attempt = _tracker.RecordAttempt();
if (attempt <= _tracker.FailCount)
var attempt = tracker.RecordAttempt();
if (attempt <= tracker.FailCount)
{
throw new InvalidOperationException($"Intentional failure on attempt {attempt} for command {command.Id}");
}
// Success
}
// Success
}
}

Expand Down Expand Up @@ -219,4 +170,3 @@ public int RecordAttempt()
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="NSubstitute" />
<PackageReference Include="Shouldly" />
<PackageReference Include="xunit"/>
<PackageReference Include="xunit" />
<PackageReference Include="xunit.runner.visualstudio">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
Expand All @@ -23,7 +23,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\Testing\Wolverine.ComplianceTests\Wolverine.ComplianceTests.csproj" />
<ProjectReference Include="..\Wolverine.Redis\Wolverine.Redis.csproj"/>
<ProjectReference Include="..\Wolverine.Redis\Wolverine.Redis.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading
Loading