Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using NSubstitute;
using Shouldly;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime.Agents;
using Xunit;

namespace CoreTests.Runtime.Agents.DynamicListeners;

/// <summary>
/// Unit coverage for <see cref="DynamicListenerAgentFamily"/> — verifies the
/// family reads from <see cref="IListenerStore"/> on every assignment cycle
/// (no cached snapshot), produces stable agent URIs that round-trip back to
/// the original listener URI, and balances them via
/// <c>assignments.DistributeEvenly(Scheme)</c>. Pairs with the
/// <see cref="DynamicListenerUriEncodingTests"/> round-trip coverage.
/// </summary>
public class DynamicListenerAgentFamilyTests
{
private readonly MockWolverineRuntime _runtime;
private readonly IListenerStore _listenerStore;
private readonly DynamicListenerAgentFamily _family;

public DynamicListenerAgentFamilyTests()
{
_runtime = new MockWolverineRuntime();
_listenerStore = Substitute.For<IListenerStore>();
_runtime.Storage.Listeners.Returns(_listenerStore);

_family = new DynamicListenerAgentFamily(_runtime);
}

[Fact]
public void scheme_is_the_dynamic_listener_uri_scheme()
{
// The family's Scheme is the dictionary key in NodeAgentController's
// _agentFamilies; it must match the scheme of every agent URI the
// family hands out so dispatch from a URI back to the family works.
_family.Scheme.ShouldBe(DynamicListenerUriEncoding.SchemeName);
}

[Fact]
public async Task all_known_agents_returns_empty_when_store_is_empty()
{
_listenerStore.AllListenersAsync(Arg.Any<CancellationToken>())
.Returns(Task.FromResult<IReadOnlyList<Uri>>(Array.Empty<Uri>()));

var agents = await _family.AllKnownAgentsAsync();

agents.ShouldBeEmpty();
}

[Fact]
public async Task all_known_agents_projects_each_listener_uri_through_encoder()
{
var listenerA = new Uri("mqtt://broker/topic-a");
var listenerB = new Uri("mqtt://broker/topic-b");
_listenerStore.AllListenersAsync(Arg.Any<CancellationToken>())
.Returns(Task.FromResult<IReadOnlyList<Uri>>(new[] { listenerA, listenerB }));

var agents = await _family.AllKnownAgentsAsync();

agents.Count.ShouldBe(2);
agents.ShouldContain(DynamicListenerUriEncoding.ToAgentUri(listenerA));
agents.ShouldContain(DynamicListenerUriEncoding.ToAgentUri(listenerB));
}

[Fact]
public async Task all_known_agents_re_reads_store_on_every_call()
{
// The family deliberately doesn't cache — each cluster assignment
// cycle must see freshly registered URIs without a host restart.
_listenerStore.AllListenersAsync(Arg.Any<CancellationToken>())
.Returns(
Task.FromResult<IReadOnlyList<Uri>>(Array.Empty<Uri>()),
Task.FromResult<IReadOnlyList<Uri>>(new[] { new Uri("mqtt://broker/new-topic") }));

var first = await _family.AllKnownAgentsAsync();
var second = await _family.AllKnownAgentsAsync();

first.ShouldBeEmpty();
second.Count.ShouldBe(1);
await _listenerStore.Received(2).AllListenersAsync(Arg.Any<CancellationToken>());
}

[Fact]
public async Task supported_agents_matches_all_known_agents()
{
// Every node has the family registered when EnableDynamicListeners is
// on, so any node can run any of the listeners — supported set ==
// known set. Transport-level "this node can't reach the broker" lives
// in StartAsync, not in supported-agents filtering.
_listenerStore.AllListenersAsync(Arg.Any<CancellationToken>())
.Returns(Task.FromResult<IReadOnlyList<Uri>>(new[]
{
new Uri("mqtt://broker/topic-a"),
new Uri("mqtt://broker/topic-b")
}));

var supported = await _family.SupportedAgentsAsync();
var known = await _family.AllKnownAgentsAsync();

supported.ShouldBe(known);
}

[Fact]
public async Task build_agent_async_decodes_uri_and_constructs_dynamic_listener_agent()
{
var listener = new Uri("mqtt://broker/devices/foo/status");
var agentUri = DynamicListenerUriEncoding.ToAgentUri(listener);

var agent = await _family.BuildAgentAsync(agentUri, _runtime);

agent.ShouldBeOfType<DynamicListenerAgent>();
agent.Uri.ShouldBe(agentUri);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using Shouldly;
using Wolverine.Runtime.Agents;
using Xunit;

namespace CoreTests.Runtime.Agents.DynamicListeners;

/// <summary>
/// Unit coverage for <see cref="DynamicListenerUriEncoding"/> — the encoding
/// has to be lossless and idempotent for cluster assignment to work, since
/// every cycle the cluster compares the agent URIs returned by
/// <c>AllKnownAgentsAsync</c> against the previous cycle's set. Any encoding
/// jitter (different escape rules for the same input) would manifest as
/// "agents thrashing on/off" between assignment polls.
/// </summary>
public class DynamicListenerUriEncodingTests
{
[Theory]
[InlineData("mqtt://broker:1883/devices/foo/status")]
[InlineData("mqtt://localhost/iot/+/temp")] // MQTT topic wildcards
[InlineData("rabbitmq://localhost:5672/queue.name")]
[InlineData("kafka://broker.local:9092/orders")]
[InlineData("amqp://user:pass@host:5672/vhost/queue")] // userinfo, port, multi-segment path
[InlineData("mqtt://broker/path?qos=1")] // query string
public void encoding_round_trips_listener_uri(string raw)
{
var listener = new Uri(raw);

var agentUri = DynamicListenerUriEncoding.ToAgentUri(listener);
var decoded = DynamicListenerUriEncoding.ToListenerUri(agentUri);

decoded.ShouldBe(listener);
}

[Fact]
public void agent_uri_uses_dynamic_listener_scheme()
{
var agentUri = DynamicListenerUriEncoding.ToAgentUri(new Uri("mqtt://broker/topic"));
agentUri.Scheme.ShouldBe(DynamicListenerUriEncoding.SchemeName);
}

[Fact]
public void encoding_is_deterministic_for_the_same_input()
{
// The cluster's assignment grid keys agents by their URI string —
// the same listener URI must always produce the same agent URI so
// an agent that was running before a poll cycle is recognized as the
// same agent after the poll.
var listener = new Uri("mqtt://broker/devices/foo/status");

var first = DynamicListenerUriEncoding.ToAgentUri(listener);
var second = DynamicListenerUriEncoding.ToAgentUri(listener);

second.ShouldBe(first);
}

[Fact]
public void to_listener_uri_rejects_wrong_scheme()
{
var bogus = new Uri("wolverine-listener://something");

Should.Throw<ArgumentException>(() => DynamicListenerUriEncoding.ToListenerUri(bogus))
.Message.ShouldContain(DynamicListenerUriEncoding.SchemeName);
}

[Fact]
public void to_listener_uri_rejects_empty_path()
{
// wolverine-dynamic-listener:/// with nothing after the slashes is not
// a valid encoded agent URI — guard the decoder so a malformed entry
// doesn't silently produce a garbage listener URI.
var emptyPath = new Uri($"{DynamicListenerUriEncoding.SchemeName}:///");

Should.Throw<ArgumentException>(() => DynamicListenerUriEncoding.ToListenerUri(emptyPath))
.Message.ShouldContain("no encoded listener URI");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using NSubstitute;
using Shouldly;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Xunit;

namespace CoreTests.Runtime.Agents.DynamicListeners;

/// <summary>
/// Coverage for the public <see cref="WolverineRuntimeListenerExtensions"/>
/// surface — these are the methods user code calls (e.g. on an HTTP request
/// to add an MQTT broker), so the public contract needs to argument-validate
/// and pass straight through to <see cref="IListenerStore"/> without any
/// transformation. Most of the substance is exercised through the
/// <see cref="DynamicListenerAgentFamilyTests"/> suite — the tests here just
/// pin down the thin wrapper.
/// </summary>
public class WolverineRuntimeListenerExtensionsTests
{
private readonly MockWolverineRuntime _runtime = new();
private readonly IListenerStore _store = Substitute.For<IListenerStore>();

public WolverineRuntimeListenerExtensionsTests()
{
_runtime.Storage.Listeners.Returns(_store);
}

[Fact]
public async Task register_listener_async_delegates_to_store()
{
var uri = new Uri("mqtt://broker/topic");
await _runtime.RegisterListenerAsync(uri);

await _store.Received(1).RegisterListenerAsync(uri, Arg.Any<CancellationToken>());
}

[Fact]
public async Task remove_listener_async_delegates_to_store()
{
var uri = new Uri("mqtt://broker/topic");
await _runtime.RemoveListenerAsync(uri);

await _store.Received(1).RemoveListenerAsync(uri, Arg.Any<CancellationToken>());
}

[Fact]
public async Task all_registered_listeners_async_delegates_to_store()
{
var listed = new[]
{
new Uri("mqtt://broker/a"),
new Uri("mqtt://broker/b")
};
_store.AllListenersAsync(Arg.Any<CancellationToken>())
.Returns(Task.FromResult<IReadOnlyList<Uri>>(listed));

var result = await _runtime.AllRegisteredListenersAsync();

result.ShouldBe(listed);
}

[Fact]
public void register_throws_argument_null_for_null_runtime()
{
IWolverineRuntime? runtime = null;
var uri = new Uri("mqtt://broker/topic");
Should.Throw<ArgumentNullException>(() => runtime!.RegisterListenerAsync(uri));
}

[Fact]
public void register_throws_argument_null_for_null_uri()
{
Should.Throw<ArgumentNullException>(() => _runtime.RegisterListenerAsync(null!));
}

[Fact]
public void remove_throws_argument_null_for_null_uri()
{
Should.Throw<ArgumentNullException>(() => _runtime.RemoveListenerAsync(null!));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using JasperFx;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NSubstitute;
using Shouldly;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Xunit;

namespace CoreTests.Runtime.Agents.DynamicListeners;

/// <summary>
/// End-to-end coverage for <see cref="DynamicListenerAgent"/>: spin up a real
/// Wolverine host with the stub transport in solo mode, manually drive the
/// agent's lifecycle, and verify the listener actually goes live and is
/// recognizable to the rest of the runtime via
/// <see cref="IEndpointCollection.FindListeningAgent"/>.
///
/// This is intentionally below the cluster-assignment layer: we don't wait
/// for <see cref="DurabilitySettings.CheckAssignmentPeriod"/> to fire — that
/// path is exercised by the broader cluster compliance tests once a
/// transport-specific dynamic registration test ships in PR-3 (MQTT).
/// </summary>
public class dynamic_listener_agent_lifecycle_integration : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await WolverineHost.ForAsync(opts =>
{
// Solo mode keeps us out of the cluster orchestration path so the
// test exercises just the agent — the family registration logic
// is covered separately by DynamicListenerAgentFamilyTests.
opts.Durability.Mode = DurabilityMode.Solo;
});
}

public Task DisposeAsync()
{
_host.Dispose();
return Task.CompletedTask;
}

[Fact]
public async Task agent_start_activates_the_listener_on_the_runtime()
{
// Arrange: a stub-transport listener URI that the runtime's existing
// stub transport knows how to materialize into an Endpoint.
var listenerUri = new Uri("stub://gh-2685-dynamic-target");

var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();
var agent = new DynamicListenerAgent(runtime, listenerUri);

// Act
await agent.StartAsync(CancellationToken.None);

try
{
// Assert: the runtime is now listening at the requested URI. This
// verifies the agent went all the way through transport-resolution
// → endpoint creation → ListeningAgent registration without us
// pre-configuring the URI in WolverineOptions.
runtime.Endpoints.FindListeningAgent(listenerUri).ShouldNotBeNull();
}
finally
{
await agent.StopAsync(CancellationToken.None);
agent.Status.ShouldBe(AgentStatus.Stopped);
}
}

[Fact]
public async Task agent_start_throws_when_no_transport_supports_the_scheme()
{
// Misconfiguration scenario: a listener URI was registered for a
// transport the host doesn't include. Surface it as a hard error at
// StartAsync rather than letting the listener silently never come up.
var bogusUri = new Uri("not-a-real-scheme://host/topic");

var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();
var agent = new DynamicListenerAgent(runtime, bogusUri);

var ex = await Should.ThrowAsync<InvalidOperationException>(() =>
agent.StartAsync(CancellationToken.None));

ex.Message.ShouldContain("not-a-real-scheme");
ex.Message.ShouldContain("UseMqtt"); // hint baked into the message
}

[Fact]
public async Task agent_stop_before_start_is_a_no_op()
{
// The agent runtime can call StopAsync on an agent that never had a
// chance to start (e.g. node deactivation between assignment and
// first poll); tolerate that without throwing.
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();
var agent = new DynamicListenerAgent(runtime, new Uri("stub://gh-2685-never-started"));

await Should.NotThrowAsync(() => agent.StopAsync(CancellationToken.None));
agent.Status.ShouldBe(AgentStatus.Stopped);
}
}
Loading
Loading