Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public OracleMessageStore(DatabaseSettings databaseSettings, DurabilitySettings
public IMessageInbox Inbox => this;
public IMessageOutbox Outbox => this;
public INodeAgentPersistence Nodes => _nodes!;

// Real Oracle-backed listener store wired in once the schema bits land.
// Default no-op keeps the Wolverine boot path clean while
// EnableDynamicListeners is false (no schema, no behavior).
public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance;

public IMessageStoreAdmin Admin => this;
public IDeadLetters DeadLetters => this;
public string Name { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public ValueTask DisposeAsync()
public IMessageInbox Inbox => this;
public IMessageOutbox Outbox => this;
public INodeAgentPersistence Nodes => this;

// Default no-op listener store. CosmosDB-backed listener registry is a
// follow-up implementation; stays a no-op while EnableDynamicListeners
// is false (default).
public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance;

public IMessageStoreAdmin Admin => this;
public IDeadLetters DeadLetters => this;

Expand Down
7 changes: 7 additions & 0 deletions src/Persistence/Wolverine.RDBMS/MessageDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ public void DemoteToAncillary()

public INodeAgentPersistence Nodes { get; }

/// <summary>
/// Set by <see cref="Initialize"/> after <see cref="DurabilitySettings.EnableDynamicListeners"/>
/// is read off the runtime — defaults to <see cref="NullListenerStore.Instance"/>
/// when the flag is off so the listener-registry table is never provisioned.
/// </summary>
public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance;

public IMessageInbox Inbox => this;

public IMessageOutbox Outbox => this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public ValueTask DisposeAsync()
public IMessageInbox Inbox => this;
public IMessageOutbox Outbox => this;
public INodeAgentPersistence Nodes => this;

// Default no-op listener store; real RavenDb-backed listener registry is
// a follow-up implementation. Stays a no-op while EnableDynamicListeners
// is false (default).
public IListenerStore Listeners { get; protected set; } = NullListenerStore.Instance;

public IMessageStoreAdmin Admin => this;
public IDeadLetters DeadLetters => this;
public void Initialize(IWolverineRuntime runtime)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Shouldly;
using Wolverine.Persistence.Durability;
using Xunit;

namespace CoreTests.Persistence.Durability;

/// <summary>
/// Lock down the upgrade-safety contract for the new dynamic-listener registry:
/// the opt-in flag defaults to <c>false</c>, the default
/// <c>IMessageStore.Listeners</c> is the no-op store, and existing apps that
/// bump Wolverine without touching options pay nothing — no schema migration,
/// no behavioural change, no agent family registration.
/// </summary>
public class DynamicListenersDefaultsTests
{
[Fact]
public void enable_dynamic_listeners_defaults_to_false_on_a_fresh_durability_settings()
{
// Direct construction (no host bootstrap) — guards against a future
// ctor or property-initializer regression flipping the default.
new DurabilitySettings().EnableDynamicListeners.ShouldBeFalse();
}

[Fact]
public void enable_dynamic_listeners_defaults_to_false_on_wolverine_options()
{
// Whole-options walk — covers any indirect mutation that
// WolverineOptions' constructor could introduce on Durability.
new WolverineOptions().Durability.EnableDynamicListeners.ShouldBeFalse();
}

[Fact]
public void null_listener_store_register_is_a_no_op()
{
// The default store every IMessageStore.Listeners points at when the
// flag is off. Locks down the no-op contract: no exception, no state.
var store = NullListenerStore.Instance;

store.RegisterListenerAsync(new Uri("mqtt://topic/devices/abc"))
.GetAwaiter().GetResult(); // sync drain — no-op should be sync-completed
}

[Fact]
public async Task null_listener_store_returns_empty_listing()
{
var listeners = await NullListenerStore.Instance.AllListenersAsync();
listeners.ShouldBeEmpty();
}

[Fact]
public async Task null_listener_store_remove_is_a_no_op()
{
await NullListenerStore.Instance.RemoveListenerAsync(new Uri("mqtt://topic/whatever"));
}
}
103 changes: 103 additions & 0 deletions src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,4 +1186,107 @@ public virtual async Task orphaned_incoming_recovery_stamps_envelope_with_origin
"mark-as-handled SQL targets the right database. See GH-2318 / GH-2576.");
}

#region IListenerStore compliance — see GH-2685

