Skip to content

[3/N] DynamicListenerAgentFamily — cluster-coordinated dynamic listeners (GH-2685)#2702

Merged
jeremydmiller merged 1 commit intomainfrom
issue-2685-dynamic-listener-agent
May 7, 2026
Merged

[3/N] DynamicListenerAgentFamily — cluster-coordinated dynamic listeners (GH-2685)#2702
jeremydmiller merged 1 commit intomainfrom
issue-2685-dynamic-listener-agent

Conversation

@jeremydmiller
Copy link
Copy Markdown
Member

Summary

Third slice of #2685. Connects the durable listener registry from PR #2700 to the cluster's agent runtime so registered URIs become live listeners on exactly one node — the user-visible surface for "add an MQTT broker at runtime and have the cluster activate it" the original issue called for.

What's in this PR

  • DynamicListenerAgentFamily — re-reads IMessageStore.Listeners.AllListenersAsync() on every assignment cycle (no boot snapshot, so newly registered URIs are picked up within one polling interval, default 30s) and balances the agents across the cluster via assignments.DistributeEvenly(Scheme). Same shape as ExclusiveListenerFamily / StickyPostgresqlQueueListenerAgentFamily.

  • DynamicListenerAgent — single-URI IAgent. StartAsync resolves the listener URI's scheme to a registered transport via Options.Transports.ForScheme(...), asks the transport to materialize an Endpoint from the URI, and hands it to IEndpointCollection.StartListenerAsync. Failure to resolve surfaces as InvalidOperationException with a hint at UseMqtt / UseRabbitMq / etc.

  • DynamicListenerUriEncoding — lossless round-trip between listener URI and agent URI under the reserved wolverine-dynamic-listener:/// scheme. Empty authority + percent-encoded path keeps the embedded :// from colliding with the agent URI's own parsing. Determinism is required — the cluster keys agents by URI, so the encoding has to produce the same agent URI for the same listener URI on every poll.

  • NodeAgentController registers the family when both Durability.Mode == Balanced AND Durability.EnableDynamicListeners == true. Off by default. Solo / MediatorOnly / Serverless modes don't get cluster assignment so the family wouldn't help them anyway.

  • IWolverineRuntime.RegisterListenerAsync / RemoveListenerAsync / AllRegisteredListenersAsync extension methods so user code calls runtime.RegisterListenerAsync(uri) rather than reaching through runtime.Storage.Listeners. Thin pass-through.

