diff --git a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.cs b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.cs index 274f0392c..e536e25a5 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.cs +++ b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.cs @@ -113,6 +113,12 @@ public OracleMessageStore(DatabaseSettings databaseSettings, DurabilitySettings public IMessageInbox Inbox => this; public IMessageOutbox Outbox => this; public INodeAgentPersistence Nodes => _nodes!; + + // Real Oracle-backed listener store wired in once the schema bits land. + // Default no-op keeps the Wolverine boot path clean while + // EnableDynamicListeners is false (no schema, no behavior). + public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance; + public IMessageStoreAdmin Admin => this; public IDeadLetters DeadLetters => this; public string Name { get; set; } diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.cs index b825ee98a..f0fdb1e22 100644 --- a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.cs +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.cs @@ -66,6 +66,12 @@ public ValueTask DisposeAsync() public IMessageInbox Inbox => this; public IMessageOutbox Outbox => this; public INodeAgentPersistence Nodes => this; + + // Default no-op listener store. CosmosDB-backed listener registry is a + // follow-up implementation; stays a no-op while EnableDynamicListeners + // is false (default). + public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance; + public IMessageStoreAdmin Admin => this; public IDeadLetters DeadLetters => this; diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs index eb91a37d3..efdd325b4 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs @@ -127,6 +127,13 @@ public void DemoteToAncillary() public INodeAgentPersistence Nodes { get; } + /// + /// Set by after + /// is read off the runtime — defaults to + /// when the flag is off so the listener-registry table is never provisioned. + /// + public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance; + public IMessageInbox Inbox => this; public IMessageOutbox Outbox => this; diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs index c430a70cb..a740fd895 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs @@ -60,6 +60,12 @@ public ValueTask DisposeAsync() public IMessageInbox Inbox => this; public IMessageOutbox Outbox => this; public INodeAgentPersistence Nodes => this; + + // Default no-op listener store; real RavenDb-backed listener registry is + // a follow-up implementation. Stays a no-op while EnableDynamicListeners + // is false (default). + public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance; + public IMessageStoreAdmin Admin => this; public IDeadLetters DeadLetters => this; public void Initialize(IWolverineRuntime runtime) diff --git a/src/Testing/CoreTests/Persistence/Durability/DynamicListenersDefaultsTests.cs b/src/Testing/CoreTests/Persistence/Durability/DynamicListenersDefaultsTests.cs new file mode 100644 index 000000000..475404dda --- /dev/null +++ b/src/Testing/CoreTests/Persistence/Durability/DynamicListenersDefaultsTests.cs @@ -0,0 +1,55 @@ +using Shouldly; +using Wolverine.Persistence.Durability; +using Xunit; + +namespace CoreTests.Persistence.Durability; + +/// +/// Lock down the upgrade-safety contract for the new dynamic-listener registry: +/// the opt-in flag defaults to false, the default +/// IMessageStore.Listeners is the no-op store, and existing apps that +/// bump Wolverine without touching options pay nothing — no schema migration, +/// no behavioural change, no agent family registration. +/// +public class DynamicListenersDefaultsTests +{ + [Fact] + public void enable_dynamic_listeners_defaults_to_false_on_a_fresh_durability_settings() + { + // Direct construction (no host bootstrap) — guards against a future + // ctor or property-initializer regression flipping the default. + new DurabilitySettings().EnableDynamicListeners.ShouldBeFalse(); + } + + [Fact] + public void enable_dynamic_listeners_defaults_to_false_on_wolverine_options() + { + // Whole-options walk — covers any indirect mutation that + // WolverineOptions' constructor could introduce on Durability. + new WolverineOptions().Durability.EnableDynamicListeners.ShouldBeFalse(); + } + + [Fact] + public void null_listener_store_register_is_a_no_op() + { + // The default store every IMessageStore.Listeners points at when the + // flag is off. Locks down the no-op contract: no exception, no state. + var store = NullListenerStore.Instance; + + store.RegisterListenerAsync(new Uri("mqtt://topic/devices/abc")) + .GetAwaiter().GetResult(); // sync drain — no-op should be sync-completed + } + + [Fact] + public async Task null_listener_store_returns_empty_listing() + { + var listeners = await NullListenerStore.Instance.AllListenersAsync(); + listeners.ShouldBeEmpty(); + } + + [Fact] + public async Task null_listener_store_remove_is_a_no_op() + { + await NullListenerStore.Instance.RemoveListenerAsync(new Uri("mqtt://topic/whatever")); + } +} diff --git a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs index 4a4cbcd4c..f9066faa5 100644 --- a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs @@ -1186,4 +1186,107 @@ public virtual async Task orphaned_incoming_recovery_stamps_envelope_with_origin "mark-as-handled SQL targets the right database. See GH-2318 / GH-2576."); } + #region IListenerStore compliance — see GH-2685 + + /// + /// Provider opts into the dynamic-listener registry by setting + /// Durability.EnableDynamicListeners = true in its . + /// Tests below short-circuit when Listeners is the no-op store so providers + /// can adopt the contract incrementally — once a provider opts in, the same suite + /// validates real persistence end-to-end. + /// + private bool listenersAreSupported => thePersistence.Listeners is not NullListenerStore; + + [Fact] + public async Task all_listeners_returns_empty_on_a_clean_store() + { + if (!listenersAreSupported) return; + + var listeners = await thePersistence.Listeners.AllListenersAsync(); + listeners.ShouldBeEmpty(); + } + + [Fact] + public async Task register_listener_persists_uri() + { + if (!listenersAreSupported) return; + + var uri = new Uri("mqtt://topic/devices/foo/status"); + await thePersistence.Listeners.RegisterListenerAsync(uri); + + var all = await thePersistence.Listeners.AllListenersAsync(); + all.ShouldContain(uri); + } + + [Fact] + public async Task register_listener_is_idempotent() + { + if (!listenersAreSupported) return; + + var uri = new Uri("mqtt://topic/devices/dup"); + await thePersistence.Listeners.RegisterListenerAsync(uri); + await thePersistence.Listeners.RegisterListenerAsync(uri); // no-op, no exception + + var all = await thePersistence.Listeners.AllListenersAsync(); + all.Count(x => x == uri).ShouldBe(1); + } + + [Fact] + public async Task remove_listener_removes_the_uri() + { + if (!listenersAreSupported) return; + + var uri = new Uri("mqtt://topic/devices/removable"); + await thePersistence.Listeners.RegisterListenerAsync(uri); + await thePersistence.Listeners.RemoveListenerAsync(uri); + + var all = await thePersistence.Listeners.AllListenersAsync(); + all.ShouldNotContain(uri); + } + + [Fact] + public async Task remove_listener_is_idempotent_on_unknown_uri() + { + if (!listenersAreSupported) return; + + // Should not throw — the registry tolerates removing a uri that was + // never registered (matches the "register then crash mid-handler" + // recovery shape the runtime API depends on). + await thePersistence.Listeners.RemoveListenerAsync(new Uri("mqtt://topic/never-registered")); + } + + [Fact] + public async Task register_then_remove_then_re_register_works() + { + if (!listenersAreSupported) return; + + var uri = new Uri("mqtt://topic/devices/cycle"); + await thePersistence.Listeners.RegisterListenerAsync(uri); + await thePersistence.Listeners.RemoveListenerAsync(uri); + await thePersistence.Listeners.RegisterListenerAsync(uri); + + var all = await thePersistence.Listeners.AllListenersAsync(); + all.ShouldContain(uri); + } + + [Fact] + public async Task all_listeners_includes_every_registered_uri() + { + if (!listenersAreSupported) return; + + var a = new Uri("mqtt://topic/a"); + var b = new Uri("mqtt://topic/b"); + var c = new Uri("mqtt://topic/c"); + await thePersistence.Listeners.RegisterListenerAsync(a); + await thePersistence.Listeners.RegisterListenerAsync(b); + await thePersistence.Listeners.RegisterListenerAsync(c); + + var all = await thePersistence.Listeners.AllListenersAsync(); + all.ShouldContain(a); + all.ShouldContain(b); + all.ShouldContain(c); + } + + #endregion + } \ No newline at end of file diff --git a/src/Wolverine/DurabilitySettings.cs b/src/Wolverine/DurabilitySettings.cs index ede09b5ae..6c723bad7 100644 --- a/src/Wolverine/DurabilitySettings.cs +++ b/src/Wolverine/DurabilitySettings.cs @@ -199,6 +199,18 @@ internal set /// public TimeSpan CheckAssignmentPeriod { get; set; } = 30.Seconds(); + /// + /// Opt-in switch for the dynamic listener registry: persisted listener URIs that + /// are activated at runtime in addition to the listeners declared statically + /// through . When true, IMessageStore.Listeners + /// is backed by durable storage (and database-backed message stores create their + /// listener registry table on first migration); when false (the default), + /// IMessageStore.Listeners is a no-op store and no listener-registry + /// schema is provisioned. Default is false so users upgrading Wolverine + /// see no schema migration churn. + /// + public bool EnableDynamicListeners { get; set; } = false; + /// /// If using any kind of dynamic multi-tenancy where Wolverine should discover new /// tenants, this is the polling time. Default is 5 seconds diff --git a/src/Wolverine/Persistence/Durability/IListenerStore.cs b/src/Wolverine/Persistence/Durability/IListenerStore.cs new file mode 100644 index 000000000..00b054db7 --- /dev/null +++ b/src/Wolverine/Persistence/Durability/IListenerStore.cs @@ -0,0 +1,55 @@ +namespace Wolverine.Persistence.Durability; + +/// +/// Persistence contract for the dynamic-listener registry. Stores the set of +/// listener URIs Wolverine should activate at runtime in addition to the +/// listeners declared statically through WolverineOptions. The registry +/// is opt-in via — when +/// disabled, providers must NOT create the backing storage so that users +/// upgrading Wolverine see no schema migration churn. +/// +/// The store is transport-agnostic: each entry is a single . +/// The pluggable DynamicListenerAgentFamily turns each persisted URI +/// into a runtime listener via the appropriate transport. +/// +public interface IListenerStore +{ + /// + /// Persist as a registered listener. Idempotent — a + /// repeat registration of the same URI is a no-op rather than an error. + /// + Task RegisterListenerAsync(Uri uri, CancellationToken cancellationToken = default); + + /// + /// Remove from the registry. Idempotent — removing a + /// URI that isn't registered is a no-op rather than an error. + /// + Task RemoveListenerAsync(Uri uri, CancellationToken cancellationToken = default); + + /// + /// Snapshot of every currently-registered listener URI. The order is + /// implementation-defined; callers that need a deterministic ordering should + /// sort the result themselves. + /// + Task> AllListenersAsync(CancellationToken cancellationToken = default); +} + +/// +/// Default no-op listener store. Returned by +/// when dynamic listeners are disabled ( +/// is false) or when the message store has no durable backing (e.g. NullMessageStore +/// in solo-mode-without-DB scenarios). All operations are no-ops or return empty. +/// +public sealed class NullListenerStore : IListenerStore +{ + public static NullListenerStore Instance { get; } = new(); + + public Task RegisterListenerAsync(Uri uri, CancellationToken cancellationToken = default) + => Task.CompletedTask; + + public Task RemoveListenerAsync(Uri uri, CancellationToken cancellationToken = default) + => Task.CompletedTask; + + public Task> AllListenersAsync(CancellationToken cancellationToken = default) + => Task.FromResult>(Array.Empty()); +} diff --git a/src/Wolverine/Persistence/Durability/IMessageStore.cs b/src/Wolverine/Persistence/Durability/IMessageStore.cs index 403b5c803..7aca74b3a 100644 --- a/src/Wolverine/Persistence/Durability/IMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/IMessageStore.cs @@ -98,6 +98,17 @@ public interface IMessageStore : IAsyncDisposable INodeAgentPersistence Nodes { get; } + /// + /// Registry of dynamic, runtime-registered listener URIs. Backed by a + /// transport-agnostic store (one URI per entry); each URI is turned into + /// an actual listener at runtime by DynamicListenerAgentFamily via + /// the appropriate transport. Opt-in via + /// ; providers must + /// return (and skip schema + /// migrations for the listener table) when the flag is false. + /// + IListenerStore Listeners { get; } + IMessageStoreAdmin Admin { get; } IDeadLetters DeadLetters { get; } diff --git a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs index 1937fea94..6566c3648 100644 --- a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs @@ -358,6 +358,13 @@ public void Initialize(IWolverineRuntime runtime) public IDeadLetters DeadLetters => this; public IScheduledMessages ScheduledMessages => Main.ScheduledMessages; public INodeAgentPersistence Nodes => this; + + // Multi-tenant store delegates dynamic-listener registration to the main + // store. Listener URIs aren't tenant-scoped — registering the same URI + // across tenants would create duplicate listeners — so the master is + // authoritative for the registry. + public IListenerStore Listeners => Main.Listeners; + public IMessageStoreAdmin Admin => this; public DatabaseDescriptor Describe() diff --git a/src/Wolverine/Persistence/Durability/NullMessageStore.cs b/src/Wolverine/Persistence/Durability/NullMessageStore.cs index 1dce8a885..c844d112c 100644 --- a/src/Wolverine/Persistence/Durability/NullMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/NullMessageStore.cs @@ -145,6 +145,10 @@ public void Initialize(IWolverineRuntime runtime) public IScheduledMessages ScheduledMessages => this; public INodeAgentPersistence Nodes => throw new NotSupportedException(); + // No durable backing → no dynamic-listener registry. Solo-mode hosts that + // *do* want dynamic listeners need a real message store. + public IListenerStore Listeners => NullListenerStore.Instance; + public IMessageStoreAdmin Admin => this; public DatabaseDescriptor Describe()