Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions src/Akka.Hosting.TestKit.Tests/TestActorRecoveryRegistrySpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Hosting;
using Akka.TestKit;
using FluentAssertions;
using Microsoft.Extensions.Logging;
using Xunit;

namespace Akka.Hosting.TestKit.Tests;

file sealed class CachedProbeForwarder : ReceiveActor
{
private readonly IActorRef _cachedProbe;

public sealed class Send
{
public Send(string message)
{
Message = message;
}

public string Message { get; }
}

public sealed class GetCachedProbe
{
public static readonly GetCachedProbe Instance = new();

private GetCachedProbe()
{
}
}

public CachedProbeForwarder(IRequiredActor<TestProbe> probe)
{
_cachedProbe = probe.ActorRef;

Receive<Send>(send => _cachedProbe.Tell(send.Message));
Receive<GetCachedProbe>(_ => Sender.Tell(_cachedProbe));
}
}

sealed class RecoveryTestKit : Akka.Hosting.TestKit.TestKit
{
public RecoveryTestKit(ITestOutputHelper output)
: base($"recovery-{Guid.NewGuid():N}", output: output, startupTimeout: TimeSpan.FromSeconds(10), logLevel: LogLevel.Error)
{
}

public IActorRef Forwarder => ActorRegistry.Get<CachedProbeForwarder>();

protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
builder.WithActors((system, registry, resolver) =>
{
var forwarder = system.ActorOf(resolver.Props<CachedProbeForwarder>(), "cached-probe-forwarder");
registry.Register<CachedProbeForwarder>(forwarder);
});
}
}

public class TestActorRecoveryRegistrySpec
{
private readonly ITestOutputHelper _output;

public TestActorRecoveryRegistrySpec(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task Cached_required_TestProbe_reference_should_survive_TestActor_recovery()
{
await using var kit = new RecoveryTestKit(_output);
await kit.InitializeAsync();

var initialTestActor = kit.TestActor;
var cachedProbe = await kit.Forwarder.Ask<IActorRef>(CachedProbeForwarder.GetCachedProbe.Instance, TimeSpan.FromSeconds(3));
cachedProbe.Should().NotBe(ActorRefs.Nobody);
cachedProbe.Should().NotBe(initialTestActor);

kit.Forwarder.Tell(new CachedProbeForwarder.Send("before-recovery"));
await kit.ExpectMsgAsync<string>("before-recovery", TimeSpan.FromSeconds(3));

initialTestActor.Tell(PoisonPill.Instance);
await WaitUntilProbeStopsAsync(kit, initialTestActor);

await kit.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));

await kit.ForceReinitializeTestActorAsync();
kit.TestActor.Should().NotBe(initialTestActor);

kit.Forwarder.Tell(new CachedProbeForwarder.Send("after-recovery"));
await kit.ExpectMsgAsync<string>("after-recovery", TimeSpan.FromSeconds(3));
}

private static async Task WaitUntilProbeStopsAsync(RecoveryTestKit kit, IActorRef probe)
{
var deadline = DateTime.UtcNow.AddSeconds(3);

while (DateTime.UtcNow < deadline)
{
try
{
await kit.Sys.ActorSelection(probe.Path).ResolveOne(TimeSpan.FromMilliseconds(150));
await Task.Delay(50);
}
catch (ActorNotFoundException)
{
return;
}
}

Assert.Fail($"Probe [{probe.Path}] did not terminate within the expected window.");
}
}
73 changes: 71 additions & 2 deletions src/Akka.Hosting.TestKit/TestKit.Shared.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public abstract partial class TestKit : TestKitBase
protected static XunitAssertions Assertions { get; } = new XunitAssertions();

private IHost? _host;
private IActorRef? _registeredTestProbe;

public IHost Host
{
get
Expand Down Expand Up @@ -129,7 +131,7 @@ private void InternalConfigureServices(HostBuilderContext context, IServiceColle
{
// Initialize TestActor here to ensure it's available before user actors start
base.InitializeTest(actorSystem, (ActorSystemSetup)null!, null, null);
actorRegistry.Register<TestProbe>(TestActor);
actorRegistry.Register<TestProbe>(GetOrCreateRegisteredTestProbe(actorSystem));

// Set implicit sender on initialization thread
if (this is not INoImplicitSender)
Expand Down Expand Up @@ -269,7 +271,8 @@ private async Task EnsureTestActorAliveAsync()
SynchronizationContext.SetSynchronizationContext(savedContext);
}

ActorRegistry.Register<TestProbe>(TestActor, overwrite: true);
await RetargetRegisteredTestProbeAsync(Sys);
ActorRegistry.Register<TestProbe>(GetOrCreateRegisteredTestProbe(Sys), overwrite: true);
}

if (!await IsTestActorAliveAsync())
Expand All @@ -290,6 +293,72 @@ private async Task<bool> IsTestActorAliveAsync()
}
}

private IActorRef GetOrCreateRegisteredTestProbe(ActorSystem system)
{
if (_registeredTestProbe == null)
{
_registeredTestProbe = system.ActorOf(
Props.Create(() => new StableTestProbeRef(TestActor)),
$"testProbe-registry-{Guid.NewGuid():N}");
}

return _registeredTestProbe;
}

internal async Task ForceReinitializeTestActorAsync()
{
var savedContext = SynchronizationContext.Current;
try
{
base.InitializeTest(Sys, (ActorSystemSetup)null!, null, null);
}
finally
{
SynchronizationContext.SetSynchronizationContext(savedContext);
}

await RetargetRegisteredTestProbeAsync(Sys);
ActorRegistry.Register<TestProbe>(GetOrCreateRegisteredTestProbe(Sys), overwrite: true);
}

private async Task RetargetRegisteredTestProbeAsync(ActorSystem system)
{
if (_registeredTestProbe == null)
{
_ = GetOrCreateRegisteredTestProbe(system);
return;
}

_ = await _registeredTestProbe.Ask<Done>(new StableTestProbeRef.UpdateTarget(TestActor), TimeSpan.FromSeconds(3));
}

private sealed class StableTestProbeRef : ReceiveActor
{
public sealed class UpdateTarget
{
public UpdateTarget(IActorRef target)
{
Target = target;
}

public IActorRef Target { get; }
}

private IActorRef _target;

public StableTestProbeRef(IActorRef initialTarget)
{
_target = initialTarget;

Receive<UpdateTarget>(update =>
{
_target = update.Target;
Sender.Tell(Done.Instance);
});
ReceiveAny(message => _target.Forward(message));
}
}

protected sealed override void InitializeTest(ActorSystem system, ActorSystemSetup config, string actorSystemName,
string testActorName)
{
Expand Down
Loading