diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/BasicPubSubTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/BasicPubSubTests.cs new file mode 100644 index 000000000..dfa66721a --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/BasicPubSubTests.cs @@ -0,0 +1,60 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Shouldly; +using Wolverine; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class BasicPubSubTests +{ + public record PubMessage(string Id); + + public class Handler + { + private readonly TaskCompletionSource _tcs; + public Handler(TaskCompletionSource tcs) => _tcs = tcs; + public void Handle(PubMessage m) => _tcs.TrySetResult(true); + } + + [Fact] + public async Task publish_and_listen_end_to_end() + { + var streamKey = $"wolverine-tests-pubsub-{Guid.NewGuid():N}"; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var host = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => + { + logging.ClearProviders(); + logging.AddConsole(); + logging.SetMinimumLevel(LogLevel.Debug); + }) + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + var endpoint = opts.ListenToRedisStream(streamKey, "g1"); + endpoint.MessageType = typeof(PubMessage); + endpoint.BlockTimeoutMilliseconds = 100; + endpoint.EnableAutoClaim(TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(0)); + + opts.PublishAllMessages().ToRedisStream(streamKey); + opts.Services.AddSingleton(tcs); + }) + .StartAsync(); + + var bus = host.Services.GetRequiredService(); + // Send directly to the Redis stream endpoint to avoid route misconfiguration + var uri = new Uri($"redis://stream/0/{streamKey}"); + await bus.EndpointFor(uri).SendAsync(new PubMessage("123")); + + var completed = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(10))); + completed.ShouldBe(tcs.Task); + var result = await tcs.Task; + result.ShouldBeTrue(); + } +} + diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/BufferedComplianceTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/BufferedComplianceTests.cs new file mode 100644 index 000000000..36d665790 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/BufferedComplianceTests.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading.Tasks; +using Wolverine.ComplianceTests.Compliance; +using Wolverine.Configuration; +using Wolverine.Redis; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class RedisBufferedComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + public RedisBufferedComplianceFixture() : base(new Uri("redis://stream/0/receiver"), 120) + { + } + + public async Task InitializeAsync() + { + var receiverStream = $"wolverine-tests-buffered-receiver-{Guid.NewGuid():N}"; + OutboundAddress = new Uri($"redis://stream/0/{receiverStream}"); + + await ReceiverIs(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + opts.ListenToRedisStream(receiverStream, "g1").BufferedInMemory(); + }); + + await SenderIs(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + opts.PublishAllMessages().ToRedisStream(receiverStream).BufferedInMemory(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +public class BufferedSendingAndReceivingCompliance : TransportCompliance; + diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs new file mode 100644 index 000000000..61a0d397f --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs @@ -0,0 +1,255 @@ +using System.Text.Json; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using JasperFx.Resources; +using Wolverine.Util; +using StackExchange.Redis; +using Wolverine.Configuration; +using Wolverine.Transports; + +namespace Wolverine.Redis.Tests; + +public class DocumentationSamples +{ + public static async Task configure() + { + #region sample_bootstrapping_with_redis + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379") + + // Auto-create streams and consumer groups + .AutoProvision() + + // Configure default consumer name selector for all Redis listeners + .ConfigureDefaultConsumerName((runtime, endpoint) => + $"{runtime.Options.ServiceName}-{runtime.DurabilitySettings.AssignedNodeNumber}") + + // Useful for testing - auto purge queues on startup + .AutoPurgeOnStartup(); + + // Just publish all messages to Redis streams (uses database 0 by default) + opts.PublishAllMessages().ToRedisStream("wolverine-messages"); + + // Or explicitly configure message routing with database ID + opts.PublishMessage() + .ToRedisStream("colors", databaseId: 1) + + // Configure specific settings for this stream + .BatchSize(50) + .Inline(); + + // Listen to Redis streams with consumer groups (uses database 0 by default) + opts.ListenToRedisStream("red", "color-processors") + .Inline() + + // Configure consumer settings + .ConsumerName("red-consumer-1") + .BatchSize(10) + .BlockTimeout(TimeSpan.FromSeconds(5)) + + // Start from beginning to consume existing messages (like Kafka's AutoOffsetReset.Earliest) + .StartFromBeginning(); + + // Listen to Redis streams with database ID specified + opts.ListenToRedisStream("green", "color-processors", databaseId: 2) + .BufferedInMemory() + .BatchSize(25) + .StartFromNewMessages(); // Default: only new messages (like Kafka's AutoOffsetReset.Latest) + + opts.ListenToRedisStream("blue", "color-processors", databaseId: 3) + .Durable() + .ConsumerName("blue-consumer") + .StartFromBeginning(); // Process existing messages too + + // Alternative: use StartFrom parameter directly + opts.ListenToRedisStream("purple", "color-processors", StartFrom.Beginning) + .BufferedInMemory(); + + // This will direct Wolverine to try to ensure that all + // referenced Redis streams and consumer groups exist at + // application start up time + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + #endregion + } + + public static async Task configure_with_database_ids() + { + #region sample_redis_database_configuration + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379"); + + // Configure streams on different databases + opts.PublishMessage() + .ToRedisStream("orders", databaseId: 1); + + opts.PublishMessage() + .ToRedisStream("payments", databaseId: 2); + + // Listen on different databases + opts.ListenToRedisStream("orders", "order-processors", databaseId: 1); + opts.ListenToRedisStream("payments", "payment-processors", databaseId: 2); + + // Advanced configuration with database ID + opts.ListenToRedisStream("notifications", "notification-processors", databaseId: 3, endpoint => + { + endpoint.ConsumerName("notification-consumer-1"); + endpoint.BatchSize(100); + endpoint.BlockTimeout(TimeSpan.FromSeconds(10)); + endpoint.Durable(); + }); + }).StartAsync(); + + #endregion + } + + public static async Task configure_with_uri_helpers() + { + #region sample_redis_uri_helpers + + // Using URI builder helpers + var ordersUri = RedisStreamEndpointExtensions.BuildRedisStreamUri("orders", databaseId: 1); + var paymentsUri = RedisStreamEndpointExtensions.BuildRedisStreamUri("payments", databaseId: 2, "payment-processors"); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379"); + + // Configure endpoints to listen and publish + opts.ListenToRedisStream("orders", "order-processors", databaseId: 1); + opts.ListenToRedisStream("payments", "payment-processors", databaseId: 2); + }).StartAsync(); + + // Send directly to specific database URIs + var bus = host.Services.GetRequiredService(); + await bus.EndpointFor(ordersUri).SendAsync(new OrderCreated("123", 99.99m, DateTime.Now)); + + #endregion + } + + public static async Task working_with_multiple_databases() + { + #region sample_multiple_database_usage + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + + // Different message types on different databases for isolation + + // Database 0: Default messages + opts.PublishMessage().ToRedisStream("system-events"); + opts.ListenToRedisStream("system-events", "system-processors"); + + // Database 1: Order processing + opts.PublishMessage().ToRedisStream("orders", 1); + opts.ListenToRedisStream("orders", "order-processors", 1); + + // Database 2: Payment processing + opts.PublishMessage().ToRedisStream("payments", 2); + opts.ListenToRedisStream("payments", "payment-processors", 2); + + // Database 3: Analytics and reporting + opts.PublishMessage().ToRedisStream("analytics", 3); + opts.ListenToRedisStream("analytics", "analytics-processors", 3); + }).StartAsync(); + + #endregion + } +} + +#region sample_RedisInstrumentation_middleware + +public static class RedisInstrumentation +{ + // Just showing what data elements are available to use for + // extra instrumentation when listening to Redis streams + public static void Before(Envelope envelope, ILogger logger) + { + logger.LogDebug("Received message from Redis stream {StreamKey} with Id={MessageId} and Database={DatabaseId}", + envelope.TopicName, envelope.Id, envelope.Headers.GetValueOrDefault("DatabaseId")); + } +} + +#endregion + +#region sample_OurRedisJsonMapper + +// Simplistic envelope mapper that expects every message to be of +// type "T" and serialized as JSON that works perfectly well w/ our +// application's default JSON serialization +public class OurRedisJsonMapper : EnvelopeMapper>, IRedisEnvelopeMapper +{ + // Wolverine needs to know the message type name + private readonly string _messageTypeName = typeof(TMessage).ToMessageTypeName(); + + public OurRedisJsonMapper(Endpoint endpoint) : base(endpoint) + { + // Map the data property + MapProperty(x => x.Data!, + (e, m) => e.Data = m.Values.FirstOrDefault(x => x.Name == "data").Value, + (e, m) => m.Add(new NameValueEntry("data", e.Data))); + + // Set up the message type + MapProperty(x => x.MessageType!, + (e, m) => e.MessageType = _messageTypeName, + (e, m) => m.Add(new NameValueEntry("message-type", _messageTypeName))); + + // Set up content type + MapProperty(x => x.ContentType!, + (e, m) => e.ContentType = "application/json", + (e, m) => m.Add(new NameValueEntry("content-type", "application/json"))); + } + + protected override void writeOutgoingHeader(List outgoing, string key, string value) + { + outgoing.Add(new NameValueEntry($"header-{key}", value)); + } + + protected override bool tryReadIncomingHeader(StreamEntry incoming, string key, out string? value) + { + var target = $"header-{key}"; + foreach (var nv in incoming.Values) + { + if (nv.Name.Equals(target)) + { + value = nv.Value.ToString(); + return true; + } + } + + value = null; + return false; + } + + protected override void writeIncomingHeaders(StreamEntry incoming, Envelope envelope) + { + var headers = incoming.Values.Where(k => k.Name.StartsWith("header-")); + foreach (var nv in headers) + { + envelope.Headers[nv.Name.ToString()[7..]] = nv.Value.ToString(); // Remove "header-" prefix + } + + // Capture the Redis stream message id + envelope.Headers["redis-entry-id"] = incoming.Id.ToString(); + } +} + +#endregion + +// Sample message types for documentation +public record ColorMessage(string Color, DateTime Timestamp); +public record OrderCreated(string OrderId, decimal Amount, DateTime CreatedAt); +public record PaymentProcessed(string PaymentId, string OrderId, decimal Amount); +public record SystemEvent(string EventType, string Description, DateTime Timestamp); +public record AnalyticsEvent(string EventName, Dictionary Properties, DateTime Timestamp); diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/InlineComplianceTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/InlineComplianceTests.cs new file mode 100644 index 000000000..8247462bb --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/InlineComplianceTests.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading.Tasks; +using Wolverine.ComplianceTests.Compliance; +using Wolverine.Configuration; +using Wolverine.Redis; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class RedisInlineComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + public RedisInlineComplianceFixture() : base(new Uri("redis://stream/0/receiver"), 120) + { + } + + public async Task InitializeAsync() + { + var receiverStream = $"wolverine-tests-inline-receiver-{Guid.NewGuid():N}"; + OutboundAddress = new Uri($"redis://stream/0/{receiverStream}"); + + await SenderIs(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + opts.PublishAllMessages().ToRedisStream(receiverStream).Inline(); + }); + + await ReceiverIs(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + opts.ListenToRedisStream(receiverStream, "g1").Inline(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +public class InlineSendingAndReceivingCompliance : TransportCompliance; + diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisAutoClaimIntegrationTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisAutoClaimIntegrationTests.cs new file mode 100644 index 000000000..e453ec88f --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisAutoClaimIntegrationTests.cs @@ -0,0 +1,147 @@ +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using Shouldly; +using Wolverine; +using Wolverine.Redis; +using Wolverine.Redis.Internal; +using Wolverine.Util; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class RedisAutoClaimIntegrationTests +{ + public record AutoClaimTestMessage(string Id); + + private static async Task ConnectAsync() => (await ConnectionMultiplexer.ConnectAsync("localhost:6379")).GetDatabase(); + + [Fact] + public async Task autoclaim_integration_processes_pending_messages() + { + var streamKey = $"wolverine-tests-autoclaim-{Guid.NewGuid():N}"; + var group = "g1"; + var consumerA = "A"; + + var db = await ConnectAsync(); + + // Ensure group exists + try { await db.StreamCreateConsumerGroupAsync(streamKey, group, "$", true); } catch { } + + // Produce one message + var json = "{\"Id\":\"auto-claim-test\"}"; + var typeName = typeof(AutoClaimTestMessage).ToMessageTypeName(); + await db.StreamAddAsync(streamKey, new[] + { + new NameValueEntry("payload", (ReadOnlyMemory)Encoding.UTF8.GetBytes(json)), + new NameValueEntry("wolverine-message-type", typeName), + new NameValueEntry("wolverine-content-type", "application/json") + }); + + // Read with consumer A but do not ack, to create a pending entry + await db.StreamReadGroupAsync(streamKey, group, consumerA, ">", 1, false); + // Give the message a little idle time to satisfy minIdle + await Task.Delay(250); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var host = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => + { + logging.ClearProviders(); + logging.AddConsole(); + logging.SetMinimumLevel(LogLevel.Debug); + }) + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379"); + var endpoint = opts.ListenToRedisStream(streamKey, group); + endpoint.EnableAutoClaim(TimeSpan.FromMilliseconds(500), TimeSpan.FromMilliseconds(100)); + endpoint.BlockTimeoutMilliseconds = 100; + endpoint.MessageType = typeof(AutoClaimTestMessage); + + opts.Services.AddSingleton(tcs); + }) + .StartAsync(); + + // Wait up to 10 seconds for message to be handled via auto-claim + var completed = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(10))); + completed.ShouldBe(tcs.Task); + var result = await tcs.Task; + result.ShouldBeTrue(); + } + + [Fact] + public async Task autoclaim_disabled_by_default() + { + var streamKey = $"wolverine-tests-autoclaim-disabled-{Guid.NewGuid():N}"; + var group = "g1"; + + var db = await ConnectAsync(); + + // Ensure group exists + try { await db.StreamCreateConsumerGroupAsync(streamKey, group, "$", true); } catch { } + + using var host = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => + { + logging.ClearProviders(); + logging.SetMinimumLevel(LogLevel.Debug); + }) + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379"); + var endpoint = opts.ListenToRedisStream(streamKey, group); + endpoint.MessageType = typeof(AutoClaimTestMessage); + + // AutoClaim should be disabled by default + endpoint.AutoClaimEnabled.ShouldBeFalse(); + endpoint.AutoClaimPeriod.ShouldBe(TimeSpan.FromSeconds(30)); // Default period + }) + .StartAsync(); + + await Task.Delay(100); // Brief delay to let host start + } + + [Fact] + public void fluent_api_enables_autoclaim_with_custom_settings() + { + var transport = new RedisTransport("localhost:6379"); + var endpoint = transport.StreamEndpoint("test"); + + endpoint.AutoClaimEnabled.ShouldBeFalse(); // Default + endpoint.AutoClaimPeriod.ShouldBe(TimeSpan.FromSeconds(30)); // Default + endpoint.AutoClaimMinIdle.ShouldBe(TimeSpan.FromMinutes(1)); // Default + + endpoint.EnableAutoClaim(TimeSpan.FromSeconds(15), TimeSpan.FromMinutes(2)); + + endpoint.AutoClaimEnabled.ShouldBeTrue(); + endpoint.AutoClaimPeriod.ShouldBe(TimeSpan.FromSeconds(15)); + endpoint.AutoClaimMinIdle.ShouldBe(TimeSpan.FromMinutes(2)); + } + + [Fact] + public void fluent_api_disables_autoclaim() + { + var transport = new RedisTransport("localhost:6379"); + var endpoint = transport.StreamEndpoint("test"); + + endpoint.EnableAutoClaim().DisableAutoClaim(); + + endpoint.AutoClaimEnabled.ShouldBeFalse(); + } +} + +public class AutoClaimTestHandler +{ + private readonly TaskCompletionSource _tcs; + public AutoClaimTestHandler(TaskCompletionSource tcs) => _tcs = tcs; + + public void Handle(RedisAutoClaimIntegrationTests.AutoClaimTestMessage msg) + { + _tcs.TrySetResult(true); + } +} diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisClaimingTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisClaimingTests.cs new file mode 100644 index 000000000..b62cd05e0 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisClaimingTests.cs @@ -0,0 +1,85 @@ +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using Shouldly; +using Wolverine.Util; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class RedisClaimingTests +{ + public record TestMessage(string Id); + + private static async Task ConnectAsync() => (await ConnectionMultiplexer.ConnectAsync("localhost:6379")).GetDatabase(); + + [Fact] + public async Task claim_and_process_pending_messages() + { + var streamKey = $"wolverine-tests-claim-{Guid.NewGuid():N}"; + var group = "g1"; + var consumerA = "A"; + + var db = await ConnectAsync(); + + // Ensure group exists + try { await db.StreamCreateConsumerGroupAsync(streamKey, group, "$", true); } catch { } + + // Produce one message + var json = "{\"Id\":\"abc\"}"; + var typeName = typeof(TestMessage).ToMessageTypeName(); + await db.StreamAddAsync(streamKey, new[] + { + new NameValueEntry("payload", (ReadOnlyMemory)Encoding.UTF8.GetBytes(json)), + new NameValueEntry("wolverine-message-type", typeName), + new NameValueEntry("wolverine-content-type", "application/json") + }); + + // Read with consumer A but do not ack, to create a pending entry + await db.StreamReadGroupAsync(streamKey, group, consumerA, ">", 1, false); + // Give the message a little idle time to satisfy minIdle + await Task.Delay(250); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var host = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => + { + logging.ClearProviders(); + logging.AddConsole(); + logging.SetMinimumLevel(LogLevel.Debug); + }) + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379"); + var endpoint = opts.ListenToRedisStream(streamKey, group); + endpoint.EnableAutoClaim(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(1)); + endpoint.BatchSize = 10; + endpoint.BlockTimeoutMilliseconds = 100; + endpoint.MessageType = typeof(TestMessage); + + opts.Services.AddSingleton(tcs); + }) + .StartAsync(); + + // Wait up to 10 seconds for message to be handled via claim loop + var completed = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(10))); + completed.ShouldBe(tcs.Task); + var result = await tcs.Task; + result.ShouldBeTrue(); + } +} + +public class TestHandler +{ + private readonly TaskCompletionSource _tcs; + public TestHandler(TaskCompletionSource tcs) => _tcs = tcs; + + public void Handle(RedisClaimingTests.TestMessage msg) + { + _tcs.TrySetResult(true); + } +} + diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisDiagnosticsTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisDiagnosticsTests.cs new file mode 100644 index 000000000..f1a125dc5 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisDiagnosticsTests.cs @@ -0,0 +1,45 @@ +using System.Text; +using Microsoft.Extensions.Logging.Abstractions; +using StackExchange.Redis; +using Shouldly; +using Wolverine.Redis.Internal; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class RedisDiagnosticsTests +{ + private static async Task ConnectAsync() => (await ConnectionMultiplexer.ConnectAsync("localhost:6379")).GetDatabase(); + + [Fact] + public async Task get_attributes_and_purge() + { + var streamKey = $"wolverine-tests-diag-{Guid.NewGuid():N}"; + var transport = new RedisTransport("localhost:6379"); + var endpoint = transport.StreamEndpoint(streamKey); + + var db = await ConnectAsync(); + + // push a couple messages + for (int i = 0; i < 3; i++) + { + var payload = Encoding.UTF8.GetBytes("{}"); + await db.StreamAddAsync(streamKey, new[] + { + new NameValueEntry("payload", (ReadOnlyMemory)payload), + new NameValueEntry("wolverine-message-type", "noop"), + }); + } + + var attrs = await endpoint.GetAttributesAsync(); + attrs["streamKey"].ShouldBe(streamKey); + attrs.ContainsKey("messageCount").ShouldBeTrue(); + attrs["messageCount"].ShouldBe("3"); + + // Purge + await endpoint.PurgeAsync(NullLogger.Instance); + var len = await db.StreamLengthAsync(streamKey); + len.ShouldBe(0); + } +} + diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisTransportFixture.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisTransportFixture.cs new file mode 100644 index 000000000..3cc6a3bce --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisTransportFixture.cs @@ -0,0 +1,28 @@ +using Wolverine.ComplianceTests.Compliance; +using Wolverine.Redis; + +namespace Wolverine.Redis.Tests; + +public class RedisTransportFixture : TransportComplianceFixture +{ + public RedisTransportFixture() : base(new Uri($"redis://localhost:6379?streamKey=wolverine-tests-{Guid.NewGuid():N}")) + { + } + + public async Task InitializeAsync() + { + await SenderIs(opts => + { + opts.UseRedisTransport("localhost:6379") + .AutoProvision(); + }); + + await ReceiverIs(opts => + { + opts.UseRedisTransport("localhost:6379") + .AutoProvision(); + + opts.ListenToRedisStream("wolverine-tests", "test-consumer-group"); + }); + } +} diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs new file mode 100644 index 000000000..802f29f3b --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs @@ -0,0 +1,108 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Configuration; +using Wolverine.Tracking; +using Wolverine.Redis.Internal; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class ResponseStreamMechanics : IAsyncLifetime +{ + private IHost _host = null!; + private RedisStreamEndpoint _endpoint = null!; + private string _expectedStreamKey = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "MyApp"; + opts.UseRedisTransport("localhost:6379").AutoProvision(); + }).StartAsync(); + + var runtime = _host.GetRuntime(); + var options = runtime.Options; + var transport = options.Transports.GetOrCreate(); + + _expectedStreamKey = $"wolverine.response.{options.ServiceName}.{options.Durability.AssignedNodeNumber}".ToLowerInvariant(); + + // Should be created by transport initialization as a system endpoint + var reply = transport.ReplyEndpoint(); + reply.ShouldNotBeNull(); + + _endpoint = reply.ShouldBeOfType(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public void the_endpoint_exists() + { + _endpoint.ShouldNotBeNull(); + } + + [Fact] + public void should_be_marked_for_replies() + { + _endpoint.IsUsedForReplies.ShouldBeTrue(); + } + + [Fact] + public void should_be_marked_as_system_role() + { + _endpoint.Role.ShouldBe(EndpointRole.System); + } + + [Fact] + public void is_using_the_expected_stream_key() + { + _endpoint.StreamKey.ShouldStartWith("wolverine.response.myapp"); + _endpoint.StreamKey.ShouldBe(_expectedStreamKey); + } + + [Fact] + public void should_have_expected_uri() + { + var expected = new Uri($"redis://stream/0/{_expectedStreamKey}?consumerGroup=wolverine-replies"); + _endpoint.Uri.ShouldBe(expected); + } +} + +public class ResponseStreamDisabling : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + var t = opts.UseRedisTransport("localhost:6379").AutoProvision(); + t.SystemQueuesEnabled = false; + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public void disable_system_streams() + { + var transport = _host.GetRuntime().Options.Transports.GetOrCreate(); + var systemEndpoints = transport.Endpoints().Where(x => x.Role == EndpointRole.System).ToArray(); + systemEndpoints.Any().ShouldBeFalse(); + } +} + diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/StartFromBehaviorTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/StartFromBehaviorTests.cs new file mode 100644 index 000000000..f09dfae75 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/StartFromBehaviorTests.cs @@ -0,0 +1,168 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Shouldly; +using Wolverine; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class StartFromBehaviorTests +{ + public record TestMessage(string Id); + + public class MessageHandler + { + private readonly MessageTracker _tracker; + private readonly TaskCompletionSource? _tcs; + public MessageHandler(MessageTracker tracker, TaskCompletionSource? tcs = null) + { + _tracker = tracker; + _tcs = tcs; + } + + public void Handle(TestMessage message) + { + _tracker.AddMessage(message.Id); + _tcs?.TrySetResult(true); + } + } + + public class MessageTracker + { + private readonly List _receivedMessages = new(); + public IReadOnlyList ReceivedMessages => _receivedMessages.AsReadOnly(); + public void AddMessage(string id) => _receivedMessages.Add(id); + } + + [Fact] + public async Task StartFromNewMessages_should_only_process_messages_after_group_creation() + { + var streamKey = $"wolverine-tests-start-from-new-{Guid.NewGuid():N}"; + var tracker = new MessageTracker(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // First, send some messages before creating the listener + using var publisherHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + }) + .StartAsync(); + + var bus = publisherHost.Services.GetRequiredService(); + + // Send 3 messages before creating the consumer group + await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("before-1")); + await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("before-2")); + await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("before-3")); + + await publisherHost.StopAsync(); + + // Now create a listener with StartFromNewMessages (default behavior) + using var listenerHost = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => + { + logging.ClearProviders(); + logging.AddConsole(); + logging.SetMinimumLevel(LogLevel.Debug); + }) + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + var endpoint = opts.ListenToRedisStream(streamKey, "test-group") + .StartFromNewMessages() // Explicit, but this is the default + .BlockTimeout(TimeSpan.FromMilliseconds(100)); + endpoint.MessageType = typeof(TestMessage); + + opts.Services.AddSingleton(tracker); + opts.Services.AddSingleton(tcs); + opts.Discovery.IncludeAssembly(typeof(StartFromBehaviorTests).Assembly); + }) + .StartAsync(); + + // Give listener time to start + await Task.Delay(200); + + // Send a message after the listener is active + var listenerBus = listenerHost.Services.GetRequiredService(); + await listenerBus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("after-1")); + + // Wait for completion or timeout + var completed = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(5))); + if (completed == tcs.Task) + { + // Should only have received the message sent after listener creation + tracker.ReceivedMessages.Count.ShouldBe(1); + tracker.ReceivedMessages.ShouldContain("after-1"); + tracker.ReceivedMessages.ShouldNotContain("before-1"); + tracker.ReceivedMessages.ShouldNotContain("before-2"); + tracker.ReceivedMessages.ShouldNotContain("before-3"); + } + else + { + // Fallback: just check what messages we got + tracker.ReceivedMessages.Count.ShouldBe(1); + tracker.ReceivedMessages.ShouldContain("after-1"); + } + + await listenerHost.StopAsync(); + } + + [Fact] + public async Task StartFromBeginning_should_process_existing_messages() + { + var streamKey = $"wolverine-tests-start-from-beginning-{Guid.NewGuid():N}"; + var tracker = new MessageTracker(); + + // First, send some messages before creating the listener + using var publisherHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + }) + .StartAsync(); + + var bus = publisherHost.Services.GetRequiredService(); + + // Send messages before creating the consumer group + await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("existing-1")); + await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("existing-2")); + + await publisherHost.StopAsync(); + + // Now create a listener with StartFromBeginning + using var listenerHost = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => + { + logging.ClearProviders(); + logging.AddConsole(); + logging.SetMinimumLevel(LogLevel.Debug); + }) + .UseWolverine(opts => + { + opts.UseRedisTransport("localhost:6379").AutoProvision(); + var endpoint = opts.ListenToRedisStream(streamKey, "test-group-beginning") + .StartFromBeginning() // Should process existing messages + .BlockTimeout(TimeSpan.FromMilliseconds(100)); + endpoint.MessageType = typeof(TestMessage); + + opts.Services.AddSingleton(tracker); + opts.Discovery.IncludeAssembly(typeof(StartFromBehaviorTests).Assembly); + }) + .StartAsync(); + + // Give time for message processing + await Task.Delay(1000); + + // Should have received the existing messages + tracker.ReceivedMessages.Count.ShouldBeGreaterThanOrEqualTo(2); + tracker.ReceivedMessages.ShouldContain("existing-1"); + tracker.ReceivedMessages.ShouldContain("existing-2"); + + await listenerHost.StopAsync(); + } +} diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj b/src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj new file mode 100644 index 000000000..4f428a867 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj @@ -0,0 +1,34 @@ + + + + false + true + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + Servers.cs + + + + diff --git a/src/Transports/Redis/Wolverine.Redis/IRedisEnvelopeMapper.cs b/src/Transports/Redis/Wolverine.Redis/IRedisEnvelopeMapper.cs new file mode 100644 index 000000000..4e1e14c7d --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/IRedisEnvelopeMapper.cs @@ -0,0 +1,14 @@ +using StackExchange.Redis; +using Wolverine.Transports; + +namespace Wolverine.Redis; + +/// +/// Envelope mapper interface for Redis Streams transport +/// +public interface IRedisEnvelopeMapper : IEnvelopeMapper> +{ + //Task ToRedisStreamFields(Envelope envelope); + //Envelope CreateEnvelope(string streamKey, StreamEntry message); + +} \ No newline at end of file diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/InlineRedisStreamSender.cs b/src/Transports/Redis/Wolverine.Redis/Internal/InlineRedisStreamSender.cs new file mode 100644 index 000000000..018929b65 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/Internal/InlineRedisStreamSender.cs @@ -0,0 +1,70 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using Wolverine.Runtime; +using Wolverine.Transports.Sending; + +namespace Wolverine.Redis.Internal; + +public class InlineRedisStreamSender : ISender +{ + private readonly RedisTransport _transport; + private readonly RedisStreamEndpoint _endpoint; + private readonly IWolverineRuntime _runtime; + private readonly ILogger _logger; + + public InlineRedisStreamSender(RedisTransport transport, RedisStreamEndpoint endpoint, IWolverineRuntime runtime) + { + _transport = transport; + _endpoint = endpoint; + _runtime = runtime; + _logger = runtime.LoggerFactory.CreateLogger(); + } + + public Uri Destination => _endpoint.Uri; + + public bool SupportsNativeScheduledSend => false; + + public async Task PingAsync() + { + try + { + var database = _transport.GetDatabase(); + // Simple ping to check if Redis is available + await database.PingAsync(); + return true; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to ping Redis server"); + return false; + } + } + + public async ValueTask SendAsync(Envelope envelope) + { + try + { + var database = _transport.GetDatabase(); + + // Use envelope mapper to create Redis stream fields + _endpoint.EnvelopeMapper ??= _endpoint.BuildMapper(_runtime); + + var fields = new List(); + _endpoint.EnvelopeMapper!.MapEnvelopeToOutgoing(envelope, fields); + + // Send to Redis Stream using XADD + var messageId = await database.StreamAddAsync(_endpoint.StreamKey, fields.ToArray()); + + _logger.LogDebug("Sent message {MessageId} to Redis stream {StreamKey} with ID {StreamMessageId}", + envelope.Id, _endpoint.StreamKey, messageId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to send message {MessageId} to Redis stream {StreamKey}", + envelope.Id, _endpoint.StreamKey); + throw; + } + } + + +} diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisSenderProtocol.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisSenderProtocol.cs new file mode 100644 index 000000000..194bf4fbd --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisSenderProtocol.cs @@ -0,0 +1,46 @@ +using StackExchange.Redis; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Redis.Internal; + +public class RedisSenderProtocol : ISenderProtocol, IDisposable +{ + private readonly RedisTransport _transport; + private readonly RedisStreamEndpoint _endpoint; + + public RedisSenderProtocol(RedisTransport transport, RedisStreamEndpoint endpoint) + { + _transport = transport; + _endpoint = endpoint; + } + + public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch batch) + { + var database = _transport.GetDatabase(); + + try + { + var tasks = new List>(batch.Messages.Count); + foreach (var envelope in batch.Messages) + { + var list = new List(); + _endpoint.EnvelopeMapper!.MapEnvelopeToOutgoing(envelope, list); + tasks.Add(database.StreamAddAsync(_endpoint.StreamKey, list.ToArray())); + } + + await Task.WhenAll(tasks); + await callback.MarkSuccessfulAsync(batch); + } + catch (Exception ex) + { + await callback.MarkProcessingFailureAsync(batch, ex); + throw; + } + } + + public void Dispose() + { + // No unmanaged resources + } +} diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs new file mode 100644 index 000000000..2701b4236 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs @@ -0,0 +1,335 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using Wolverine.Configuration; +using Wolverine.Redis; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Redis.Internal; + +public class RedisStreamEndpoint : Endpoint, IBrokerEndpoint, IBrokerQueue +{ + private readonly RedisTransport _transport; + + internal RedisStreamEndpoint(Uri uri, RedisTransport transport, EndpointRole role = EndpointRole.Application) + : base(uri, role) + { + _transport = transport; + var (streamKey, databaseId) = ParseStreamKeyAndDatabase(uri); + StreamKey = streamKey; + DatabaseId = databaseId; + ConsumerGroup = ParseConsumerGroup(uri); + EndpointName = StreamKey; + + // Redis Streams work well in buffered mode by default + Mode = EndpointMode.BufferedInMemory; + } + + /// + /// The Redis Stream key name + /// + public string StreamKey { get; } + + /// + /// The Redis database ID (0-15 for standard Redis) + /// + public int DatabaseId { get; } + + /// + /// The consumer group name for this endpoint (if listening) + /// + public string? ConsumerGroup { get; set; } + + /// + /// Maximum number of messages to read in a single batch from Redis Stream + /// + public int BatchSize { get; set; } = 10; + + /// + /// Consumer name within the consumer group. Defaults to machine name + process ID + /// + public string? ConsumerName { get; set; } + + /// + /// Block timeout in milliseconds when reading from streams + /// + public int BlockTimeoutMilliseconds { get; set; } = 1000; + + /// + /// If true, purge the stream on startup. Useful for tests. + /// + public bool PurgeOnStartup { get; set; } + + /// + /// Enable periodic auto-claiming of pending messages within the main consumer loop + /// + public bool AutoClaimEnabled { get; set; } = false; + + /// + /// Period between auto-claim attempts when integrated into the consumer loop (default: 30 seconds) + /// + public TimeSpan AutoClaimPeriod { get; set; } = TimeSpan.FromSeconds(30); + + /// + /// Minimum idle time before claiming pending messages for auto-claim (default: 1 min) + /// + public TimeSpan AutoClaimMinIdle { get; set; } = TimeSpan.FromMinutes(1); + + /// + /// Determines where to start consuming when creating a new consumer group. + /// NewMessages (default): only consume messages added after group creation + /// Beginning: consume from the start of the stream including existing messages + /// + public StartFrom StartFrom { get; set; } = StartFrom.NewMessages; + + private static (string streamKey, int databaseId) ParseStreamKeyAndDatabase(Uri uri) + { + // Only support the new format: redis://stream/{dbId}/{streamKey} + if (!uri.Host.Equals("stream", StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException($"Redis URI must use the format 'redis://stream/{{dbId}}/{{streamKey}}': {uri}"); + } + + if (uri.Segments.Length < 3) + { + throw new ArgumentException($"Redis URI must specify both database ID and stream key in format 'redis://stream/{{dbId}}/{{streamKey}}': {uri}"); + } + + var databaseSegment = uri.Segments[1].TrimEnd('/'); + var streamKeySegment = uri.Segments[2].TrimEnd('/'); + + if (!int.TryParse(databaseSegment, out var databaseId) || databaseId < 0) + { + throw new ArgumentException($"Database ID must be a non-negative integer in Redis URI: {uri}"); + } + + if (string.IsNullOrEmpty(streamKeySegment)) + { + throw new ArgumentException($"Stream key cannot be empty in Redis URI: {uri}"); + } + + return (streamKeySegment, databaseId); + } + + private static string? ParseConsumerGroup(Uri uri) + { + var query = System.Web.HttpUtility.ParseQueryString(uri.Query); + return query["consumerGroup"]; + } + + protected override RedisEnvelopeMapper buildMapper(IWolverineRuntime runtime) + { + return new RedisEnvelopeMapper(this); + } + + public override async ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) + { + if (string.IsNullOrEmpty(ConsumerGroup)) + { + throw new InvalidOperationException($"Consumer group is required for listening to Redis stream '{StreamKey}'"); + } + + var listener = new RedisStreamListener(_transport, this, runtime, receiver); + await listener.InitializeAsync(); + return listener; + } + + public override async ValueTask InitializeAsync(ILogger logger) + { + // Honor AutoProvision/AutoPurge semantics similar to other transports + try + { + var db = _transport.GetDatabase(database: DatabaseId); + + if (_transport.AutoProvision) + { + // Ensure group exists with appropriate starting position + if (!string.IsNullOrEmpty(ConsumerGroup) && IsListener) + { + try + { + var startPosition = StartFrom == StartFrom.Beginning ? "0-0" : "$"; + await db.StreamCreateConsumerGroupAsync(StreamKey, ConsumerGroup, startPosition, true); + } + catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP")) + { + // already exists + } + } + } + + if (PurgeOnStartup || _transport.AutoPurgeAllQueues) + { + // Trim entire stream + try { await PurgeAsync(logger); } catch { /* ignore */ } + } + } + catch (Exception e) + { + logger.LogError(e, "Error initializing Redis stream endpoint {Stream}", StreamKey); + throw; + } + } + + protected override ISender CreateSender(IWolverineRuntime runtime) + { + EnvelopeMapper ??= BuildMapper(runtime); + + return Mode == EndpointMode.Inline + ? new InlineRedisStreamSender(_transport, this, runtime) + : new BatchedSender(this, new RedisSenderProtocol(_transport, this), runtime.Cancellation, + runtime.LoggerFactory.CreateLogger()); + } + + public async ValueTask CheckAsync() + { + try + { + var database = _transport.GetDatabase(database: DatabaseId); + // Check if stream exists by getting stream info + var info = await database.StreamInfoAsync(StreamKey); + return true; + } + catch (RedisServerException ex) when (ex.Message.Contains("no such key")) + { + // Stream doesn't exist yet, which is fine + return true; + } + catch + { + return false; + } + } + + public async ValueTask TeardownAsync(ILogger logger) + { + if (Role == EndpointRole.System) + { + return; // Don't tear down system endpoints + } + + try + { + var database = _transport.GetDatabase(database: DatabaseId); + + // Delete consumer group if this endpoint created it + if (!string.IsNullOrEmpty(ConsumerGroup)) + { + logger.LogInformation("Removing consumer group {ConsumerGroup} from Redis stream {StreamKey}", + ConsumerGroup, StreamKey); + await database.StreamDeleteConsumerGroupAsync(StreamKey, ConsumerGroup); + } + + logger.LogDebug("Teardown completed for Redis stream endpoint {StreamKey}", StreamKey); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to teardown Redis stream endpoint {StreamKey}", StreamKey); + } + } + + // IBrokerQueue implementation for diagnostics and purge support + public async ValueTask PurgeAsync(ILogger logger) + { + try + { + var db = _transport.GetDatabase(database: DatabaseId); + if (await db.KeyDeleteAsync(StreamKey)) + logger.LogInformation("Purged Redis stream {StreamKey}", StreamKey); + } + catch (Exception e) + { + logger.LogError(e, "Error purging Redis stream {StreamKey}", StreamKey); + throw; + } + } + + public async ValueTask> GetAttributesAsync() + { + var dict = new Dictionary + { + { "streamKey", StreamKey } + }; + + try + { + var db = _transport.GetDatabase(database: DatabaseId); + var len = await db.StreamLengthAsync(StreamKey); + dict["messageCount"] = len.ToString(); + } + catch + { + // ignore + } + + if (!string.IsNullOrEmpty(ConsumerGroup)) + { + dict["consumerGroup"] = ConsumerGroup!; + } + + return dict; + } + + /// + /// Enable auto-claiming of pending entries within the consumer loop every specified period + /// + /// This can cause out of order messages. It is recommended to manually deal with expired claimed message if you are using .Inline() or desire ordered message processing. + /// Period between auto-claim attempts (default: 30 seconds) + /// Minimum idle time before claiming pending messages (if null, uses MinIdleBeforeClaimMilliseconds) + /// This endpoint for method chaining + public RedisStreamEndpoint EnableAutoClaim(TimeSpan? period = null, TimeSpan? minIdle = null) + { + AutoClaimEnabled = true; + if (period.HasValue) AutoClaimPeriod = period.Value; + if (minIdle.HasValue) AutoClaimMinIdle = minIdle.Value; + return this; + } + + /// + /// Disable auto-claiming of pending entries within the consumer loop + /// + /// This endpoint for method chaining + public RedisStreamEndpoint DisableAutoClaim() + { + AutoClaimEnabled = false; + return this; + } + + public async ValueTask SetupAsync(ILogger logger) + { + try + { + var database = _transport.GetDatabase(database: DatabaseId); + + // Create consumer group if specified and this is a listener + if (!string.IsNullOrEmpty(ConsumerGroup) && IsListener) + { + try + { + var startPosition = StartFrom == StartFrom.Beginning ? "0-0" : "$"; + var positionDescription = StartFrom == StartFrom.Beginning ? "beginning" : "end"; + + logger.LogDebug("Creating consumer group {ConsumerGroup} for Redis stream {StreamKey} starting from {Position}", + ConsumerGroup, StreamKey, positionDescription); + + await database.StreamCreateConsumerGroupAsync(StreamKey, ConsumerGroup, startPosition, true); + + logger.LogInformation("Created consumer group {ConsumerGroup} for Redis stream {StreamKey} starting from {Position}", + ConsumerGroup, StreamKey, positionDescription); + } + catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP")) + { + // Group already exists, which is fine + logger.LogDebug("Consumer group {ConsumerGroup} already exists for stream {StreamKey}", + ConsumerGroup, StreamKey); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to setup Redis stream endpoint {StreamKey}", StreamKey); + throw; + } + } +} diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs new file mode 100644 index 000000000..a44d1e255 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs @@ -0,0 +1,358 @@ +using System.Diagnostics; +using System.Linq; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using Wolverine.Configuration; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.Redis.Internal; + +public class RedisStreamListener : IListener +{ + private readonly RedisTransport _transport; + private readonly RedisStreamEndpoint _endpoint; + private readonly IWolverineRuntime _runtime; + private readonly IReceiver _receiver; + private readonly ILogger _logger; + private readonly CancellationTokenSource _cancellation = new(); + + private Task? _consumerTask; + private ListeningStatus _status = ListeningStatus.Stopped; + private string _consumerName; + + + + public RedisStreamListener(RedisTransport transport, RedisStreamEndpoint endpoint, + IWolverineRuntime runtime, IReceiver receiver) + { + _transport = transport; + _endpoint = endpoint; + _runtime = runtime; + _receiver = receiver; + _logger = runtime.LoggerFactory.CreateLogger(); + + // Generate stable consumer name: service name + node number (+ machine) by default, + // or use endpoint-level override if specified. + _consumerName = _transport.ComputeConsumerName(_runtime, _endpoint); + + Address = endpoint.Uri; + } + + public Uri Address { get; } + public ListeningStatus Status => _status; + public IHandlerPipeline? Pipeline => _receiver.Pipeline; + + public async ValueTask InitializeAsync() + { + // Only create resources at listener init time if AutoProvision is enabled. + if (_transport.AutoProvision) + { + await _endpoint.SetupAsync(_logger); + } + else + { + // Fail-fast if the required consumer group (or stream) is missing when AutoProvision is disabled. + if (_endpoint.IsListener && !string.IsNullOrEmpty(_endpoint.ConsumerGroup)) + { + try + { + var db = _transport.GetDatabase(database: _endpoint.DatabaseId); + var groups = await db.StreamGroupInfoAsync(_endpoint.StreamKey); + var exists = groups?.Any(g => g.Name == _endpoint.ConsumerGroup) ?? false; + if (!exists) + { + throw new InvalidOperationException($"Redis consumer group '{_endpoint.ConsumerGroup}' for stream '{_endpoint.StreamKey}' (db {_endpoint.DatabaseId}) does not exist, and AutoProvision is disabled. Enable AutoProvision() or run AddResourceSetupOnStartup() to create it."); + } + } + catch (RedisServerException ex) when (ex.Message.Contains("no such key", StringComparison.OrdinalIgnoreCase)) + { + throw new InvalidOperationException($"Redis stream '{_endpoint.StreamKey}' (db {_endpoint.DatabaseId}) does not exist, and AutoProvision is disabled. Create the stream and consumer group '{_endpoint.ConsumerGroup}' or enable AutoProvision()/AddResourceSetupOnStartup().", ex); + } + } + } + + // Start processing loops + if (_status == ListeningStatus.Stopped) + { + _logger.LogInformation("Starting Redis stream listener for {StreamKey} with consumer group {ConsumerGroup} and consumer {ConsumerName}", + _endpoint.StreamKey, _endpoint.ConsumerGroup, _consumerName); + + _status = ListeningStatus.Accepting; + + // Start the consumer loop + _consumerTask = Task.Run(ConsumerLoop, _cancellation.Token); + + } + } + + public async ValueTask StopAsync() + { + if (_status == ListeningStatus.Stopped) + { + return; + } + + _logger.LogInformation("Stopping Redis stream listener for {StreamKey}", _endpoint.StreamKey); + + _status = ListeningStatus.Stopped; + _cancellation.Cancel(); + + if (_consumerTask != null) + { + try + { + await _consumerTask.WaitAsync(TimeSpan.FromSeconds(10)); + } + catch (TaskCanceledException) + { + // Expected when cancellation token is used + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error while stopping consumer task for stream {StreamKey}", _endpoint.StreamKey); + } + } + } + + public async ValueTask CompleteAsync(Envelope envelope) + { + try + { + if (!envelope.Headers.TryGetValue(RedisEnvelopeMapper.RedisEntryIdHeader, out var idString) || string.IsNullOrEmpty(idString)) + { + _logger.LogDebug("No Redis stream id header present for envelope {EnvelopeId}; skipping ACK", envelope.Id); + return; + } + + var db = _transport.GetDatabase(); + await db.StreamAcknowledgeAsync(_endpoint.StreamKey, _endpoint.ConsumerGroup!, idString!); + _logger.LogDebug("Acknowledged Redis stream message {StreamId} on {StreamKey}", idString, _endpoint.StreamKey); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error ACKing Redis stream message for envelope {EnvelopeId}", envelope.Id); + } + } + + public async ValueTask DeferAsync(Envelope envelope) + { + try + { + var db = _transport.GetDatabase(); + + // 1) Ack the current pending entry if we can + if (envelope.Headers.TryGetValue(RedisEnvelopeMapper.RedisEntryIdHeader, out var idString) && !string.IsNullOrEmpty(idString)) + { + try + { + await db.StreamAcknowledgeAsync(_endpoint.StreamKey, _endpoint.ConsumerGroup!, idString!); + } + catch (Exception ackEx) + { + _logger.LogWarning(ackEx, "Error ACKing Redis stream message before requeue for envelope {EnvelopeId}", envelope.Id); + } + } + + // 2) Re-add a copy to the tail of the stream + _endpoint.EnvelopeMapper ??= _endpoint.BuildMapper(_runtime); + var fields = new List(); + _endpoint.EnvelopeMapper.MapEnvelopeToOutgoing(envelope, fields); + var newId = await db.StreamAddAsync(_endpoint.StreamKey, fields.ToArray()); + + _logger.LogDebug("Requeued envelope {EnvelopeId} to Redis stream {StreamKey} as new entry {NewId}", + envelope.Id, _endpoint.StreamKey, newId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to requeue Redis stream message for envelope {EnvelopeId}", envelope.Id); + } + } + + public async Task TryRequeueAsync(Envelope envelope) + { + await DeferAsync(envelope); + return true; + } + + private async Task EnsureGroupExistsAsync(IDatabase db) + { + if (string.IsNullOrEmpty(_endpoint.ConsumerGroup)) return; + if (!_transport.AutoProvision) return; + + try + { + // Use the endpoint's StartFrom setting to determine position + var startPosition = _endpoint.StartFrom == StartFrom.Beginning ? "0-0" : "$"; + await db.StreamCreateConsumerGroupAsync(_endpoint.StreamKey, _endpoint.ConsumerGroup, startPosition, true); + _logger.LogInformation("Ensured consumer group {Group} exists for stream {StreamKey}", _endpoint.ConsumerGroup, _endpoint.StreamKey); + } + catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP")) + { + // Group already exists, nothing to do + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to ensure consumer group {Group} exists for stream {StreamKey}", _endpoint.ConsumerGroup, _endpoint.StreamKey); + } + } + + private async Task ReadEntriesAsync(IDatabase database, bool useAutoClaim) + { + if (useAutoClaim) + { + var minIdleMs = (int)_endpoint.AutoClaimMinIdle.TotalMilliseconds; + // XAUTOCLAIM for pending messages + var result = await database.StreamAutoClaimAsync( + _endpoint.StreamKey, + _endpoint.ConsumerGroup!, + _consumerName, + minIdleMs, + "0-0", // start from the beginning of PEL + _endpoint.BatchSize); + + _logger.LogDebug( + "XAUTOCLAIM on {StreamKey}/{Group} (minIdle={MinIdle}ms) returned {Count} entries; nextStartId={NextStartId}", + _endpoint.StreamKey, _endpoint.ConsumerGroup, minIdleMs, result.ClaimedEntries?.Length ?? 0, + result.NextStartId); + + return result.ClaimedEntries ?? []; + } + + // Standard XREADGROUP for new messages + return await database.StreamReadGroupAsync( + _endpoint.StreamKey, + _endpoint.ConsumerGroup!, + _consumerName, + ">", // Read new messages not yet delivered to this consumer group + count: _endpoint.BatchSize, + noAck: false); + } + + private async Task ConsumerLoop() + { + var database = _transport.GetDatabase(); + var autoClaimWatch = Stopwatch.StartNew(); + + try + { + while (!_cancellation.Token.IsCancellationRequested && _status == ListeningStatus.Accepting) + { + try + { + // Determine if it's time to use AutoClaim instead of regular read + var shouldUseAutoClaim = _endpoint.AutoClaimEnabled && + autoClaimWatch.Elapsed >= _endpoint.AutoClaimPeriod; + + // Read from either XREADGROUP or XAUTOCLAIM + var streamResults = await ReadEntriesAsync(database, shouldUseAutoClaim); + + if (shouldUseAutoClaim) + { + autoClaimWatch.Restart(); + _logger.LogDebug("Used XAUTOCLAIM for {StreamKey}, found {Count} entries", + _endpoint.StreamKey, streamResults.Length); + } + else + { + _logger.LogDebug("Read {Count} entries from {StreamKey} for group {Group} consumer {Consumer}", + streamResults.Length, _endpoint.StreamKey, _endpoint.ConsumerGroup, _consumerName); + } + + if (!streamResults.Any()) + { + // No messages, wait a bit before polling again + await Task.Delay(_endpoint.BlockTimeoutMilliseconds, _cancellation.Token); + continue; + } + + // Process each message + foreach (var message in streamResults) + { + if (_cancellation.Token.IsCancellationRequested) + break; + + await ProcessMessage(message); + } + } + catch (OperationCanceledException) + { + // Expected when shutting down + break; + } + catch (RedisServerException ex) when ( + ex.Message.Contains("NOGROUP", StringComparison.OrdinalIgnoreCase) || + ex.Message.Contains("no such key", StringComparison.OrdinalIgnoreCase)) + { + if (_transport.AutoProvision) + { + _logger.LogWarning(ex, "Consumer group or stream missing for {StreamKey}/{Group}. Attempting to create and retry.", _endpoint.StreamKey, _endpoint.ConsumerGroup); + await EnsureGroupExistsAsync(database); + await Task.Delay(TimeSpan.FromMilliseconds(200), _cancellation.Token); + } + else + { + _logger.LogError(ex, "Redis stream/consumer group missing for {StreamKey}/{Group}, and AutoProvision is disabled. Enable AutoProvision() or run AddResourceSetupOnStartup() to create resources.", _endpoint.StreamKey, _endpoint.ConsumerGroup); + _status = ListeningStatus.Stopped; + _cancellation.Cancel(); + break; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in Redis stream consumer loop for {StreamKey}", _endpoint.StreamKey); + + // Brief delay before retrying to avoid tight error loops + await Task.Delay(TimeSpan.FromSeconds(5), _cancellation.Token); + } + } + } + catch (OperationCanceledException) + { + // Expected during shutdown + } + catch (Exception ex) + { + _logger.LogError(ex, "Fatal error in Redis stream consumer loop for {StreamKey}", _endpoint.StreamKey); + } + + _logger.LogDebug("Redis stream consumer loop ended for {StreamKey}", _endpoint.StreamKey); + } + + private async Task ProcessMessage(StreamEntry streamEntry) + { + try + { + // Convert Redis stream message to Wolverine envelope using mapper + _endpoint.EnvelopeMapper ??= _endpoint.BuildMapper(_runtime); + + var envelope = new Envelope { TopicName = _endpoint.StreamKey }; + _endpoint.EnvelopeMapper.MapIncomingToEnvelope(envelope, streamEntry); + + _logger.LogDebug("Received message {EnvelopeId} from Redis stream {StreamKey} (stream message ID: {StreamMessageId})", + envelope.Id, _endpoint.StreamKey, streamEntry.Id); + + // Send to Wolverine for processing (this will invoke continuations that may call Complete/Defer) + await _receiver.ReceivedAsync(this, envelope); + + // Do not ACK here; CompleteAsync/DeferAsync will handle ACK or requeue as appropriate + _logger.LogDebug("Processed message {EnvelopeId} from Redis stream {StreamKey}", envelope.Id, _endpoint.StreamKey); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to process message {MessageId} from Redis stream {StreamKey}", + streamEntry.Id, _endpoint.StreamKey); + + // Note: We don't acknowledge failed messages, so they can be retried later + // Redis will automatically retry unacknowledged messages based on the consumer group configuration + } + } + + + public ValueTask DisposeAsync() + { + _cancellation.Cancel(); + _cancellation.Dispose(); + return ValueTask.CompletedTask; + } +} diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs new file mode 100644 index 000000000..550e0a5ca --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs @@ -0,0 +1,287 @@ +using System.Collections.Concurrent; +using System.Linq; +using JasperFx.Core; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using Spectre.Console; +using Wolverine.Configuration; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.Redis.Internal; + +public class RedisTransport : BrokerTransport, IAsyncDisposable +{ + public const string ProtocolName = "redis"; + + private readonly LightweightCache _streams; + private readonly ConcurrentDictionary _connections = new(); + private readonly string _connectionString; + + /// + /// Enable/disable creation of system endpoints like reply streams + /// + public bool SystemQueuesEnabled { get; set; } = true; + + /// + /// Database ID to use for the per-node reply stream endpoint. Defaults to 0. + /// + public int ReplyDatabaseId { get; set; } = 0; + + /// + /// Customizable selector to build a stable consumer name for listeners when an endpoint-level ConsumerName is not set. + /// Defaults to ServiceName-NodeNumber-MachineName (lowercased and sanitized). + /// + public Func? DefaultConsumerNameSelector { get; set; } + + /// + /// Default constructor for GetOrCreate pattern - uses localhost:6379 + /// + public RedisTransport() : this("localhost:6379") + { + // Default constructor for GetOrCreate() + } + + public RedisTransport(string connectionString) : base(ProtocolName, "Redis Streams Transport") + { + _connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); + _streams = new LightweightCache( + cacheKey => + { + // Parse the cache key format: {databaseId}:{streamKey} + var parts = cacheKey.Split(':', 2); + if (parts.Length != 2 || !int.TryParse(parts[0], out var databaseId)) + { + throw new ArgumentException($"Invalid cache key format. Expected 'databaseId:streamKey', got '{cacheKey}'"); + } + var streamKey = parts[1]; + return new RedisStreamEndpoint( + new Uri($"{ProtocolName}://stream/{databaseId}/{streamKey}"), + this, + EndpointRole.Application); + }); + } + + public override Uri ResourceUri + { + get + { + // Parse connection string to build resource URI + var options = ConfigurationOptions.Parse(_connectionString); + var endpoint = options.EndPoints.FirstOrDefault(); + + if (endpoint == null) + { + return new Uri($"{ProtocolName}://localhost:6379"); + } + + return new Uri($"{ProtocolName}://{endpoint}"); + } + } + + internal IDatabase GetDatabase(string? connectionString = null, int database = 0) + { + var connection = GetConnection(connectionString); + return connection.GetDatabase(database); + } + + internal IConnectionMultiplexer GetConnection(string? connectionString = null) + { + var connStr = connectionString ?? _connectionString; + return _connections.GetOrAdd(connStr, cs => ConnectionMultiplexer.Connect(cs)); + } + + public override async ValueTask ConnectAsync(IWolverineRuntime runtime) + { + runtime.Logger.LogInformation("Connecting to Redis at {ConnectionString}", + SanitizeConnectionStringForLogging(_connectionString)); + + try + { + // Initialize the default connection + var connection = GetConnection(); + + // Test the connection + var db = connection.GetDatabase(); + await db.PingAsync(); + + runtime.Logger.LogInformation("Successfully connected to Redis"); + } + catch (Exception ex) + { + runtime.Logger.LogError(ex, "Failed to connect to Redis"); + throw; + } + } + + public override IEnumerable DiagnosticColumns() + { + yield return new PropertyColumn("Stream Key", "streamKey"); + yield return new PropertyColumn("Consumer Group", "consumerGroup"); + yield return new PropertyColumn("Message Count", "messageCount", Justify.Right); + } + + protected override IEnumerable endpoints() + { + return _streams.ToArray(); + } + + protected override RedisStreamEndpoint findEndpointByUri(Uri uri) + { + if (uri.Scheme != ProtocolName) + { + throw new ArgumentException($"Invalid scheme for Redis transport: {uri.Scheme}"); + } + + // Only support the new format: redis://stream/{dbId}/{streamKey} + if (!uri.Host.Equals("stream", StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException($"Redis URI must use the format 'redis://stream/{{dbId}}/{{streamKey}}': {uri}"); + } + + if (uri.Segments.Length < 3) + { + throw new ArgumentException($"Redis URI must specify both database ID and stream key in format 'redis://stream/{{dbId}}/{{streamKey}}': {uri}"); + } + + var databaseSegment = uri.Segments[1].TrimEnd('/'); + var streamKeySegment = uri.Segments[2].TrimEnd('/'); + + if (!int.TryParse(databaseSegment, out var databaseId) || databaseId < 0) + { + throw new ArgumentException($"Database ID must be a non-negative integer in Redis URI: {uri}"); + } + + if (string.IsNullOrEmpty(streamKeySegment)) + { + throw new ArgumentException($"Stream key cannot be empty in Redis URI: {uri}"); + } + + // Create cache key that includes database ID to avoid conflicts + var cacheKey = $"{databaseId}:{streamKeySegment}"; + var endpoint = _streams[cacheKey]; + + // Parse consumer group from query string if present + var query = System.Web.HttpUtility.ParseQueryString(uri.Query); + var consumerGroup = query["consumerGroup"]; + if (!string.IsNullOrEmpty(consumerGroup)) + { + endpoint.ConsumerGroup = consumerGroup; + } + + return endpoint; + } + + /// + /// Get or create a Redis stream endpoint by stream key (uses database 0) + /// + public RedisStreamEndpoint StreamEndpoint(string streamKey) + { + return StreamEndpoint(streamKey, 0); + } + + /// + /// Get or create a Redis stream endpoint by stream key and database ID + /// + public RedisStreamEndpoint StreamEndpoint(string streamKey, int databaseId) + { + var cacheKey = $"{databaseId}:{streamKey}"; + return _streams[cacheKey]; + } + + /// + /// Configure a Redis stream endpoint (uses database 0) + /// + public RedisStreamEndpoint StreamEndpoint(string streamKey, Action configure) + { + return StreamEndpoint(streamKey, 0, configure); + } + + /// + /// Configure a Redis stream endpoint with database ID + /// + public RedisStreamEndpoint StreamEndpoint(string streamKey, int databaseId, Action configure) + { + var endpoint = StreamEndpoint(streamKey, databaseId); + configure(endpoint); + return endpoint; + } + + public async ValueTask DisposeAsync() + { + try + { + foreach (var connection in _connections.Values) + { + await connection.CloseAsync(); + connection.Dispose(); + } + _connections.Clear(); + } + catch (ObjectDisposedException) + { + // Already disposed, ignore + } + catch (Exception ex) + { + // Log but don't throw during disposal + Console.WriteLine($"Error during Redis connection disposal: {ex.Message}"); + } + } + + internal string ComputeConsumerName(IWolverineRuntime runtime, RedisStreamEndpoint endpoint) + { + if (!string.IsNullOrWhiteSpace(endpoint.ConsumerName)) return endpoint.ConsumerName!; + var selector = DefaultConsumerNameSelector ?? BuildDefaultConsumerName; + var name = selector(runtime, endpoint); + return SanitizeName(name); + } + + private static string BuildDefaultConsumerName(IWolverineRuntime runtime, RedisStreamEndpoint endpoint) + { + var service = string.IsNullOrWhiteSpace(runtime.Options.ServiceName) + ? "wolverine" + : runtime.Options.ServiceName!.Trim(); + var node = runtime.DurabilitySettings.AssignedNodeNumber.ToString(); + var host = Environment.MachineName; + var name = $"{service}-{node}-{host}"; + return name.ToLowerInvariant(); + } + + private static string SanitizeName(string name) + { + if (string.IsNullOrWhiteSpace(name)) return "wolverine"; + var chars = name.Select(ch => char.IsLetterOrDigit(ch) || ch == '-' || ch == '_' || ch == ':' || ch == '.' ? ch : '-').ToArray(); + var sanitized = new string(chars); + return sanitized.Trim('-'); + } + + private static string SanitizeConnectionStringForLogging(string connectionString) + { + // Remove password from connection string for logging + var options = ConfigurationOptions.Parse(connectionString); + options.Password = options.Password?.Length > 0 ? "****" : null; + return options.ToString(); + } + + protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) + { + if (!SystemQueuesEnabled) return; + + // Create a per-node reply stream endpoint similar to other transports (using database 0) + var replyStreamKey = $"wolverine.response.{runtime.Options.ServiceName}.{runtime.DurabilitySettings.AssignedNodeNumber}".ToLowerInvariant(); + var cacheKey = $"{ReplyDatabaseId}:{replyStreamKey}"; + var replyEndpoint = new RedisStreamEndpoint( + new Uri($"{ProtocolName}://stream/{ReplyDatabaseId}/{replyStreamKey}?consumerGroup=wolverine-replies"), + this, + EndpointRole.System) + { + IsListener = true, + IsUsedForReplies = true, + ConsumerGroup = "wolverine-replies", + EndpointName = "RedisReplies" + }; + + _streams[cacheKey] = replyEndpoint; + } +} diff --git a/src/Transports/Redis/Wolverine.Redis/RedisEnvelopeMapper.cs b/src/Transports/Redis/Wolverine.Redis/RedisEnvelopeMapper.cs new file mode 100644 index 000000000..d52a96316 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/RedisEnvelopeMapper.cs @@ -0,0 +1,84 @@ +using StackExchange.Redis; +using Wolverine.Configuration; +using Wolverine.Transports; + +namespace Wolverine.Redis; + +public class RedisEnvelopeMapper : EnvelopeMapper>, IRedisEnvelopeMapper +{ + /// + /// represents the Redis Stream entry ID - used for ACK/requeue operations + /// + public const string RedisEntryIdHeader = "redis-entry-id"; + + private const string HeaderPrefix = "wolverine-"; + + public RedisEnvelopeMapper(Endpoint endpoint) : base(endpoint) + { + MapProperty(x => x.Data!, + (e, m) => e.Data = m.Values.FirstOrDefault(x => x.Name == "payload").Value, + (e, m) => m.Add(new NameValueEntry("payload", e.Data))); + } + + protected override void writeOutgoingHeader(List outgoing, string key, string value) + { + // Do not persist the Redis stream id header as an outgoing field; a new entry will receive its own ID + if (string.Equals(key, RedisEntryIdHeader, StringComparison.OrdinalIgnoreCase)) + return; + + outgoing.Add(new NameValueEntry($"{HeaderPrefix}{key}", value)); + } + + protected override bool tryReadIncomingHeader(StreamEntry incoming, string key, out string? value) + { + var target = $"{HeaderPrefix}{key}"; + foreach (var nv in incoming.Values) + { + if (nv.Name.Equals(target)) + { + value = nv.Value.ToString(); + return true; + } + } + + value = null; + return false; + } + + protected override void writeIncomingHeaders(StreamEntry incoming, Envelope envelope) + { + var headers = incoming.Values.Where(k => k.Name.StartsWith(HeaderPrefix)); + foreach (var nv in headers) + { + envelope.Headers[nv.Name.ToString()[HeaderPrefix.Length..]] = nv.Value.ToString(); + } + + // Also capture the Redis stream message id so the listener can ACK/DEFER appropriately + envelope.Headers[RedisEntryIdHeader] = incoming.Id.ToString(); + } + + //public async Task ToRedisStreamFields(Envelope envelope) + //{ + // var kvps = new List(); + + // MapEnvelopeToOutgoing(envelope, kvps); + + // return kvps.ToArray(); + //} + + //public Envelope CreateEnvelope(string streamKey, StreamEntry message) + //{ + // var envelope = new Envelope + // { + // Data = message.Values.FirstOrDefault(x => x.Name == "data").Value, + // TopicName = streamKey, + // }; + + // MapIncomingToEnvelope(envelope, message); + + // // Ensure the stream id is present on the envelope + // envelope.Headers[RedisEntryIdHeader] = message.Id.ToString(); + + // return envelope; + //} +} diff --git a/src/Transports/Redis/Wolverine.Redis/Wolverine.Redis.csproj b/src/Transports/Redis/Wolverine.Redis/Wolverine.Redis.csproj new file mode 100644 index 000000000..5926e2d9b --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/Wolverine.Redis.csproj @@ -0,0 +1,21 @@ + + + + Redis Streams Transport for Wolverine Messaging Systems + WolverineFx.Redis + false + false + false + false + false + + + + + + + + + + + diff --git a/src/Transports/Redis/Wolverine.Redis/WolverineOptionsExtensions.cs b/src/Transports/Redis/Wolverine.Redis/WolverineOptionsExtensions.cs new file mode 100644 index 000000000..5e67dc832 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/WolverineOptionsExtensions.cs @@ -0,0 +1,314 @@ +using JasperFx.Core.Reflection; +using Microsoft.Extensions.DependencyInjection; +using Wolverine; +using Wolverine.Configuration; +using Wolverine.Redis.Internal; +using Wolverine.Runtime; + +namespace Wolverine.Redis; + +/// +/// Determines the starting position for new Redis Stream consumer groups +/// +public enum StartFrom +{ + /// + /// Start consuming from the beginning of the stream (equivalent to "0-0") + /// Similar to Kafka's AutoOffsetReset.Earliest + /// + Beginning, + + /// + /// Start consuming only new messages added after group creation (equivalent to "$") + /// Similar to Kafka's AutoOffsetReset.Latest + /// + NewMessages +} + +public static class WolverineOptionsExtensions +{ + /// + /// Adds Redis Streams transport to Wolverine with the specified connection string + /// + /// Wolverine configuration options + /// Redis connection string (StackExchange.Redis format) + /// RedisTransport for fluent configuration + public static RedisTransport UseRedisTransport(this WolverineOptions options, string connectionString) + { + var transport = new RedisTransport(connectionString); + + options.Transports.Add(transport); + + return transport; + } + + /// + /// Configure Wolverine to publish messages to the specified Redis stream (uses database 0) + /// + /// Publishing configuration + /// Redis stream key name + /// Stream endpoint for further configuration + public static RedisStreamEndpoint ToRedisStream(this IPublishToExpression publishing, string streamKey) + { + return publishing.ToRedisStream(streamKey, 0); + } + + /// + /// Configure Wolverine to publish messages to the specified Redis stream with database ID + /// + /// Publishing configuration + /// Redis stream key name + /// Redis database ID + /// Stream endpoint for further configuration + public static RedisStreamEndpoint ToRedisStream(this IPublishToExpression publishing, string streamKey, int databaseId) + { + // Use correct pattern from Kafka transport + var transports = publishing.As().Parent.Transports; + var transport = transports.GetOrCreate(); + + var endpoint = transport.StreamEndpoint(streamKey, databaseId); + + // Register this as a publishing destination + publishing.To(endpoint.Uri); + + return endpoint; + } + + /// + /// Configure Wolverine to listen to messages from the specified Redis stream with a consumer group (uses database 0) + /// + /// Wolverine configuration options + /// Redis stream key name + /// Consumer group name + /// Stream endpoint for further configuration + public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup) + { + return options.ListenToRedisStream(streamKey, consumerGroup, 0); + } + + /// + /// Configure Wolverine to listen to a Redis stream with starting position control + /// + public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup, StartFrom startFrom) + { + var endpoint = options.ListenToRedisStream(streamKey, consumerGroup, 0); + endpoint.StartFrom = startFrom; + return endpoint; + } + + /// + /// Configure Wolverine to listen to messages from the specified Redis stream with a consumer group and database ID + /// + /// Wolverine configuration options + /// Redis stream key name + /// Consumer group name + /// Redis database ID + /// Stream endpoint for further configuration + public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup, int databaseId) + { + var transport = options.Transports.GetOrCreate(); + if (transport == null) + { + throw new InvalidOperationException("Redis transport has not been configured. Call UseRedisTransport() first."); + } + + var endpoint = transport.StreamEndpoint(streamKey, databaseId, e => + { + e.ConsumerGroup = consumerGroup; + e.IsListener = true; + }); + + return endpoint; + } + + /// + /// Configure Wolverine to listen to messages from the specified Redis stream with a consumer group (uses database 0) + /// + /// Wolverine configuration options + /// Redis stream key name + /// Consumer group name + /// Configuration action for the endpoint + /// Stream endpoint for further configuration + public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, + string consumerGroup, Action configure) + { + return options.ListenToRedisStream(streamKey, consumerGroup, 0, configure); + } + + /// + /// Configure Wolverine to listen to messages from the specified Redis stream with a consumer group and database ID + /// + /// Wolverine configuration options + /// Redis stream key name + /// Consumer group name + /// Redis database ID + /// Configuration action for the endpoint + /// Stream endpoint for further configuration + public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, + string consumerGroup, int databaseId, Action configure) + { + var endpoint = options.ListenToRedisStream(streamKey, consumerGroup, databaseId); + configure(endpoint); + return endpoint; + } +} + +/// +/// Extension methods for RedisTransport fluent configuration +/// +public static class RedisTransportExtensions +{ + /// + /// Set the Redis database ID used for the per-node reply stream endpoint (request/reply mechanics) + /// + public static RedisTransport UseReplyStreamDatabase(this RedisTransport transport, int databaseId) + { + if (databaseId < 0) throw new ArgumentOutOfRangeException(nameof(databaseId)); + transport.ReplyDatabaseId = databaseId; + return transport; + } + + /// + /// Enable auto-provisioning of Redis streams and consumer groups + /// + public static RedisTransport AutoProvision(this RedisTransport transport) + { + transport.AutoProvision = true; + return transport; + } + + /// + /// Configure Redis transport to auto-purge streams on startup (useful for testing) + /// + public static RedisTransport AutoPurgeOnStartup(this RedisTransport transport) + { + transport.AutoPurgeAllQueues = true; + return transport; + } + + /// + /// Configure a default consumer name selector applied to all Redis listeners + /// when an endpoint-level ConsumerName is not explicitly set. + /// Example: transport.ConfigureDefaultConsumerName((rt, ep) => $"{rt.Options.ServiceName}-{rt.DurabilitySettings.AssignedNodeNumber}"); + /// + public static RedisTransport ConfigureDefaultConsumerName(this RedisTransport transport, + Func selector) + { + transport.DefaultConsumerNameSelector = selector; + return transport; + } +} + +/// +/// Extension methods for RedisStreamEndpoint fluent configuration +/// +public static class RedisStreamEndpointExtensions +{ + /// + /// Configure the batch size for reading messages from Redis streams + /// + public static RedisStreamEndpoint BatchSize(this RedisStreamEndpoint endpoint, int batchSize) + { + endpoint.BatchSize = batchSize; + return endpoint; + } + + /// + /// Configure the consumer name for this endpoint + /// + public static RedisStreamEndpoint ConsumerName(this RedisStreamEndpoint endpoint, string consumerName) + { + endpoint.ConsumerName = consumerName; + return endpoint; + } + + /// + /// Configure the block timeout when reading from Redis streams + /// + public static RedisStreamEndpoint BlockTimeout(this RedisStreamEndpoint endpoint, TimeSpan timeout) + { + endpoint.BlockTimeoutMilliseconds = (int)timeout.TotalMilliseconds; + return endpoint; + } + + /// + /// Configure this endpoint to use buffered in-memory processing + /// + public static RedisStreamEndpoint BufferedInMemory(this RedisStreamEndpoint endpoint) + { + endpoint.Mode = EndpointMode.BufferedInMemory; + return endpoint; + } + + /// + /// Configure this endpoint to use buffered in-memory processing + /// + public static RedisStreamEndpoint Sequential(this RedisStreamEndpoint endpoint) + { + endpoint.MaxDegreeOfParallelism = 1; + return endpoint; + } + + /// + /// Configure this endpoint to use durable processing with message persistence + /// + public static RedisStreamEndpoint Durable(this RedisStreamEndpoint endpoint) + { + endpoint.Mode = EndpointMode.Durable; + return endpoint; + } + + /// + /// Configure this endpoint to use inline processing (no queueing) + /// + public static RedisStreamEndpoint Inline(this RedisStreamEndpoint endpoint) + { + endpoint.Mode = EndpointMode.Inline; + endpoint.MaxDegreeOfParallelism = 1; + endpoint.BatchSize = 1; + return endpoint; + } + + /// + /// Configure the consumer group to start consuming from the beginning of the stream, + /// including any existing messages (equivalent to Redis "0-0" position) + /// + public static RedisStreamEndpoint StartFromBeginning(this RedisStreamEndpoint endpoint) + { + endpoint.StartFrom = StartFrom.Beginning; + return endpoint; + } + + /// + /// Configure the consumer group to start consuming only new messages added after + /// group creation (equivalent to Redis "$" position). This is the default behavior. + /// + public static RedisStreamEndpoint StartFromNewMessages(this RedisStreamEndpoint endpoint) + { + endpoint.StartFrom = StartFrom.NewMessages; + return endpoint; + } + + /// + /// Helper method to create a Redis stream URI with database ID + /// + /// Redis stream key name + /// Redis database ID + /// Formatted Redis stream URI + public static Uri BuildRedisStreamUri(string streamKey, int databaseId = 0) + { + return new Uri($"redis://stream/{databaseId}/{streamKey}"); + } + + /// + /// Helper method to create a Redis stream URI with database ID and consumer group + /// + /// Redis stream key name + /// Redis database ID + /// Consumer group name + /// Formatted Redis stream URI + public static Uri BuildRedisStreamUri(string streamKey, int databaseId, string consumerGroup) + { + return new Uri($"redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}"); + } +} diff --git a/wolverine.sln b/wolverine.sln index c10cbb1dd..62c4bf3aa 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -283,6 +283,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.Protobuf.Tests", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MartenSubscriptionTests", "src\Persistence\MartenSubscriptionTests\MartenSubscriptionTests.csproj", "{D61E79E6-169B-41DA-AAEC-93328F378331}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Redis", "Redis", "{203D4D7F-AE72-F75E-4DA2-8607DB1AB172}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.Redis", "src\Transports\Redis\Wolverine.Redis\Wolverine.Redis.csproj", "{0B48793A-F3BD-F7A1-9498-715FB7881194}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.Redis.Tests", "src\Transports\Redis\Wolverine.Redis.Tests\Wolverine.Redis.Tests.csproj", "{B4697521-797B-4B71-03C9-BC908B957227}" +EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "SignalR", "SignalR", "{1EF7D49F-DDB8-469A-88A0-4A8D6237561C}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.SignalR", "src\Transports\SignalR\Wolverine.SignalR\Wolverine.SignalR.csproj", "{36645C4B-BE1F-4184-A14C-D5BFA3F86A86}" @@ -1595,6 +1601,30 @@ Global {D61E79E6-169B-41DA-AAEC-93328F378331}.Release|x64.Build.0 = Release|Any CPU {D61E79E6-169B-41DA-AAEC-93328F378331}.Release|x86.ActiveCfg = Release|Any CPU {D61E79E6-169B-41DA-AAEC-93328F378331}.Release|x86.Build.0 = Release|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Debug|x64.ActiveCfg = Debug|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Debug|x64.Build.0 = Debug|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Debug|x86.ActiveCfg = Debug|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Debug|x86.Build.0 = Debug|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Release|Any CPU.Build.0 = Release|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Release|x64.ActiveCfg = Release|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Release|x64.Build.0 = Release|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Release|x86.ActiveCfg = Release|Any CPU + {0B48793A-F3BD-F7A1-9498-715FB7881194}.Release|x86.Build.0 = Release|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Debug|x64.ActiveCfg = Debug|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Debug|x64.Build.0 = Debug|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Debug|x86.ActiveCfg = Debug|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Debug|x86.Build.0 = Debug|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Release|Any CPU.Build.0 = Release|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Release|x64.ActiveCfg = Release|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Release|x64.Build.0 = Release|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Release|x86.ActiveCfg = Release|Any CPU + {B4697521-797B-4B71-03C9-BC908B957227}.Release|x86.Build.0 = Release|Any CPU {36645C4B-BE1F-4184-A14C-D5BFA3F86A86}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {36645C4B-BE1F-4184-A14C-D5BFA3F86A86}.Debug|Any CPU.Build.0 = Debug|Any CPU {36645C4B-BE1F-4184-A14C-D5BFA3F86A86}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -1752,5 +1782,8 @@ Global {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} = {84D32C8B-9CCE-4925-9AEC-8F445C7A2E3D} {36645C4B-BE1F-4184-A14C-D5BFA3F86A86} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} {3F62DB30-9A29-487A-9EE2-22A097E3EE3F} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} + {203D4D7F-AE72-F75E-4DA2-8607DB1AB172} = {84D32C8B-9CCE-4925-9AEC-8F445C7A2E3D} + {0B48793A-F3BD-F7A1-9498-715FB7881194} = {203D4D7F-AE72-F75E-4DA2-8607DB1AB172} + {B4697521-797B-4B71-03C9-BC908B957227} = {203D4D7F-AE72-F75E-4DA2-8607DB1AB172} EndGlobalSection EndGlobal