/// <summary>
/// Provider opts into the dynamic-listener registry by setting
/// <c>Durability.EnableDynamicListeners = true</c> in its <see cref="BuildCleanHost"/>.
/// Tests below short-circuit when <c>Listeners</c> is the no-op store so providers
/// can adopt the contract incrementally — once a provider opts in, the same suite
/// validates real persistence end-to-end.
/// </summary>
private bool listenersAreSupported => thePersistence.Listeners is not NullListenerStore;

[Fact]
public async Task all_listeners_returns_empty_on_a_clean_store()
{
if (!listenersAreSupported) return;

var listeners = await thePersistence.Listeners.AllListenersAsync();
listeners.ShouldBeEmpty();
}

[Fact]
public async Task register_listener_persists_uri()
{
if (!listenersAreSupported) return;

var uri = new Uri("mqtt://topic/devices/foo/status");
await thePersistence.Listeners.RegisterListenerAsync(uri);

var all = await thePersistence.Listeners.AllListenersAsync();
all.ShouldContain(uri);
}

[Fact]
public async Task register_listener_is_idempotent()
{
if (!listenersAreSupported) return;

var uri = new Uri("mqtt://topic/devices/dup");
await thePersistence.Listeners.RegisterListenerAsync(uri);
await thePersistence.Listeners.RegisterListenerAsync(uri); // no-op, no exception

var all = await thePersistence.Listeners.AllListenersAsync();
all.Count(x => x == uri).ShouldBe(1);
}

[Fact]
public async Task remove_listener_removes_the_uri()
{
if (!listenersAreSupported) return;

var uri = new Uri("mqtt://topic/devices/removable");
await thePersistence.Listeners.RegisterListenerAsync(uri);
await thePersistence.Listeners.RemoveListenerAsync(uri);

var all = await thePersistence.Listeners.AllListenersAsync();
all.ShouldNotContain(uri);
}

[Fact]
public async Task remove_listener_is_idempotent_on_unknown_uri()
{
if (!listenersAreSupported) return;

// Should not throw — the registry tolerates removing a uri that was
// never registered (matches the "register then crash mid-handler"
// recovery shape the runtime API depends on).
await thePersistence.Listeners.RemoveListenerAsync(new Uri("mqtt://topic/never-registered"));
}

[Fact]
public async Task register_then_remove_then_re_register_works()
{
if (!listenersAreSupported) return;

var uri = new Uri("mqtt://topic/devices/cycle");
await thePersistence.Listeners.RegisterListenerAsync(uri);
await thePersistence.Listeners.RemoveListenerAsync(uri);
await thePersistence.Listeners.RegisterListenerAsync(uri);

var all = await thePersistence.Listeners.AllListenersAsync();
all.ShouldContain(uri);
}

[Fact]
public async Task all_listeners_includes_every_registered_uri()
{
if (!listenersAreSupported) return;

var a = new Uri("mqtt://topic/a");
var b = new Uri("mqtt://topic/b");
var c = new Uri("mqtt://topic/c");
await thePersistence.Listeners.RegisterListenerAsync(a);
await thePersistence.Listeners.RegisterListenerAsync(b);
await thePersistence.Listeners.RegisterListenerAsync(c);

var all = await thePersistence.Listeners.AllListenersAsync();
all.ShouldContain(a);
all.ShouldContain(b);
all.ShouldContain(c);
}

#endregion

}
12 changes: 12 additions & 0 deletions src/Wolverine/DurabilitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ internal set
/// </summary>
public TimeSpan CheckAssignmentPeriod { get; set; } = 30.Seconds();

/// <summary>
/// Opt-in switch for the dynamic listener registry: persisted listener URIs that
/// are activated at runtime in addition to the listeners declared statically
/// through <see cref="WolverineOptions"/>. When <c>true</c>, <c>IMessageStore.Listeners</c>
/// is backed by durable storage (and database-backed message stores create their
/// listener registry table on first migration); when <c>false</c> (the default),
/// <c>IMessageStore.Listeners</c> is a no-op store and no listener-registry
/// schema is provisioned. Default is <c>false</c> so users upgrading Wolverine
/// see no schema migration churn.
/// </summary>
public bool EnableDynamicListeners { get; set; } = false;

/// <summary>
/// If using any kind of dynamic multi-tenancy where Wolverine should discover new
/// tenants, this is the polling time. Default is 5 seconds
Expand Down
55 changes: 55 additions & 0 deletions src/Wolverine/Persistence/Durability/IListenerStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace Wolverine.Persistence.Durability;

