Fix race condition in UnfoldResourceAsyncSource callback invocation#7859
Merged
Aaronontheweb merged 8 commits intoOct 5, 2025
Merged
Conversation
Changed UnfoldResourceSourceAsync to use InvokeWithFeedback instead of direct callback invocation to properly handle StreamDetachedException when the stream stops before resource creation completes. The previous implementation used Action<T> invocation which silently drops callbacks after stage shutdown. This caused resource cleanup to be skipped when streams were cancelled quickly (e.g., with Sink.Cancelled), leading to test timeouts waiting for close callbacks. The fix uses GetTypedAsyncCallback to obtain IAsyncCallback<T> and calls InvokeWithFeedback with an async local function pattern. This ensures StreamDetachedException is properly caught and resource cleanup occurs even when the callback cannot be processed due to stream completion. Fixes UnfoldResourceAsyncSourceSpec.A_UnfoldResourceAsyncSource_must_close_resource_when_stream_is_quickly_cancelled_reproducer_2
Aaronontheweb
commented
Oct 4, 2025
Aaronontheweb
left a comment
Member
Author
There was a problem hiding this comment.
Detailed my changes
| return strategy != null ? strategy.Decider : Deciders.StoppingDecider; | ||
| }); | ||
|
|
||
| _createdCallback = new Lazy<IAsyncCallback<Try<TSource>>>(() => |
Member
Author
There was a problem hiding this comment.
This is another stage that really needs the InvokeWithFeedback changes we introduced in #7578
| } | ||
| } | ||
|
|
||
| _ = InvokeCallback(); |
Member
Author
There was a problem hiding this comment.
Invokes the callback and ensures that the stage gets closed afterwards in the event of failure, so that way we can't miss / fail resource clean-up.
Aaronontheweb
added a commit
to Aaronontheweb/akka.net
that referenced
this pull request
Oct 5, 2025
…kkadotnet#7859) Changed UnfoldResourceSourceAsync to use InvokeWithFeedback instead of direct callback invocation to properly handle StreamDetachedException when the stream stops before resource creation completes. The previous implementation used Action<T> invocation which silently drops callbacks after stage shutdown. This caused resource cleanup to be skipped when streams were cancelled quickly (e.g., with Sink.Cancelled), leading to test timeouts waiting for close callbacks. The fix uses GetTypedAsyncCallback to obtain IAsyncCallback<T> and calls InvokeWithFeedback with an async local function pattern. This ensures StreamDetachedException is properly caught and resource cleanup occurs even when the callback cannot be processed due to stream completion. Fixes UnfoldResourceAsyncSourceSpec.A_UnfoldResourceAsyncSource_must_close_resource_when_stream_is_quickly_cancelled_reproducer_2
Arkatufus
pushed a commit
to Arkatufus/akka.net
that referenced
this pull request
Mar 23, 2026
…kkadotnet#7859) Changed UnfoldResourceSourceAsync to use InvokeWithFeedback instead of direct callback invocation to properly handle StreamDetachedException when the stream stops before resource creation completes. The previous implementation used Action<T> invocation which silently drops callbacks after stage shutdown. This caused resource cleanup to be skipped when streams were cancelled quickly (e.g., with Sink.Cancelled), leading to test timeouts waiting for close callbacks. The fix uses GetTypedAsyncCallback to obtain IAsyncCallback<T> and calls InvokeWithFeedback with an async local function pattern. This ensures StreamDetachedException is properly caught and resource cleanup occurs even when the callback cannot be processed due to stream completion. Fixes UnfoldResourceAsyncSourceSpec.A_UnfoldResourceAsyncSource_must_close_resource_when_stream_is_quickly_cancelled_reproducer_2 (cherry picked from commit ccad240)
This was referenced Mar 24, 2026
This was referenced Mar 31, 2026
This was referenced Apr 8, 2026
Merged
This was referenced May 21, 2026
Open
Closed
Open
Open
This was referenced May 29, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixed a race condition in
UnfoldResourceSourceAsyncwhere resource cleanup was skipped when streams were cancelled quickly (e.g., withSink.Cancelled).Changes
UnfoldResourceSourceAsyncto useInvokeWithFeedbackinstead of direct callback invocationGetTypedAsyncCallbackto obtainIAsyncCallback<T>for proper feedback handlingStreamDetachedExceptionRoot Cause
The previous implementation used
Action<T>invocation which silently drops callbacks after stage shutdown. This caused the close callback to never execute when the stream stopped before the resource creation callback could be invoked.Technical Details
The fix matches the Scala Akka implementation's use of
invokeWithFeedback, which returns a Future/Task that fails withStreamDetachedExceptionwhen the stage has stopped. This allows proper cleanup in the exception handler.Test Results
UnfoldResourceAsyncSourceSpecpassA_UnfoldResourceAsyncSource_must_close_resource_when_stream_is_quickly_cancelled_reproducer_2Files Changed
src/core/Akka.Streams/Implementation/Sources.cs