Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 85 additions & 124 deletions src/Netclaw.Daemon.Tests/Gateway/SessionRegistryTests.cs
Original file line number Diff line number Diff line change
@@ -1,184 +1,145 @@
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using Microsoft.AspNetCore.SignalR;
using Akka.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Daemon.Gateway;
using Xunit;

namespace Netclaw.Daemon.Tests.Gateway;

/// <summary>
/// Integration tests for <see cref="SessionRegistry"/> using a real ActorSystem
/// for Akka.Streams materialization and a fake <see cref="ISessionPipeline"/>.
/// Unit tests for <see cref="SessionRegistry"/> coordination behavior.
/// Stream materialization and pipeline lifecycle are now handled by
/// <see cref="SignalRSessionActor"/> — these tests focus on the registry's
/// session-tracking, connection-binding, and message-routing responsibilities.
/// </summary>
public sealed class SessionRegistryTests : IAsyncLifetime
public sealed class SessionRegistryTests
{
private ActorSystem _system = null!;
private SessionRegistry BuildRegistry()
=> new(
new StubRequiredActor(),
TimeProvider.System,
NullLogger<SessionRegistry>.Instance);

public Task InitializeAsync()
[Fact]
public async Task CreateSession_returns_valid_session_id()
{
_system = ActorSystem.Create("session-registry-tests",
"akka { loglevel = WARNING }");
return Task.CompletedTask;
}
var registry = BuildRegistry();

public async Task DisposeAsync()
{
await _system.Terminate();
var sessionId = await registry.CreateSessionAsync("conn-1", "tui");

Assert.False(string.IsNullOrWhiteSpace(sessionId));
Assert.StartsWith("signalr/", sessionId, StringComparison.Ordinal);
}

[Fact]
public async Task EnsureSession_rematerializes_pipeline_when_output_stream_has_completed()
public async Task EnsureSession_creates_new_session_when_no_id_provided()
{
var pipeline = new TrackingFakeSessionPipeline(_system, neverComplete: false);
var registry = BuildRegistry(pipeline);

var sessionId = await registry.CreateSessionAsync("conn-1", "tui");
var sessionIdParsed = new SessionId(sessionId);

// Source.Empty completes immediately; wait for the output sink to finish
var outputCompletion = registry.GetOutputCompletionForTesting(sessionIdParsed);
Assert.NotNull(outputCompletion);
await outputCompletion.WaitAsync(TimeSpan.FromSeconds(5));
var registry = BuildRegistry();

// EnsureSession from a new connection ID (simulate reconnect)
var result = await registry.EnsureSessionAsync("conn-2", sessionId, "tui");
var result = await registry.EnsureSessionAsync("conn-1", null, "tui");

// Pipeline should have been re-created because the output stream was dead
Assert.Equal(2, pipeline.CreateCount);
Assert.Equal(sessionId, result.SessionId);
Assert.False(result.Created);
Assert.True(result.Created);
Assert.False(string.IsNullOrWhiteSpace(result.SessionId));
}

[Fact]
public async Task EnsureSession_reuses_pipeline_when_output_stream_still_active()
public async Task EnsureSession_reuses_existing_session_when_id_is_known()
{
var pipeline = new TrackingFakeSessionPipeline(_system, neverComplete: true);
var registry = BuildRegistry(pipeline);

var registry = BuildRegistry();
var sessionId = await registry.CreateSessionAsync("conn-1", "tui");

// EnsureSession from another connection — stream is alive, no re-materialization needed
var result = await registry.EnsureSessionAsync("conn-2", sessionId, "tui");

Assert.Equal(1, pipeline.CreateCount);
Assert.Equal(sessionId, result.SessionId);
Assert.False(result.Created);
Assert.Equal(sessionId, result.SessionId);
}

[Fact]
public async Task AttachSession_rematerializes_pipeline_when_output_stream_has_completed()
public async Task EnsureSession_creates_binding_when_id_is_unknown()
{
var pipeline = new TrackingFakeSessionPipeline(_system, neverComplete: false);
var registry = BuildRegistry(pipeline);

var sessionId = await registry.CreateSessionAsync("conn-1", "tui");
var sessionIdParsed = new SessionId(sessionId);
var registry = BuildRegistry();
// Provide a session ID we haven't seen before
var unknownId = "signalr/00000000000000000000000000000000";

var outputCompletion = registry.GetOutputCompletionForTesting(sessionIdParsed);
Assert.NotNull(outputCompletion);
await outputCompletion.WaitAsync(TimeSpan.FromSeconds(5));
var result = await registry.EnsureSessionAsync("conn-1", unknownId, "tui");

// AttachSession from new connection after passivation
await registry.AttachSessionAsync("conn-2", sessionId);
Assert.False(result.Created);
Assert.Equal(unknownId, result.SessionId);

Assert.Equal(2, pipeline.CreateCount);
// Subsequent EnsureSession should find it now
var result2 = await registry.EnsureSessionAsync("conn-2", unknownId, "tui");
Assert.False(result2.Created);
Assert.Equal(unknownId, result2.SessionId);
}

[Fact]
public async Task AttachSession_reuses_pipeline_when_output_stream_still_active()
public async Task AttachSession_throws_when_session_not_found()
{
var pipeline = new TrackingFakeSessionPipeline(_system, neverComplete: true);
var registry = BuildRegistry(pipeline);
var registry = BuildRegistry();

await Assert.ThrowsAsync<Microsoft.AspNetCore.SignalR.HubException>(
() => registry.AttachSessionAsync("conn-1", "signalr/nonexistent"));
}

[Fact]
public async Task AttachSession_succeeds_for_known_session()
{
var registry = BuildRegistry();
var sessionId = await registry.CreateSessionAsync("conn-1", "tui");

// Attach from another connection — stream is alive, no re-materialization needed
// Should not throw
await registry.AttachSessionAsync("conn-2", sessionId);

Assert.Equal(1, pipeline.CreateCount);
}

private SessionRegistry BuildRegistry(ISessionPipeline pipeline)
=> new(
pipeline,
_system,
TimeProvider.System,
new NoopHubContext(),
NullLogger<SessionRegistry>.Instance);

/// <summary>
/// Fake pipeline that returns <see cref="MaterializedSession"/> instances backed by
/// controllable Akka.Streams sources. When <paramref name="neverComplete"/> is false,
/// output uses <see cref="Source.Empty{TOut}"/> which completes immediately — simulating
/// a dead output stream after actor passivation. When true, uses
/// <see cref="Source.Never{T}"/> to keep the stream alive.
/// </summary>
private sealed class TrackingFakeSessionPipeline : ISessionPipeline
[Fact]
public async Task SendMessage_throws_when_connection_has_no_session()
{
private readonly ActorSystem _system;
private readonly bool _neverComplete;
private int _createCount;

public int CreateCount => _createCount;

public TrackingFakeSessionPipeline(ActorSystem system, bool neverComplete)
{
_system = system;
_neverComplete = neverComplete;
}

public Task<MaterializedSession> CreateAsync(
SessionId sessionId,
SessionPipelineOptions options,
IMaterializer? materializer = null,
CancellationToken cancellationToken = default)
{
var n = Interlocked.Increment(ref _createCount);
var registry = BuildRegistry();
await registry.CreateSessionAsync("conn-1", "tui");

var killSwitch = KillSwitches.Shared($"test-{sessionId.Value}-{n}");

var input = Sink.Ignore<ChannelInput>()
.MapMaterializedValue<NotUsed>(_ => NotUsed.Instance);

Source<SessionOutput, NotUsed> output = _neverComplete
? Source.Never<SessionOutput>().Via(killSwitch.Flow<SessionOutput>())
: Source.Empty<SessionOutput>().Via(killSwitch.Flow<SessionOutput>());

return Task.FromResult(new MaterializedSession(input, output, killSwitch));
}
// conn-2 is not attached to any session
await Assert.ThrowsAsync<Microsoft.AspNetCore.SignalR.HubException>(
() => registry.SendMessageAsync("conn-2", "signalr/any", "hello"));
}

/// <summary>Minimal hub context that silently accepts all output.</summary>
private sealed class NoopHubContext : IHubContext<SessionHub, ISessionHubClient>
[Fact]
public async Task SendMessage_throws_when_connection_attached_to_different_session()
{
private static readonly NoopHubClients s_clients = new();
var registry = BuildRegistry();
var sessionId = await registry.CreateSessionAsync("conn-1", "tui");
var otherSessionId = await registry.CreateSessionAsync("conn-2", "tui");

public IHubClients<ISessionHubClient> Clients => s_clients;
public IGroupManager Groups => throw new NotSupportedException();
// conn-1 is attached to sessionId, not otherSessionId
await Assert.ThrowsAsync<Microsoft.AspNetCore.SignalR.HubException>(
() => registry.SendMessageAsync("conn-1", otherSessionId, "hello"));
}

private sealed class NoopHubClients : IHubClients<ISessionHubClient>
[Fact]
public async Task ShutdownAsync_clears_all_sessions()
{
private static readonly NoopClient s_client = new();

public ISessionHubClient All => s_client;
public ISessionHubClient AllExcept(IReadOnlyList<string> excluded) => s_client;
public ISessionHubClient Client(string connectionId) => s_client;
public ISessionHubClient Clients(IReadOnlyList<string> connectionIds) => s_client;
public ISessionHubClient Group(string groupName) => s_client;
public ISessionHubClient GroupExcept(string groupName, IReadOnlyList<string> excluded) => s_client;
public ISessionHubClient Groups(IReadOnlyList<string> groupNames) => s_client;
public ISessionHubClient User(string userId) => s_client;
public ISessionHubClient Users(IReadOnlyList<string> userIds) => s_client;
var registry = BuildRegistry();
await registry.CreateSessionAsync("conn-1", "tui");
await registry.CreateSessionAsync("conn-2", "tui");

await registry.ShutdownAsync(CancellationToken.None);

// After shutdown, no sessions should be known; EnsureSession creates a fresh one
var result = await registry.EnsureSessionAsync("conn-3", "signalr/any-old-id", "tui");
// The old ID was unknown after shutdown, so it's created fresh
Assert.Equal("signalr/any-old-id", result.SessionId);
}

private sealed class NoopClient : ISessionHubClient
/// <summary>
/// Stub implementation of <see cref="IRequiredActor{T}"/> that returns
/// <see cref="ActorRefs.Nobody"/> for all requests. Used to isolate
/// <see cref="SessionRegistry"/> from the actor system in unit tests.
/// </summary>
private sealed class StubRequiredActor : IRequiredActor<SignalRGatewayActorKey>
{
public Task ReceiveOutput(SessionOutputDto dto) => Task.CompletedTask;
public IActorRef ActorRef => ActorRefs.Nobody;

public Task<IActorRef> GetAsync(CancellationToken cancellationToken = default)
=> Task.FromResult<IActorRef>(ActorRefs.Nobody);
}
}
Loading