/// <summary>
/// Persistence contract for the dynamic-listener registry. Stores the set of
/// listener URIs Wolverine should activate at runtime in addition to the
/// listeners declared statically through <c>WolverineOptions</c>. The registry
/// is opt-in via <see cref="DurabilitySettings.EnableDynamicListeners"/> — when
/// disabled, providers must NOT create the backing storage so that users
/// upgrading Wolverine see no schema migration churn.
///
/// The store is transport-agnostic: each entry is a single <see cref="Uri"/>.
/// The pluggable <c>DynamicListenerAgentFamily</c> turns each persisted URI
/// into a runtime listener via the appropriate transport.
/// </summary>
public interface IListenerStore
{
/// <summary>
/// Persist <paramref name="uri"/> as a registered listener. Idempotent — a
/// repeat registration of the same URI is a no-op rather than an error.
/// </summary>
Task RegisterListenerAsync(Uri uri, CancellationToken cancellationToken = default);

/// <summary>
/// Remove <paramref name="uri"/> from the registry. Idempotent — removing a
/// URI that isn't registered is a no-op rather than an error.
/// </summary>
Task RemoveListenerAsync(Uri uri, CancellationToken cancellationToken = default);

/// <summary>
/// Snapshot of every currently-registered listener URI. The order is
/// implementation-defined; callers that need a deterministic ordering should
/// sort the result themselves.
/// </summary>
Task<IReadOnlyList<Uri>> AllListenersAsync(CancellationToken cancellationToken = default);
}

/// <summary>
/// Default no-op listener store. Returned by <see cref="IMessageStore.Listeners"/>
/// when dynamic listeners are disabled (<see cref="DurabilitySettings.EnableDynamicListeners"/>
/// is <c>false</c>) or when the message store has no durable backing (e.g. <c>NullMessageStore</c>
/// in solo-mode-without-DB scenarios). All operations are no-ops or return empty.
/// </summary>
public sealed class NullListenerStore : IListenerStore
{
public static NullListenerStore Instance { get; } = new();

public Task RegisterListenerAsync(Uri uri, CancellationToken cancellationToken = default)
=> Task.CompletedTask;

public Task RemoveListenerAsync(Uri uri, CancellationToken cancellationToken = default)
=> Task.CompletedTask;

public Task<IReadOnlyList<Uri>> AllListenersAsync(CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<Uri>>(Array.Empty<Uri>());
}
11 changes: 11 additions & 0 deletions src/Wolverine/Persistence/Durability/IMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ public interface IMessageStore : IAsyncDisposable

INodeAgentPersistence Nodes { get; }

/// <summary>
/// Registry of dynamic, runtime-registered listener URIs. Backed by a
/// transport-agnostic store (one URI per entry); each URI is turned into
/// an actual listener at runtime by <c>DynamicListenerAgentFamily</c> via
/// the appropriate transport. Opt-in via
/// <see cref="DurabilitySettings.EnableDynamicListeners"/>; providers must
/// return <see cref="NullListenerStore.Instance"/> (and skip schema
/// migrations for the listener table) when the flag is <c>false</c>.
/// </summary>
IListenerStore Listeners { get; }

IMessageStoreAdmin Admin { get; }

IDeadLetters DeadLetters { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ public void Initialize(IWolverineRuntime runtime)
public IDeadLetters DeadLetters => this;
public IScheduledMessages ScheduledMessages => Main.ScheduledMessages;
public INodeAgentPersistence Nodes => this;

// Multi-tenant store delegates dynamic-listener registration to the main
// store. Listener URIs aren't tenant-scoped — registering the same URI
// across tenants would create duplicate listeners — so the master is
// authoritative for the registry.
public IListenerStore Listeners => Main.Listeners;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, that is OK for our situation.

We would then associate the listener data with a tenant as the device will belong to a tenant, but that can happen outside of this process.


public IMessageStoreAdmin Admin => this;

public DatabaseDescriptor Describe()
Expand Down
4 changes: 4 additions & 0 deletions src/Wolverine/Persistence/Durability/NullMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public void Initialize(IWolverineRuntime runtime)
public IScheduledMessages ScheduledMessages => this;
public INodeAgentPersistence Nodes => throw new NotSupportedException();

// No durable backing → no dynamic-listener registry. Solo-mode hosts that
// *do* want dynamic listeners need a real message store.
public IListenerStore Listeners => NullListenerStore.Instance;

public IMessageStoreAdmin Admin => this;

public DatabaseDescriptor Describe()
Expand Down
Loading