diff --git a/docs/guide/messaging/transports/azureservicebus/index.md b/docs/guide/messaging/transports/azureservicebus/index.md
index 285848ec0..d24c09590 100644
--- a/docs/guide/messaging/transports/azureservicebus/index.md
+++ b/docs/guide/messaging/transports/azureservicebus/index.md
@@ -146,8 +146,20 @@ builder.UseWolverine(opts =>
snippet source | anchor
+## URI reference
+The `AzureServiceBusEndpointUri` helper class builds canonical endpoint URIs:
+| URI form | Helper call |
+|---|---|
+| `asb://queue/{name}` | `AzureServiceBusEndpointUri.Queue("name")` |
+| `asb://topic/{name}` | `AzureServiceBusEndpointUri.Topic("name")` |
+| `asb://topic/{topic}/{subscription}` | `AzureServiceBusEndpointUri.Subscription("topic", "sub")` |
+```csharp
+using Wolverine.AzureServiceBus;
+
+var uri = AzureServiceBusEndpointUri.Subscription("events", "audit");
+```
diff --git a/docs/guide/messaging/transports/gcp-pubsub/index.md b/docs/guide/messaging/transports/gcp-pubsub/index.md
index dcf563b27..b42327591 100644
--- a/docs/guide/messaging/transports/gcp-pubsub/index.md
+++ b/docs/guide/messaging/transports/gcp-pubsub/index.md
@@ -100,3 +100,17 @@ opts.UsePubsub("your-project-id")
```
The default delimiter between the prefix and the original name is `.` for GCP Pub/Sub (e.g., `dev-john.orders`).
+
+## URI reference
+
+The `GcpPubsubEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `pubsub://{projectId}/{topicName}` | `GcpPubsubEndpointUri.Topic("projectId", "topicName")` |
+
+```csharp
+using Wolverine.Pubsub;
+
+var uri = GcpPubsubEndpointUri.Topic("my-project", "orders");
+```
diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md
index 509e7353e..31f354ba1 100644
--- a/docs/guide/messaging/transports/kafka.md
+++ b/docs/guide/messaging/transports/kafka.md
@@ -678,3 +678,17 @@ await bus.BroadcastToTopicAsync("my-topic", new KafkaTombstone("record-key-to-de
When Wolverine encounters a `KafkaTombstone` message, it produces a Kafka message with the specified key and a `null` value. This signals to Kafka's log compaction process that the record with that key should be removed during the next compaction cycle.
This is useful when your Kafka topics use [log compaction](https://docs.confluent.io/platform/current/kafka/design.html#log-compaction) to maintain a key-value snapshot of the latest state. Publishing a tombstone ensures that deleted records are eventually cleaned up from the topic.
+
+## URI reference
+
+The `KafkaEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `kafka://topic/{name}` | `KafkaEndpointUri.Topic("name")` |
+
+```csharp
+using Wolverine.Kafka;
+
+var uri = KafkaEndpointUri.Topic("orders");
+```
diff --git a/docs/guide/messaging/transports/mqtt.md b/docs/guide/messaging/transports/mqtt.md
index 14c07dacd..8943f24db 100644
--- a/docs/guide/messaging/transports/mqtt.md
+++ b/docs/guide/messaging/transports/mqtt.md
@@ -466,3 +466,16 @@ await host.StartAsync();
snippet source | anchor
+## URI reference
+
+The `MqttEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `mqtt://topic/{name}` | `MqttEndpointUri.Topic("name")` |
+
+```csharp
+using Wolverine.MQTT;
+
+var uri = MqttEndpointUri.Topic("sensor/temperature");
+```
diff --git a/docs/guide/messaging/transports/nats.md b/docs/guide/messaging/transports/nats.md
index 1d12acb1d..5014ad4b8 100644
--- a/docs/guide/messaging/transports/nats.md
+++ b/docs/guide/messaging/transports/nats.md
@@ -423,3 +423,17 @@ docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:latest --jetstream -m 8
# For scheduled delivery tests, use NATS 2.12+
docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:2.12-alpine --jetstream -m 8222
```
+
+## URI reference
+
+The `NatsEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `nats://subject/{subject}` | `NatsEndpointUri.Subject("subject")` |
+
+```csharp
+using Wolverine.Nats;
+
+var uri = NatsEndpointUri.Subject("orders.created");
+```
diff --git a/docs/guide/messaging/transports/pulsar.md b/docs/guide/messaging/transports/pulsar.md
index 37031c290..d92b2c7b6 100644
--- a/docs/guide/messaging/transports/pulsar.md
+++ b/docs/guide/messaging/transports/pulsar.md
@@ -159,3 +159,24 @@ Also see the more generic [Wolverine Guide on Interoperability](/tutorials/inter
:::
Pulsar interoperability is done through the `IPulsarEnvelopeMapper` interface.
+
+## URI reference
+
+The `PulsarEndpointUri` helper class produces Wolverine endpoint URIs of the form `pulsar://persistent/{tenant}/{ns}/{topic}` or `pulsar://non-persistent/{tenant}/{ns}/{topic}` — the form Wolverine's parser accepts. Pulsar-native topic-path strings (`persistent://...`) used by the native Pulsar client are a separate concept and are not built by this helper.
+
+| Helper call | Resulting URI |
+|---|---|
+| `PulsarEndpointUri.PersistentTopic("public", "default", "orders")` | `pulsar://persistent/public/default/orders` |
+| `PulsarEndpointUri.NonPersistentTopic("public", "default", "orders")` | `pulsar://non-persistent/public/default/orders` |
+| `PulsarEndpointUri.Topic("public", "default", "orders", persistent: true)` | `pulsar://persistent/public/default/orders` |
+| `PulsarEndpointUri.Topic("persistent://public/default/orders")` | `pulsar://persistent/public/default/orders` |
+
+```csharp
+using Wolverine.Pulsar;
+
+var uri = PulsarEndpointUri.PersistentTopic("public", "default", "orders");
+```
+
+::: tip
+`PulsarEndpoint.UriFor` is deprecated. Use `PulsarEndpointUri.Topic` (string overload) or `PersistentTopic`/`NonPersistentTopic` instead. The old `UriFor(bool, ...)` overload returned a Pulsar-native topic path, not a Wolverine endpoint URI — if you need that format, build the string directly.
+:::
diff --git a/docs/guide/messaging/transports/rabbitmq/index.md b/docs/guide/messaging/transports/rabbitmq/index.md
index a1b14a3b9..b3789dba3 100644
--- a/docs/guide/messaging/transports/rabbitmq/index.md
+++ b/docs/guide/messaging/transports/rabbitmq/index.md
@@ -272,3 +272,21 @@ This creates RabbitMQ queues named `sequenced1` through `sequenced5` with compan
::: info
Wolverine with the `WolverineFX.RabbitMQ` transport has also been verified to work against [LavinMQ](https://lavinmq.com/), a modern RabbitMQ-protocol compatible message broker, using the RabbitMQ transport with 100% protocol compatibility when configured through the standard RabbitMQ integration shown above.
:::
+
+## URI reference
+
+The `RabbitMqEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `rabbitmq://queue/{name}` | `RabbitMqEndpointUri.Queue("name")` |
+| `rabbitmq://exchange/{name}` | `RabbitMqEndpointUri.Exchange("name")` |
+| `rabbitmq://topic/{exchange}/{routingKey}` | `RabbitMqEndpointUri.Topic("ex", "key")` |
+| `rabbitmq://exchange/{exchange}/routing/{routingKey}` | `RabbitMqEndpointUri.Routing("ex", "key")` |
+
+```csharp
+using Wolverine.RabbitMQ;
+
+var uri = RabbitMqEndpointUri.Queue("orders");
+// new Uri("rabbitmq://queue/orders")
+```
diff --git a/docs/guide/messaging/transports/redis.md b/docs/guide/messaging/transports/redis.md
index 29e5b86ae..54d468766 100644
--- a/docs/guide/messaging/transports/redis.md
+++ b/docs/guide/messaging/transports/redis.md
@@ -259,4 +259,21 @@ using var host = await builder.UseWolverine(opts =>
snippet source | anchor
+## URI reference
+The `RedisEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `redis://stream/{databaseId}/{streamKey}` | `RedisEndpointUri.Stream("key", databaseId: 0)` |
+| `redis://stream/{databaseId}/{streamKey}?consumerGroup={group}` | `RedisEndpointUri.Stream("key", 0, "group")` |
+
+```csharp
+using Wolverine.Redis;
+
+var uri = RedisEndpointUri.Stream("orders", databaseId: 3);
+```
+
+::: tip
+`RedisTransport.BuildRedisStreamUri` is deprecated. Use `RedisEndpointUri.Stream` instead.
+:::
diff --git a/docs/guide/messaging/transports/sns.md b/docs/guide/messaging/transports/sns.md
index c61dcfaeb..d72268752 100644
--- a/docs/guide/messaging/transports/sns.md
+++ b/docs/guide/messaging/transports/sns.md
@@ -199,3 +199,19 @@ Also see the more generic [Wolverine Guide on Interoperability](/tutorials/inter
SNS interoperability is done through the `ISnsEnvelopeMapper`. At this point, SNS supports interoperability through
MassTransit, NServiceBus, CloudEvents, or user defined mapping strategies.
+
+## URI reference
+
+The `SnsEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `sns://{name}` | `SnsEndpointUri.Topic("name")` |
+
+```csharp
+using Wolverine.AmazonSns;
+
+var uri = SnsEndpointUri.Topic("events");
+// FIFO topic (suffix preserved verbatim):
+var fifoUri = SnsEndpointUri.Topic("events.fifo");
+```
diff --git a/docs/guide/messaging/transports/sqs/index.md b/docs/guide/messaging/transports/sqs/index.md
index c2444eb89..ae6595ebb 100644
--- a/docs/guide/messaging/transports/sqs/index.md
+++ b/docs/guide/messaging/transports/sqs/index.md
@@ -264,3 +264,19 @@ using var host = await Host.CreateDefaultBuilder()
opts.PublishAllMessages().ToSqsQueue("send-and-receive");
}).StartAsync();
```
+
+## URI reference
+
+The `SqsEndpointUri` helper class builds canonical endpoint URIs:
+
+| URI form | Helper call |
+|---|---|
+| `sqs://{name}` | `SqsEndpointUri.Queue("name")` |
+
+```csharp
+using Wolverine.AmazonSqs;
+
+var uri = SqsEndpointUri.Queue("orders");
+// FIFO queue (suffix preserved verbatim):
+var fifoUri = SqsEndpointUri.Queue("orders.fifo");
+```
diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/SnsEndpointUriTests.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/SnsEndpointUriTests.cs
new file mode 100644
index 000000000..1efda575c
--- /dev/null
+++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/SnsEndpointUriTests.cs
@@ -0,0 +1,39 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.AmazonSns.Tests;
+
+public class SnsEndpointUriTests
+{
+ [Fact]
+ public void topic_uri_has_expected_shape()
+ {
+ SnsEndpointUri.Topic("events")
+ .ShouldBe(new Uri("sns://events"));
+ }
+
+ [Fact]
+ public void topic_uri_preserves_fifo_suffix()
+ {
+ SnsEndpointUri.Topic("events.fifo")
+ .ShouldBe(new Uri("sns://events.fifo"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void topic_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => SnsEndpointUri.Topic(name!));
+ }
+
+ [Fact]
+ public void topic_uri_roundtrips_through_parser()
+ {
+ var uri = SnsEndpointUri.Topic("events");
+ var transport = new Wolverine.AmazonSns.Internal.AmazonSnsTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+}
diff --git a/src/Transports/AWS/Wolverine.AmazonSns/SnsEndpointUri.cs b/src/Transports/AWS/Wolverine.AmazonSns/SnsEndpointUri.cs
new file mode 100644
index 000000000..bdf20e496
--- /dev/null
+++ b/src/Transports/AWS/Wolverine.AmazonSns/SnsEndpointUri.cs
@@ -0,0 +1,21 @@
+namespace Wolverine.AmazonSns;
+
+///
+/// Builds canonical Wolverine endpoint values for AWS SNS transport endpoints.
+///
+public static class SnsEndpointUri
+{
+ ///
+ /// Builds a URI referencing an SNS topic endpoint in the canonical form
+ /// sns://{topicName}. FIFO topic names (with .fifo suffix) are preserved verbatim.
+ ///
+ /// The SNS topic name.
+ /// A of the form sns://{topicName}.
+ /// SnsEndpointUri.Topic("events") returns sns://events.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Topic(string topicName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicName);
+ return new Uri($"sns://{topicName}");
+ }
+}
diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/SqsEndpointUriTests.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/SqsEndpointUriTests.cs
new file mode 100644
index 000000000..831db7b0d
--- /dev/null
+++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/SqsEndpointUriTests.cs
@@ -0,0 +1,39 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.AmazonSqs.Tests;
+
+public class SqsEndpointUriTests
+{
+ [Fact]
+ public void queue_uri_has_expected_shape()
+ {
+ SqsEndpointUri.Queue("orders")
+ .ShouldBe(new Uri("sqs://orders"));
+ }
+
+ [Fact]
+ public void queue_uri_preserves_fifo_suffix()
+ {
+ SqsEndpointUri.Queue("orders.fifo")
+ .ShouldBe(new Uri("sqs://orders.fifo"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void queue_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => SqsEndpointUri.Queue(name!));
+ }
+
+ [Fact]
+ public void queue_uri_roundtrips_through_parser()
+ {
+ var uri = SqsEndpointUri.Queue("orders");
+ var transport = new Wolverine.AmazonSqs.Internal.AmazonSqsTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+}
diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs b/src/Transports/AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs
new file mode 100644
index 000000000..4d52d6328
--- /dev/null
+++ b/src/Transports/AWS/Wolverine.AmazonSqs/SqsEndpointUri.cs
@@ -0,0 +1,21 @@
+namespace Wolverine.AmazonSqs;
+
+///
+/// Builds canonical Wolverine endpoint values for AWS SQS transport endpoints.
+///
+public static class SqsEndpointUri
+{
+ ///
+ /// Builds a URI referencing an SQS queue endpoint in the canonical form
+ /// sqs://{queueName}. FIFO queue names (with .fifo suffix) are preserved verbatim.
+ ///
+ /// The SQS queue name.
+ /// A of the form sqs://{queueName}.
+ /// SqsEndpointUri.Queue("orders") returns sqs://orders.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Queue(string queueName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(queueName);
+ return new Uri($"sqs://{queueName}");
+ }
+}
diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/AzureServiceBusEndpointUriTests.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/AzureServiceBusEndpointUriTests.cs
new file mode 100644
index 000000000..e703c20b1
--- /dev/null
+++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/AzureServiceBusEndpointUriTests.cs
@@ -0,0 +1,85 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.AzureServiceBus.Tests;
+
+public class AzureServiceBusEndpointUriTests
+{
+ [Fact]
+ public void queue_uri_has_expected_shape()
+ {
+ AzureServiceBusEndpointUri.Queue("orders")
+ .ShouldBe(new Uri("asb://queue/orders"));
+ }
+
+ [Fact]
+ public void topic_uri_has_expected_shape()
+ {
+ AzureServiceBusEndpointUri.Topic("events")
+ .ShouldBe(new Uri("asb://topic/events"));
+ }
+
+ [Fact]
+ public void subscription_uri_has_expected_shape()
+ {
+ AzureServiceBusEndpointUri.Subscription("events", "audit")
+ .ShouldBe(new Uri("asb://topic/events/audit"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void queue_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => AzureServiceBusEndpointUri.Queue(name!));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void topic_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => AzureServiceBusEndpointUri.Topic(name!));
+ }
+
+ [Theory]
+ [InlineData(null, "sub")]
+ [InlineData("", "sub")]
+ [InlineData(" ", "sub")]
+ [InlineData("topic", null)]
+ [InlineData("topic", "")]
+ [InlineData("topic", " ")]
+ public void subscription_rejects_invalid_args(string? topic, string? sub)
+ {
+ Should.Throw(() => AzureServiceBusEndpointUri.Subscription(topic!, sub!));
+ }
+
+ [Fact]
+ public void queue_uri_roundtrips_through_parser()
+ {
+ var uri = AzureServiceBusEndpointUri.Queue("orders");
+ var transport = new AzureServiceBusTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+
+ [Fact]
+ public void topic_uri_roundtrips_through_parser()
+ {
+ var uri = AzureServiceBusEndpointUri.Topic("events");
+ var transport = new AzureServiceBusTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+
+ [Fact]
+ public void subscription_uri_roundtrips_through_parser()
+ {
+ var uri = AzureServiceBusEndpointUri.Subscription("events", "audit");
+ var transport = new AzureServiceBusTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+}
diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusEndpointUri.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusEndpointUri.cs
new file mode 100644
index 000000000..1b2e7a220
--- /dev/null
+++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusEndpointUri.cs
@@ -0,0 +1,51 @@
+namespace Wolverine.AzureServiceBus;
+
+///
+/// Builds canonical Wolverine endpoint values for Azure Service Bus transport endpoints.
+///
+public static class AzureServiceBusEndpointUri
+{
+ ///
+ /// Builds a URI referencing an Azure Service Bus queue endpoint in the canonical form
+ /// asb://queue/{queueName}.
+ ///
+ /// The Azure Service Bus queue name.
+ /// A of the form asb://queue/{queueName}.
+ /// AzureServiceBusEndpointUri.Queue("orders") returns asb://queue/orders.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Queue(string queueName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(queueName);
+ return new Uri($"asb://queue/{queueName}");
+ }
+
+ ///
+ /// Builds a URI referencing an Azure Service Bus topic endpoint in the canonical form
+ /// asb://topic/{topicName}.
+ ///
+ /// The Azure Service Bus topic name.
+ /// A of the form asb://topic/{topicName}.
+ /// AzureServiceBusEndpointUri.Topic("events") returns asb://topic/events.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Topic(string topicName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicName);
+ return new Uri($"asb://topic/{topicName}");
+ }
+
+ ///
+ /// Builds a URI referencing an Azure Service Bus topic subscription endpoint in the canonical form
+ /// asb://topic/{topicName}/{subscriptionName}.
+ ///
+ /// The Azure Service Bus topic name.
+ /// The subscription name bound to the topic.
+ /// A of the form asb://topic/{topicName}/{subscriptionName}.
+ /// AzureServiceBusEndpointUri.Subscription("events", "audit") returns asb://topic/events/audit.
+ /// Thrown when either parameter is null, empty, or whitespace.
+ public static Uri Subscription(string topicName, string subscriptionName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicName);
+ ArgumentException.ThrowIfNullOrWhiteSpace(subscriptionName);
+ return new Uri($"asb://topic/{topicName}/{subscriptionName}");
+ }
+}
diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/GcpPubsubEndpointUriTests.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/GcpPubsubEndpointUriTests.cs
new file mode 100644
index 000000000..57cdccd80
--- /dev/null
+++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/GcpPubsubEndpointUriTests.cs
@@ -0,0 +1,26 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.Pubsub.Tests;
+
+public class GcpPubsubEndpointUriTests
+{
+ [Fact]
+ public void topic_uri_has_expected_shape()
+ {
+ GcpPubsubEndpointUri.Topic("my-project", "orders")
+ .ShouldBe(new Uri("pubsub://my-project/orders"));
+ }
+
+ [Theory]
+ [InlineData(null, "orders")]
+ [InlineData("", "orders")]
+ [InlineData(" ", "orders")]
+ [InlineData("proj", null)]
+ [InlineData("proj", "")]
+ [InlineData("proj", " ")]
+ public void topic_rejects_invalid_args(string? projectId, string? topicName)
+ {
+ Should.Throw(() => GcpPubsubEndpointUri.Topic(projectId!, topicName!));
+ }
+}
diff --git a/src/Transports/GCP/Wolverine.Pubsub/GcpPubsubEndpointUri.cs b/src/Transports/GCP/Wolverine.Pubsub/GcpPubsubEndpointUri.cs
new file mode 100644
index 000000000..e8a4de03e
--- /dev/null
+++ b/src/Transports/GCP/Wolverine.Pubsub/GcpPubsubEndpointUri.cs
@@ -0,0 +1,23 @@
+namespace Wolverine.Pubsub;
+
+///
+/// Builds canonical Wolverine endpoint values for Google Cloud Pub/Sub transport endpoints.
+///
+public static class GcpPubsubEndpointUri
+{
+ ///
+ /// Builds a URI referencing a Google Cloud Pub/Sub topic endpoint in the canonical form
+ /// pubsub://{projectId}/{topicName}.
+ ///
+ /// The GCP project ID that owns the topic.
+ /// The Pub/Sub topic name.
+ /// A of the form pubsub://{projectId}/{topicName}.
+ /// GcpPubsubEndpointUri.Topic("my-project", "orders") returns pubsub://my-project/orders.
+ /// Thrown when either parameter is null, empty, or whitespace.
+ public static Uri Topic(string projectId, string topicName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(projectId);
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicName);
+ return new Uri($"pubsub://{projectId}/{topicName}");
+ }
+}
diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaEndpointUriTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaEndpointUriTests.cs
new file mode 100644
index 000000000..70089a672
--- /dev/null
+++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaEndpointUriTests.cs
@@ -0,0 +1,32 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.Kafka.Tests;
+
+public class KafkaEndpointUriTests
+{
+ [Fact]
+ public void topic_uri_has_expected_shape()
+ {
+ KafkaEndpointUri.Topic("orders")
+ .ShouldBe(new Uri("kafka://topic/orders"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void topic_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => KafkaEndpointUri.Topic(name!));
+ }
+
+ [Fact]
+ public void topic_uri_roundtrips_through_parser()
+ {
+ var uri = KafkaEndpointUri.Topic("orders");
+ var transport = new Wolverine.Kafka.Internals.KafkaTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+}
diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaEndpointUri.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaEndpointUri.cs
new file mode 100644
index 000000000..fbd468bab
--- /dev/null
+++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaEndpointUri.cs
@@ -0,0 +1,21 @@
+namespace Wolverine.Kafka;
+
+///
+/// Builds canonical Wolverine endpoint values for Kafka transport endpoints.
+///
+public static class KafkaEndpointUri
+{
+ ///
+ /// Builds a URI referencing a Kafka topic endpoint in the canonical form
+ /// kafka://topic/{topicName}.
+ ///
+ /// The Kafka topic name.
+ /// A of the form kafka://topic/{topicName}.
+ /// KafkaEndpointUri.Topic("orders") returns kafka://topic/orders.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Topic(string topicName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicName);
+ return new Uri($"kafka://topic/{topicName}");
+ }
+}
diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttEndpointUriTests.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttEndpointUriTests.cs
new file mode 100644
index 000000000..91e1e134f
--- /dev/null
+++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttEndpointUriTests.cs
@@ -0,0 +1,32 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.MQTT.Tests;
+
+public class MqttEndpointUriTests
+{
+ [Fact]
+ public void topic_uri_has_expected_shape()
+ {
+ MqttEndpointUri.Topic("sensor/temperature")
+ .ShouldBe(new Uri("mqtt://topic/sensor/temperature"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void topic_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => MqttEndpointUri.Topic(name!));
+ }
+
+ [Fact]
+ public void topic_uri_roundtrips_through_parser()
+ {
+ var uri = MqttEndpointUri.Topic("sensor/temperature");
+ var transport = new Wolverine.MQTT.Internals.MqttTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+}
diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttEndpointUri.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttEndpointUri.cs
new file mode 100644
index 000000000..895a3eeea
--- /dev/null
+++ b/src/Transports/MQTT/Wolverine.MQTT/MqttEndpointUri.cs
@@ -0,0 +1,22 @@
+namespace Wolverine.MQTT;
+
+///
+/// Builds canonical Wolverine endpoint values for MQTT transport endpoints.
+///
+public static class MqttEndpointUri
+{
+ ///
+ /// Builds a URI referencing an MQTT topic endpoint in the canonical form
+ /// mqtt://topic/{topicName}. Slashes inside the topic name are preserved
+ /// as path separators.
+ ///
+ /// The MQTT topic name (may contain slashes).
+ /// A of the form mqtt://topic/{topicName}.
+ /// MqttEndpointUri.Topic("sensor/temperature") returns mqtt://topic/sensor/temperature.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Topic(string topicName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicName);
+ return new Uri("mqtt://topic/" + topicName.Trim('/'));
+ }
+}
diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NatsEndpointUriTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NatsEndpointUriTests.cs
new file mode 100644
index 000000000..f9a93d7e0
--- /dev/null
+++ b/src/Transports/NATS/Wolverine.Nats.Tests/NatsEndpointUriTests.cs
@@ -0,0 +1,32 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.Nats.Tests;
+
+public class NatsEndpointUriTests
+{
+ [Fact]
+ public void subject_uri_has_expected_shape()
+ {
+ NatsEndpointUri.Subject("orders.created")
+ .ShouldBe(new Uri("nats://subject/orders.created"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void subject_rejects_invalid_name(string? subject)
+ {
+ Should.Throw(() => NatsEndpointUri.Subject(subject!));
+ }
+
+ [Fact]
+ public void subject_uri_roundtrips_through_parser()
+ {
+ var uri = NatsEndpointUri.Subject("orders.created");
+ var transport = new Wolverine.Nats.Internal.NatsTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+}
diff --git a/src/Transports/NATS/Wolverine.Nats/NatsEndpointUri.cs b/src/Transports/NATS/Wolverine.Nats/NatsEndpointUri.cs
new file mode 100644
index 000000000..9485e4160
--- /dev/null
+++ b/src/Transports/NATS/Wolverine.Nats/NatsEndpointUri.cs
@@ -0,0 +1,21 @@
+namespace Wolverine.Nats;
+
+///
+/// Builds canonical Wolverine endpoint values for NATS transport endpoints.
+///
+public static class NatsEndpointUri
+{
+ ///
+ /// Builds a URI referencing a NATS subject endpoint in the canonical form
+ /// nats://subject/{subject}.
+ ///
+ /// The NATS subject (dot-separated identifier).
+ /// A of the form nats://subject/{subject}.
+ /// NatsEndpointUri.Subject("orders.created") returns nats://subject/orders.created.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Subject(string subject)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(subject);
+ return new Uri($"nats://subject/{subject}");
+ }
+}
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs
index 87dd55954..c9f442c05 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/InlinePulsarTransportComplianceTests.cs
@@ -14,7 +14,7 @@ public async Task InitializeAsync()
{
var topic = Guid.NewGuid().ToString();
var topicPath = $"persistent://public/default/{topic}";
- OutboundAddress = PulsarEndpoint.UriFor(topicPath);
+ OutboundAddress = PulsarEndpointUri.Topic(topicPath);
await ReceiverIs(opts =>
{
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs
index 77371531f..29e1f3c34 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointTests.cs
@@ -81,4 +81,46 @@ public void uri_for_topic_string()
endpoint.Tenant.ShouldBe("t1");
endpoint.TopicName.ShouldBe("aaa");
}
+
+ [Fact]
+ public void uri_for_topic_string_handles_non_persistent_scheme()
+ {
+ var uri = PulsarEndpoint.UriFor("non-persistent://t1/ns1/aaa");
+ var endpoint = new PulsarEndpoint(uri, null!);
+
+ endpoint.Persistence.ShouldBe(PulsarEndpoint.NonPersistent);
+ endpoint.Tenant.ShouldBe("t1");
+ endpoint.Namespace.ShouldBe("ns1");
+ endpoint.TopicName.ShouldBe("aaa");
+ }
+
+ [Fact]
+ public void uri_for_topic_string_preserves_realistic_topic_names()
+ {
+ var uri = PulsarEndpoint.UriFor("persistent://my-tenant/my-namespace/order.created.v2");
+ var endpoint = new PulsarEndpoint(uri, null!);
+
+ endpoint.Persistence.ShouldBe(PulsarEndpoint.Persistent);
+ endpoint.Tenant.ShouldBe("my-tenant");
+ endpoint.Namespace.ShouldBe("my-namespace");
+ endpoint.TopicName.ShouldBe("order.created.v2");
+ }
+
+ [Fact]
+ public void uri_for_bool_produces_retry_suffix_path_used_by_pulsar_listener()
+ {
+ // Mirrors the call pattern in PulsarListener.getRetryLetterTopicUri:
+ // PulsarEndpoint.UriFor(isPersistent, tenant, namespace, "{topic}-RETRY").
+ var uri = PulsarEndpoint.UriFor(true, "public", "default", "orders-RETRY");
+ uri.ShouldBe(new Uri("persistent://public/default/orders-RETRY"));
+ }
+
+ [Fact]
+ public void uri_for_bool_produces_dlq_suffix_path_used_by_pulsar_listener()
+ {
+ // Mirrors the call pattern in PulsarListener.getDeadLetteredTopicUri:
+ // PulsarEndpoint.UriFor(isPersistent, tenant, namespace, "{topic}-DLQ").
+ var uri = PulsarEndpoint.UriFor(true, "public", "default", "orders-DLQ");
+ uri.ShouldBe(new Uri("persistent://public/default/orders-DLQ"));
+ }
}
\ No newline at end of file
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointUriTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointUriTests.cs
new file mode 100644
index 000000000..c33a6ddb2
--- /dev/null
+++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarEndpointUriTests.cs
@@ -0,0 +1,125 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.Pulsar.Tests;
+
+public class PulsarEndpointUriTests
+{
+ [Fact]
+ public void persistent_topic_returns_wolverine_endpoint_uri()
+ {
+ PulsarEndpointUri.PersistentTopic("public", "default", "orders")
+ .ShouldBe(new Uri("pulsar://persistent/public/default/orders"));
+ }
+
+ [Fact]
+ public void non_persistent_topic_returns_wolverine_endpoint_uri()
+ {
+ PulsarEndpointUri.NonPersistentTopic("public", "default", "orders")
+ .ShouldBe(new Uri("pulsar://non-persistent/public/default/orders"));
+ }
+
+ [Fact]
+ public void topic_with_persistent_flag_true_matches_persistent_topic()
+ {
+ PulsarEndpointUri.Topic("public", "default", "orders", persistent: true)
+ .ShouldBe(new Uri("pulsar://persistent/public/default/orders"));
+ }
+
+ [Fact]
+ public void topic_with_persistent_flag_false_matches_non_persistent_topic()
+ {
+ PulsarEndpointUri.Topic("public", "default", "orders", persistent: false)
+ .ShouldBe(new Uri("pulsar://non-persistent/public/default/orders"));
+ }
+
+ [Fact]
+ public void topic_from_pulsar_native_path_returns_wolverine_endpoint_uri()
+ {
+ PulsarEndpointUri.Topic("persistent://t1/ns1/aaa")
+ .ShouldBe(new Uri("pulsar://persistent/t1/ns1/aaa"));
+ }
+
+ [Fact]
+ public void topic_from_non_persistent_native_path_returns_wolverine_endpoint_uri()
+ {
+ PulsarEndpointUri.Topic("non-persistent://t1/ns1/aaa")
+ .ShouldBe(new Uri("pulsar://non-persistent/t1/ns1/aaa"));
+ }
+
+ [Theory]
+ [InlineData(null, "ns", "topic")]
+ [InlineData("", "ns", "topic")]
+ [InlineData(" ", "ns", "topic")]
+ [InlineData("tenant", null, "topic")]
+ [InlineData("tenant", "", "topic")]
+ [InlineData("tenant", " ", "topic")]
+ [InlineData("tenant", "ns", null)]
+ [InlineData("tenant", "ns", "")]
+ [InlineData("tenant", "ns", " ")]
+ public void persistent_topic_rejects_invalid_args(string? tenant, string? ns, string? topic)
+ {
+ Should.Throw(() => PulsarEndpointUri.PersistentTopic(tenant!, ns!, topic!));
+ }
+
+ [Theory]
+ [InlineData(null, "ns", "topic")]
+ [InlineData("", "ns", "topic")]
+ [InlineData("tenant", null, "topic")]
+ [InlineData("tenant", "ns", null)]
+ public void non_persistent_topic_rejects_invalid_args(string? tenant, string? ns, string? topic)
+ {
+ Should.Throw(() => PulsarEndpointUri.NonPersistentTopic(tenant!, ns!, topic!));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void topic_from_path_rejects_invalid_path(string? path)
+ {
+ Should.Throw(() => PulsarEndpointUri.Topic(path!));
+ }
+
+ [Theory]
+ [InlineData("pulsar://persistent/t1/ns1/aaa")] // already a Wolverine endpoint URI
+ [InlineData("http://example.com/ns/topic")] // unsupported scheme
+ [InlineData("ftp://server/ns/topic")] // unsupported scheme
+ [InlineData("tcp://host:5000/a/b")] // unsupported scheme
+ public void topic_from_path_rejects_unsupported_scheme(string path)
+ {
+ Should.Throw(() => PulsarEndpointUri.Topic(path));
+ }
+
+ [Theory]
+ [InlineData("persistent://tenant")] // only tenant, missing ns and topic
+ [InlineData("persistent://tenant/ns")] // missing topic
+ [InlineData("persistent://tenant/ns/topic/extra")] // extra segment
+ [InlineData("non-persistent://tenant/ns/topic/extra")] // extra segment, non-persistent
+ public void topic_from_path_rejects_wrong_segment_count(string path)
+ {
+ Should.Throw(() => PulsarEndpointUri.Topic(path));
+ }
+
+ [Fact]
+ public void persistent_topic_uri_roundtrips_through_parser()
+ {
+ var uri = PulsarEndpointUri.PersistentTopic("public", "default", "orders");
+ var endpoint = new PulsarEndpoint(uri, null!);
+ endpoint.Persistence.ShouldBe(PulsarEndpoint.Persistent);
+ endpoint.Tenant.ShouldBe("public");
+ endpoint.Namespace.ShouldBe("default");
+ endpoint.TopicName.ShouldBe("orders");
+ }
+
+ [Fact]
+ public void non_persistent_topic_uri_roundtrips_through_parser()
+ {
+ var uri = PulsarEndpointUri.NonPersistentTopic("public", "default", "orders");
+ var endpoint = new PulsarEndpoint(uri, null!);
+ endpoint.Persistence.ShouldBe(PulsarEndpoint.NonPersistent);
+ endpoint.Tenant.ShouldBe("public");
+ endpoint.Namespace.ShouldBe("default");
+ endpoint.TopicName.ShouldBe("orders");
+ }
+}
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs
index 9115d958a..f463a90b4 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarTransportComplianceTests.cs
@@ -14,7 +14,7 @@ public async Task InitializeAsync()
{
var topic = Guid.NewGuid().ToString();
var topicPath = $"persistent://public/default/compliance{topic}";
- OutboundAddress = PulsarEndpoint.UriFor(topicPath);
+ OutboundAddress = PulsarEndpointUri.Topic(topicPath);
await SenderIs(opts =>
{
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs
index 26aa73faa..4f9f5d93c 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/WithCloudEvents.cs
@@ -14,7 +14,7 @@ public async Task InitializeAsync()
{
var topic = Guid.NewGuid().ToString();
var topicPath = $"persistent://public/default/compliance{topic}";
- OutboundAddress = PulsarEndpoint.UriFor(topicPath);
+ OutboundAddress = PulsarEndpointUri.Topic(topicPath);
await SenderIs(opts =>
{
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs
index 9146d3aee..1ca510d06 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/endpoint_configuration.cs
@@ -39,7 +39,7 @@ public void Dispose()
[Fact]
public void requeue_disabled()
{
- var uri = PulsarEndpoint.UriFor("persistent://public/default/one");
+ var uri = PulsarEndpointUri.Topic("persistent://public/default/one");
var endpoint = theRuntime.Endpoints.EndpointFor(uri)?.As();
endpoint!.EnableRequeue.ShouldBeFalse();
@@ -48,7 +48,7 @@ public void requeue_disabled()
[Fact]
public void unsubscribe_on_close_disabled()
{
- var uri = PulsarEndpoint.UriFor("persistent://public/default/one");
+ var uri = PulsarEndpointUri.Topic("persistent://public/default/one");
var endpoint = theRuntime.Endpoints.EndpointFor(uri)?.As();
endpoint!.UnsubscribeOnClose.ShouldBeFalse();
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs
index 940db87c9..ba9bfec81 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs
@@ -48,17 +48,17 @@ protected override PulsarEnvelopeMapper buildMapper(IWolverineRuntime runtime)
public bool IsPersistent => Persistence.Equals(Persistent);
+ [Obsolete("This overload returns a Pulsar-native topic path (persistent://...), not a Wolverine endpoint URI (pulsar://...). The native Pulsar client requires the former; PulsarEndpointUri produces the latter — they are NOT interchangeable, which is why this overload is not delegated. Build Pulsar-native topic paths directly. This helper will be removed in a future version.")]
public static Uri UriFor(bool persistent, string tenant, string @namespace, string topicName)
{
var scheme = persistent ? "persistent" : "non-persistent";
return new Uri($"{scheme}://{tenant}/{@namespace}/{topicName}");
}
+ [Obsolete("Use PulsarEndpointUri.Topic instead. This helper will be removed in a future version.")]
public static Uri UriFor(string topicPath)
{
- var uri = new Uri(topicPath);
- return new Uri(
- $"pulsar://{uri.Scheme}/{uri.Host}/{uri.Segments.Skip(1).Select(x => x.TrimEnd('/')).Join("/")}");
+ return PulsarEndpointUri.Topic(topicPath);
}
public override IDictionary DescribeProperties()
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpointUri.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpointUri.cs
new file mode 100644
index 000000000..82ddfb7f9
--- /dev/null
+++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpointUri.cs
@@ -0,0 +1,100 @@
+namespace Wolverine.Pulsar;
+
+///
+/// Builds canonical Wolverine endpoint values for Pulsar transport endpoints.
+/// All methods return Wolverine endpoint URIs of the form pulsar://persistent/{tenant}/{ns}/{topic}
+/// or pulsar://non-persistent/{tenant}/{ns}/{topic}, matching what 's parser
+/// accepts. For Pulsar-native topic-path strings (e.g. persistent://...) used by the native Pulsar client,
+/// build them directly — they are not Wolverine endpoint URIs and are out of scope for this helper.
+///
+public static class PulsarEndpointUri
+{
+ ///
+ /// Builds a URI referencing a Pulsar persistent-topic endpoint in the canonical form
+ /// pulsar://persistent/{tenant}/{namespace}/{topicName}.
+ ///
+ /// The Pulsar tenant.
+ /// The Pulsar namespace.
+ /// The Pulsar topic name.
+ /// A of the form pulsar://persistent/{tenant}/{namespace}/{topicName}.
+ /// PulsarEndpointUri.PersistentTopic("public", "default", "orders") returns pulsar://persistent/public/default/orders.
+ /// Thrown when any parameter is null, empty, or whitespace.
+ public static Uri PersistentTopic(string tenant, string @namespace, string topicName)
+ {
+ return BuildEndpointUri(PulsarEndpoint.Persistent, tenant, @namespace, topicName);
+ }
+
+ ///
+ /// Builds a URI referencing a Pulsar non-persistent-topic endpoint in the canonical form
+ /// pulsar://non-persistent/{tenant}/{namespace}/{topicName}.
+ ///
+ /// The Pulsar tenant.
+ /// The Pulsar namespace.
+ /// The Pulsar topic name.
+ /// A of the form pulsar://non-persistent/{tenant}/{namespace}/{topicName}.
+ /// PulsarEndpointUri.NonPersistentTopic("public", "default", "orders") returns pulsar://non-persistent/public/default/orders.
+ /// Thrown when any parameter is null, empty, or whitespace.
+ public static Uri NonPersistentTopic(string tenant, string @namespace, string topicName)
+ {
+ return BuildEndpointUri(PulsarEndpoint.NonPersistent, tenant, @namespace, topicName);
+ }
+
+ ///
+ /// Builds a URI referencing a Pulsar topic endpoint, selecting persistent or non-persistent by flag.
+ ///
+ /// The Pulsar tenant.
+ /// The Pulsar namespace.
+ /// The Pulsar topic name.
+ /// true for a persistent topic, false for non-persistent.
+ /// A of the form pulsar://persistent/... or pulsar://non-persistent/....
+ /// PulsarEndpointUri.Topic("public", "default", "orders", persistent: true) returns pulsar://persistent/public/default/orders.
+ /// Thrown when any string parameter is null, empty, or whitespace.
+ public static Uri Topic(string tenant, string @namespace, string topicName, bool persistent)
+ {
+ var scheme = persistent ? PulsarEndpoint.Persistent : PulsarEndpoint.NonPersistent;
+ return BuildEndpointUri(scheme, tenant, @namespace, topicName);
+ }
+
+ ///
+ /// Converts a Pulsar-native topic path (e.g. persistent://tenant/ns/topic) into a
+ /// Wolverine endpoint URI of the form pulsar://persistent/tenant/ns/topic.
+ ///
+ /// A Pulsar-native topic path string with scheme persistent or non-persistent and exactly three path components (tenant, namespace, topic).
+ /// A of the form pulsar://{persistence}/{tenant}/{namespace}/{topicName}.
+ /// PulsarEndpointUri.Topic("persistent://t1/ns1/aaa") returns pulsar://persistent/t1/ns1/aaa.
+ /// Thrown when is null, empty, whitespace, uses a scheme other than persistent/non-persistent, or does not contain exactly tenant/namespace/topic path parts.
+ /// Thrown when is not a parseable URI.
+ public static Uri Topic(string topicPath)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicPath);
+
+ var parsed = new Uri(topicPath);
+
+ if (parsed.Scheme != PulsarEndpoint.Persistent && parsed.Scheme != PulsarEndpoint.NonPersistent)
+ {
+ throw new ArgumentException(
+ $"topicPath must use scheme '{PulsarEndpoint.Persistent}' or '{PulsarEndpoint.NonPersistent}', got '{parsed.Scheme}'. Use PulsarEndpointUri.PersistentTopic or NonPersistentTopic to build Wolverine endpoint URIs from components.",
+ nameof(topicPath));
+ }
+
+ var pathParts = parsed.AbsolutePath.Trim('/').Split('/', StringSplitOptions.RemoveEmptyEntries);
+ if (pathParts.Length != 2)
+ {
+ throw new ArgumentException(
+ $"topicPath must have the form '{{scheme}}://{{tenant}}/{{namespace}}/{{topic}}', got '{topicPath}'.",
+ nameof(topicPath));
+ }
+
+ return parsed.Scheme == PulsarEndpoint.Persistent
+ ? PersistentTopic(parsed.Host, pathParts[0], pathParts[1])
+ : NonPersistentTopic(parsed.Host, pathParts[0], pathParts[1]);
+ }
+
+ private static Uri BuildEndpointUri(string persistence, string tenant, string @namespace, string topicName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(tenant);
+ ArgumentException.ThrowIfNullOrWhiteSpace(@namespace);
+ ArgumentException.ThrowIfNullOrWhiteSpace(topicName);
+ return new Uri($"pulsar://{persistence}/{tenant}/{@namespace}/{topicName}");
+ }
+}
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs
index 8ffeb0062..0e224fea1 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs
@@ -138,16 +138,20 @@ private IConsumer> createRetryConsumer(PulsarEndpoint end
private Uri? getRetryLetterTopicUri(PulsarEndpoint endpoint)
{
+#pragma warning disable CS0618 // Pulsar-native topic-path form is required by the Pulsar client
return NativeRetryLetterQueueEnabled
? PulsarEndpoint.UriFor(endpoint.IsPersistent, endpoint.Tenant, endpoint.Namespace,
endpoint.RetryLetterTopic?.TopicName ?? $"{endpoint.TopicName}-RETRY")
: null;
+#pragma warning restore CS0618
}
private Uri getDeadLetteredTopicUri(PulsarEndpoint endpoint)
{
+#pragma warning disable CS0618 // Pulsar-native topic-path form is required by the Pulsar client
var topicDql = PulsarEndpoint.UriFor(endpoint.IsPersistent, endpoint.Tenant, endpoint.Namespace,
endpoint.DeadLetterTopic?.TopicName ?? $"{endpoint.TopicName}-DLQ");
+#pragma warning restore CS0618
return topicDql;
}
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs
index 1604b9b52..1c52055d6 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs
@@ -95,7 +95,7 @@ public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime)
public PulsarEndpoint EndpointFor(string topicPath)
{
- var uri = PulsarEndpoint.UriFor(topicPath);
+ var uri = PulsarEndpointUri.Topic(topicPath);
return this[uri];
}
-}
\ No newline at end of file
+}
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs
index a82d1abe9..e41ac473b 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs
@@ -79,7 +79,7 @@ public static PulsarSubscriberConfiguration ToPulsarTopic(this IPublishToExpress
///
public static PulsarListenerConfiguration ListenToPulsarTopic(this WolverineOptions endpoints, string topicPath)
{
- var uri = PulsarEndpoint.UriFor(topicPath);
+ var uri = PulsarEndpointUri.Topic(topicPath);
var endpoint = endpoints.PulsarTransport()[uri];
endpoint.IsListener = true;
return new PulsarListenerConfiguration(endpoint);
diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqEndpointUriTests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqEndpointUriTests.cs
new file mode 100644
index 000000000..a53aa39c2
--- /dev/null
+++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqEndpointUriTests.cs
@@ -0,0 +1,113 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.RabbitMQ.Tests;
+
+public class RabbitMqEndpointUriTests
+{
+ [Fact]
+ public void queue_uri_has_expected_shape()
+ {
+ RabbitMqEndpointUri.Queue("orders")
+ .ShouldBe(new Uri("rabbitmq://queue/orders"));
+ }
+
+ [Fact]
+ public void exchange_uri_has_expected_shape()
+ {
+ RabbitMqEndpointUri.Exchange("events")
+ .ShouldBe(new Uri("rabbitmq://exchange/events"));
+ }
+
+ [Fact]
+ public void topic_uri_has_expected_shape()
+ {
+ RabbitMqEndpointUri.Topic("prices", "usd.eur")
+ .ShouldBe(new Uri("rabbitmq://topic/prices/usd.eur"));
+ }
+
+ [Fact]
+ public void routing_uri_has_expected_shape()
+ {
+ RabbitMqEndpointUri.Routing("events", "order.created")
+ .ShouldBe(new Uri("rabbitmq://exchange/events/routing/order.created"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void queue_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => RabbitMqEndpointUri.Queue(name!));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void exchange_rejects_invalid_name(string? name)
+ {
+ Should.Throw(() => RabbitMqEndpointUri.Exchange(name!));
+ }
+
+ [Theory]
+ [InlineData(null, "key")]
+ [InlineData("", "key")]
+ [InlineData(" ", "key")]
+ [InlineData("ex", null)]
+ [InlineData("ex", "")]
+ [InlineData("ex", " ")]
+ public void topic_rejects_invalid_args(string? exchange, string? key)
+ {
+ Should.Throw(() => RabbitMqEndpointUri.Topic(exchange!, key!));
+ }
+
+ [Theory]
+ [InlineData(null, "key")]
+ [InlineData("", "key")]
+ [InlineData(" ", "key")]
+ [InlineData("ex", null)]
+ [InlineData("ex", "")]
+ [InlineData("ex", " ")]
+ public void routing_rejects_invalid_args(string? exchange, string? key)
+ {
+ Should.Throw(() => RabbitMqEndpointUri.Routing(exchange!, key!));
+ }
+
+ [Fact]
+ public void queue_uri_roundtrips_through_parser()
+ {
+ var uri = RabbitMqEndpointUri.Queue("orders");
+ var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+
+ [Fact]
+ public void exchange_uri_roundtrips_through_parser()
+ {
+ var uri = RabbitMqEndpointUri.Exchange("events");
+ var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+
+ [Fact]
+ public void topic_uri_roundtrips_through_parser()
+ {
+ var uri = RabbitMqEndpointUri.Topic("prices", "usd.eur");
+ var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+
+ [Fact]
+ public void routing_uri_roundtrips_through_parser()
+ {
+ var uri = RabbitMqEndpointUri.Routing("events", "order.created");
+ var transport = new Wolverine.RabbitMQ.Internal.RabbitMqTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+}
diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqEndpointUri.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqEndpointUri.cs
new file mode 100644
index 000000000..d2cca10ba
--- /dev/null
+++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqEndpointUri.cs
@@ -0,0 +1,67 @@
+namespace Wolverine.RabbitMQ;
+
+///
+/// Builds canonical Wolverine endpoint values for RabbitMQ transport endpoints.
+///
+public static class RabbitMqEndpointUri
+{
+ ///
+ /// Builds a URI referencing a RabbitMQ queue endpoint in the canonical form
+ /// rabbitmq://queue/{queueName}.
+ ///
+ /// The RabbitMQ queue name.
+ /// A of the form rabbitmq://queue/{queueName}.
+ /// RabbitMqEndpointUri.Queue("orders") returns rabbitmq://queue/orders.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Queue(string queueName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(queueName);
+ return new Uri($"rabbitmq://queue/{queueName}");
+ }
+
+ ///
+ /// Builds a URI referencing a RabbitMQ exchange endpoint in the canonical form
+ /// rabbitmq://exchange/{exchangeName}.
+ ///
+ /// The RabbitMQ exchange name.
+ /// A of the form rabbitmq://exchange/{exchangeName}.
+ /// RabbitMqEndpointUri.Exchange("events") returns rabbitmq://exchange/events.
+ /// Thrown when is null, empty, or whitespace.
+ public static Uri Exchange(string exchangeName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(exchangeName);
+ return new Uri($"rabbitmq://exchange/{exchangeName}");
+ }
+
+ ///
+ /// Builds a URI referencing a RabbitMQ topic-routed exchange endpoint in the canonical form
+ /// rabbitmq://topic/{exchangeName}/{routingKey}.
+ ///
+ /// The RabbitMQ exchange name.
+ /// The topic routing key.
+ /// A of the form rabbitmq://topic/{exchangeName}/{routingKey}.
+ /// RabbitMqEndpointUri.Topic("prices", "usd.eur") returns rabbitmq://topic/prices/usd.eur.
+ /// Thrown when either parameter is null, empty, or whitespace.
+ public static Uri Topic(string exchangeName, string routingKey)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(exchangeName);
+ ArgumentException.ThrowIfNullOrWhiteSpace(routingKey);
+ return new Uri($"rabbitmq://topic/{exchangeName}/{routingKey}");
+ }
+
+ ///
+ /// Builds a URI referencing a RabbitMQ exchange with an explicit routing key in the canonical form
+ /// rabbitmq://exchange/{exchangeName}/routing/{routingKey}.
+ ///
+ /// The RabbitMQ exchange name.
+ /// The routing key bound to the exchange.
+ /// A of the form rabbitmq://exchange/{exchangeName}/routing/{routingKey}.
+ /// RabbitMqEndpointUri.Routing("events", "order.created") returns rabbitmq://exchange/events/routing/order.created.
+ /// Thrown when either parameter is null, empty, or whitespace.
+ public static Uri Routing(string exchangeName, string routingKey)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(exchangeName);
+ ArgumentException.ThrowIfNullOrWhiteSpace(routingKey);
+ return new Uri($"rabbitmq://exchange/{exchangeName}/routing/{routingKey}");
+ }
+}
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs
index 0ea8a5929..766c62223 100644
--- a/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/DocumentationSamples.cs
@@ -116,8 +116,8 @@ public static async Task configure_with_uri_helpers()
#region sample_redis_uri_helpers
// Using URI builder helpers
- var ordersUri = RedisTransport.BuildRedisStreamUri("orders", databaseId: 1);
- var paymentsUri = RedisTransport.BuildRedisStreamUri("payments", databaseId: 2, "payment-processors");
+ var ordersUri = RedisEndpointUri.Stream("orders", databaseId: 1);
+ var paymentsUri = RedisEndpointUri.Stream("payments", databaseId: 2, "payment-processors");
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/RedisEndpointUriTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/RedisEndpointUriTests.cs
new file mode 100644
index 000000000..7c9df3fe5
--- /dev/null
+++ b/src/Transports/Redis/Wolverine.Redis.Tests/RedisEndpointUriTests.cs
@@ -0,0 +1,70 @@
+using Shouldly;
+using Xunit;
+
+namespace Wolverine.Redis.Tests;
+
+public class RedisEndpointUriTests
+{
+ [Fact]
+ public void stream_uri_has_expected_shape_with_default_database()
+ {
+ RedisEndpointUri.Stream("orders")
+ .ShouldBe(new Uri("redis://stream/0/orders"));
+ }
+
+ [Fact]
+ public void stream_uri_has_expected_shape_with_explicit_database()
+ {
+ RedisEndpointUri.Stream("orders", databaseId: 3)
+ .ShouldBe(new Uri("redis://stream/3/orders"));
+ }
+
+ [Fact]
+ public void stream_uri_includes_consumer_group_query_parameter()
+ {
+ RedisEndpointUri.Stream("orders", 3, "order-processors")
+ .ShouldBe(new Uri("redis://stream/3/orders?consumerGroup=order-processors"));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void stream_rejects_invalid_key(string? key)
+ {
+ Should.Throw(() => RedisEndpointUri.Stream(key!));
+ }
+
+ [Fact]
+ public void stream_rejects_negative_database_id()
+ {
+ Should.Throw(() => RedisEndpointUri.Stream("orders", databaseId: -1));
+ }
+
+ [Theory]
+ [InlineData(null)]
+ [InlineData("")]
+ [InlineData(" ")]
+ public void stream_with_consumer_group_rejects_invalid_group(string? group)
+ {
+ Should.Throw(() => RedisEndpointUri.Stream("orders", 0, group!));
+ }
+
+ [Fact]
+ public void stream_uri_roundtrips_through_parser()
+ {
+ var uri = RedisEndpointUri.Stream("orders", databaseId: 3);
+ var transport = new Wolverine.Redis.Internal.RedisTransport();
+ var endpoint = transport.GetOrCreateEndpoint(uri);
+ endpoint.Uri.ShouldBe(uri);
+ }
+
+ [Fact]
+ public void consumer_group_uri_roundtrips_through_parser()
+ {
+ var uri = RedisEndpointUri.Stream("orders", databaseId: 3, consumerGroup: "order-processors");
+ var transport = new Wolverine.Redis.Internal.RedisTransport();
+ var endpoint = (Wolverine.Redis.Internal.RedisStreamEndpoint)transport.GetOrCreateEndpoint(uri);
+ endpoint.ConsumerGroup.ShouldBe("order-processors");
+ }
+}
diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs
index 4e7de2cee..972633fcf 100644
--- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs
+++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs
@@ -293,25 +293,27 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime)
}
///
- /// Helper method to create a Redis stream URI with database ID
+ /// Helper method to create a Redis stream URI with database ID.
///
/// Redis stream key name
/// Redis database ID
/// Formatted Redis stream URI
+ [Obsolete("Use RedisEndpointUri.Stream instead. This helper will be removed in a future version.")]
public static Uri BuildRedisStreamUri(string streamKey, int databaseId = 0)
{
- return new Uri($"redis://stream/{databaseId}/{streamKey}");
+ return RedisEndpointUri.Stream(streamKey, databaseId);
}
///
- /// Helper method to create a Redis stream URI with database ID and consumer group
+ /// Helper method to create a Redis stream URI with database ID and consumer group.
///
/// Redis stream key name
/// Redis database ID
/// Consumer group name
/// Formatted Redis stream URI
+ [Obsolete("Use RedisEndpointUri.Stream instead. This helper will be removed in a future version.")]
public static Uri BuildRedisStreamUri(string streamKey, int databaseId, string consumerGroup)
{
- return new Uri($"redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}");
+ return RedisEndpointUri.Stream(streamKey, databaseId, consumerGroup);
}
}
diff --git a/src/Transports/Redis/Wolverine.Redis/RedisEndpointUri.cs b/src/Transports/Redis/Wolverine.Redis/RedisEndpointUri.cs
new file mode 100644
index 000000000..acad7219a
--- /dev/null
+++ b/src/Transports/Redis/Wolverine.Redis/RedisEndpointUri.cs
@@ -0,0 +1,43 @@
+namespace Wolverine.Redis;
+
+///
+/// Builds canonical Wolverine endpoint values for Redis stream transport endpoints.
+///
+public static class RedisEndpointUri
+{
+ ///
+ /// Builds a URI referencing a Redis stream endpoint in the canonical form
+ /// redis://stream/{databaseId}/{streamKey}.
+ ///
+ /// The Redis stream key name.
+ /// The Redis database ID (default 0).
+ /// A of the form redis://stream/{databaseId}/{streamKey}.
+ /// RedisEndpointUri.Stream("orders", 3) returns redis://stream/3/orders.
+ /// Thrown when is null, empty, or whitespace.
+ /// Thrown when is negative.
+ public static Uri Stream(string streamKey, int databaseId = 0)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(streamKey);
+ ArgumentOutOfRangeException.ThrowIfNegative(databaseId);
+ return new Uri($"redis://stream/{databaseId}/{streamKey}");
+ }
+
+ ///
+ /// Builds a URI referencing a Redis stream endpoint with a consumer group in the canonical form
+ /// redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}.
+ ///
+ /// The Redis stream key name.
+ /// The Redis database ID.
+ /// The Redis consumer group name.
+ /// A of the form redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}.
+ /// RedisEndpointUri.Stream("orders", 3, "order-processors") returns redis://stream/3/orders?consumerGroup=order-processors.
+ /// Thrown when or is null, empty, or whitespace.
+ /// Thrown when is negative.
+ public static Uri Stream(string streamKey, int databaseId, string consumerGroup)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(streamKey);
+ ArgumentOutOfRangeException.ThrowIfNegative(databaseId);
+ ArgumentException.ThrowIfNullOrWhiteSpace(consumerGroup);
+ return new Uri($"redis://stream/{databaseId}/{streamKey}?consumerGroup={consumerGroup}");
+ }
+}