diff --git a/build/build.cs b/build/build.cs
index a20c4885a..f989d07fe 100644
--- a/build/build.cs
+++ b/build/build.cs
@@ -309,10 +309,13 @@ class Build : NukeBuild
Solution.Extensions.Wolverine_FluentValidation,
Solution.Extensions.Wolverine_MemoryPack,
Solution.Extensions.Wolverine_MessagePack,
+ Solution.Extensions.Wolverine_Protobuf,
Solution.Http.Wolverine_Http,
Solution.Http.Wolverine_Http_FluentValidation,
Solution.Http.Wolverine_Http_Marten,
- Solution.Testing.Wolverine_ComplianceTests
+ Solution.Testing.Wolverine_ComplianceTests,
+ Solution.Transports.Redis.Wolverine_Redis,
+ Solution.Transports.SignalR.Wolverine_SignalR
};
foreach (var project in nugetProjects)
diff --git a/docker-compose.yml b/docker-compose.yml
index 2964e32a2..ee60faec3 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -86,4 +86,10 @@ services:
ports:
- "8082:8082"
- "9092:9092"
- - "9101:9101"
\ No newline at end of file
+ - "9101:9101"
+
+ redis-server:
+ image: "redis:alpine"
+ command: redis-server
+ ports:
+ - "6379:6379"
\ No newline at end of file
diff --git a/docs/guide/messaging/redis.md b/docs/guide/messaging/redis.md
deleted file mode 100644
index ae4325788..000000000
--- a/docs/guide/messaging/redis.md
+++ /dev/null
@@ -1 +0,0 @@
-# Using Redis
diff --git a/docs/guide/messaging/transports/redis.md b/docs/guide/messaging/transports/redis.md
new file mode 100644
index 000000000..7713a8e4e
--- /dev/null
+++ b/docs/guide/messaging/transports/redis.md
@@ -0,0 +1,218 @@
+# Using Redis
+
+## Installing
+
+To use [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/) as a messaging transport for Wolverine,
+first install the `WolverineFx.Redis` Nuget package to your application. Behind the scenes, the `Wolverine.Redis` library
+is using the [StackExchange.Redis](https://github.com/StackExchange/StackExchange.Redis) library.
+
+```bash
+dotnet add WolverineFx.Redis
+```
+
+## Using as Message Transport
+
+To connect to Redis and configure listeners and senders, use this syntax:
+
+
+
+```cs
+using var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ opts.UseRedisTransport("localhost:6379")
+
+ // Auto-create streams and consumer groups
+ .AutoProvision()
+
+ // Configure default consumer name selector for all Redis listeners
+ .ConfigureDefaultConsumerName((runtime, endpoint) =>
+ $"{runtime.Options.ServiceName}-{runtime.DurabilitySettings.AssignedNodeNumber}")
+
+ // Useful for testing - auto purge queues on startup
+ .AutoPurgeOnStartup();
+
+ // Just publish all messages to Redis streams (uses database 0 by default)
+ opts.PublishAllMessages().ToRedisStream("wolverine-messages");
+
+ // Or explicitly configure message routing with database ID
+ opts.PublishMessage()
+ .ToRedisStream("colors", databaseId: 1)
+
+ // Configure specific settings for this stream
+ .BatchSize(50)
+ .SendInline();
+
+ // Listen to Redis streams with consumer groups (uses database 0 by default)
+ opts.ListenToRedisStream("red", "color-processors")
+ .ProcessInline()
+
+ // Configure consumer settings
+ .ConsumerName("red-consumer-1")
+ .BatchSize(10)
+ .BlockTimeout(TimeSpan.FromSeconds(5))
+
+ // Start from beginning to consume existing messages (like Kafka's AutoOffsetReset.Earliest)
+ .StartFromBeginning();
+
+ // Listen to Redis streams with database ID specified
+ opts.ListenToRedisStream("green", "color-processors", databaseId: 2)
+ .BufferedInMemory()
+ .BatchSize(25)
+ .StartFromNewMessages(); // Default: only new messages (like Kafka's AutoOffsetReset.Latest)
+
+ opts.ListenToRedisStream("blue", "color-processors", databaseId: 3)
+ .UseDurableInbox()
+ .ConsumerName("blue-consumer")
+ .StartFromBeginning(); // Process existing messages too
+
+ // Alternative: use StartFrom parameter directly
+ opts.ListenToRedisStream("purple", "color-processors", StartFrom.Beginning)
+ .BufferedInMemory();
+
+ // This will direct Wolverine to try to ensure that all
+ // referenced Redis streams and consumer groups exist at
+ // application start up time
+ opts.Services.AddResourceSetupOnStartup();
+ }).StartAsync();
+```
+snippet source | anchor
+
+
+If you need to control the database id within Redis, you have these options:
+
+
+
+```cs
+using var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ opts.UseRedisTransport("localhost:6379");
+
+ // Configure streams on different databases
+ opts.PublishMessage()
+ .ToRedisStream("orders", databaseId: 1);
+
+ opts.PublishMessage()
+ .ToRedisStream("payments", databaseId: 2);
+
+ // Listen on different databases
+ opts.ListenToRedisStream("orders", "order-processors", databaseId: 1);
+ opts.ListenToRedisStream("payments", "payment-processors", databaseId: 2);
+
+ // Advanced configuration with database ID
+ opts.ListenToRedisStream("notifications", "notification-processors", databaseId: 3)
+ .ConsumerName("notification-consumer-1")
+ .BatchSize(100)
+ .BlockTimeout(10.Seconds())
+ .UseDurableInbox();
+ }).StartAsync();
+```
+snippet source | anchor
+
+
+To work with multiple databases in one application, see this sample:
+
+
+
+```cs
+using var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ opts.UseRedisTransport("localhost:6379").AutoProvision();
+
+ // Different message types on different databases for isolation
+
+ // Database 0: Default messages
+ opts.PublishMessage().ToRedisStream("system-events");
+ opts.ListenToRedisStream("system-events", "system-processors");
+
+ // Database 1: Order processing
+ opts.PublishMessage().ToRedisStream("orders", 1);
+ opts.ListenToRedisStream("orders", "order-processors", 1);
+
+ // Database 2: Payment processing
+ opts.PublishMessage().ToRedisStream("payments", 2);
+ opts.ListenToRedisStream("payments", "payment-processors", 2);
+
+ // Database 3: Analytics and reporting
+ opts.PublishMessage().ToRedisStream("analytics", 3);
+ opts.ListenToRedisStream("analytics", "analytics-processors", 3);
+ }).StartAsync();
+```
+snippet source | anchor
+
+
+## Interoperability
+
+First, see the [tutorial on interoperability with Wolverine](/tutorials/interop) for general guidance.
+
+Next, the Redis transport supports interoperability through the `IRedisEnvelopeMapper` interface. If necessary, you
+can build your own version of this mapper interface like the following:
+
+
+
+```cs
+// Simplistic envelope mapper that expects every message to be of
+// type "T" and serialized as JSON that works perfectly well w/ our
+// application's default JSON serialization
+public class OurRedisJsonMapper : EnvelopeMapper>, IRedisEnvelopeMapper
+{
+ // Wolverine needs to know the message type name
+ private readonly string _messageTypeName = typeof(TMessage).ToMessageTypeName();
+
+ public OurRedisJsonMapper(Endpoint endpoint) : base(endpoint)
+ {
+ // Map the data property
+ MapProperty(x => x.Data!,
+ (e, m) => e.Data = m.Values.FirstOrDefault(x => x.Name == "data").Value,
+ (e, m) => m.Add(new NameValueEntry("data", e.Data)));
+
+ // Set up the message type
+ MapProperty(x => x.MessageType!,
+ (e, m) => e.MessageType = _messageTypeName,
+ (e, m) => m.Add(new NameValueEntry("message-type", _messageTypeName)));
+
+ // Set up content type
+ MapProperty(x => x.ContentType!,
+ (e, m) => e.ContentType = "application/json",
+ (e, m) => m.Add(new NameValueEntry("content-type", "application/json")));
+ }
+
+ protected override void writeOutgoingHeader(List outgoing, string key, string value)
+ {
+ outgoing.Add(new NameValueEntry($"header-{key}", value));
+ }
+
+ protected override bool tryReadIncomingHeader(StreamEntry incoming, string key, out string? value)
+ {
+ var target = $"header-{key}";
+ foreach (var nv in incoming.Values)
+ {
+ if (nv.Name.Equals(target))
+ {
+ value = nv.Value.ToString();
+ return true;
+ }
+ }
+
+ value = null;
+ return false;
+ }
+
+ protected override void writeIncomingHeaders(StreamEntry incoming, Envelope envelope)
+ {
+ var headers = incoming.Values.Where(k => k.Name.StartsWith("header-"));
+ foreach (var nv in headers)
+ {
+ envelope.Headers[nv.Name.ToString()[7..]] = nv.Value.ToString(); // Remove "header-" prefix
+ }
+
+ // Capture the Redis stream message id
+ envelope.Headers["redis-entry-id"] = incoming.Id.ToString();
+ }
+}
+```
+snippet source | anchor
+
+
diff --git a/docs/guide/messaging/transports/signalr.md b/docs/guide/messaging/transports/signalr.md
index 96d4912c9..c22edb785 100644
--- a/docs/guide/messaging/transports/signalr.md
+++ b/docs/guide/messaging/transports/signalr.md
@@ -6,6 +6,10 @@ The SignalR transport has been requested several times, but finally got built sp
team has heavily dog-fooded this feature.
:::
+::: tip
+Much of the sample code is taken from a runnable sample application in the Wolverine codebase called [WolverineChat](https://github.com/JasperFx/wolverine/tree/main/src/Samples/WolverineChat).
+:::
+
The [SignalR library](https://dotnet.microsoft.com/en-us/apps/aspnet/signalr) from Microsoft isn't hard to use from Wolverine for simplistic WebSockets
or Server Side Events usage , but what if you want a server side
application to exchange any number of different messages between a browser (or other WebSocket client because that's
@@ -168,7 +172,7 @@ builder.UseWolverine(opts =>
});
});
```
-snippet source | anchor
+snippet source | anchor
## Interacting with the Server from the Browser
@@ -290,7 +294,7 @@ public static class RequestSumHandler
}
}
```
-snippet source | anchor
+snippet source | anchor
In the next section we'll learn a bit more about working with SignalR groups.
@@ -301,12 +305,45 @@ One of the powerful features of SignalR is being able to work with [groups of co
The SignalR transport currently has some simple support for managing and publishing to groups. Let's say you have
these web socket messages in your system:
-snippet: sample_messages_related_to_signalr_groups
+
+
+```cs
+public record EnrollMe(string GroupName) : WebSocketMessage;
+
+public record KickMeOut(string GroupName) : WebSocketMessage;
+
+public record BroadCastToGroup(string GroupName, string Message) : WebSocketMessage;
+```
+snippet source | anchor
+
The following code is a set of simplistic message handlers that handle these messages with some SignalR connection
group mechanics:
-snippet: sample_group_mechanics_with_signalr
+
+
+```cs
+// Declaring that you need the connection that originated
+// this message to be added to the named SignalR client group
+public static AddConnectionToGroup Handle(EnrollMe msg)
+ => new(msg.GroupName);
+
+// Declaring that you need the connection that originated this
+// message to be removed from the named SignalR client group
+public static RemoveConnectionToGroup Handle(KickMeOut msg)
+ => new(msg.GroupName);
+
+// The message wrapper here sends the raw message to
+// the named SignalR client group
+public static SignalRMessage Handle(BroadCastToGroup msg)
+ => new Information(msg.Message)
+ // This extension method wraps the "real" message
+ // with an envelope that routes this original message
+ // to the named group
+ .ToWebSocketGroup(msg.GroupName);
+```
+snippet source | anchor
+
In the code above:
@@ -330,22 +367,119 @@ send and receive messages from the main Wolverine SignalR transport and is 100%
If you wanted to use the SignalR client as a "real" messaging transport, you could do that like this sample:
-snippet: sample_bootstrap_signalr_client_for_realsies
+
+
+```cs
+var builder = Host.CreateApplicationBuilder();
+builder.UseWolverine(opts =>
+{
+ // this would need to be an absolute Url to where SignalR is
+ // hosted on your application and include the exact route where
+ // the WolverineHub is listening
+ var url = builder.Configuration.GetValue("signalr.url");
+ opts.UseClientToSignalR(url);
+
+ // Setting this up to publish any messages implementing
+ // the WebSocketMessage marker interface with the SignalR
+ // client
+ opts.Publish(x =>
+ {
+ x.MessagesImplementing();
+ x.ToSignalRWithClient(url);
+ });
+});
+```
+snippet source | anchor
+
Or a little more simply, if you are just using this for test automation, you would need to give it the port number where
your SignalR hosting service is running on the local computer:
-snippet: sample_bootstrap_signalr_client_for_local
+
+
+```cs
+// Ostensibly, *something* in your test harness would
+// be telling you the port number of the real application
+int port = 5555;
+
+using var clientHost = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ // Just so you know it's possible, you can override
+ // the relative url of the SignalR WolverineHub route
+ // in the hosting application
+ opts.UseClientToSignalR(port, "/api/messages");
+
+ // Setting this up to publish any messages implementing
+ // the WebSocketMessage marker interface with the SignalR
+ // client
+ opts.Publish(x =>
+ {
+ x.MessagesImplementing();
+ x.ToSignalRWithClient(port);
+ });
+ }).StartAsync();
+```
+snippet source | anchor
+
To make this a little more concrete, here's a little bit of the test harness setup we used to test the Wolverine.SignalR
transport:
-snippet: sample_signalr_client_test_harness_setup
+
+
+```cs
+public abstract class WebSocketTestContext : IAsyncLifetime
+{
+ protected WebApplication theWebApp;
+ private readonly int Port = PortFinder.GetAvailablePort();
+ protected readonly Uri clientUri;
+
+ private readonly List _clientHosts = new();
+
+ public WebSocketTestContext()
+ {
+ clientUri = new Uri($"http://localhost:{Port}/messages");
+ }
+
+ public async Task InitializeAsync()
+ {
+ var builder = WebApplication.CreateBuilder();
+
+ builder.WebHost.ConfigureKestrel(opts =>
+ {
+ opts.ListenLocalhost(Port);
+ });
+```
+snippet source | anchor
+
In the same test harness class, we bootstrap new `IHost` instances with the SignalR Client to mimic browser client
communication like this:
-snippet: sample_bootstrapping_signalr_client_in_test
+
+
+```cs
+var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ opts.ServiceName = serviceName;
+
+ opts.UseClientToSignalR(Port);
+
+ opts.PublishMessage().ToSignalRWithClient(Port);
+
+ opts.PublishMessage().ToSignalRWithClient(Port);
+
+ opts.Publish(x =>
+ {
+ x.MessagesImplementing();
+ x.ToSignalRWithClient(Port);
+ });
+ }).StartAsync();
+```
+snippet source | anchor
+
The key point here is that we stood up the service using a port number for Kestrel, then stood up `IHost` instances for
a Wolverine application using the SignalR Client using the same port number for easy connectivity.
@@ -356,7 +490,34 @@ for any messaging. In the sample test below, we're utilizing the [tracked sessio
message from the `IHost` hosting the SignalR Client transport and expect it to be successfully handled in the `IHost`
for our actual SignalR server:
-snippet: sample_end_to_end_test_with_signalr
+
+
+```cs
+[Fact]
+public async Task receive_message_from_a_client()
+{
+ // This is an IHost that has the SignalR Client
+ // transport configured to connect to a SignalR
+ // server in the "theWebApp" IHost
+ using var client = await StartClientHost();
+
+ var tracked = await client
+ .TrackActivity()
+ .IncludeExternalTransports()
+ .AlsoTrack(theWebApp)
+ .Timeout(10.Seconds())
+ .ExecuteAndWaitAsync(c => c.SendViaSignalRClient(clientUri, new ToSecond("Hollywood Brown")));
+
+ var record = tracked.Received.SingleRecord();
+ record.ServiceName.ShouldBe("Server");
+ record.Envelope.Destination.ShouldBe(new Uri("signalr://wolverine"));
+ record.Message.ShouldBeOfType()
+ .Name.ShouldBe("Hollywood Brown");
+
+}
+```
+snippet source | anchor
+
*Conveniently enough as I write this documentation today using existing test code, Hollywood Brown had a huge
game last night. Go Chiefs!*
diff --git a/docs/tutorials/interop.md b/docs/tutorials/interop.md
index 88f6e9bba..f4d6db9a9 100644
--- a/docs/tutorials/interop.md
+++ b/docs/tutorials/interop.md
@@ -79,7 +79,8 @@ common interoperability scenarios:
| [Amazon SNS](/guide/messaging/transports/sns) | [ISnsEnvelopeMapper](/guide/messaging/transports/sns.html#interoperability) | MassTransit, NServiceBus, CloudEvents, Raw Json |
| [Kafka](/guide/messaging/transports/kafka) | [IKafkaEnvelopeMapper](/guide/messaging/transports/kafka.html#interoperability) | CloudEvents, Raw Json |
| [Apache Pulsar](/guide/messaging/transports/pulsar) | [IPulsarEnvelopeMapper](/guide/messaging/transports/pulsar.html#interoperability) | CloudEvents |
-| [MQTT](/guide/messaging/transports/mqtt) | [IMqttEnvelopeMapper](/guide/messaging/transports/mqtt.html#interoperability)] | CloudEvents |
+| [MQTT](/guide/messaging/transports/mqtt) | [IMqttEnvelopeMapper](/guide/messaging/transports/mqtt.html#interoperability)] | CloudEvents |
+| [Redis](/guide/messaging/transports/redis) | [IRedisEnvelopeMapper](/guide/messaging/transports/redis.html#interoperability) | CloudEvents |
## Writing a Custom Envelope Mapper
diff --git a/src/Samples/WolverineChat/README.md b/src/Samples/WolverineChat/README.md
new file mode 100644
index 000000000..f01a2e9b5
--- /dev/null
+++ b/src/Samples/WolverineChat/README.md
@@ -0,0 +1,4 @@
+# WolverineChat
+
+This is a very small sample application used for the Wolverine documentation for its SignalR transport. To run the application,
+just press F5 in the browser of your choice or `dotnet run`. There are no Docker dependencies.
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/BasicPubSubTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/BasicPubSubTests.cs
index dfa66721a..ff3d0518d 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/BasicPubSubTests.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/BasicPubSubTests.cs
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
+using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -36,9 +37,13 @@ public async Task publish_and_listen_end_to_end()
.UseWolverine(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
- var endpoint = opts.ListenToRedisStream(streamKey, "g1");
- endpoint.MessageType = typeof(PubMessage);
- endpoint.BlockTimeoutMilliseconds = 100;
+ var endpoint = opts.ListenToRedisStream(streamKey, "g1")
+ .DefaultIncomingMessage()
+ .BlockTimeout(100.Milliseconds())
+ .EnableAutoClaim()
+ .DefaultIncomingMessage()
+ .BlockTimeout(100.Milliseconds());
+
endpoint.EnableAutoClaim(TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(0));
opts.PublishAllMessages().ToRedisStream(streamKey);
@@ -46,7 +51,7 @@ public async Task publish_and_listen_end_to_end()
})
.StartAsync();
- var bus = host.Services.GetRequiredService();
+ var bus = host.MessageBus();
// Send directly to the Redis stream endpoint to avoid route misconfiguration
var uri = new Uri($"redis://stream/0/{streamKey}");
await bus.EndpointFor(uri).SendAsync(new PubMessage("123"));
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/BufferedComplianceTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/BufferedComplianceTests.cs
index 36d665790..32955caa4 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/BufferedComplianceTests.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/BufferedComplianceTests.cs
@@ -21,7 +21,7 @@ public async Task InitializeAsync()
await ReceiverIs(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
- opts.ListenToRedisStream(receiverStream, "g1").BufferedInMemory();
+ opts.ListenToRedisStream(receiverStream, "g1").BufferedInMemory().StartFromBeginning();
});
await SenderIs(opts =>
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs
index 61a0d397f..325b0cf41 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs
@@ -1,4 +1,5 @@
using System.Text.Json;
+using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
@@ -6,6 +7,7 @@
using Wolverine.Util;
using StackExchange.Redis;
using Wolverine.Configuration;
+using Wolverine.Redis.Internal;
using Wolverine.Transports;
namespace Wolverine.Redis.Tests;
@@ -40,11 +42,11 @@ public static async Task configure()
// Configure specific settings for this stream
.BatchSize(50)
- .Inline();
+ .SendInline();
// Listen to Redis streams with consumer groups (uses database 0 by default)
opts.ListenToRedisStream("red", "color-processors")
- .Inline()
+ .ProcessInline()
// Configure consumer settings
.ConsumerName("red-consumer-1")
@@ -61,7 +63,7 @@ public static async Task configure()
.StartFromNewMessages(); // Default: only new messages (like Kafka's AutoOffsetReset.Latest)
opts.ListenToRedisStream("blue", "color-processors", databaseId: 3)
- .Durable()
+ .UseDurableInbox()
.ConsumerName("blue-consumer")
.StartFromBeginning(); // Process existing messages too
@@ -99,13 +101,11 @@ public static async Task configure_with_database_ids()
opts.ListenToRedisStream("payments", "payment-processors", databaseId: 2);
// Advanced configuration with database ID
- opts.ListenToRedisStream("notifications", "notification-processors", databaseId: 3, endpoint =>
- {
- endpoint.ConsumerName("notification-consumer-1");
- endpoint.BatchSize(100);
- endpoint.BlockTimeout(TimeSpan.FromSeconds(10));
- endpoint.Durable();
- });
+ opts.ListenToRedisStream("notifications", "notification-processors", databaseId: 3)
+ .ConsumerName("notification-consumer-1")
+ .BatchSize(100)
+ .BlockTimeout(10.Seconds())
+ .UseDurableInbox();
}).StartAsync();
#endregion
@@ -116,8 +116,8 @@ public static async Task configure_with_uri_helpers()
#region sample_redis_uri_helpers
// Using URI builder helpers
- var ordersUri = RedisStreamEndpointExtensions.BuildRedisStreamUri("orders", databaseId: 1);
- var paymentsUri = RedisStreamEndpointExtensions.BuildRedisStreamUri("payments", databaseId: 2, "payment-processors");
+ var ordersUri = RedisTransport.BuildRedisStreamUri("orders", databaseId: 1);
+ var paymentsUri = RedisTransport.BuildRedisStreamUri("payments", databaseId: 2, "payment-processors");
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/InlineComplianceTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/InlineComplianceTests.cs
index 8247462bb..dcda81d50 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/InlineComplianceTests.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/InlineComplianceTests.cs
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
+using JasperFx.Core;
using Wolverine.ComplianceTests.Compliance;
using Wolverine.Configuration;
using Wolverine.Redis;
@@ -18,17 +19,19 @@ public async Task InitializeAsync()
var receiverStream = $"wolverine-tests-inline-receiver-{Guid.NewGuid():N}";
OutboundAddress = new Uri($"redis://stream/0/{receiverStream}");
- await SenderIs(opts =>
+ await ReceiverIs(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
- opts.PublishAllMessages().ToRedisStream(receiverStream).Inline();
+ opts.ListenToRedisStream(receiverStream, "g1").ProcessInline().StartFromBeginning();
});
- await ReceiverIs(opts =>
+ await SenderIs(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
- opts.ListenToRedisStream(receiverStream, "g1").Inline();
+ opts.PublishAllMessages().ToRedisStream(receiverStream).SendInline();
});
+
+
}
public new Task DisposeAsync()
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisAutoClaimIntegrationTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisAutoClaimIntegrationTests.cs
index e453ec88f..babc1ee90 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/RedisAutoClaimIntegrationTests.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisAutoClaimIntegrationTests.cs
@@ -1,5 +1,7 @@
using System.Text;
using System.Threading.Tasks;
+using JasperFx.Core;
+using JasperFx.Core.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -57,12 +59,13 @@ await db.StreamAddAsync(streamKey, new[]
})
.UseWolverine(opts =>
{
- opts.UseRedisTransport("localhost:6379");
- var endpoint = opts.ListenToRedisStream(streamKey, group);
- endpoint.EnableAutoClaim(TimeSpan.FromMilliseconds(500), TimeSpan.FromMilliseconds(100));
- endpoint.BlockTimeoutMilliseconds = 100;
- endpoint.MessageType = typeof(AutoClaimTestMessage);
-
+ opts.UseRedisTransport("localhost:6379").AutoProvision();
+ opts
+ .ListenToRedisStream(streamKey, group)
+ .EnableAutoClaim(500.Milliseconds(), 100.Milliseconds())
+ .BlockTimeout(100.Milliseconds())
+ .DefaultIncomingMessage();
+
opts.Services.AddSingleton(tcs);
})
.StartAsync();
@@ -93,9 +96,10 @@ public async Task autoclaim_disabled_by_default()
})
.UseWolverine(opts =>
{
- opts.UseRedisTransport("localhost:6379");
- var endpoint = opts.ListenToRedisStream(streamKey, group);
- endpoint.MessageType = typeof(AutoClaimTestMessage);
+ opts.UseRedisTransport("localhost:6379").AutoProvision();
+ var expression = opts.ListenToRedisStream(streamKey, group).DefaultIncomingMessage();
+
+ var endpoint = expression.Endpoint.As();
// AutoClaim should be disabled by default
endpoint.AutoClaimEnabled.ShouldBeFalse();
@@ -115,9 +119,9 @@ public void fluent_api_enables_autoclaim_with_custom_settings()
endpoint.AutoClaimEnabled.ShouldBeFalse(); // Default
endpoint.AutoClaimPeriod.ShouldBe(TimeSpan.FromSeconds(30)); // Default
endpoint.AutoClaimMinIdle.ShouldBe(TimeSpan.FromMinutes(1)); // Default
-
- endpoint.EnableAutoClaim(TimeSpan.FromSeconds(15), TimeSpan.FromMinutes(2));
-
+
+ new RedisListenerConfiguration(endpoint).EnableAutoClaim(TimeSpan.FromSeconds(15), TimeSpan.FromMinutes(2));
+
endpoint.AutoClaimEnabled.ShouldBeTrue();
endpoint.AutoClaimPeriod.ShouldBe(TimeSpan.FromSeconds(15));
endpoint.AutoClaimMinIdle.ShouldBe(TimeSpan.FromMinutes(2));
@@ -128,9 +132,9 @@ public void fluent_api_disables_autoclaim()
{
var transport = new RedisTransport("localhost:6379");
var endpoint = transport.StreamEndpoint("test");
-
- endpoint.EnableAutoClaim().DisableAutoClaim();
-
+
+ new RedisListenerConfiguration(endpoint).EnableAutoClaim().DisableAutoClaim();
+
endpoint.AutoClaimEnabled.ShouldBeFalse();
}
}
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisClaimingTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisClaimingTests.cs
index b62cd05e0..966d265a5 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/RedisClaimingTests.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisClaimingTests.cs
@@ -1,4 +1,5 @@
using System.Text;
+using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -53,12 +54,13 @@ await db.StreamAddAsync(streamKey, new[]
})
.UseWolverine(opts =>
{
- opts.UseRedisTransport("localhost:6379");
- var endpoint = opts.ListenToRedisStream(streamKey, group);
- endpoint.EnableAutoClaim(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(1));
- endpoint.BatchSize = 10;
- endpoint.BlockTimeoutMilliseconds = 100;
- endpoint.MessageType = typeof(TestMessage);
+ opts.UseRedisTransport("localhost:6379").AutoProvision();
+ opts
+ .ListenToRedisStream(streamKey, group)
+ .EnableAutoClaim(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(1))
+ .BatchSize(10)
+ .BlockTimeout(100.Milliseconds())
+ .DefaultIncomingMessage();
opts.Services.AddSingleton(tcs);
})
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs
index 802f29f3b..e9b698a65 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs
@@ -87,8 +87,10 @@ public async Task InitializeAsync()
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
- var t = opts.UseRedisTransport("localhost:6379").AutoProvision();
- t.SystemQueuesEnabled = false;
+ var t = opts
+ .UseRedisTransport("localhost:6379")
+ .AutoProvision()
+ .SystemQueuesEnabled(false);
}).StartAsync();
}
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/StartFromBehaviorTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/StartFromBehaviorTests.cs
index f09dfae75..160a1468b 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/StartFromBehaviorTests.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/StartFromBehaviorTests.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
+using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -33,9 +34,30 @@ public void Handle(TestMessage message)
public class MessageTracker
{
+ public Task WaitForNumberOfMessages(int count, int timeoutInMilliseconds)
+ {
+ _completion = new TaskCompletionSource(false);
+ _expectedCount = count;
+
+ return _completion.Task.TimeoutAfterAsync(timeoutInMilliseconds);
+ }
+
private readonly List _receivedMessages = new();
+ private TaskCompletionSource? _completion;
+ private int _expectedCount;
public IReadOnlyList ReceivedMessages => _receivedMessages.AsReadOnly();
- public void AddMessage(string id) => _receivedMessages.Add(id);
+
+ public void AddMessage(string id)
+ {
+ _receivedMessages.Add(id);
+ if (_completion != null)
+ {
+ if (_receivedMessages.Count >= _expectedCount)
+ {
+ _completion.TrySetResult(true);
+ }
+ }
+ }
}
[Fact]
@@ -53,7 +75,7 @@ public async Task StartFromNewMessages_should_only_process_messages_after_group_
})
.StartAsync();
- var bus = publisherHost.Services.GetRequiredService();
+ var bus = publisherHost.MessageBus();
// Send 3 messages before creating the consumer group
await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("before-1"));
@@ -73,10 +95,10 @@ public async Task StartFromNewMessages_should_only_process_messages_after_group_
.UseWolverine(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
- var endpoint = opts.ListenToRedisStream(streamKey, "test-group")
+ opts.ListenToRedisStream(streamKey, "test-group")
.StartFromNewMessages() // Explicit, but this is the default
- .BlockTimeout(TimeSpan.FromMilliseconds(100));
- endpoint.MessageType = typeof(TestMessage);
+ .BlockTimeout(TimeSpan.FromMilliseconds(100))
+ .DefaultIncomingMessage();
opts.Services.AddSingleton(tracker);
opts.Services.AddSingleton(tcs);
@@ -88,7 +110,7 @@ public async Task StartFromNewMessages_should_only_process_messages_after_group_
await Task.Delay(200);
// Send a message after the listener is active
- var listenerBus = listenerHost.Services.GetRequiredService();
+ var listenerBus = listenerHost.MessageBus();
await listenerBus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("after-1"));
// Wait for completion or timeout
@@ -123,17 +145,23 @@ public async Task StartFromBeginning_should_process_existing_messages()
.UseWolverine(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
+
+ opts.PublishMessage().To(new Uri($"redis://stream/0/{streamKey}"))
+ .SendInline();
})
.StartAsync();
- var bus = publisherHost.Services.GetRequiredService();
+ var bus = publisherHost.MessageBus();
// Send messages before creating the consumer group
- await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("existing-1"));
- await bus.EndpointFor(new Uri($"redis://stream/0/{streamKey}")).SendAsync(new TestMessage("existing-2"));
+
+ await bus.PublishAsync(new TestMessage("existing-1"));
+ await bus.PublishAsync(new TestMessage("existing-2"));
await publisherHost.StopAsync();
+ var waiter = tracker.WaitForNumberOfMessages(2, 10000);
+
// Now create a listener with StartFromBeginning
using var listenerHost = await Host.CreateDefaultBuilder()
.ConfigureLogging(logging =>
@@ -145,18 +173,18 @@ public async Task StartFromBeginning_should_process_existing_messages()
.UseWolverine(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
- var endpoint = opts.ListenToRedisStream(streamKey, "test-group-beginning")
+ opts.ListenToRedisStream(streamKey, "test-group-beginning")
.StartFromBeginning() // Should process existing messages
- .BlockTimeout(TimeSpan.FromMilliseconds(100));
- endpoint.MessageType = typeof(TestMessage);
+ .BlockTimeout(TimeSpan.FromMilliseconds(100))
+ .DefaultIncomingMessage();
opts.Services.AddSingleton(tracker);
opts.Discovery.IncludeAssembly(typeof(StartFromBehaviorTests).Assembly);
})
.StartAsync();
-
+
// Give time for message processing
- await Task.Delay(1000);
+ await waiter;
// Should have received the existing messages
tracker.ReceivedMessages.Count.ShouldBeGreaterThanOrEqualTo(2);
diff --git a/src/Transports/Redis/Wolverine.Redis/IRedisEnvelopeMapper.cs b/src/Transports/Redis/Wolverine.Redis/IRedisEnvelopeMapper.cs
index 4e1e14c7d..5bc92b701 100644
--- a/src/Transports/Redis/Wolverine.Redis/IRedisEnvelopeMapper.cs
+++ b/src/Transports/Redis/Wolverine.Redis/IRedisEnvelopeMapper.cs
@@ -8,7 +8,5 @@ namespace Wolverine.Redis;
///
public interface IRedisEnvelopeMapper : IEnvelopeMapper>
{
- //Task ToRedisStreamFields(Envelope envelope);
- //Envelope CreateEnvelope(string streamKey, StreamEntry message);
}
\ No newline at end of file
diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs
index 2701b4236..4f93d09b3 100644
--- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs
+++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs
@@ -270,32 +270,7 @@ public async ValueTask> GetAttributesAsync()
return dict;
}
-
- ///
- /// Enable auto-claiming of pending entries within the consumer loop every specified period
- ///
- /// This can cause out of order messages. It is recommended to manually deal with expired claimed message if you are using .Inline() or desire ordered message processing.
- /// Period between auto-claim attempts (default: 30 seconds)
- /// Minimum idle time before claiming pending messages (if null, uses MinIdleBeforeClaimMilliseconds)
- /// This endpoint for method chaining
- public RedisStreamEndpoint EnableAutoClaim(TimeSpan? period = null, TimeSpan? minIdle = null)
- {
- AutoClaimEnabled = true;
- if (period.HasValue) AutoClaimPeriod = period.Value;
- if (minIdle.HasValue) AutoClaimMinIdle = minIdle.Value;
- return this;
- }
-
- ///
- /// Disable auto-claiming of pending entries within the consumer loop
- ///
- /// This endpoint for method chaining
- public RedisStreamEndpoint DisableAutoClaim()
- {
- AutoClaimEnabled = false;
- return this;
- }
-
+
public async ValueTask SetupAsync(ILogger logger)
{
try
diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs
index 550e0a5ca..4695eb5ba 100644
--- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs
+++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs
@@ -284,4 +284,27 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime)
_streams[cacheKey] = replyEndpoint;
}
+
+ ///
+ /// Helper method to create a Redis stream URI with database ID
+ ///
+ /// Redis stream key name
+ /// Redis database ID
+ /// Formatted Redis stream URI
+ public static Uri BuildRedisStreamUri(string streamKey, int databaseId = 0)
+ {
+ return new Uri($"redis://stream/{databaseId}/{streamKey}");
+ }
+
+ ///
+ /// Helper method to create a Redis stream URI with database ID and consumer group
+ ///
+ /// Redis stream key name
+ /// Redis database ID
+ /// Consumer group name
+ /// Formatted Redis stream URI
+ public static Uri BuildRedisStreamUri(string streamKey, int databaseId, string consumerGroup)
+ {
+ return new Uri($"redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}");
+ }
}
diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransportExpression.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransportExpression.cs
new file mode 100644
index 000000000..72f285e2e
--- /dev/null
+++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransportExpression.cs
@@ -0,0 +1,176 @@
+using Wolverine.Configuration;
+using Wolverine.Runtime;
+using Wolverine.Transports;
+
+namespace Wolverine.Redis.Internal;
+
+public class RedisTransportExpression : BrokerExpression
+{
+ private readonly RedisTransport _transport;
+
+ public RedisTransportExpression(RedisTransport transport, WolverineOptions options) : base(transport, options)
+ {
+ _transport = transport;
+ }
+
+ protected override RedisListenerConfiguration createListenerExpression(RedisStreamEndpoint listenerEndpoint)
+ {
+ return new RedisListenerConfiguration(listenerEndpoint);
+ }
+
+ protected override RedisSubscriberConfiguration createSubscriberExpression(RedisStreamEndpoint subscriberEndpoint)
+ {
+ return new RedisSubscriberConfiguration(subscriberEndpoint);
+ }
+
+ ///
+ /// Set the Redis database ID used for the per-node reply stream endpoint (request/reply mechanics)
+ ///
+ public RedisTransportExpression UseReplyStreamDatabase(int databaseId)
+ {
+ if (databaseId < 0) throw new ArgumentOutOfRangeException(nameof(databaseId));
+ _transport.ReplyDatabaseId = databaseId;
+ return this;
+ }
+
+ ///
+ /// Configure a default consumer name selector applied to all Redis listeners
+ /// when an endpoint-level ConsumerName is not explicitly set.
+ /// Example: transport.ConfigureDefaultConsumerName((rt, ep) => $"{rt.Options.ServiceName}-{rt.DurabilitySettings.AssignedNodeNumber}");
+ ///
+ public RedisTransportExpression ConfigureDefaultConsumerName(
+ Func selector)
+ {
+ _transport.DefaultConsumerNameSelector = selector;
+ return this;
+ }
+
+ ///
+ /// Control whether or not the system queues for intra-Wolverine communication and request/reply
+ /// mechanics are enabled
+ ///
+ ///
+ ///
+ public RedisTransportExpression SystemQueuesEnabled(bool enabled)
+ {
+ _transport.SystemQueuesEnabled = enabled;
+ return this;
+ }
+}
+
+public class RedisListenerConfiguration : ListenerConfiguration
+{
+ public RedisListenerConfiguration(RedisStreamEndpoint endpoint) : base(endpoint)
+ {
+ }
+
+ internal RedisListenerConfiguration(Func source) : base(source)
+ {
+ }
+
+ ///
+ /// Configure the batch size for reading messages from Redis streams
+ ///
+ public RedisListenerConfiguration BatchSize(int batchSize)
+ {
+ _endpoint.BatchSize = batchSize;
+ return this;
+ }
+
+ ///
+ /// Configure the consumer name for this endpoint
+ ///
+ public RedisListenerConfiguration ConsumerName(string consumerName)
+ {
+ _endpoint.ConsumerName = consumerName;
+ return this;
+ }
+
+ ///
+ /// Configure the block timeout when reading from Redis streams
+ ///
+ public RedisListenerConfiguration BlockTimeout(TimeSpan timeout)
+ {
+ _endpoint.BlockTimeoutMilliseconds = (int)timeout.TotalMilliseconds;
+ return this;
+ }
+
+ ///
+ /// Configure the consumer group to start consuming from the beginning of the stream,
+ /// including any existing messages (equivalent to Redis "0-0" position)
+ ///
+ public RedisListenerConfiguration StartFromBeginning()
+ {
+ _endpoint.StartFrom = StartFrom.Beginning;
+ return this;
+ }
+
+ ///
+ /// Configure the consumer group to start consuming only new messages added after
+ /// group creation (equivalent to Redis "$" position). This is the default behavior.
+ ///
+ public RedisListenerConfiguration StartFromNewMessages()
+ {
+ _endpoint.StartFrom = StartFrom.NewMessages;
+ return this;
+ }
+
+ ///
+ /// Enable auto-claiming of pending entries within the consumer loop every specified period
+ ///
+ /// This can cause out of order messages. It is recommended to manually deal with expired claimed message if you are using .Inline() or desire ordered message processing.
+ /// Period between auto-claim attempts (default: 30 seconds)
+ /// Minimum idle time before claiming pending messages (if null, uses MinIdleBeforeClaimMilliseconds)
+ /// This endpoint for method chaining
+ public RedisListenerConfiguration EnableAutoClaim(TimeSpan? period = null, TimeSpan? minIdle = null)
+ {
+ _endpoint.AutoClaimEnabled = true;
+ if (period.HasValue) _endpoint.AutoClaimPeriod = period.Value;
+ if (minIdle.HasValue) _endpoint.AutoClaimMinIdle = minIdle.Value;
+ return this;
+ }
+
+ ///
+ /// Disable auto-claiming of pending entries within the consumer loop
+ ///
+ /// This endpoint for method chaining
+ public RedisListenerConfiguration DisableAutoClaim()
+ {
+ _endpoint.AutoClaimEnabled = false;
+ return this;
+ }
+}
+
+public class RedisSubscriberConfiguration : SubscriberConfiguration
+{
+ internal RedisSubscriberConfiguration(RedisStreamEndpoint endpoint) : base(endpoint)
+ {
+ }
+
+ ///
+ /// Configure the batch size for reading messages from Redis streams
+ ///
+ public RedisSubscriberConfiguration BatchSize(int batchSize)
+ {
+ _endpoint.BatchSize = batchSize;
+ return this;
+ }
+
+ ///
+ /// Configure the consumer name for this endpoint
+ ///
+ public RedisSubscriberConfiguration ConsumerName(string consumerName)
+ {
+ _endpoint.ConsumerName = consumerName;
+ return this;
+ }
+
+ ///
+ /// Configure the block timeout when reading from Redis streams
+ ///
+ public RedisSubscriberConfiguration BlockTimeout(TimeSpan timeout)
+ {
+ _endpoint.BlockTimeoutMilliseconds = (int)timeout.TotalMilliseconds;
+ return this;
+ }
+}
\ No newline at end of file
diff --git a/src/Transports/Redis/Wolverine.Redis/WolverineOptionsExtensions.cs b/src/Transports/Redis/Wolverine.Redis/WolverineOptionsExtensions.cs
index 5e67dc832..fc971f1fd 100644
--- a/src/Transports/Redis/Wolverine.Redis/WolverineOptionsExtensions.cs
+++ b/src/Transports/Redis/Wolverine.Redis/WolverineOptionsExtensions.cs
@@ -33,13 +33,13 @@ public static class WolverineOptionsExtensions
/// Wolverine configuration options
/// Redis connection string (StackExchange.Redis format)
/// RedisTransport for fluent configuration
- public static RedisTransport UseRedisTransport(this WolverineOptions options, string connectionString)
+ public static RedisTransportExpression UseRedisTransport(this WolverineOptions options, string connectionString)
{
var transport = new RedisTransport(connectionString);
options.Transports.Add(transport);
- return transport;
+ return new RedisTransportExpression(transport, options);
}
///
@@ -48,7 +48,7 @@ public static RedisTransport UseRedisTransport(this WolverineOptions options, st
/// Publishing configuration
/// Redis stream key name
/// Stream endpoint for further configuration
- public static RedisStreamEndpoint ToRedisStream(this IPublishToExpression publishing, string streamKey)
+ public static RedisSubscriberConfiguration ToRedisStream(this IPublishToExpression publishing, string streamKey)
{
return publishing.ToRedisStream(streamKey, 0);
}
@@ -60,7 +60,7 @@ public static RedisStreamEndpoint ToRedisStream(this IPublishToExpression publis
/// Redis stream key name
/// Redis database ID
/// Stream endpoint for further configuration
- public static RedisStreamEndpoint ToRedisStream(this IPublishToExpression publishing, string streamKey, int databaseId)
+ public static RedisSubscriberConfiguration ToRedisStream(this IPublishToExpression publishing, string streamKey, int databaseId)
{
// Use correct pattern from Kafka transport
var transports = publishing.As().Parent.Transports;
@@ -71,7 +71,7 @@ public static RedisStreamEndpoint ToRedisStream(this IPublishToExpression publis
// Register this as a publishing destination
publishing.To(endpoint.Uri);
- return endpoint;
+ return new RedisSubscriberConfiguration(endpoint);
}
///
@@ -81,7 +81,7 @@ public static RedisStreamEndpoint ToRedisStream(this IPublishToExpression publis
/// Redis stream key name
/// Consumer group name
/// Stream endpoint for further configuration
- public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup)
+ public static RedisListenerConfiguration ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup)
{
return options.ListenToRedisStream(streamKey, consumerGroup, 0);
}
@@ -89,11 +89,12 @@ public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions opti
///
/// Configure Wolverine to listen to a Redis stream with starting position control
///
- public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup, StartFrom startFrom)
+ public static RedisListenerConfiguration ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup, StartFrom startFrom)
{
var endpoint = options.ListenToRedisStream(streamKey, consumerGroup, 0);
- endpoint.StartFrom = startFrom;
- return endpoint;
+
+ if (startFrom == StartFrom.Beginning) return endpoint.StartFromBeginning();
+ return endpoint.StartFromNewMessages();
}
///
@@ -104,7 +105,7 @@ public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions opti
/// Consumer group name
/// Redis database ID
/// Stream endpoint for further configuration
- public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup, int databaseId)
+ public static RedisListenerConfiguration ListenToRedisStream(this WolverineOptions options, string streamKey, string consumerGroup, int databaseId)
{
var transport = options.Transports.GetOrCreate();
if (transport == null)
@@ -118,197 +119,8 @@ public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions opti
e.IsListener = true;
});
- return endpoint;
- }
-
- ///
- /// Configure Wolverine to listen to messages from the specified Redis stream with a consumer group (uses database 0)
- ///
- /// Wolverine configuration options
- /// Redis stream key name
- /// Consumer group name
- /// Configuration action for the endpoint
- /// Stream endpoint for further configuration
- public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey,
- string consumerGroup, Action configure)
- {
- return options.ListenToRedisStream(streamKey, consumerGroup, 0, configure);
- }
-
- ///
- /// Configure Wolverine to listen to messages from the specified Redis stream with a consumer group and database ID
- ///
- /// Wolverine configuration options
- /// Redis stream key name
- /// Consumer group name
- /// Redis database ID
- /// Configuration action for the endpoint
- /// Stream endpoint for further configuration
- public static RedisStreamEndpoint ListenToRedisStream(this WolverineOptions options, string streamKey,
- string consumerGroup, int databaseId, Action configure)
- {
- var endpoint = options.ListenToRedisStream(streamKey, consumerGroup, databaseId);
- configure(endpoint);
- return endpoint;
- }
-}
-
-///
-/// Extension methods for RedisTransport fluent configuration
-///
-public static class RedisTransportExtensions
-{
- ///
- /// Set the Redis database ID used for the per-node reply stream endpoint (request/reply mechanics)
- ///
- public static RedisTransport UseReplyStreamDatabase(this RedisTransport transport, int databaseId)
- {
- if (databaseId < 0) throw new ArgumentOutOfRangeException(nameof(databaseId));
- transport.ReplyDatabaseId = databaseId;
- return transport;
- }
-
- ///
- /// Enable auto-provisioning of Redis streams and consumer groups
- ///
- public static RedisTransport AutoProvision(this RedisTransport transport)
- {
- transport.AutoProvision = true;
- return transport;
- }
-
- ///
- /// Configure Redis transport to auto-purge streams on startup (useful for testing)
- ///
- public static RedisTransport AutoPurgeOnStartup(this RedisTransport transport)
- {
- transport.AutoPurgeAllQueues = true;
- return transport;
- }
-
- ///
- /// Configure a default consumer name selector applied to all Redis listeners
- /// when an endpoint-level ConsumerName is not explicitly set.
- /// Example: transport.ConfigureDefaultConsumerName((rt, ep) => $"{rt.Options.ServiceName}-{rt.DurabilitySettings.AssignedNodeNumber}");
- ///
- public static RedisTransport ConfigureDefaultConsumerName(this RedisTransport transport,
- Func selector)
- {
- transport.DefaultConsumerNameSelector = selector;
- return transport;
- }
-}
-
-///
-/// Extension methods for RedisStreamEndpoint fluent configuration
-///
-public static class RedisStreamEndpointExtensions
-{
- ///
- /// Configure the batch size for reading messages from Redis streams
- ///
- public static RedisStreamEndpoint BatchSize(this RedisStreamEndpoint endpoint, int batchSize)
- {
- endpoint.BatchSize = batchSize;
- return endpoint;
- }
-
- ///
- /// Configure the consumer name for this endpoint
- ///
- public static RedisStreamEndpoint ConsumerName(this RedisStreamEndpoint endpoint, string consumerName)
- {
- endpoint.ConsumerName = consumerName;
- return endpoint;
- }
-
- ///
- /// Configure the block timeout when reading from Redis streams
- ///
- public static RedisStreamEndpoint BlockTimeout(this RedisStreamEndpoint endpoint, TimeSpan timeout)
- {
- endpoint.BlockTimeoutMilliseconds = (int)timeout.TotalMilliseconds;
- return endpoint;
- }
-
- ///
- /// Configure this endpoint to use buffered in-memory processing
- ///
- public static RedisStreamEndpoint BufferedInMemory(this RedisStreamEndpoint endpoint)
- {
- endpoint.Mode = EndpointMode.BufferedInMemory;
- return endpoint;
- }
-
- ///
- /// Configure this endpoint to use buffered in-memory processing
- ///
- public static RedisStreamEndpoint Sequential(this RedisStreamEndpoint endpoint)
- {
- endpoint.MaxDegreeOfParallelism = 1;
- return endpoint;
- }
-
- ///
- /// Configure this endpoint to use durable processing with message persistence
- ///
- public static RedisStreamEndpoint Durable(this RedisStreamEndpoint endpoint)
- {
- endpoint.Mode = EndpointMode.Durable;
- return endpoint;
- }
-
- ///
- /// Configure this endpoint to use inline processing (no queueing)
- ///
- public static RedisStreamEndpoint Inline(this RedisStreamEndpoint endpoint)
- {
- endpoint.Mode = EndpointMode.Inline;
- endpoint.MaxDegreeOfParallelism = 1;
- endpoint.BatchSize = 1;
- return endpoint;
- }
-
- ///
- /// Configure the consumer group to start consuming from the beginning of the stream,
- /// including any existing messages (equivalent to Redis "0-0" position)
- ///
- public static RedisStreamEndpoint StartFromBeginning(this RedisStreamEndpoint endpoint)
- {
- endpoint.StartFrom = StartFrom.Beginning;
- return endpoint;
- }
-
- ///
- /// Configure the consumer group to start consuming only new messages added after
- /// group creation (equivalent to Redis "$" position). This is the default behavior.
- ///
- public static RedisStreamEndpoint StartFromNewMessages(this RedisStreamEndpoint endpoint)
- {
- endpoint.StartFrom = StartFrom.NewMessages;
- return endpoint;
+ return new RedisListenerConfiguration(endpoint);
}
- ///
- /// Helper method to create a Redis stream URI with database ID
- ///
- /// Redis stream key name
- /// Redis database ID
- /// Formatted Redis stream URI
- public static Uri BuildRedisStreamUri(string streamKey, int databaseId = 0)
- {
- return new Uri($"redis://stream/{databaseId}/{streamKey}");
- }
- ///
- /// Helper method to create a Redis stream URI with database ID and consumer group
- ///
- /// Redis stream key name
- /// Redis database ID
- /// Consumer group name
- /// Formatted Redis stream URI
- public static Uri BuildRedisStreamUri(string streamKey, int databaseId, string consumerGroup)
- {
- return new Uri($"redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}");
- }
}