From 9b0f980d61f51872c8ebf94dba7c1933062a834e Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 17 Apr 2026 21:16:30 +0200 Subject: [PATCH] Add transport endpoint URI helpers for all supported transports (#2502) Introduce EndpointUri static helper classes for every built-in transport so users no longer need to hand-write raw URI strings. Each helper exposes Uri-returning factory methods that cover every URI shape the transport's parser accepts, with ArgumentException validation on every string parameter and XML documentation on every method. New helpers: - RabbitMqEndpointUri (Queue, Exchange, Topic, Routing) - KafkaEndpointUri (Topic) - AzureServiceBusEndpointUri (Queue, Topic, Subscription) - SqsEndpointUri (Queue) - SnsEndpointUri (Topic) - MqttEndpointUri (Topic) - NatsEndpointUri (Subject) - PulsarEndpointUri (PersistentTopic, NonPersistentTopic, Topic(..., bool), Topic(string)) - GcpPubsubEndpointUri (Topic) - RedisEndpointUri (Stream, Stream with consumer group) Migration of existing ad-hoc helpers: - RedisTransport.BuildRedisStreamUri is now a [Obsolete] one-line delegator to RedisEndpointUri.Stream. - PulsarEndpoint.UriFor(string) is now a [Obsolete] delegator to PulsarEndpointUri.Topic(string). - PulsarEndpoint.UriFor(bool, tenant, ns, topic) stays bit-exact and is marked [Obsolete] without delegation: it returns a Pulsar-native topic path ("persistent://..."), which is a different URI form from the Wolverine endpoint URI ("pulsar://...") produced by PulsarEndpointUri. The obsolete message explains the non-delegation for future readers. - Internal library callers that want endpoint URIs (PulsarTransport, PulsarTransportExtensions) now call PulsarEndpointUri.Topic directly. - PulsarListener still needs the topic-path form for the native Pulsar client; its two call sites wrap the obsolete call with #pragma warning disable CS0618. PulsarEndpointUri.Topic(string) validates strictly: the scheme must be "persistent" or "non-persistent" and the path must contain exactly tenant/namespace/topic. Previously-accepted malformed input (e.g. "http://..." or a Wolverine endpoint URI) now throws an ArgumentException at the helper instead of silently producing a corrupt URI that only fails later in PulsarEndpoint.Parse. Tests: - One new test class per transport covering shape, validation, and (where feasible without a live broker) URI-parser roundtrip, totalling ~130 new test cases. - Additional compatibility tests in PulsarEndpointTests pin the deprecated UriFor behaviour for non-persistent scheme input, realistic topic names with dots/dashes, and the -RETRY / -DLQ suffix patterns used by PulsarListener against the native Pulsar client. Documentation: - Added a "## URI reference" section to every transport's Vitepress docs page mapping URI forms to helper calls. - Redis and Pulsar pages include a deprecation tip pointing users from BuildRedisStreamUri / UriFor to the new helpers. - Redis DocumentationSamples and four Pulsar test files that used UriFor(string) incidentally were migrated to PulsarEndpointUri.Topic. --- .../transports/azureservicebus/index.md | 12 ++ .../messaging/transports/gcp-pubsub/index.md | 14 ++ docs/guide/messaging/transports/kafka.md | 14 ++ docs/guide/messaging/transports/mqtt.md | 13 ++ docs/guide/messaging/transports/nats.md | 14 ++ docs/guide/messaging/transports/pulsar.md | 21 +++ .../messaging/transports/rabbitmq/index.md | 18 +++ docs/guide/messaging/transports/redis.md | 17 +++ docs/guide/messaging/transports/sns.md | 16 +++ docs/guide/messaging/transports/sqs/index.md | 16 +++ .../SnsEndpointUriTests.cs | 39 ++++++ .../AWS/Wolverine.AmazonSns/SnsEndpointUri.cs | 21 +++ .../SqsEndpointUriTests.cs | 39 ++++++ .../AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs | 21 +++ .../AzureServiceBusEndpointUriTests.cs | 85 ++++++++++++ .../AzureServiceBusEndpointUri.cs | 51 +++++++ .../GcpPubsubEndpointUriTests.cs | 26 ++++ .../Wolverine.Pubsub/GcpPubsubEndpointUri.cs | 23 ++++ .../KafkaEndpointUriTests.cs | 32 +++++ .../Kafka/Wolverine.Kafka/KafkaEndpointUri.cs | 21 +++ .../MqttEndpointUriTests.cs | 32 +++++ .../MQTT/Wolverine.MQTT/MqttEndpointUri.cs | 22 +++ .../NatsEndpointUriTests.cs | 32 +++++ .../NATS/Wolverine.Nats/NatsEndpointUri.cs | 21 +++ .../InlinePulsarTransportComplianceTests.cs | 2 +- .../PulsarEndpointTests.cs | 42 ++++++ .../PulsarEndpointUriTests.cs | 125 ++++++++++++++++++ .../PulsarTransportComplianceTests.cs | 2 +- .../Wolverine.Pulsar.Tests/WithCloudEvents.cs | 2 +- .../endpoint_configuration.cs | 4 +- .../Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs | 6 +- .../Wolverine.Pulsar/PulsarEndpointUri.cs | 100 ++++++++++++++ .../Pulsar/Wolverine.Pulsar/PulsarListener.cs | 4 + .../Wolverine.Pulsar/PulsarTransport.cs | 4 +- .../PulsarTransportExtensions.cs | 2 +- .../RabbitMqEndpointUriTests.cs | 113 ++++++++++++++++ .../Wolverine.RabbitMQ/RabbitMqEndpointUri.cs | 67 ++++++++++ .../DocumentationSamples.cs | 4 +- .../RedisEndpointUriTests.cs | 70 ++++++++++ .../Internal/RedisTransport.cs | 10 +- .../Redis/Wolverine.Redis/RedisEndpointUri.cs | 43 ++++++ 41 files changed, 1203 insertions(+), 17 deletions(-) create mode 100644 src/Transports/AWS/Wolverine.AmazonSns.Tests/SnsEndpointUriTests.cs create mode 100644 src/Transports/AWS/Wolverine.AmazonSns/SnsEndpointUri.cs create mode 100644 src/Transports/AWS/Wolverine.AmazonSqs.Tests/SqsEndpointUriTests.cs create mode 100644 src/Transports/AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs create mode 100644 src/Transports/Azure/Wolverine.AzureServiceBus.Tests/AzureServiceBusEndpointUriTests.cs create mode 100644 src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusEndpointUri.cs create mode 100644 src/Transports/GCP/Wolverine.Pubsub.Tests/GcpPubsubEndpointUriTests.cs create mode 100644 src/Transports/GCP/Wolverine.Pubsub/GcpPubsubEndpointUri.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaEndpointUriTests.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/KafkaEndpointUri.cs create mode 100644 src/Transports/MQTT/Wolverine.MQTT.Tests/MqttEndpointUriTests.cs create mode 100644 src/Transports/MQTT/Wolverine.MQTT/MqttEndpointUri.cs create mode 100644 src/Transports/NATS/Wolverine.Nats.Tests/NatsEndpointUriTests.cs create mode 100644 src/Transports/NATS/Wolverine.Nats/NatsEndpointUri.cs create mode 100644 src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointUriTests.cs create mode 100644 src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpointUri.cs create mode 100644 src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqEndpointUriTests.cs create mode 100644 src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqEndpointUri.cs create mode 100644 src/Transports/Redis/Wolverine.Redis.Tests/RedisEndpointUriTests.cs create mode 100644 src/Transports/Redis/Wolverine.Redis/RedisEndpointUri.cs diff --git a/docs/guide/messaging/transports/azureservicebus/index.md b/docs/guide/messaging/transports/azureservicebus/index.md index 285848ec0..d24c09590 100644 --- a/docs/guide/messaging/transports/azureservicebus/index.md +++ b/docs/guide/messaging/transports/azureservicebus/index.md @@ -146,8 +146,20 @@ builder.UseWolverine(opts => snippet source | anchor +## URI reference +The `AzureServiceBusEndpointUri` helper class builds canonical endpoint URIs: +| URI form | Helper call | +|---|---| +| `asb://queue/{name}` | `AzureServiceBusEndpointUri.Queue("name")` | +| `asb://topic/{name}` | `AzureServiceBusEndpointUri.Topic("name")` | +| `asb://topic/{topic}/{subscription}` | `AzureServiceBusEndpointUri.Subscription("topic", "sub")` | +```csharp +using Wolverine.AzureServiceBus; + +var uri = AzureServiceBusEndpointUri.Subscription("events", "audit"); +``` diff --git a/docs/guide/messaging/transports/gcp-pubsub/index.md b/docs/guide/messaging/transports/gcp-pubsub/index.md index dcf563b27..b42327591 100644 --- a/docs/guide/messaging/transports/gcp-pubsub/index.md +++ b/docs/guide/messaging/transports/gcp-pubsub/index.md @@ -100,3 +100,17 @@ opts.UsePubsub("your-project-id") ``` The default delimiter between the prefix and the original name is `.` for GCP Pub/Sub (e.g., `dev-john.orders`). + +## URI reference + +The `GcpPubsubEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `pubsub://{projectId}/{topicName}` | `GcpPubsubEndpointUri.Topic("projectId", "topicName")` | + +```csharp +using Wolverine.Pubsub; + +var uri = GcpPubsubEndpointUri.Topic("my-project", "orders"); +``` diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index 509e7353e..31f354ba1 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -678,3 +678,17 @@ await bus.BroadcastToTopicAsync("my-topic", new KafkaTombstone("record-key-to-de When Wolverine encounters a `KafkaTombstone` message, it produces a Kafka message with the specified key and a `null` value. This signals to Kafka's log compaction process that the record with that key should be removed during the next compaction cycle. This is useful when your Kafka topics use [log compaction](https://docs.confluent.io/platform/current/kafka/design.html#log-compaction) to maintain a key-value snapshot of the latest state. Publishing a tombstone ensures that deleted records are eventually cleaned up from the topic. + +## URI reference + +The `KafkaEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `kafka://topic/{name}` | `KafkaEndpointUri.Topic("name")` | + +```csharp +using Wolverine.Kafka; + +var uri = KafkaEndpointUri.Topic("orders"); +``` diff --git a/docs/guide/messaging/transports/mqtt.md b/docs/guide/messaging/transports/mqtt.md index 14c07dacd..8943f24db 100644 --- a/docs/guide/messaging/transports/mqtt.md +++ b/docs/guide/messaging/transports/mqtt.md @@ -466,3 +466,16 @@ await host.StartAsync(); snippet source | anchor +## URI reference + +The `MqttEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `mqtt://topic/{name}` | `MqttEndpointUri.Topic("name")` | + +```csharp +using Wolverine.MQTT; + +var uri = MqttEndpointUri.Topic("sensor/temperature"); +``` diff --git a/docs/guide/messaging/transports/nats.md b/docs/guide/messaging/transports/nats.md index 1d12acb1d..5014ad4b8 100644 --- a/docs/guide/messaging/transports/nats.md +++ b/docs/guide/messaging/transports/nats.md @@ -423,3 +423,17 @@ docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:latest --jetstream -m 8 # For scheduled delivery tests, use NATS 2.12+ docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:2.12-alpine --jetstream -m 8222 ``` + +## URI reference + +The `NatsEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `nats://subject/{subject}` | `NatsEndpointUri.Subject("subject")` | + +```csharp +using Wolverine.Nats; + +var uri = NatsEndpointUri.Subject("orders.created"); +``` diff --git a/docs/guide/messaging/transports/pulsar.md b/docs/guide/messaging/transports/pulsar.md index 37031c290..d92b2c7b6 100644 --- a/docs/guide/messaging/transports/pulsar.md +++ b/docs/guide/messaging/transports/pulsar.md @@ -159,3 +159,24 @@ Also see the more generic [Wolverine Guide on Interoperability](/tutorials/inter ::: Pulsar interoperability is done through the `IPulsarEnvelopeMapper` interface. + +## URI reference + +The `PulsarEndpointUri` helper class produces Wolverine endpoint URIs of the form `pulsar://persistent/{tenant}/{ns}/{topic}` or `pulsar://non-persistent/{tenant}/{ns}/{topic}` — the form Wolverine's parser accepts. Pulsar-native topic-path strings (`persistent://...`) used by the native Pulsar client are a separate concept and are not built by this helper. + +| Helper call | Resulting URI | +|---|---| +| `PulsarEndpointUri.PersistentTopic("public", "default", "orders")` | `pulsar://persistent/public/default/orders` | +| `PulsarEndpointUri.NonPersistentTopic("public", "default", "orders")` | `pulsar://non-persistent/public/default/orders` | +| `PulsarEndpointUri.Topic("public", "default", "orders", persistent: true)` | `pulsar://persistent/public/default/orders` | +| `PulsarEndpointUri.Topic("persistent://public/default/orders")` | `pulsar://persistent/public/default/orders` | + +```csharp +using Wolverine.Pulsar; + +var uri = PulsarEndpointUri.PersistentTopic("public", "default", "orders"); +``` + +::: tip +`PulsarEndpoint.UriFor` is deprecated. Use `PulsarEndpointUri.Topic` (string overload) or `PersistentTopic`/`NonPersistentTopic` instead. The old `UriFor(bool, ...)` overload returned a Pulsar-native topic path, not a Wolverine endpoint URI — if you need that format, build the string directly. +::: diff --git a/docs/guide/messaging/transports/rabbitmq/index.md b/docs/guide/messaging/transports/rabbitmq/index.md index a1b14a3b9..b3789dba3 100644 --- a/docs/guide/messaging/transports/rabbitmq/index.md +++ b/docs/guide/messaging/transports/rabbitmq/index.md @@ -272,3 +272,21 @@ This creates RabbitMQ queues named `sequenced1` through `sequenced5` with compan ::: info Wolverine with the `WolverineFX.RabbitMQ` transport has also been verified to work against [LavinMQ](https://lavinmq.com/), a modern RabbitMQ-protocol compatible message broker, using the RabbitMQ transport with 100% protocol compatibility when configured through the standard RabbitMQ integration shown above. ::: + +## URI reference + +The `RabbitMqEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `rabbitmq://queue/{name}` | `RabbitMqEndpointUri.Queue("name")` | +| `rabbitmq://exchange/{name}` | `RabbitMqEndpointUri.Exchange("name")` | +| `rabbitmq://topic/{exchange}/{routingKey}` | `RabbitMqEndpointUri.Topic("ex", "key")` | +| `rabbitmq://exchange/{exchange}/routing/{routingKey}` | `RabbitMqEndpointUri.Routing("ex", "key")` | + +```csharp +using Wolverine.RabbitMQ; + +var uri = RabbitMqEndpointUri.Queue("orders"); +// new Uri("rabbitmq://queue/orders") +``` diff --git a/docs/guide/messaging/transports/redis.md b/docs/guide/messaging/transports/redis.md index 29e5b86ae..54d468766 100644 --- a/docs/guide/messaging/transports/redis.md +++ b/docs/guide/messaging/transports/redis.md @@ -259,4 +259,21 @@ using var host = await builder.UseWolverine(opts => snippet source | anchor +## URI reference +The `RedisEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `redis://stream/{databaseId}/{streamKey}` | `RedisEndpointUri.Stream("key", databaseId: 0)` | +| `redis://stream/{databaseId}/{streamKey}?consumerGroup={group}` | `RedisEndpointUri.Stream("key", 0, "group")` | + +```csharp +using Wolverine.Redis; + +var uri = RedisEndpointUri.Stream("orders", databaseId: 3); +``` + +::: tip +`RedisTransport.BuildRedisStreamUri` is deprecated. Use `RedisEndpointUri.Stream` instead. +::: diff --git a/docs/guide/messaging/transports/sns.md b/docs/guide/messaging/transports/sns.md index c61dcfaeb..d72268752 100644 --- a/docs/guide/messaging/transports/sns.md +++ b/docs/guide/messaging/transports/sns.md @@ -199,3 +199,19 @@ Also see the more generic [Wolverine Guide on Interoperability](/tutorials/inter SNS interoperability is done through the `ISnsEnvelopeMapper`. At this point, SNS supports interoperability through MassTransit, NServiceBus, CloudEvents, or user defined mapping strategies. + +## URI reference + +The `SnsEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `sns://{name}` | `SnsEndpointUri.Topic("name")` | + +```csharp +using Wolverine.AmazonSns; + +var uri = SnsEndpointUri.Topic("events"); +// FIFO topic (suffix preserved verbatim): +var fifoUri = SnsEndpointUri.Topic("events.fifo"); +``` diff --git a/docs/guide/messaging/transports/sqs/index.md b/docs/guide/messaging/transports/sqs/index.md index c2444eb89..ae6595ebb 100644 --- a/docs/guide/messaging/transports/sqs/index.md +++ b/docs/guide/messaging/transports/sqs/index.md @@ -264,3 +264,19 @@ using var host = await Host.CreateDefaultBuilder() opts.PublishAllMessages().ToSqsQueue("send-and-receive"); }).StartAsync(); ``` + +## URI reference + +The `SqsEndpointUri` helper class builds canonical endpoint URIs: + +| URI form | Helper call | +|---|---| +| `sqs://{name}` | `SqsEndpointUri.Queue("name")` | + +```csharp +using Wolverine.AmazonSqs; + +var uri = SqsEndpointUri.Queue("orders"); +// FIFO queue (suffix preserved verbatim): +var fifoUri = SqsEndpointUri.Queue("orders.fifo"); +``` diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/SnsEndpointUriTests.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/SnsEndpointUriTests.cs new file mode 100644 index 000000000..1efda575c --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/SnsEndpointUriTests.cs @@ -0,0 +1,39 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.AmazonSns.Tests; + +public class SnsEndpointUriTests +{ + [Fact] + public void topic_uri_has_expected_shape() + { + SnsEndpointUri.Topic("events") + .ShouldBe(new Uri("sns://events")); + } + + [Fact] + public void topic_uri_preserves_fifo_suffix() + { + SnsEndpointUri.Topic("events.fifo") + .ShouldBe(new Uri("sns://events.fifo")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void topic_rejects_invalid_name(string? name) + { + Should.Throw(() => SnsEndpointUri.Topic(name!)); + } + + [Fact] + public void topic_uri_roundtrips_through_parser() + { + var uri = SnsEndpointUri.Topic("events"); + var transport = new Wolverine.AmazonSns.Internal.AmazonSnsTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSns/SnsEndpointUri.cs b/src/Transports/AWS/Wolverine.AmazonSns/SnsEndpointUri.cs new file mode 100644 index 000000000..bdf20e496 --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSns/SnsEndpointUri.cs @@ -0,0 +1,21 @@ +namespace Wolverine.AmazonSns; + +/// +/// Builds canonical Wolverine endpoint values for AWS SNS transport endpoints. +/// +public static class SnsEndpointUri +{ + /// + /// Builds a URI referencing an SNS topic endpoint in the canonical form + /// sns://{topicName}. FIFO topic names (with .fifo suffix) are preserved verbatim. + /// + /// The SNS topic name. + /// A of the form sns://{topicName}. + /// SnsEndpointUri.Topic("events") returns sns://events. + /// Thrown when is null, empty, or whitespace. + public static Uri Topic(string topicName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(topicName); + return new Uri($"sns://{topicName}"); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/SqsEndpointUriTests.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/SqsEndpointUriTests.cs new file mode 100644 index 000000000..831db7b0d --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/SqsEndpointUriTests.cs @@ -0,0 +1,39 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.AmazonSqs.Tests; + +public class SqsEndpointUriTests +{ + [Fact] + public void queue_uri_has_expected_shape() + { + SqsEndpointUri.Queue("orders") + .ShouldBe(new Uri("sqs://orders")); + } + + [Fact] + public void queue_uri_preserves_fifo_suffix() + { + SqsEndpointUri.Queue("orders.fifo") + .ShouldBe(new Uri("sqs://orders.fifo")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void queue_rejects_invalid_name(string? name) + { + Should.Throw(() => SqsEndpointUri.Queue(name!)); + } + + [Fact] + public void queue_uri_roundtrips_through_parser() + { + var uri = SqsEndpointUri.Queue("orders"); + var transport = new Wolverine.AmazonSqs.Internal.AmazonSqsTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs b/src/Transports/AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs new file mode 100644 index 000000000..4d52d6328 --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs @@ -0,0 +1,21 @@ +namespace Wolverine.AmazonSqs; + +/// +/// Builds canonical Wolverine endpoint values for AWS SQS transport endpoints. +/// +public static class SqsEndpointUri +{ + /// + /// Builds a URI referencing an SQS queue endpoint in the canonical form + /// sqs://{queueName}. FIFO queue names (with .fifo suffix) are preserved verbatim. + /// + /// The SQS queue name. + /// A of the form sqs://{queueName}. + /// SqsEndpointUri.Queue("orders") returns sqs://orders. + /// Thrown when is null, empty, or whitespace. + public static Uri Queue(string queueName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(queueName); + return new Uri($"sqs://{queueName}"); + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/AzureServiceBusEndpointUriTests.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/AzureServiceBusEndpointUriTests.cs new file mode 100644 index 000000000..e703c20b1 --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/AzureServiceBusEndpointUriTests.cs @@ -0,0 +1,85 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.AzureServiceBus.Tests; + +public class AzureServiceBusEndpointUriTests +{ + [Fact] + public void queue_uri_has_expected_shape() + { + AzureServiceBusEndpointUri.Queue("orders") + .ShouldBe(new Uri("asb://queue/orders")); + } + + [Fact] + public void topic_uri_has_expected_shape() + { + AzureServiceBusEndpointUri.Topic("events") + .ShouldBe(new Uri("asb://topic/events")); + } + + [Fact] + public void subscription_uri_has_expected_shape() + { + AzureServiceBusEndpointUri.Subscription("events", "audit") + .ShouldBe(new Uri("asb://topic/events/audit")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void queue_rejects_invalid_name(string? name) + { + Should.Throw(() => AzureServiceBusEndpointUri.Queue(name!)); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void topic_rejects_invalid_name(string? name) + { + Should.Throw(() => AzureServiceBusEndpointUri.Topic(name!)); + } + + [Theory] + [InlineData(null, "sub")] + [InlineData("", "sub")] + [InlineData(" ", "sub")] + [InlineData("topic", null)] + [InlineData("topic", "")] + [InlineData("topic", " ")] + public void subscription_rejects_invalid_args(string? topic, string? sub) + { + Should.Throw(() => AzureServiceBusEndpointUri.Subscription(topic!, sub!)); + } + + [Fact] + public void queue_uri_roundtrips_through_parser() + { + var uri = AzureServiceBusEndpointUri.Queue("orders"); + var transport = new AzureServiceBusTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } + + [Fact] + public void topic_uri_roundtrips_through_parser() + { + var uri = AzureServiceBusEndpointUri.Topic("events"); + var transport = new AzureServiceBusTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } + + [Fact] + public void subscription_uri_roundtrips_through_parser() + { + var uri = AzureServiceBusEndpointUri.Subscription("events", "audit"); + var transport = new AzureServiceBusTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusEndpointUri.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusEndpointUri.cs new file mode 100644 index 000000000..1b2e7a220 --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusEndpointUri.cs @@ -0,0 +1,51 @@ +namespace Wolverine.AzureServiceBus; + +/// +/// Builds canonical Wolverine endpoint values for Azure Service Bus transport endpoints. +/// +public static class AzureServiceBusEndpointUri +{ + /// + /// Builds a URI referencing an Azure Service Bus queue endpoint in the canonical form + /// asb://queue/{queueName}. + /// + /// The Azure Service Bus queue name. + /// A of the form asb://queue/{queueName}. + /// AzureServiceBusEndpointUri.Queue("orders") returns asb://queue/orders. + /// Thrown when is null, empty, or whitespace. + public static Uri Queue(string queueName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(queueName); + return new Uri($"asb://queue/{queueName}"); + } + + /// + /// Builds a URI referencing an Azure Service Bus topic endpoint in the canonical form + /// asb://topic/{topicName}. + /// + /// The Azure Service Bus topic name. + /// A of the form asb://topic/{topicName}. + /// AzureServiceBusEndpointUri.Topic("events") returns asb://topic/events. + /// Thrown when is null, empty, or whitespace. + public static Uri Topic(string topicName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(topicName); + return new Uri($"asb://topic/{topicName}"); + } + + /// + /// Builds a URI referencing an Azure Service Bus topic subscription endpoint in the canonical form + /// asb://topic/{topicName}/{subscriptionName}. + /// + /// The Azure Service Bus topic name. + /// The subscription name bound to the topic. + /// A of the form asb://topic/{topicName}/{subscriptionName}. + /// AzureServiceBusEndpointUri.Subscription("events", "audit") returns asb://topic/events/audit. + /// Thrown when either parameter is null, empty, or whitespace. + public static Uri Subscription(string topicName, string subscriptionName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(topicName); + ArgumentException.ThrowIfNullOrWhiteSpace(subscriptionName); + return new Uri($"asb://topic/{topicName}/{subscriptionName}"); + } +} diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/GcpPubsubEndpointUriTests.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/GcpPubsubEndpointUriTests.cs new file mode 100644 index 000000000..57cdccd80 --- /dev/null +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/GcpPubsubEndpointUriTests.cs @@ -0,0 +1,26 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.Pubsub.Tests; + +public class GcpPubsubEndpointUriTests +{ + [Fact] + public void topic_uri_has_expected_shape() + { + GcpPubsubEndpointUri.Topic("my-project", "orders") + .ShouldBe(new Uri("pubsub://my-project/orders")); + } + + [Theory] + [InlineData(null, "orders")] + [InlineData("", "orders")] + [InlineData(" ", "orders")] + [InlineData("proj", null)] + [InlineData("proj", "")] + [InlineData("proj", " ")] + public void topic_rejects_invalid_args(string? projectId, string? topicName) + { + Should.Throw(() => GcpPubsubEndpointUri.Topic(projectId!, topicName!)); + } +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/GcpPubsubEndpointUri.cs b/src/Transports/GCP/Wolverine.Pubsub/GcpPubsubEndpointUri.cs new file mode 100644 index 000000000..e8a4de03e --- /dev/null +++ b/src/Transports/GCP/Wolverine.Pubsub/GcpPubsubEndpointUri.cs @@ -0,0 +1,23 @@ +namespace Wolverine.Pubsub; + +/// +/// Builds canonical Wolverine endpoint values for Google Cloud Pub/Sub transport endpoints. +/// +public static class GcpPubsubEndpointUri +{ + /// + /// Builds a URI referencing a Google Cloud Pub/Sub topic endpoint in the canonical form + /// pubsub://{projectId}/{topicName}. + /// + /// The GCP project ID that owns the topic. + /// The Pub/Sub topic name. + /// A of the form pubsub://{projectId}/{topicName}. + /// GcpPubsubEndpointUri.Topic("my-project", "orders") returns pubsub://my-project/orders. + /// Thrown when either parameter is null, empty, or whitespace. + public static Uri Topic(string projectId, string topicName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(projectId); + ArgumentException.ThrowIfNullOrWhiteSpace(topicName); + return new Uri($"pubsub://{projectId}/{topicName}"); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaEndpointUriTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaEndpointUriTests.cs new file mode 100644 index 000000000..70089a672 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaEndpointUriTests.cs @@ -0,0 +1,32 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.Kafka.Tests; + +public class KafkaEndpointUriTests +{ + [Fact] + public void topic_uri_has_expected_shape() + { + KafkaEndpointUri.Topic("orders") + .ShouldBe(new Uri("kafka://topic/orders")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void topic_rejects_invalid_name(string? name) + { + Should.Throw(() => KafkaEndpointUri.Topic(name!)); + } + + [Fact] + public void topic_uri_roundtrips_through_parser() + { + var uri = KafkaEndpointUri.Topic("orders"); + var transport = new Wolverine.Kafka.Internals.KafkaTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaEndpointUri.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaEndpointUri.cs new file mode 100644 index 000000000..fbd468bab --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaEndpointUri.cs @@ -0,0 +1,21 @@ +namespace Wolverine.Kafka; + +/// +/// Builds canonical Wolverine endpoint values for Kafka transport endpoints. +/// +public static class KafkaEndpointUri +{ + /// + /// Builds a URI referencing a Kafka topic endpoint in the canonical form + /// kafka://topic/{topicName}. + /// + /// The Kafka topic name. + /// A of the form kafka://topic/{topicName}. + /// KafkaEndpointUri.Topic("orders") returns kafka://topic/orders. + /// Thrown when is null, empty, or whitespace. + public static Uri Topic(string topicName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(topicName); + return new Uri($"kafka://topic/{topicName}"); + } +} diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttEndpointUriTests.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttEndpointUriTests.cs new file mode 100644 index 000000000..91e1e134f --- /dev/null +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttEndpointUriTests.cs @@ -0,0 +1,32 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.MQTT.Tests; + +public class MqttEndpointUriTests +{ + [Fact] + public void topic_uri_has_expected_shape() + { + MqttEndpointUri.Topic("sensor/temperature") + .ShouldBe(new Uri("mqtt://topic/sensor/temperature")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void topic_rejects_invalid_name(string? name) + { + Should.Throw(() => MqttEndpointUri.Topic(name!)); + } + + [Fact] + public void topic_uri_roundtrips_through_parser() + { + var uri = MqttEndpointUri.Topic("sensor/temperature"); + var transport = new Wolverine.MQTT.Internals.MqttTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } +} diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttEndpointUri.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttEndpointUri.cs new file mode 100644 index 000000000..895a3eeea --- /dev/null +++ b/src/Transports/MQTT/Wolverine.MQTT/MqttEndpointUri.cs @@ -0,0 +1,22 @@ +namespace Wolverine.MQTT; + +/// +/// Builds canonical Wolverine endpoint values for MQTT transport endpoints. +/// +public static class MqttEndpointUri +{ + /// + /// Builds a URI referencing an MQTT topic endpoint in the canonical form + /// mqtt://topic/{topicName}. Slashes inside the topic name are preserved + /// as path separators. + /// + /// The MQTT topic name (may contain slashes). + /// A of the form mqtt://topic/{topicName}. + /// MqttEndpointUri.Topic("sensor/temperature") returns mqtt://topic/sensor/temperature. + /// Thrown when is null, empty, or whitespace. + public static Uri Topic(string topicName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(topicName); + return new Uri("mqtt://topic/" + topicName.Trim('/')); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NatsEndpointUriTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NatsEndpointUriTests.cs new file mode 100644 index 000000000..f9a93d7e0 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/NatsEndpointUriTests.cs @@ -0,0 +1,32 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.Nats.Tests; + +public class NatsEndpointUriTests +{ + [Fact] + public void subject_uri_has_expected_shape() + { + NatsEndpointUri.Subject("orders.created") + .ShouldBe(new Uri("nats://subject/orders.created")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void subject_rejects_invalid_name(string? subject) + { + Should.Throw(() => NatsEndpointUri.Subject(subject!)); + } + + [Fact] + public void subject_uri_roundtrips_through_parser() + { + var uri = NatsEndpointUri.Subject("orders.created"); + var transport = new Wolverine.Nats.Internal.NatsTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/NatsEndpointUri.cs b/src/Transports/NATS/Wolverine.Nats/NatsEndpointUri.cs new file mode 100644 index 000000000..9485e4160 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/NatsEndpointUri.cs @@ -0,0 +1,21 @@ +namespace Wolverine.Nats; + +/// +/// Builds canonical Wolverine endpoint values for NATS transport endpoints. +/// +public static class NatsEndpointUri +{ + /// + /// Builds a URI referencing a NATS subject endpoint in the canonical form + /// nats://subject/{subject}. + /// + /// The NATS subject (dot-separated identifier). + /// A of the form nats://subject/{subject}. + /// NatsEndpointUri.Subject("orders.created") returns nats://subject/orders.created. + /// Thrown when is null, empty, or whitespace. + public static Uri Subject(string subject) + { + ArgumentException.ThrowIfNullOrWhiteSpace(subject); + return new Uri($"nats://subject/{subject}"); + } +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs index 87dd55954..c9f442c05 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs @@ -14,7 +14,7 @@ public async Task InitializeAsync() { var topic = Guid.NewGuid().ToString(); var topicPath = $"persistent://public/default/{topic}"; - OutboundAddress = PulsarEndpoint.UriFor(topicPath); + OutboundAddress = PulsarEndpointUri.Topic(topicPath); await ReceiverIs(opts => { diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs index 77371531f..29e1f3c34 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs @@ -81,4 +81,46 @@ public void uri_for_topic_string() endpoint.Tenant.ShouldBe("t1"); endpoint.TopicName.ShouldBe("aaa"); } + + [Fact] + public void uri_for_topic_string_handles_non_persistent_scheme() + { + var uri = PulsarEndpoint.UriFor("non-persistent://t1/ns1/aaa"); + var endpoint = new PulsarEndpoint(uri, null!); + + endpoint.Persistence.ShouldBe(PulsarEndpoint.NonPersistent); + endpoint.Tenant.ShouldBe("t1"); + endpoint.Namespace.ShouldBe("ns1"); + endpoint.TopicName.ShouldBe("aaa"); + } + + [Fact] + public void uri_for_topic_string_preserves_realistic_topic_names() + { + var uri = PulsarEndpoint.UriFor("persistent://my-tenant/my-namespace/order.created.v2"); + var endpoint = new PulsarEndpoint(uri, null!); + + endpoint.Persistence.ShouldBe(PulsarEndpoint.Persistent); + endpoint.Tenant.ShouldBe("my-tenant"); + endpoint.Namespace.ShouldBe("my-namespace"); + endpoint.TopicName.ShouldBe("order.created.v2"); + } + + [Fact] + public void uri_for_bool_produces_retry_suffix_path_used_by_pulsar_listener() + { + // Mirrors the call pattern in PulsarListener.getRetryLetterTopicUri: + // PulsarEndpoint.UriFor(isPersistent, tenant, namespace, "{topic}-RETRY"). + var uri = PulsarEndpoint.UriFor(true, "public", "default", "orders-RETRY"); + uri.ShouldBe(new Uri("persistent://public/default/orders-RETRY")); + } + + [Fact] + public void uri_for_bool_produces_dlq_suffix_path_used_by_pulsar_listener() + { + // Mirrors the call pattern in PulsarListener.getDeadLetteredTopicUri: + // PulsarEndpoint.UriFor(isPersistent, tenant, namespace, "{topic}-DLQ"). + var uri = PulsarEndpoint.UriFor(true, "public", "default", "orders-DLQ"); + uri.ShouldBe(new Uri("persistent://public/default/orders-DLQ")); + } } \ No newline at end of file diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointUriTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointUriTests.cs new file mode 100644 index 000000000..c33a6ddb2 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointUriTests.cs @@ -0,0 +1,125 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.Pulsar.Tests; + +public class PulsarEndpointUriTests +{ + [Fact] + public void persistent_topic_returns_wolverine_endpoint_uri() + { + PulsarEndpointUri.PersistentTopic("public", "default", "orders") + .ShouldBe(new Uri("pulsar://persistent/public/default/orders")); + } + + [Fact] + public void non_persistent_topic_returns_wolverine_endpoint_uri() + { + PulsarEndpointUri.NonPersistentTopic("public", "default", "orders") + .ShouldBe(new Uri("pulsar://non-persistent/public/default/orders")); + } + + [Fact] + public void topic_with_persistent_flag_true_matches_persistent_topic() + { + PulsarEndpointUri.Topic("public", "default", "orders", persistent: true) + .ShouldBe(new Uri("pulsar://persistent/public/default/orders")); + } + + [Fact] + public void topic_with_persistent_flag_false_matches_non_persistent_topic() + { + PulsarEndpointUri.Topic("public", "default", "orders", persistent: false) + .ShouldBe(new Uri("pulsar://non-persistent/public/default/orders")); + } + + [Fact] + public void topic_from_pulsar_native_path_returns_wolverine_endpoint_uri() + { + PulsarEndpointUri.Topic("persistent://t1/ns1/aaa") + .ShouldBe(new Uri("pulsar://persistent/t1/ns1/aaa")); + } + + [Fact] + public void topic_from_non_persistent_native_path_returns_wolverine_endpoint_uri() + { + PulsarEndpointUri.Topic("non-persistent://t1/ns1/aaa") + .ShouldBe(new Uri("pulsar://non-persistent/t1/ns1/aaa")); + } + + [Theory] + [InlineData(null, "ns", "topic")] + [InlineData("", "ns", "topic")] + [InlineData(" ", "ns", "topic")] + [InlineData("tenant", null, "topic")] + [InlineData("tenant", "", "topic")] + [InlineData("tenant", " ", "topic")] + [InlineData("tenant", "ns", null)] + [InlineData("tenant", "ns", "")] + [InlineData("tenant", "ns", " ")] + public void persistent_topic_rejects_invalid_args(string? tenant, string? ns, string? topic) + { + Should.Throw(() => PulsarEndpointUri.PersistentTopic(tenant!, ns!, topic!)); + } + + [Theory] + [InlineData(null, "ns", "topic")] + [InlineData("", "ns", "topic")] + [InlineData("tenant", null, "topic")] + [InlineData("tenant", "ns", null)] + public void non_persistent_topic_rejects_invalid_args(string? tenant, string? ns, string? topic) + { + Should.Throw(() => PulsarEndpointUri.NonPersistentTopic(tenant!, ns!, topic!)); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void topic_from_path_rejects_invalid_path(string? path) + { + Should.Throw(() => PulsarEndpointUri.Topic(path!)); + } + + [Theory] + [InlineData("pulsar://persistent/t1/ns1/aaa")] // already a Wolverine endpoint URI + [InlineData("http://example.com/ns/topic")] // unsupported scheme + [InlineData("ftp://server/ns/topic")] // unsupported scheme + [InlineData("tcp://host:5000/a/b")] // unsupported scheme + public void topic_from_path_rejects_unsupported_scheme(string path) + { + Should.Throw(() => PulsarEndpointUri.Topic(path)); + } + + [Theory] + [InlineData("persistent://tenant")] // only tenant, missing ns and topic + [InlineData("persistent://tenant/ns")] // missing topic + [InlineData("persistent://tenant/ns/topic/extra")] // extra segment + [InlineData("non-persistent://tenant/ns/topic/extra")] // extra segment, non-persistent + public void topic_from_path_rejects_wrong_segment_count(string path) + { + Should.Throw(() => PulsarEndpointUri.Topic(path)); + } + + [Fact] + public void persistent_topic_uri_roundtrips_through_parser() + { + var uri = PulsarEndpointUri.PersistentTopic("public", "default", "orders"); + var endpoint = new PulsarEndpoint(uri, null!); + endpoint.Persistence.ShouldBe(PulsarEndpoint.Persistent); + endpoint.Tenant.ShouldBe("public"); + endpoint.Namespace.ShouldBe("default"); + endpoint.TopicName.ShouldBe("orders"); + } + + [Fact] + public void non_persistent_topic_uri_roundtrips_through_parser() + { + var uri = PulsarEndpointUri.NonPersistentTopic("public", "default", "orders"); + var endpoint = new PulsarEndpoint(uri, null!); + endpoint.Persistence.ShouldBe(PulsarEndpoint.NonPersistent); + endpoint.Tenant.ShouldBe("public"); + endpoint.Namespace.ShouldBe("default"); + endpoint.TopicName.ShouldBe("orders"); + } +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs index 9115d958a..f463a90b4 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs @@ -14,7 +14,7 @@ public async Task InitializeAsync() { var topic = Guid.NewGuid().ToString(); var topicPath = $"persistent://public/default/compliance{topic}"; - OutboundAddress = PulsarEndpoint.UriFor(topicPath); + OutboundAddress = PulsarEndpointUri.Topic(topicPath); await SenderIs(opts => { diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs index 26aa73faa..4f9f5d93c 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs @@ -14,7 +14,7 @@ public async Task InitializeAsync() { var topic = Guid.NewGuid().ToString(); var topicPath = $"persistent://public/default/compliance{topic}"; - OutboundAddress = PulsarEndpoint.UriFor(topicPath); + OutboundAddress = PulsarEndpointUri.Topic(topicPath); await SenderIs(opts => { diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs index 9146d3aee..1ca510d06 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs @@ -39,7 +39,7 @@ public void Dispose() [Fact] public void requeue_disabled() { - var uri = PulsarEndpoint.UriFor("persistent://public/default/one"); + var uri = PulsarEndpointUri.Topic("persistent://public/default/one"); var endpoint = theRuntime.Endpoints.EndpointFor(uri)?.As(); endpoint!.EnableRequeue.ShouldBeFalse(); @@ -48,7 +48,7 @@ public void requeue_disabled() [Fact] public void unsubscribe_on_close_disabled() { - var uri = PulsarEndpoint.UriFor("persistent://public/default/one"); + var uri = PulsarEndpointUri.Topic("persistent://public/default/one"); var endpoint = theRuntime.Endpoints.EndpointFor(uri)?.As(); endpoint!.UnsubscribeOnClose.ShouldBeFalse(); diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs index 940db87c9..ba9bfec81 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs @@ -48,17 +48,17 @@ protected override PulsarEnvelopeMapper buildMapper(IWolverineRuntime runtime) public bool IsPersistent => Persistence.Equals(Persistent); + [Obsolete("This overload returns a Pulsar-native topic path (persistent://...), not a Wolverine endpoint URI (pulsar://...). The native Pulsar client requires the former; PulsarEndpointUri produces the latter — they are NOT interchangeable, which is why this overload is not delegated. Build Pulsar-native topic paths directly. This helper will be removed in a future version.")] public static Uri UriFor(bool persistent, string tenant, string @namespace, string topicName) { var scheme = persistent ? "persistent" : "non-persistent"; return new Uri($"{scheme}://{tenant}/{@namespace}/{topicName}"); } + [Obsolete("Use PulsarEndpointUri.Topic instead. This helper will be removed in a future version.")] public static Uri UriFor(string topicPath) { - var uri = new Uri(topicPath); - return new Uri( - $"pulsar://{uri.Scheme}/{uri.Host}/{uri.Segments.Skip(1).Select(x => x.TrimEnd('/')).Join("/")}"); + return PulsarEndpointUri.Topic(topicPath); } public override IDictionary DescribeProperties() diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpointUri.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpointUri.cs new file mode 100644 index 000000000..82ddfb7f9 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpointUri.cs @@ -0,0 +1,100 @@ +namespace Wolverine.Pulsar; + +/// +/// Builds canonical Wolverine endpoint values for Pulsar transport endpoints. +/// All methods return Wolverine endpoint URIs of the form pulsar://persistent/{tenant}/{ns}/{topic} +/// or pulsar://non-persistent/{tenant}/{ns}/{topic}, matching what 's parser +/// accepts. For Pulsar-native topic-path strings (e.g. persistent://...) used by the native Pulsar client, +/// build them directly — they are not Wolverine endpoint URIs and are out of scope for this helper. +/// +public static class PulsarEndpointUri +{ + /// + /// Builds a URI referencing a Pulsar persistent-topic endpoint in the canonical form + /// pulsar://persistent/{tenant}/{namespace}/{topicName}. + /// + /// The Pulsar tenant. + /// The Pulsar namespace. + /// The Pulsar topic name. + /// A of the form pulsar://persistent/{tenant}/{namespace}/{topicName}. + /// PulsarEndpointUri.PersistentTopic("public", "default", "orders") returns pulsar://persistent/public/default/orders. + /// Thrown when any parameter is null, empty, or whitespace. + public static Uri PersistentTopic(string tenant, string @namespace, string topicName) + { + return BuildEndpointUri(PulsarEndpoint.Persistent, tenant, @namespace, topicName); + } + + /// + /// Builds a URI referencing a Pulsar non-persistent-topic endpoint in the canonical form + /// pulsar://non-persistent/{tenant}/{namespace}/{topicName}. + /// + /// The Pulsar tenant. + /// The Pulsar namespace. + /// The Pulsar topic name. + /// A of the form pulsar://non-persistent/{tenant}/{namespace}/{topicName}. + /// PulsarEndpointUri.NonPersistentTopic("public", "default", "orders") returns pulsar://non-persistent/public/default/orders. + /// Thrown when any parameter is null, empty, or whitespace. + public static Uri NonPersistentTopic(string tenant, string @namespace, string topicName) + { + return BuildEndpointUri(PulsarEndpoint.NonPersistent, tenant, @namespace, topicName); + } + + /// + /// Builds a URI referencing a Pulsar topic endpoint, selecting persistent or non-persistent by flag. + /// + /// The Pulsar tenant. + /// The Pulsar namespace. + /// The Pulsar topic name. + /// true for a persistent topic, false for non-persistent. + /// A of the form pulsar://persistent/... or pulsar://non-persistent/.... + /// PulsarEndpointUri.Topic("public", "default", "orders", persistent: true) returns pulsar://persistent/public/default/orders. + /// Thrown when any string parameter is null, empty, or whitespace. + public static Uri Topic(string tenant, string @namespace, string topicName, bool persistent) + { + var scheme = persistent ? PulsarEndpoint.Persistent : PulsarEndpoint.NonPersistent; + return BuildEndpointUri(scheme, tenant, @namespace, topicName); + } + + /// + /// Converts a Pulsar-native topic path (e.g. persistent://tenant/ns/topic) into a + /// Wolverine endpoint URI of the form pulsar://persistent/tenant/ns/topic. + /// + /// A Pulsar-native topic path string with scheme persistent or non-persistent and exactly three path components (tenant, namespace, topic). + /// A of the form pulsar://{persistence}/{tenant}/{namespace}/{topicName}. + /// PulsarEndpointUri.Topic("persistent://t1/ns1/aaa") returns pulsar://persistent/t1/ns1/aaa. + /// Thrown when is null, empty, whitespace, uses a scheme other than persistent/non-persistent, or does not contain exactly tenant/namespace/topic path parts. + /// Thrown when is not a parseable URI. + public static Uri Topic(string topicPath) + { + ArgumentException.ThrowIfNullOrWhiteSpace(topicPath); + + var parsed = new Uri(topicPath); + + if (parsed.Scheme != PulsarEndpoint.Persistent && parsed.Scheme != PulsarEndpoint.NonPersistent) + { + throw new ArgumentException( + $"topicPath must use scheme '{PulsarEndpoint.Persistent}' or '{PulsarEndpoint.NonPersistent}', got '{parsed.Scheme}'. Use PulsarEndpointUri.PersistentTopic or NonPersistentTopic to build Wolverine endpoint URIs from components.", + nameof(topicPath)); + } + + var pathParts = parsed.AbsolutePath.Trim('/').Split('/', StringSplitOptions.RemoveEmptyEntries); + if (pathParts.Length != 2) + { + throw new ArgumentException( + $"topicPath must have the form '{{scheme}}://{{tenant}}/{{namespace}}/{{topic}}', got '{topicPath}'.", + nameof(topicPath)); + } + + return parsed.Scheme == PulsarEndpoint.Persistent + ? PersistentTopic(parsed.Host, pathParts[0], pathParts[1]) + : NonPersistentTopic(parsed.Host, pathParts[0], pathParts[1]); + } + + private static Uri BuildEndpointUri(string persistence, string tenant, string @namespace, string topicName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenant); + ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); + ArgumentException.ThrowIfNullOrWhiteSpace(topicName); + return new Uri($"pulsar://{persistence}/{tenant}/{@namespace}/{topicName}"); + } +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs index 8ffeb0062..0e224fea1 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs @@ -138,16 +138,20 @@ private IConsumer> createRetryConsumer(PulsarEndpoint end private Uri? getRetryLetterTopicUri(PulsarEndpoint endpoint) { +#pragma warning disable CS0618 // Pulsar-native topic-path form is required by the Pulsar client return NativeRetryLetterQueueEnabled ? PulsarEndpoint.UriFor(endpoint.IsPersistent, endpoint.Tenant, endpoint.Namespace, endpoint.RetryLetterTopic?.TopicName ?? $"{endpoint.TopicName}-RETRY") : null; +#pragma warning restore CS0618 } private Uri getDeadLetteredTopicUri(PulsarEndpoint endpoint) { +#pragma warning disable CS0618 // Pulsar-native topic-path form is required by the Pulsar client var topicDql = PulsarEndpoint.UriFor(endpoint.IsPersistent, endpoint.Tenant, endpoint.Namespace, endpoint.DeadLetterTopic?.TopicName ?? $"{endpoint.TopicName}-DLQ"); +#pragma warning restore CS0618 return topicDql; } diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs index 1604b9b52..1c52055d6 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs @@ -95,7 +95,7 @@ public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) public PulsarEndpoint EndpointFor(string topicPath) { - var uri = PulsarEndpoint.UriFor(topicPath); + var uri = PulsarEndpointUri.Topic(topicPath); return this[uri]; } -} \ No newline at end of file +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs index a82d1abe9..e41ac473b 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs @@ -79,7 +79,7 @@ public static PulsarSubscriberConfiguration ToPulsarTopic(this IPublishToExpress /// public static PulsarListenerConfiguration ListenToPulsarTopic(this WolverineOptions endpoints, string topicPath) { - var uri = PulsarEndpoint.UriFor(topicPath); + var uri = PulsarEndpointUri.Topic(topicPath); var endpoint = endpoints.PulsarTransport()[uri]; endpoint.IsListener = true; return new PulsarListenerConfiguration(endpoint); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqEndpointUriTests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqEndpointUriTests.cs new file mode 100644 index 000000000..a53aa39c2 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqEndpointUriTests.cs @@ -0,0 +1,113 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +public class RabbitMqEndpointUriTests +{ + [Fact] + public void queue_uri_has_expected_shape() + { + RabbitMqEndpointUri.Queue("orders") + .ShouldBe(new Uri("rabbitmq://queue/orders")); + } + + [Fact] + public void exchange_uri_has_expected_shape() + { + RabbitMqEndpointUri.Exchange("events") + .ShouldBe(new Uri("rabbitmq://exchange/events")); + } + + [Fact] + public void topic_uri_has_expected_shape() + { + RabbitMqEndpointUri.Topic("prices", "usd.eur") + .ShouldBe(new Uri("rabbitmq://topic/prices/usd.eur")); + } + + [Fact] + public void routing_uri_has_expected_shape() + { + RabbitMqEndpointUri.Routing("events", "order.created") + .ShouldBe(new Uri("rabbitmq://exchange/events/routing/order.created")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void queue_rejects_invalid_name(string? name) + { + Should.Throw(() => RabbitMqEndpointUri.Queue(name!)); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void exchange_rejects_invalid_name(string? name) + { + Should.Throw(() => RabbitMqEndpointUri.Exchange(name!)); + } + + [Theory] + [InlineData(null, "key")] + [InlineData("", "key")] + [InlineData(" ", "key")] + [InlineData("ex", null)] + [InlineData("ex", "")] + [InlineData("ex", " ")] + public void topic_rejects_invalid_args(string? exchange, string? key) + { + Should.Throw(() => RabbitMqEndpointUri.Topic(exchange!, key!)); + } + + [Theory] + [InlineData(null, "key")] + [InlineData("", "key")] + [InlineData(" ", "key")] + [InlineData("ex", null)] + [InlineData("ex", "")] + [InlineData("ex", " ")] + public void routing_rejects_invalid_args(string? exchange, string? key) + { + Should.Throw(() => RabbitMqEndpointUri.Routing(exchange!, key!)); + } + + [Fact] + public void queue_uri_roundtrips_through_parser() + { + var uri = RabbitMqEndpointUri.Queue("orders"); + var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } + + [Fact] + public void exchange_uri_roundtrips_through_parser() + { + var uri = RabbitMqEndpointUri.Exchange("events"); + var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } + + [Fact] + public void topic_uri_roundtrips_through_parser() + { + var uri = RabbitMqEndpointUri.Topic("prices", "usd.eur"); + var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } + + [Fact] + public void routing_uri_roundtrips_through_parser() + { + var uri = RabbitMqEndpointUri.Routing("events", "order.created"); + var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqEndpointUri.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqEndpointUri.cs new file mode 100644 index 000000000..d2cca10ba --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqEndpointUri.cs @@ -0,0 +1,67 @@ +namespace Wolverine.RabbitMQ; + +/// +/// Builds canonical Wolverine endpoint values for RabbitMQ transport endpoints. +/// +public static class RabbitMqEndpointUri +{ + /// + /// Builds a URI referencing a RabbitMQ queue endpoint in the canonical form + /// rabbitmq://queue/{queueName}. + /// + /// The RabbitMQ queue name. + /// A of the form rabbitmq://queue/{queueName}. + /// RabbitMqEndpointUri.Queue("orders") returns rabbitmq://queue/orders. + /// Thrown when is null, empty, or whitespace. + public static Uri Queue(string queueName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(queueName); + return new Uri($"rabbitmq://queue/{queueName}"); + } + + /// + /// Builds a URI referencing a RabbitMQ exchange endpoint in the canonical form + /// rabbitmq://exchange/{exchangeName}. + /// + /// The RabbitMQ exchange name. + /// A of the form rabbitmq://exchange/{exchangeName}. + /// RabbitMqEndpointUri.Exchange("events") returns rabbitmq://exchange/events. + /// Thrown when is null, empty, or whitespace. + public static Uri Exchange(string exchangeName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(exchangeName); + return new Uri($"rabbitmq://exchange/{exchangeName}"); + } + + /// + /// Builds a URI referencing a RabbitMQ topic-routed exchange endpoint in the canonical form + /// rabbitmq://topic/{exchangeName}/{routingKey}. + /// + /// The RabbitMQ exchange name. + /// The topic routing key. + /// A of the form rabbitmq://topic/{exchangeName}/{routingKey}. + /// RabbitMqEndpointUri.Topic("prices", "usd.eur") returns rabbitmq://topic/prices/usd.eur. + /// Thrown when either parameter is null, empty, or whitespace. + public static Uri Topic(string exchangeName, string routingKey) + { + ArgumentException.ThrowIfNullOrWhiteSpace(exchangeName); + ArgumentException.ThrowIfNullOrWhiteSpace(routingKey); + return new Uri($"rabbitmq://topic/{exchangeName}/{routingKey}"); + } + + /// + /// Builds a URI referencing a RabbitMQ exchange with an explicit routing key in the canonical form + /// rabbitmq://exchange/{exchangeName}/routing/{routingKey}. + /// + /// The RabbitMQ exchange name. + /// The routing key bound to the exchange. + /// A of the form rabbitmq://exchange/{exchangeName}/routing/{routingKey}. + /// RabbitMqEndpointUri.Routing("events", "order.created") returns rabbitmq://exchange/events/routing/order.created. + /// Thrown when either parameter is null, empty, or whitespace. + public static Uri Routing(string exchangeName, string routingKey) + { + ArgumentException.ThrowIfNullOrWhiteSpace(exchangeName); + ArgumentException.ThrowIfNullOrWhiteSpace(routingKey); + return new Uri($"rabbitmq://exchange/{exchangeName}/routing/{routingKey}"); + } +} diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs index 0ea8a5929..766c62223 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs +++ b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs @@ -116,8 +116,8 @@ public static async Task configure_with_uri_helpers() #region sample_redis_uri_helpers // Using URI builder helpers - var ordersUri = RedisTransport.BuildRedisStreamUri("orders", databaseId: 1); - var paymentsUri = RedisTransport.BuildRedisStreamUri("payments", databaseId: 2, "payment-processors"); + var ordersUri = RedisEndpointUri.Stream("orders", databaseId: 1); + var paymentsUri = RedisEndpointUri.Stream("payments", databaseId: 2, "payment-processors"); using var host = await Host.CreateDefaultBuilder() .UseWolverine(opts => diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisEndpointUriTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisEndpointUriTests.cs new file mode 100644 index 000000000..7c9df3fe5 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisEndpointUriTests.cs @@ -0,0 +1,70 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class RedisEndpointUriTests +{ + [Fact] + public void stream_uri_has_expected_shape_with_default_database() + { + RedisEndpointUri.Stream("orders") + .ShouldBe(new Uri("redis://stream/0/orders")); + } + + [Fact] + public void stream_uri_has_expected_shape_with_explicit_database() + { + RedisEndpointUri.Stream("orders", databaseId: 3) + .ShouldBe(new Uri("redis://stream/3/orders")); + } + + [Fact] + public void stream_uri_includes_consumer_group_query_parameter() + { + RedisEndpointUri.Stream("orders", 3, "order-processors") + .ShouldBe(new Uri("redis://stream/3/orders?consumerGroup=order-processors")); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void stream_rejects_invalid_key(string? key) + { + Should.Throw(() => RedisEndpointUri.Stream(key!)); + } + + [Fact] + public void stream_rejects_negative_database_id() + { + Should.Throw(() => RedisEndpointUri.Stream("orders", databaseId: -1)); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + [InlineData(" ")] + public void stream_with_consumer_group_rejects_invalid_group(string? group) + { + Should.Throw(() => RedisEndpointUri.Stream("orders", 0, group!)); + } + + [Fact] + public void stream_uri_roundtrips_through_parser() + { + var uri = RedisEndpointUri.Stream("orders", databaseId: 3); + var transport = new Wolverine.Redis.Internal.RedisTransport(); + var endpoint = transport.GetOrCreateEndpoint(uri); + endpoint.Uri.ShouldBe(uri); + } + + [Fact] + public void consumer_group_uri_roundtrips_through_parser() + { + var uri = RedisEndpointUri.Stream("orders", databaseId: 3, consumerGroup: "order-processors"); + var transport = new Wolverine.Redis.Internal.RedisTransport(); + var endpoint = (Wolverine.Redis.Internal.RedisStreamEndpoint)transport.GetOrCreateEndpoint(uri); + endpoint.ConsumerGroup.ShouldBe("order-processors"); + } +} diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs index 4e7de2cee..972633fcf 100644 --- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs @@ -293,25 +293,27 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) } /// - /// Helper method to create a Redis stream URI with database ID + /// Helper method to create a Redis stream URI with database ID. /// /// Redis stream key name /// Redis database ID /// Formatted Redis stream URI + [Obsolete("Use RedisEndpointUri.Stream instead. This helper will be removed in a future version.")] public static Uri BuildRedisStreamUri(string streamKey, int databaseId = 0) { - return new Uri($"redis://stream/{databaseId}/{streamKey}"); + return RedisEndpointUri.Stream(streamKey, databaseId); } /// - /// Helper method to create a Redis stream URI with database ID and consumer group + /// Helper method to create a Redis stream URI with database ID and consumer group. /// /// Redis stream key name /// Redis database ID /// Consumer group name /// Formatted Redis stream URI + [Obsolete("Use RedisEndpointUri.Stream instead. This helper will be removed in a future version.")] public static Uri BuildRedisStreamUri(string streamKey, int databaseId, string consumerGroup) { - return new Uri($"redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}"); + return RedisEndpointUri.Stream(streamKey, databaseId, consumerGroup); } } diff --git a/src/Transports/Redis/Wolverine.Redis/RedisEndpointUri.cs b/src/Transports/Redis/Wolverine.Redis/RedisEndpointUri.cs new file mode 100644 index 000000000..acad7219a --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/RedisEndpointUri.cs @@ -0,0 +1,43 @@ +namespace Wolverine.Redis; + +/// +/// Builds canonical Wolverine endpoint values for Redis stream transport endpoints. +/// +public static class RedisEndpointUri +{ + /// + /// Builds a URI referencing a Redis stream endpoint in the canonical form + /// redis://stream/{databaseId}/{streamKey}. + /// + /// The Redis stream key name. + /// The Redis database ID (default 0). + /// A of the form redis://stream/{databaseId}/{streamKey}. + /// RedisEndpointUri.Stream("orders", 3) returns redis://stream/3/orders. + /// Thrown when is null, empty, or whitespace. + /// Thrown when is negative. + public static Uri Stream(string streamKey, int databaseId = 0) + { + ArgumentException.ThrowIfNullOrWhiteSpace(streamKey); + ArgumentOutOfRangeException.ThrowIfNegative(databaseId); + return new Uri($"redis://stream/{databaseId}/{streamKey}"); + } + + /// + /// Builds a URI referencing a Redis stream endpoint with a consumer group in the canonical form + /// redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}. + /// + /// The Redis stream key name. + /// The Redis database ID. + /// The Redis consumer group name. + /// A of the form redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}. + /// RedisEndpointUri.Stream("orders", 3, "order-processors") returns redis://stream/3/orders?consumerGroup=order-processors. + /// Thrown when or is null, empty, or whitespace. + /// Thrown when is negative. + public static Uri Stream(string streamKey, int databaseId, string consumerGroup) + { + ArgumentException.ThrowIfNullOrWhiteSpace(streamKey); + ArgumentOutOfRangeException.ThrowIfNegative(databaseId); + ArgumentException.ThrowIfNullOrWhiteSpace(consumerGroup); + return new Uri($"redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}"); + } +}