Test plan

  • DynamicListenerUriEncodingTests — 9/9. Parameterized round-trip cases (MQTT topic wildcards, userinfo+port, query strings) + determinism + scheme-mismatch + empty-path guards.
  • DynamicListenerAgentFamilyTests — 6/6. Empty-store → empty agents; listener URIs project through encoder; store re-read on every call (no caching); BuildAgentAsync decodes back to a DynamicListenerAgent.
  • WolverineRuntimeListenerExtensionsTests — 7/7. Pass-through contract + arg-null guards.
  • dynamic_listener_agent_lifecycle_integration — 3/3. Real WolverineHost in solo mode; drives an agent through Start/Stop; asserts listener actually goes live via runtime.Endpoints.FindListeningAgent(uri). Covers no-transport error path + "stop before start" tolerance.
  • CoreTests Runtime.Agents: 88/88.
  • CoreTests Persistence.Durability: 17/17 (foundation defaults unchanged).
  • PostgresqlTests PostgresqlMessageStoreTests: 48/48 (PR [2/N] real RDBMS-backed IListenerStore (GH-2685) #2700 storage still green).

Deferred to follow-up PR (final piece of #2685)

  • Marten-backed IListenerStore.
  • MQTT-specific multiplexed listener (one shared MqttListener + BufferedReceiver, N broker subscriptions) — Lau's 10K-IoT case.
  • IWolverineRuntime topic-name → Uri convenience extensions for MQTT.
  • Documentation in MQTT and Durability guides.

🤖 Generated with Claude Code

…stered listener URIs [3/N]

Third slice of GH-2685. The previous PRs (#2699 foundation, #2700 RDBMS
storage) gave us a durable place to persist listener URIs and the
opt-in flag to gate it. This PR connects that storage to the cluster's
agent runtime so registered URIs become live listeners on exactly one
node — the user-visible API for "add an MQTT broker at runtime and
have the cluster activate it" the original issue called for.

What's in this PR:

- New `Wolverine.Runtime.Agents.DynamicListenerAgentFamily` —
  `IAgentFamily` implementation that re-reads
  `IMessageStore.Listeners.AllListenersAsync()` on every assignment
  cycle (no boot-time snapshot, so newly registered URIs are picked
  up within one polling interval, default 30s) and balances the
  resulting agents across the cluster via
  `assignments.DistributeEvenly(Scheme)`. Same pattern as the
  existing `ExclusiveListenerFamily` and
  `StickyPostgresqlQueueListenerAgentFamily`.

- New `Wolverine.Runtime.Agents.DynamicListenerAgent` — single-URI
  `IAgent` whose StartAsync resolves the listener URI's scheme to a
  registered transport via `Options.Transports.ForScheme(...)`,
  asks the transport to materialize an `Endpoint` from the URI, and
  hands it to `IEndpointCollection.StartListenerAsync`. Failure to
  resolve the transport surfaces as a hard `InvalidOperationException`
  with a hint pointing at `UseMqtt` / `UseRabbitMq` / etc., so a
  misconfigured host doesn't silently never run the listener.

- New `Wolverine.Runtime.Agents.DynamicListenerUriEncoding` — lossless
  round-trip between a listener URI and an agent URI under the
  reserved `wolverine-dynamic-listener:///` scheme (percent-encoded
  payload as the path, empty authority so embedded `://` in the
  listener URI doesn't collide with the agent URI's own parsing).
  The cluster keys agents by their URI, so the encoding has to be
  deterministic — covered by parameterized round-trip tests.

- `NodeAgentController` registers the family when both
  `Durability.Mode == Balanced` AND
  `Durability.EnableDynamicListeners == true`. Off by default;
  Solo / MediatorOnly / Serverless modes don't get cluster
  assignment so the family wouldn't help them anyway.

- New `IWolverineRuntime.RegisterListenerAsync` /
  `RemoveListenerAsync` / `AllRegisteredListenersAsync` extension
  methods in `Wolverine.Runtime.WolverineRuntimeListenerExtensions`
  so user code calls `runtime.RegisterListenerAsync(uri)` rather
  than `runtime.Storage.Listeners.RegisterListenerAsync(uri)`.
  Thin pass-through; no transformation — the public surface is just
  to keep dependency reach narrower in user code.

Tests:

- `DynamicListenerUriEncodingTests` — 6 round-trip cases covering
  MQTT topic wildcards, userinfo + port, query strings; plus
  determinism + scheme-mismatch + empty-path guards. 9/9.
- `DynamicListenerAgentFamilyTests` — verifies empty-store →
  empty agents, listener URIs project through the encoder, the
  store is re-read on every call (no caching), and BuildAgentAsync
  decodes back to a `DynamicListenerAgent`. 6/6.
- `WolverineRuntimeListenerExtensionsTests` — pins the public
  pass-through contract and argument-null guards. 7/7.
- `dynamic_listener_agent_lifecycle_integration` — spins up a real
  WolverineHost in solo mode, drives an agent through Start/Stop,
  asserts the listener actually goes live via
  `runtime.Endpoints.FindListeningAgent(uri)`. Also covers the
  no-transport error path and the "stop before start" tolerance
  needed for cluster reassignment edge cases. 3/3.

Regression checks:
- CoreTests `Runtime.Agents`: 88/88 (existing agent family suite
  plus the new 25 dynamic-listener tests).
- CoreTests `Persistence.Durability`: 17/17 (foundation defaults
  unchanged).
- PostgresqlTests `PostgresqlMessageStoreTests`: 48/48
  (RDBMS-backed listener store from PR #2700 still green).

Deferred to follow-up:
- Marten-backed `IListenerStore`.
- MQTT-specific multiplexed listener (one shared MqttListener +
  BufferedReceiver, N broker subscriptions) for the 10K-IoT case.
- IWolverineRuntime topic-name → Uri convenience extensions.
- Documentation in MQTT and Durability guides.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jeremydmiller jeremydmiller marked this pull request as ready for review May 7, 2026 18:52
@jeremydmiller jeremydmiller merged commit c764d25 into main May 7, 2026
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant