-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Akka.Actor: Context.Watch on FutureActorRef<T> creates memory leaks
#7502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a613bcd
c609ce0
553fbf6
c041a51
7bb7147
e3b0a22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| // ----------------------------------------------------------------------- | ||
| // <copyright file="Bugfix7501Specs.cs" company="Akka.NET Project"> | ||
| // Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com> | ||
| // Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
| // </copyright> | ||
| // ----------------------------------------------------------------------- | ||
|
|
||
| using System.Threading.Tasks; | ||
| using Akka.Actor; | ||
| using Akka.Actor.Dsl; | ||
| using Akka.TestKit; | ||
| using Akka.TestKit.TestActors; | ||
| using Xunit; | ||
| using Xunit.Abstractions; | ||
|
|
||
| namespace Akka.Tests.Actor; | ||
|
|
||
| public class Bugfix7501Specs : AkkaSpec | ||
| { | ||
| public Bugfix7501Specs(ITestOutputHelper output) : base(output) | ||
| { | ||
|
|
||
| } | ||
|
|
||
| [Fact] | ||
| public async Task FutureActorRefShouldSupportDeathWatch() | ||
| { | ||
| // arrange | ||
| var customDeathWatchProbe = CreateTestProbe(); | ||
| var watcher = Sys.ActorOf(act => | ||
| { | ||
| act.Receive<string>((_, context) => | ||
| { | ||
| // complete the Ask | ||
| context.Sender.Tell("hi"); | ||
|
|
||
| // DeathWatch the FutureActorRef<T> BEFORE it completes | ||
| context.Watch(context.Sender); | ||
|
|
||
| // deliver the IActorRef of the Ask-er to TestActor | ||
| TestActor.Tell(context.Sender); | ||
| }); | ||
|
|
||
| act.Receive<Terminated>((terminated, context) => | ||
| { | ||
| // shut ourselves down to signal that we got our Terminated from FutureActorRef | ||
| context.Stop(context.Self); | ||
| }); | ||
| }); | ||
|
|
||
| // act | ||
| await customDeathWatchProbe.WatchAsync(watcher); | ||
| await watcher.Ask<string>("boo", RemainingOrDefault); | ||
| var futureActorRef = await ExpectMsgAsync<IActorRef>(); | ||
| await WatchAsync(futureActorRef); // Ask is finished - should immediately dead-letter | ||
|
|
||
| // assert | ||
| await ExpectTerminatedAsync(futureActorRef); | ||
|
|
||
| // get the DeathWatch notification from the original actor | ||
| // this can only be received if the original actor got a Terminated message from FutureActorRef | ||
| await customDeathWatchProbe.ExpectTerminatedAsync(watcher); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,35 @@ public interface IRepointableRef : IActorRefScope | |
| bool IsStarted { get; } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// INTERNAL API - didn't want static helper methods declared inside generic class | ||
| /// </summary> | ||
| internal static class FutureActorRefDeathWatchSupport | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Helper class for sending back |
||
| { | ||
| internal static async Task ScheduleDeathWatch(IInternalActorRef notifier, IActorRef self, Task completionTask) | ||
| { | ||
| try | ||
| { | ||
| await completionTask; | ||
| } | ||
| catch | ||
| { | ||
| // we don't do error handling for this - we do not care | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error-handling is the job of the user code |
||
| } | ||
| finally | ||
| { | ||
| // regardless of whether we succeeded or failed, we notify watchers | ||
| notifier.SendSystemMessage(TerminatedFor(self)); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| internal static DeathWatchNotification TerminatedFor(IActorRef self) | ||
| { | ||
| return new DeathWatchNotification(self, true, false); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// INTERNAL API. | ||
| /// | ||
|
|
@@ -110,9 +139,6 @@ protected override void TellInternal(object message, IActorRef sender) | |
|
|
||
| switch (message) | ||
| { | ||
| case ISystemMessage msg: | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned in #7501, this code never worked because it was in the wrong method. Removing it should actually speed up |
||
| handled = _result.TrySetException(new InvalidOperationException($"system message of type '{msg.GetType().Name}' is invalid for {nameof(FutureActorRef<T>)}")); | ||
| break; | ||
| case T t: | ||
| handled = _result.TrySetResult(t); | ||
| break; | ||
|
|
@@ -140,7 +166,35 @@ protected override void TellInternal(object message, IActorRef sender) | |
| if (!handled && !_result.Task.IsCanceled) | ||
| _provider.DeadLetters.Tell(message ?? default(T), this); | ||
| } | ||
|
|
||
|
|
||
| public override void SendSystemMessage(ISystemMessage message) | ||
| { | ||
| if (message is Watch watch) | ||
| { | ||
| if (_result.Task.IsCompleted) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Automatically covers any possible completion state for the task: cancelled, faulted, or ran to completion |
||
| { | ||
| watch.Watcher.SendSystemMessage(FutureActorRefDeathWatchSupport.TerminatedFor(this)); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fast path: this actor is already "finished" |
||
| } | ||
| else | ||
| { | ||
| _ = FutureActorRefDeathWatchSupport.ScheduleDeathWatch(watch.Watcher, watch.Watchee, _result.Task); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slow path - have to wait for actor to finish |
||
| } | ||
|
|
||
| } | ||
| else if (message is Unwatch unwatch) | ||
| { | ||
| // we're not going to support Unwatch - watchers | ||
| // already have to handle scenarios where the Unwatch arrives too late | ||
| // anyway, so we're just going to treat this like that in order to keep | ||
| // state management as simple as possible | ||
| } | ||
| else | ||
| { | ||
| // TODO: blow up the caller here by just throwing the exception at the callsite? | ||
| _result.TrySetException(new InvalidOperationException($"system message of type '{message.GetType().Name}' is invalid for {nameof(FutureActorRef<T>)}")); | ||
| } | ||
| } | ||
|
|
||
| public virtual void DeliverAsk(object message, ICanTell destination){ | ||
| destination.Tell(message, this); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handles both scenarios:
Context.Watchbefore theFutureActorRef<T>completesContext.Watchafter theFutureActorRef<T>has already completed, which should immediately report back with aTerminatedmessage