[1/N] dynamic listeners — foundation only (GH-2685)#2699
Merged
jeremydmiller merged 1 commit intomainfrom May 7, 2026
Merged
Conversation
First slice of GH-2685. Establishes the contract surface every IMessageStore implementation will satisfy in subsequent PRs, but ships zero behaviour change for existing apps — the opt-in flag defaults to false and every existing IMessageStore returns NullListenerStore.Instance. What's in this PR (foundation only): - New IListenerStore abstraction in Wolverine.Persistence.Durability. Three Uri-based operations: RegisterListenerAsync, RemoveListenerAsync, AllListenersAsync. Idempotent register and remove. Transport-agnostic — every entry is a single Uri so the same registry shape works for MQTT, RabbitMQ, Kafka, Service Bus, etc. - IMessageStore.Listeners property added to the contract; every existing implementation (MultiTenantedMessageStore, NullMessageStore, MessageDatabase, OracleMessageStore, RavenDbMessageStore, CosmosDbMessageStore) returns NullListenerStore.Instance for now. The multi-tenant store delegates to its main store — listener URIs are not tenant-scoped. - New WolverineOptions.Durability.EnableDynamicListeners flag, default false. Database-backed message stores must NOT create the listener table unless this flag is set, so users upgrading Wolverine see no schema migration churn. - Compliance contract added to MessageStoreCompliance: all_listeners_returns_empty_on_a_clean_store, register_listener_persists_uri, register_listener_is_idempotent, remove_listener_removes_the_uri, remove_listener_is_idempotent_on_unknown_uri, register_then_remove_then_re_register_works, all_listeners_includes_every_registered_uri. Each test pass-throughs when thePersistence.Listeners is the no-op store, so providers can adopt the contract incrementally — once a provider opts in (sets Durability.EnableDynamicListeners = true and swaps Listeners to a real impl), the same suite validates real persistence end-to-end. - DynamicListenersDefaultsTests: locks down that the opt-in flag defaults to false on a fresh DurabilitySettings AND on a fresh WolverineOptions, plus that NullListenerStore.Register/Remove/AllListeners are no-ops. 5/5 passing. What's deferred to follow-up PRs (referenced in GH-2685): - Real RDBMS-backed IListenerStore in Wolverine.RDBMS that all five RDBMS providers (Postgres, SqlServer, MySql, Oracle, Sqlite) inherit. - Marten-backed IListenerStore. - DynamicListenerAgentFamily for cluster coordination + opt-in registration in UseWolverine. - MQTT-specific multiplexed listener (one shared MqttListener + BufferedReceiver, N broker subscriptions) for the 10K-IoT-device scaling case from Lau's comment. - IWolverineRuntime extension methods for MQTT topic-name → Uri convenience. - Documentation in the MQTT and Durability guides. - Provider impls for RavenDb, CosmosDb, Polecat (separate follow-up issue). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
uniquelau
approved these changes
May 7, 2026
| // store. Listener URIs aren't tenant-scoped — registering the same URI | ||
| // across tenants would create duplicate listeners — so the master is | ||
| // authoritative for the registry. | ||
| public IListenerStore Listeners => Main.Listeners; |
Contributor
There was a problem hiding this comment.
Understood, that is OK for our situation.
We would then associate the listener data with a tenant as the device will belong to a tenant, but that can happen outside of this process.
Member
Author
|
@uniquelau It's going to take a couple follow ups, this isn't the whole show:) |
6 tasks
jeremydmiller
added a commit
that referenced
this pull request
May 7, 2026
Second slice of GH-2685. Implements the durable listener registry behind IMessageStore.Listeners for the five RDBMS providers, gated on the existing Durability.EnableDynamicListeners opt-in. PR #2699 established the contract surface and a NullListenerStore default — this PR makes the same compliance suite go green against real persistence. What's in this PR (storage only, no agent yet): - New `wolverine_listeners` table. Single uri column, primary key on uri. Cross-provider DDL via Weasel Table abstractions: * Postgres / SqlServer / MySQL: `string` uri PK * Sqlite: `TEXT` PK * Oracle: `VARCHAR2(500)` PK (uppercase identifier) Provisioned only when `Durability.EnableDynamicListeners == true` AND `Role == MessageStoreRole.Main`. Each provider's `AllObjects()` yields the table conditionally so existing apps see no migration churn on upgrade. - `RdbmsListenerStore` in Wolverine.RDBMS/DynamicListeners. Used by the four `MessageDatabase<T>` subclasses (Postgres, SqlServer, MySQL, Sqlite). Cross-provider SQL with `@`-prefixed bind variables via `DbDataSource.CreateCommand`. Idempotent register uses a try-INSERT / swallow-unique-violation pattern: catches DbException through the existing `IsDuplicateEnvelopeException` per-provider predicate (renamed internally to `IsUniqueConstraintViolation` for clarity since the same predicate now classifies envelope and listener uniqueness violations equivalently). Idempotent remove is a plain DELETE — naturally idempotent on missing rows. - `OracleListenerStore` in Wolverine.Oracle. Oracle's managed driver doesn't ship a native `DbDataSource` and Wolverine.Oracle uses a thin wrapper plus `:`-prefixed bind variables, so the shared RdbmsListenerStore isn't a clean fit. Mirrors the contract using the existing `OpenConnectionAsync` + `conn.CreateCommand` idiom from `OracleNodePersistence`. Same try-insert / swallow- ORA-00001 pattern for idempotent register. - `MessageDatabase<T>` ctor now wires the listener store after Role is determined, via a new virtual `BuildListenerStore()` factory hook. Default returns `RdbmsListenerStore`; OracleMessageStore doesn't inherit from MessageDatabase<T> so it wires `OracleListenerStore` directly in its own ctor. - `MessageDatabase.Admin.truncateEnvelopeDataAsync` now also clears the wolverine_listeners table when the flag is on, so test hosts that call `ResetResourceState` / `ClearAllAsync` get a truly clean slate between tests. - The four base-provider compliance hosts (`PostgresqlMessageStoreTests`, `SqlServerMessageStoreTests`, `SqliteMessageStoreTests`) now flip `EnableDynamicListeners = true` in their `BuildCleanHost()`. This activates the seven listener-store compliance tests that were short-circuited in PR #2699 — the same suite now validates real persistence end-to-end on each provider. What's deferred to follow-up PRs (still tracked in GH-2685): - DynamicListenerAgentFamily for cluster coordination + opt-in registration in UseWolverine. - Marten-backed IListenerStore. - MQTT-specific multiplexed listener (one shared MqttListener + BufferedReceiver, N broker subscriptions). - IWolverineRuntime topic-name → Uri convenience extensions. - Documentation in MQTT and Durability guides. - Provider impls for RavenDb, CosmosDb, Polecat (separate follow-up issue). Tests: - PostgresqlMessageStoreTests: 48/48 on net9.0 (Postgres on :5433). - SqlServerMessageStoreTests: 49/49 on net9.0 (SqlServer on :1434). - SqliteMessageStoreTests: 48/48 on net9.0 (in-memory). - CoreTests Durability suite: 17/17 (DynamicListenersDefaultsTests still passing, as expected — the no-op path is unchanged). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jeremydmiller
added a commit
that referenced
this pull request
May 7, 2026
…stered listener URIs [3/N] Third slice of GH-2685. The previous PRs (#2699 foundation, #2700 RDBMS storage) gave us a durable place to persist listener URIs and the opt-in flag to gate it. This PR connects that storage to the cluster's agent runtime so registered URIs become live listeners on exactly one node — the user-visible API for "add an MQTT broker at runtime and have the cluster activate it" the original issue called for. What's in this PR: - New `Wolverine.Runtime.Agents.DynamicListenerAgentFamily` — `IAgentFamily` implementation that re-reads `IMessageStore.Listeners.AllListenersAsync()` on every assignment cycle (no boot-time snapshot, so newly registered URIs are picked up within one polling interval, default 30s) and balances the resulting agents across the cluster via `assignments.DistributeEvenly(Scheme)`. Same pattern as the existing `ExclusiveListenerFamily` and `StickyPostgresqlQueueListenerAgentFamily`. - New `Wolverine.Runtime.Agents.DynamicListenerAgent` — single-URI `IAgent` whose StartAsync resolves the listener URI's scheme to a registered transport via `Options.Transports.ForScheme(...)`, asks the transport to materialize an `Endpoint` from the URI, and hands it to `IEndpointCollection.StartListenerAsync`. Failure to resolve the transport surfaces as a hard `InvalidOperationException` with a hint pointing at `UseMqtt` / `UseRabbitMq` / etc., so a misconfigured host doesn't silently never run the listener. - New `Wolverine.Runtime.Agents.DynamicListenerUriEncoding` — lossless round-trip between a listener URI and an agent URI under the reserved `wolverine-dynamic-listener:///` scheme (percent-encoded payload as the path, empty authority so embedded `://` in the listener URI doesn't collide with the agent URI's own parsing). The cluster keys agents by their URI, so the encoding has to be deterministic — covered by parameterized round-trip tests. - `NodeAgentController` registers the family when both `Durability.Mode == Balanced` AND `Durability.EnableDynamicListeners == true`. Off by default; Solo / MediatorOnly / Serverless modes don't get cluster assignment so the family wouldn't help them anyway. - New `IWolverineRuntime.RegisterListenerAsync` / `RemoveListenerAsync` / `AllRegisteredListenersAsync` extension methods in `Wolverine.Runtime.WolverineRuntimeListenerExtensions` so user code calls `runtime.RegisterListenerAsync(uri)` rather than `runtime.Storage.Listeners.RegisterListenerAsync(uri)`. Thin pass-through; no transformation — the public surface is just to keep dependency reach narrower in user code. Tests: - `DynamicListenerUriEncodingTests` — 6 round-trip cases covering MQTT topic wildcards, userinfo + port, query strings; plus determinism + scheme-mismatch + empty-path guards. 9/9. - `DynamicListenerAgentFamilyTests` — verifies empty-store → empty agents, listener URIs project through the encoder, the store is re-read on every call (no caching), and BuildAgentAsync decodes back to a `DynamicListenerAgent`. 6/6. - `WolverineRuntimeListenerExtensionsTests` — pins the public pass-through contract and argument-null guards. 7/7. - `dynamic_listener_agent_lifecycle_integration` — spins up a real WolverineHost in solo mode, drives an agent through Start/Stop, asserts the listener actually goes live via `runtime.Endpoints.FindListeningAgent(uri)`. Also covers the no-transport error path and the "stop before start" tolerance needed for cluster reassignment edge cases. 3/3. Regression checks: - CoreTests `Runtime.Agents`: 88/88 (existing agent family suite plus the new 25 dynamic-listener tests). - CoreTests `Persistence.Durability`: 17/17 (foundation defaults unchanged). - PostgresqlTests `PostgresqlMessageStoreTests`: 48/48 (RDBMS-backed listener store from PR #2700 still green). Deferred to follow-up: - Marten-backed `IListenerStore`. - MQTT-specific multiplexed listener (one shared MqttListener + BufferedReceiver, N broker subscriptions) for the 10K-IoT case. - IWolverineRuntime topic-name → Uri convenience extensions. - Documentation in MQTT and Durability guides. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
First slice of #2685. Foundation only — establishes the contract surface every
IMessageStoreimplementation will satisfy in subsequent PRs, but ships zero behaviour change for existing apps. The opt-in flag defaults tofalseand every existingIMessageStorereturnsNullListenerStore.Instance.What lands in this PR
IListenerStoreabstraction inWolverine.Persistence.Durability. Three URI-based operations:RegisterListenerAsync,RemoveListenerAsync,AllListenersAsync. Idempotent register/remove. Transport-agnostic — every entry is a singleUriso the same registry shape works for MQTT, RabbitMQ, Kafka, Service Bus, etc.IMessageStore.Listenersproperty added to the contract; every existing implementation returnsNullListenerStore.Instancefor now. The multi-tenant store delegates to its main store (listener URIs aren't tenant-scoped).WolverineOptions.Durability.EnableDynamicListenersflag, defaultfalse. Database-backed stores must NOT create the listener table unless the flag is set — users upgrading Wolverine see no schema migration churn.MessageStoreCompliance(7 new tests): empty-on-clean, register persists, register is idempotent, remove removes, remove is idempotent on unknown URI, register-then-remove-then-re-register, all-listeners-includes-every-registered. Each test pass-throughs whenthePersistence.Listeners is NullListenerStoreso providers adopt the contract incrementally — once a provider opts in, the same suite validates real persistence end-to-end.DynamicListenersDefaultsTestslocks down that the opt-in flag defaults tofalseon a freshDurabilitySettingsAND on a freshWolverineOptions, plus the no-op contract onNullListenerStore. 5/5 passing.What's deferred to subsequent PRs (still tracked by #2685)
IListenerStoreinWolverine.RDBMS(covers Postgres / SqlServer / MySql / Oracle / Sqlite via the shared base) + opt-in schema gate in each provider'sAllObjects().IListenerStore.DynamicListenerAgentFamilyfor cluster coordination, with Solo-mode local fan-out, registered conditionally duringUseWolverine(not in theWolverineOptionsctor — to avoid race/order-of-operations issues).MqttListener+ oneBufferedReceiver, N broker subscriptions, topic-name routing insideReceiveAsync. Targets the 10K-IoT-device scaling case from @uniquelau's comment on Ability to Dynamically Add Mqtt Brokers as Listeners at Runtime #2685; staticListenToMqttTopic(...)callers are unaffected.IWolverineRuntimeextension methods for MQTT topic-name → URI convenience.Naming decisions baked in
IMessageStore.Listeners : IListenerStoreper @jeremydmiller's directionRegisterListenerAsync()/RemoveListenerAsync()/AllListenersAsync()Durability.EnableDynamicListeners(boolean property style, matchesDurability.NodeAssignmentHealthCheckTracingEnabledprecedent)MessageStoreCompliance(not a new file)Test plan
dotnet test src/Testing/CoreTests --filter DynamicListenersDefaultsTests— 5/5 passedWolverine.RDBMS,Wolverine.Postgresql,Wolverine.SqlServer,Wolverine.MySql,Wolverine.Oracle,Wolverine.Sqlite,Wolverine.Marten,Wolverine.RavenDb,Wolverine.CosmosDb,Wolverine.Polecat)Listeners is NullListenerStore(i.e. everywhere, until provider impls land)Why this is shipped as a draft
This is staged work; the headline feature for #2685 (Lau's IoT broker case) needs the deferred pieces to actually function. The foundation is shipping first to:
Marking
[1/N]so the staged plan is visible. Promote to ready-for-review once the next PR lands the real RDBMS + Marten + agent + MQTT pieces.🤖 Generated with Claude Code