diff --git a/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/DynamicListenerAgentFamilyTests.cs b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/DynamicListenerAgentFamilyTests.cs
new file mode 100644
index 000000000..900833907
--- /dev/null
+++ b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/DynamicListenerAgentFamilyTests.cs
@@ -0,0 +1,116 @@
+using NSubstitute;
+using Shouldly;
+using Wolverine.Persistence.Durability;
+using Wolverine.Runtime.Agents;
+using Xunit;
+
+namespace CoreTests.Runtime.Agents.DynamicListeners;
+
+///
+/// Unit coverage for — verifies the
+/// family reads from on every assignment cycle
+/// (no cached snapshot), produces stable agent URIs that round-trip back to
+/// the original listener URI, and balances them via
+/// assignments.DistributeEvenly(Scheme). Pairs with the
+/// round-trip coverage.
+///
+public class DynamicListenerAgentFamilyTests
+{
+ private readonly MockWolverineRuntime _runtime;
+ private readonly IListenerStore _listenerStore;
+ private readonly DynamicListenerAgentFamily _family;
+
+ public DynamicListenerAgentFamilyTests()
+ {
+ _runtime = new MockWolverineRuntime();
+ _listenerStore = Substitute.For();
+ _runtime.Storage.Listeners.Returns(_listenerStore);
+
+ _family = new DynamicListenerAgentFamily(_runtime);
+ }
+
+ [Fact]
+ public void scheme_is_the_dynamic_listener_uri_scheme()
+ {
+ // The family's Scheme is the dictionary key in NodeAgentController's
+ // _agentFamilies; it must match the scheme of every agent URI the
+ // family hands out so dispatch from a URI back to the family works.
+ _family.Scheme.ShouldBe(DynamicListenerUriEncoding.SchemeName);
+ }
+
+ [Fact]
+ public async Task all_known_agents_returns_empty_when_store_is_empty()
+ {
+ _listenerStore.AllListenersAsync(Arg.Any())
+ .Returns(Task.FromResult>(Array.Empty()));
+
+ var agents = await _family.AllKnownAgentsAsync();
+
+ agents.ShouldBeEmpty();
+ }
+
+ [Fact]
+ public async Task all_known_agents_projects_each_listener_uri_through_encoder()
+ {
+ var listenerA = new Uri("mqtt://broker/topic-a");
+ var listenerB = new Uri("mqtt://broker/topic-b");
+ _listenerStore.AllListenersAsync(Arg.Any())
+ .Returns(Task.FromResult>(new[] { listenerA, listenerB }));
+
+ var agents = await _family.AllKnownAgentsAsync();
+
+ agents.Count.ShouldBe(2);
+ agents.ShouldContain(DynamicListenerUriEncoding.ToAgentUri(listenerA));
+ agents.ShouldContain(DynamicListenerUriEncoding.ToAgentUri(listenerB));
+ }
+
+ [Fact]
+ public async Task all_known_agents_re_reads_store_on_every_call()
+ {
+ // The family deliberately doesn't cache — each cluster assignment
+ // cycle must see freshly registered URIs without a host restart.
+ _listenerStore.AllListenersAsync(Arg.Any())
+ .Returns(
+ Task.FromResult>(Array.Empty()),
+ Task.FromResult>(new[] { new Uri("mqtt://broker/new-topic") }));
+
+ var first = await _family.AllKnownAgentsAsync();
+ var second = await _family.AllKnownAgentsAsync();
+
+ first.ShouldBeEmpty();
+ second.Count.ShouldBe(1);
+ await _listenerStore.Received(2).AllListenersAsync(Arg.Any());
+ }
+
+ [Fact]
+ public async Task supported_agents_matches_all_known_agents()
+ {
+ // Every node has the family registered when EnableDynamicListeners is
+ // on, so any node can run any of the listeners — supported set ==
+ // known set. Transport-level "this node can't reach the broker" lives
+ // in StartAsync, not in supported-agents filtering.
+ _listenerStore.AllListenersAsync(Arg.Any())
+ .Returns(Task.FromResult>(new[]
+ {
+ new Uri("mqtt://broker/topic-a"),
+ new Uri("mqtt://broker/topic-b")
+ }));
+
+ var supported = await _family.SupportedAgentsAsync();
+ var known = await _family.AllKnownAgentsAsync();
+
+ supported.ShouldBe(known);
+ }
+
+ [Fact]
+ public async Task build_agent_async_decodes_uri_and_constructs_dynamic_listener_agent()
+ {
+ var listener = new Uri("mqtt://broker/devices/foo/status");
+ var agentUri = DynamicListenerUriEncoding.ToAgentUri(listener);
+
+ var agent = await _family.BuildAgentAsync(agentUri, _runtime);
+
+ agent.ShouldBeOfType();
+ agent.Uri.ShouldBe(agentUri);
+ }
+}
diff --git a/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/DynamicListenerUriEncodingTests.cs b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/DynamicListenerUriEncodingTests.cs
new file mode 100644
index 000000000..c8f2d925b
--- /dev/null
+++ b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/DynamicListenerUriEncodingTests.cs
@@ -0,0 +1,76 @@
+using Shouldly;
+using Wolverine.Runtime.Agents;
+using Xunit;
+
+namespace CoreTests.Runtime.Agents.DynamicListeners;
+
+///
+/// Unit coverage for — the encoding
+/// has to be lossless and idempotent for cluster assignment to work, since
+/// every cycle the cluster compares the agent URIs returned by
+/// AllKnownAgentsAsync against the previous cycle's set. Any encoding
+/// jitter (different escape rules for the same input) would manifest as
+/// "agents thrashing on/off" between assignment polls.
+///
+public class DynamicListenerUriEncodingTests
+{
+ [Theory]
+ [InlineData("mqtt://broker:1883/devices/foo/status")]
+ [InlineData("mqtt://localhost/iot/+/temp")] // MQTT topic wildcards
+ [InlineData("rabbitmq://localhost:5672/queue.name")]
+ [InlineData("kafka://broker.local:9092/orders")]
+ [InlineData("amqp://user:pass@host:5672/vhost/queue")] // userinfo, port, multi-segment path
+ [InlineData("mqtt://broker/path?qos=1")] // query string
+ public void encoding_round_trips_listener_uri(string raw)
+ {
+ var listener = new Uri(raw);
+
+ var agentUri = DynamicListenerUriEncoding.ToAgentUri(listener);
+ var decoded = DynamicListenerUriEncoding.ToListenerUri(agentUri);
+
+ decoded.ShouldBe(listener);
+ }
+
+ [Fact]
+ public void agent_uri_uses_dynamic_listener_scheme()
+ {
+ var agentUri = DynamicListenerUriEncoding.ToAgentUri(new Uri("mqtt://broker/topic"));
+ agentUri.Scheme.ShouldBe(DynamicListenerUriEncoding.SchemeName);
+ }
+
+ [Fact]
+ public void encoding_is_deterministic_for_the_same_input()
+ {
+ // The cluster's assignment grid keys agents by their URI string —
+ // the same listener URI must always produce the same agent URI so
+ // an agent that was running before a poll cycle is recognized as the
+ // same agent after the poll.
+ var listener = new Uri("mqtt://broker/devices/foo/status");
+
+ var first = DynamicListenerUriEncoding.ToAgentUri(listener);
+ var second = DynamicListenerUriEncoding.ToAgentUri(listener);
+
+ second.ShouldBe(first);
+ }
+
+ [Fact]
+ public void to_listener_uri_rejects_wrong_scheme()
+ {
+ var bogus = new Uri("wolverine-listener://something");
+
+ Should.Throw(() => DynamicListenerUriEncoding.ToListenerUri(bogus))
+ .Message.ShouldContain(DynamicListenerUriEncoding.SchemeName);
+ }
+
+ [Fact]
+ public void to_listener_uri_rejects_empty_path()
+ {
+ // wolverine-dynamic-listener:/// with nothing after the slashes is not
+ // a valid encoded agent URI — guard the decoder so a malformed entry
+ // doesn't silently produce a garbage listener URI.
+ var emptyPath = new Uri($"{DynamicListenerUriEncoding.SchemeName}:///");
+
+ Should.Throw(() => DynamicListenerUriEncoding.ToListenerUri(emptyPath))
+ .Message.ShouldContain("no encoded listener URI");
+ }
+}
diff --git a/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/WolverineRuntimeListenerExtensionsTests.cs b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/WolverineRuntimeListenerExtensionsTests.cs
new file mode 100644
index 000000000..ed406ca1e
--- /dev/null
+++ b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/WolverineRuntimeListenerExtensionsTests.cs
@@ -0,0 +1,81 @@
+using NSubstitute;
+using Shouldly;
+using Wolverine.Persistence.Durability;
+using Wolverine.Runtime;
+using Xunit;
+
+namespace CoreTests.Runtime.Agents.DynamicListeners;
+
+///
+/// Coverage for the public
+/// surface — these are the methods user code calls (e.g. on an HTTP request
+/// to add an MQTT broker), so the public contract needs to argument-validate
+/// and pass straight through to without any
+/// transformation. Most of the substance is exercised through the
+/// suite — the tests here just
+/// pin down the thin wrapper.
+///
+public class WolverineRuntimeListenerExtensionsTests
+{
+ private readonly MockWolverineRuntime _runtime = new();
+ private readonly IListenerStore _store = Substitute.For();
+
+ public WolverineRuntimeListenerExtensionsTests()
+ {
+ _runtime.Storage.Listeners.Returns(_store);
+ }
+
+ [Fact]
+ public async Task register_listener_async_delegates_to_store()
+ {
+ var uri = new Uri("mqtt://broker/topic");
+ await _runtime.RegisterListenerAsync(uri);
+
+ await _store.Received(1).RegisterListenerAsync(uri, Arg.Any());
+ }
+
+ [Fact]
+ public async Task remove_listener_async_delegates_to_store()
+ {
+ var uri = new Uri("mqtt://broker/topic");
+ await _runtime.RemoveListenerAsync(uri);
+
+ await _store.Received(1).RemoveListenerAsync(uri, Arg.Any());
+ }
+
+ [Fact]
+ public async Task all_registered_listeners_async_delegates_to_store()
+ {
+ var listed = new[]
+ {
+ new Uri("mqtt://broker/a"),
+ new Uri("mqtt://broker/b")
+ };
+ _store.AllListenersAsync(Arg.Any())
+ .Returns(Task.FromResult>(listed));
+
+ var result = await _runtime.AllRegisteredListenersAsync();
+
+ result.ShouldBe(listed);
+ }
+
+ [Fact]
+ public void register_throws_argument_null_for_null_runtime()
+ {
+ IWolverineRuntime? runtime = null;
+ var uri = new Uri("mqtt://broker/topic");
+ Should.Throw(() => runtime!.RegisterListenerAsync(uri));
+ }
+
+ [Fact]
+ public void register_throws_argument_null_for_null_uri()
+ {
+ Should.Throw(() => _runtime.RegisterListenerAsync(null!));
+ }
+
+ [Fact]
+ public void remove_throws_argument_null_for_null_uri()
+ {
+ Should.Throw(() => _runtime.RemoveListenerAsync(null!));
+ }
+}
diff --git a/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/dynamic_listener_agent_lifecycle_integration.cs b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/dynamic_listener_agent_lifecycle_integration.cs
new file mode 100644
index 000000000..35b6ea98e
--- /dev/null
+++ b/src/Testing/CoreTests/Runtime/Agents/DynamicListeners/dynamic_listener_agent_lifecycle_integration.cs
@@ -0,0 +1,106 @@
+using JasperFx;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using NSubstitute;
+using Shouldly;
+using Wolverine;
+using Wolverine.ComplianceTests;
+using Wolverine.Persistence.Durability;
+using Wolverine.Runtime;
+using Wolverine.Runtime.Agents;
+using Xunit;
+
+namespace CoreTests.Runtime.Agents.DynamicListeners;
+
+///
+/// End-to-end coverage for : spin up a real
+/// Wolverine host with the stub transport in solo mode, manually drive the
+/// agent's lifecycle, and verify the listener actually goes live and is
+/// recognizable to the rest of the runtime via
+/// .
+///
+/// This is intentionally below the cluster-assignment layer: we don't wait
+/// for to fire — that
+/// path is exercised by the broader cluster compliance tests once a
+/// transport-specific dynamic registration test ships in PR-3 (MQTT).
+///
+public class dynamic_listener_agent_lifecycle_integration : IAsyncLifetime
+{
+ private IHost _host = null!;
+
+ public async Task InitializeAsync()
+ {
+ _host = await WolverineHost.ForAsync(opts =>
+ {
+ // Solo mode keeps us out of the cluster orchestration path so the
+ // test exercises just the agent — the family registration logic
+ // is covered separately by DynamicListenerAgentFamilyTests.
+ opts.Durability.Mode = DurabilityMode.Solo;
+ });
+ }
+
+ public Task DisposeAsync()
+ {
+ _host.Dispose();
+ return Task.CompletedTask;
+ }
+
+ [Fact]
+ public async Task agent_start_activates_the_listener_on_the_runtime()
+ {
+ // Arrange: a stub-transport listener URI that the runtime's existing
+ // stub transport knows how to materialize into an Endpoint.
+ var listenerUri = new Uri("stub://gh-2685-dynamic-target");
+
+ var runtime = _host.Services.GetRequiredService();
+ var agent = new DynamicListenerAgent(runtime, listenerUri);
+
+ // Act
+ await agent.StartAsync(CancellationToken.None);
+
+ try
+ {
+ // Assert: the runtime is now listening at the requested URI. This
+ // verifies the agent went all the way through transport-resolution
+ // → endpoint creation → ListeningAgent registration without us
+ // pre-configuring the URI in WolverineOptions.
+ runtime.Endpoints.FindListeningAgent(listenerUri).ShouldNotBeNull();
+ }
+ finally
+ {
+ await agent.StopAsync(CancellationToken.None);
+ agent.Status.ShouldBe(AgentStatus.Stopped);
+ }
+ }
+
+ [Fact]
+ public async Task agent_start_throws_when_no_transport_supports_the_scheme()
+ {
+ // Misconfiguration scenario: a listener URI was registered for a
+ // transport the host doesn't include. Surface it as a hard error at
+ // StartAsync rather than letting the listener silently never come up.
+ var bogusUri = new Uri("not-a-real-scheme://host/topic");
+
+ var runtime = _host.Services.GetRequiredService();
+ var agent = new DynamicListenerAgent(runtime, bogusUri);
+
+ var ex = await Should.ThrowAsync(() =>
+ agent.StartAsync(CancellationToken.None));
+
+ ex.Message.ShouldContain("not-a-real-scheme");
+ ex.Message.ShouldContain("UseMqtt"); // hint baked into the message
+ }
+
+ [Fact]
+ public async Task agent_stop_before_start_is_a_no_op()
+ {
+ // The agent runtime can call StopAsync on an agent that never had a
+ // chance to start (e.g. node deactivation between assignment and
+ // first poll); tolerate that without throwing.
+ var runtime = _host.Services.GetRequiredService();
+ var agent = new DynamicListenerAgent(runtime, new Uri("stub://gh-2685-never-started"));
+
+ await Should.NotThrowAsync(() => agent.StopAsync(CancellationToken.None));
+ agent.Status.ShouldBe(AgentStatus.Stopped);
+ }
+}
diff --git a/src/Wolverine/Runtime/Agents/DynamicListenerAgent.cs b/src/Wolverine/Runtime/Agents/DynamicListenerAgent.cs
new file mode 100644
index 000000000..21a16736d
--- /dev/null
+++ b/src/Wolverine/Runtime/Agents/DynamicListenerAgent.cs
@@ -0,0 +1,110 @@
+using JasperFx;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+using Wolverine.Configuration;
+using Wolverine.Transports;
+
+namespace Wolverine.Runtime.Agents;
+
+///
+/// Cluster-coordinated agent that activates a single dynamically-registered listener
+/// URI on whichever node the assigns it to.
+/// The persisted listener URI lives in ;
+/// the agent simply resolves the URI to its transport via
+/// and delegates to
+/// /
+/// on Start/Stop.
+///
+/// Failure to resolve the transport is treated as a hard error rather than a
+/// silent skip — this surfaces "the user registered an MQTT URI but the host
+/// doesn't have UseMqtt" misconfigurations early instead of letting the
+/// listener silently never run.
+///
+internal sealed class DynamicListenerAgent : IAgent
+{
+ private readonly IWolverineRuntime _runtime;
+ private readonly Uri _listenerUri;
+ private Endpoint? _endpoint;
+
+ public DynamicListenerAgent(IWolverineRuntime runtime, Uri listenerUri)
+ {
+ _runtime = runtime;
+ _listenerUri = listenerUri;
+ Uri = DynamicListenerUriEncoding.ToAgentUri(listenerUri);
+ }
+
+ public Uri Uri { get; }
+
+ public AgentStatus Status { get; set; } = AgentStatus.Running;
+
+ public async Task StartAsync(CancellationToken cancellationToken)
+ {
+ _endpoint ??= resolveEndpoint();
+ await _runtime.Endpoints.StartListenerAsync(_endpoint, cancellationToken).ConfigureAwait(false);
+ }
+
+ public async Task StopAsync(CancellationToken cancellationToken)
+ {
+ if (_endpoint is null)
+ {
+ // Never started — nothing to do.
+ Status = AgentStatus.Stopped;
+ return;
+ }
+
+ await _runtime.Endpoints.StopListenerAsync(_endpoint, cancellationToken).ConfigureAwait(false);
+ Status = AgentStatus.Stopped;
+ }
+
+ public string Description =>
+ $"Dynamic listener for {_listenerUri} — registered at runtime via " +
+ $"{nameof(Persistence.Durability.IListenerStore)}, activated on this node by the cluster.";
+
+ public Task CheckHealthAsync(HealthCheckContext context,
+ CancellationToken cancellationToken = default)
+ {
+ if (Status != AgentStatus.Running)
+ {
+ return Task.FromResult(HealthCheckResult.Unhealthy($"Agent {Uri} is {Status}"));
+ }
+
+ if (_endpoint is null)
+ {
+ // We're "Running" but have never been started — the agent runtime
+ // calls StartAsync before health checks ever fire, so reaching this
+ // branch means something else is wrong.
+ return Task.FromResult(HealthCheckResult.Degraded(
+ $"Dynamic listener {_listenerUri} has not yet been started"));
+ }
+
+ var listeningAgent = _runtime.Endpoints.FindListeningAgent(_endpoint.Uri);
+ if (listeningAgent is null)
+ {
+ return Task.FromResult(HealthCheckResult.Healthy());
+ }
+
+ return Task.FromResult(listeningAgent.Status switch
+ {
+ ListeningStatus.TooBusy => HealthCheckResult.Degraded(
+ $"Dynamic listener {_listenerUri} is too busy"),
+ ListeningStatus.GloballyLatched => HealthCheckResult.Unhealthy(
+ $"Dynamic listener {_listenerUri} is globally latched"),
+ _ => HealthCheckResult.Healthy()
+ });
+ }
+
+ private Endpoint resolveEndpoint()
+ {
+ var transport = _runtime.Options.Transports.ForScheme(_listenerUri.Scheme);
+ if (transport is null)
+ {
+ throw new InvalidOperationException(
+ $"No registered transport supports scheme '{_listenerUri.Scheme}' — " +
+ $"the dynamic listener URI '{_listenerUri}' cannot be activated. " +
+ $"Did the host's WolverineOptions register the relevant transport (e.g. UseMqtt)?");
+ }
+
+ var endpoint = transport.GetOrCreateEndpoint(_listenerUri);
+ endpoint.IsListener = true;
+ return endpoint;
+ }
+}
diff --git a/src/Wolverine/Runtime/Agents/DynamicListenerAgentFamily.cs b/src/Wolverine/Runtime/Agents/DynamicListenerAgentFamily.cs
new file mode 100644
index 000000000..fb5103a05
--- /dev/null
+++ b/src/Wolverine/Runtime/Agents/DynamicListenerAgentFamily.cs
@@ -0,0 +1,79 @@
+using Wolverine.Persistence.Durability;
+
+namespace Wolverine.Runtime.Agents;
+
+///
+/// Agent family backing the GH-2685 dynamic-listener registry. The set of agents
+/// is not fixed at boot — every assignment cycle
+/// queries for the current registered
+/// listener URIs and projects each one through
+/// . The cluster's
+/// then uses
+/// to balance them across the
+/// running nodes — one node per listener URI, so registering an MQTT topic
+/// activates exactly one consumer somewhere in the cluster regardless of how
+/// many nodes are alive.
+///
+/// Registration is opt-in via :
+/// when the flag is off, never instantiates
+/// this family and the listener-registry table is never queried. Combined with
+/// the storage-side gate from PR #2700, the entire feature is zero-cost when
+/// not in use.
+///
+/// Re-evaluation cadence is the cluster's existing
+/// (default 30s). A newly
+/// registered URI takes up to one polling interval to be picked up — that's
+/// good enough for the IoT-device add/remove cadence in the original use case
+/// without needing a separate change-stream / pub-sub channel.
+///
+internal sealed class DynamicListenerAgentFamily : IAgentFamily
+{
+ private readonly IWolverineRuntime _runtime;
+
+ public DynamicListenerAgentFamily(IWolverineRuntime runtime)
+ {
+ _runtime = runtime;
+ }
+
+ public string Scheme => DynamicListenerUriEncoding.SchemeName;
+
+ public async ValueTask> AllKnownAgentsAsync()
+ {
+ var listenerUris = await _runtime.Storage.Listeners.AllListenersAsync(_runtime.Cancellation)
+ .ConfigureAwait(false);
+
+ if (listenerUris.Count == 0)
+ {
+ return Array.Empty();
+ }
+
+ var agents = new List(listenerUris.Count);
+ foreach (var uri in listenerUris)
+ {
+ agents.Add(DynamicListenerUriEncoding.ToAgentUri(uri));
+ }
+
+ return agents;
+ }
+
+ public ValueTask BuildAgentAsync(Uri uri, IWolverineRuntime wolverineRuntime)
+ {
+ var listenerUri = DynamicListenerUriEncoding.ToListenerUri(uri);
+ var agent = new DynamicListenerAgent(wolverineRuntime, listenerUri);
+ return ValueTask.FromResult(agent);
+ }
+
+ public ValueTask> SupportedAgentsAsync()
+ {
+ // Every node that has the family registered is capable of running any of
+ // the listeners — actual transport-level "can this node reach the broker"
+ // failures surface at StartAsync time rather than at assignment time.
+ return AllKnownAgentsAsync();
+ }
+
+ public ValueTask EvaluateAssignmentsAsync(AssignmentGrid assignments)
+ {
+ assignments.DistributeEvenly(Scheme);
+ return new ValueTask();
+ }
+}
diff --git a/src/Wolverine/Runtime/Agents/DynamicListenerUriEncoding.cs b/src/Wolverine/Runtime/Agents/DynamicListenerUriEncoding.cs
new file mode 100644
index 000000000..9ed06ca90
--- /dev/null
+++ b/src/Wolverine/Runtime/Agents/DynamicListenerUriEncoding.cs
@@ -0,0 +1,58 @@
+namespace Wolverine.Runtime.Agents;
+
+///
+/// Round-trippable encoding for the agent URI carried inside the cluster's
+/// when a listener URI is registered via
+/// .
+///
+/// The agent URI is a single-segment hierarchical Uri whose path is the
+/// percent-encoded listener URI. Concrete example:
+///
+/// listener: mqtt://broker:1883/devices/foo/status
+/// agent: wolverine-dynamic-listener:///mqtt%3A%2F%2Fbroker%3A1883%2Fdevices%2Ffoo%2Fstatus
+///
+/// The empty authority (:///) keeps the encoded payload entirely inside
+/// the path component, so a listener URI with embedded scheme delimiters
+/// can't collide with the agent URI's own scheme/host parsing. Decoding is
+/// the symmetric operation.
+///
+internal static class DynamicListenerUriEncoding
+{
+ ///
+ /// Scheme assigned to the dynamic-listener agent family. Used as both the
+ /// dictionary key in and the scheme of
+ /// every URI.
+ ///
+ public const string SchemeName = "wolverine-dynamic-listener";
+
+ public static Uri ToAgentUri(Uri listenerUri)
+ {
+ if (listenerUri is null) throw new ArgumentNullException(nameof(listenerUri));
+
+ var encoded = Uri.EscapeDataString(listenerUri.ToString());
+ return new Uri($"{SchemeName}:///{encoded}");
+ }
+
+ public static Uri ToListenerUri(Uri agentUri)
+ {
+ if (agentUri is null) throw new ArgumentNullException(nameof(agentUri));
+
+ if (!string.Equals(agentUri.Scheme, SchemeName, StringComparison.Ordinal))
+ {
+ throw new ArgumentException(
+ $"Expected an agent URI with scheme '{SchemeName}' but got '{agentUri.Scheme}'",
+ nameof(agentUri));
+ }
+
+ var path = agentUri.AbsolutePath.TrimStart('/');
+ if (path.Length == 0)
+ {
+ throw new ArgumentException(
+ $"Agent URI '{agentUri}' has no encoded listener URI in its path",
+ nameof(agentUri));
+ }
+
+ var listenerString = Uri.UnescapeDataString(path);
+ return new Uri(listenerString);
+ }
+}
diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.cs
index ed2b9fc18..cb18ecbf4 100644
--- a/src/Wolverine/Runtime/Agents/NodeAgentController.cs
+++ b/src/Wolverine/Runtime/Agents/NodeAgentController.cs
@@ -70,6 +70,15 @@ internal NodeAgentController(IWolverineRuntime runtime,
{
_agentFamilies[ExclusiveListenerFamily.SchemeName] = new ExclusiveListenerFamily(runtime);
_agentFamilies[LeaderPinnedListenerFamily.SchemeName] = new LeaderPinnedListenerFamily(runtime);
+
+ // GH-2685: durable, dynamically-registered listener URIs (e.g. per-IoT-device
+ // MQTT topics). Opt-in via Durability.EnableDynamicListeners — when off, the
+ // family isn't instantiated and the listener-registry table is never queried.
+ if (runtime.Options.Durability.EnableDynamicListeners)
+ {
+ _agentFamilies[DynamicListenerUriEncoding.SchemeName] =
+ new DynamicListenerAgentFamily(runtime);
+ }
}
if (runtime.Options.Durability.DurabilityAgentEnabled)
diff --git a/src/Wolverine/Runtime/WolverineRuntimeListenerExtensions.cs b/src/Wolverine/Runtime/WolverineRuntimeListenerExtensions.cs
new file mode 100644
index 000000000..63113e92b
--- /dev/null
+++ b/src/Wolverine/Runtime/WolverineRuntimeListenerExtensions.cs
@@ -0,0 +1,63 @@
+using Wolverine.Persistence.Durability;
+
+namespace Wolverine.Runtime;
+
+///
+/// Convenience surface on top of for
+/// registering, removing, and listing the dynamic-listener URIs at runtime
+/// (GH-2685). All three calls delegate straight through to the underlying
+/// — they exist purely so consumers don't have
+/// to reach through runtime.Storage.Listeners in user code.
+///
+/// Registration is durable: once persisted, the listener URI is picked up by
+/// on the cluster's next
+/// assignment cycle (default 30s) and activated on whichever node the
+/// cluster picks. Removal is also durable: the assigned node stops the
+/// listener within one polling interval. Both operations are idempotent.
+///
+/// Requires to be set
+/// at host configuration time. When the flag is off these methods are still
+/// callable but they hit : register/remove
+/// no-op and the all-listeners list is always empty.
+///
+public static class WolverineRuntimeListenerExtensions
+{
+ ///
+ /// Persist as a registered listener that the
+ /// cluster will activate on one node. Idempotent — repeat registrations of
+ /// the same URI are no-ops.
+ ///
+ public static Task RegisterListenerAsync(this IWolverineRuntime runtime, Uri listenerUri,
+ CancellationToken cancellationToken = default)
+ {
+ if (runtime is null) throw new ArgumentNullException(nameof(runtime));
+ if (listenerUri is null) throw new ArgumentNullException(nameof(listenerUri));
+
+ return runtime.Storage.Listeners.RegisterListenerAsync(listenerUri, cancellationToken);
+ }
+
+ ///
+ /// Remove from the registry. Within one
+ /// cluster assignment cycle (default 30s) the assigned node stops the
+ /// listener. Idempotent — removing an unregistered URI is a no-op.
+ ///
+ public static Task RemoveListenerAsync(this IWolverineRuntime runtime, Uri listenerUri,
+ CancellationToken cancellationToken = default)
+ {
+ if (runtime is null) throw new ArgumentNullException(nameof(runtime));
+ if (listenerUri is null) throw new ArgumentNullException(nameof(listenerUri));
+
+ return runtime.Storage.Listeners.RemoveListenerAsync(listenerUri, cancellationToken);
+ }
+
+ ///
+ /// Snapshot of every currently-registered listener URI. Order is
+ /// implementation-defined.
+ ///
+ public static Task> AllRegisteredListenersAsync(this IWolverineRuntime runtime,
+ CancellationToken cancellationToken = default)
+ {
+ if (runtime is null) throw new ArgumentNullException(nameof(runtime));
+ return runtime.Storage.Listeners.AllListenersAsync(cancellationToken);
+ }
+}