Skip to content
81 changes: 42 additions & 39 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Akka.Streams.Util;
using Akka.Util;
using Akka.Util.Internal;
using Debug = System.Diagnostics.Debug;
using Decider = Akka.Streams.Supervision.Decider;
using Directive = Akka.Streams.Supervision.Directive;

Expand Down Expand Up @@ -2512,61 +2513,47 @@ public Expand(Func<TIn, IEnumerator<TOut>> extrapolate)
/// </returns>
public override string ToString() => "Expand";
}

#nullable enable

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable nullable to take advantage of the changes made in #7520


/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
[InternalApi]
public sealed class SelectAsync<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>>
{
#region internal classes

private sealed class Logic : InAndOutGraphStageLogic
{
private class Holder<T>
private sealed class Holder<T>(object message, Result<T> element)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used a primary CTOR because I felt like it 🤫

{
private readonly Action<Holder<T>> _callback;

public Holder(object message, Result<T> element, Action<Holder<T>> callback)
{
_callback = callback;
Message = message;
Element = element;
}

public Result<T> Element { get; private set; }
public object Message { get; }
public object Message { get; private set; } = message;

public Result<T> Element { get; private set; } = element;

public void SetElement(Result<T> result)
{
Element = result.IsSuccess && result.Value == null
? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
: result;
}

public void Invoke(Result<T> result)
{
SetElement(result);
_callback(this);
}
}

private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception());

private readonly SelectAsync<TIn, TOut> _stage;
private readonly Decider _decider;
private IBuffer<Holder<TOut>> _buffer;
private readonly Action<Holder<TOut>> _taskCallback;
private readonly Action<(Holder<TOut>, Result<TOut>)> _taskCallback;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncCallbacks in Akka.Streams can only accept a single input parameter, so we have to go with a ValueTuple here


public Logic(Attributes inheritedAttributes, SelectAsync<TIn, TOut> stage) : base(stage.Shape)
{
_stage = stage;
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>();
_decider = attr != null ? attr.Decider : Deciders.StoppingDecider;

_taskCallback = GetAsyncCallback<Holder<TOut>>(HolderCompleted);
_taskCallback = GetAsyncCallback<(Holder<TOut> holder, Result<TOut> result)>(t => HolderCompleted(t.holder, t.result));

SetHandlers(stage.In, stage.Out, this);
}
Expand All @@ -2577,19 +2564,34 @@ public override void OnPush()
try
{
var task = _stage._mapFunc(message);
var holder = new Holder<TOut>(message, NotYetThere, _taskCallback);
Debug.Assert(message != null, nameof(message) + " != null");
var holder = new Holder<TOut>(message!, NotYetThere);
_buffer.Enqueue(holder);

// We dispatch the task if it's ready to optimize away
// scheduling it to an execution context
if (task.IsCompleted)
{
holder.SetElement(Result.FromTask(task));
HolderCompleted(holder);
{;
Comment thread
Aaronontheweb marked this conversation as resolved.
Outdated
HolderCompleted(holder, Result.FromTask(task));
}
else
task.ContinueWith(t => holder.Invoke(Result.FromTask(t)),
TaskContinuationOptions.ExecuteSynchronously);
{
async Task WaitForTask()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used the latest .NET meta for relying on a local function + detached Task + await on parent task instead of using ContinueWith - this is designed to tremendously simplify error-handling mostly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the side effect of eliminating the AggregateExceptions previously emitted from SelectAsync, which is why I've had to update some of the unit tests.

{
try
{
var result = Result.Success(await task);
_taskCallback((holder, result));
}
catch(Exception ex){
var result = Result.Failure<TOut>(ex);
_taskCallback((holder, result));
}
}

_ = WaitForTask();
}

}
catch (Exception e)
{
Expand All @@ -2606,7 +2608,7 @@ public override void OnPush()
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e);
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", e);
}
}
if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In))
Expand Down Expand Up @@ -2663,12 +2665,12 @@ private void PushOne()
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}");
}
continue;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SetElement() always get invoked in the same thread now, does that mean that this bit of code isn't needed anymore?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole error handling code inside PushOne() I mean

@Aaronontheweb Aaronontheweb Mar 13, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we decided in Slack that we probably still need error-handling in both places since there can be a delay on FailStage taking effect in the AsyncCallback - we'd still want to check for errors whenever else we're calling PushOne


Push(_stage.Out, result.Value);
Push(_stage.Out!, result.Value);
if (Todo < _stage._parallelism && !HasBeenPulled(inlet))
TryPull(inlet);
}
Expand All @@ -2677,17 +2679,18 @@ private void PushOne()
}
}

private void HolderCompleted(Holder<TOut> holder)
private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real fix - we now pass in both the Holder<TOut> and the Result<TOut> together into the StageActor's context and perform the assignment there, where it's guaranteed to be thread-safe, rather than doing it half-in / half-out like before.

{
var element = holder.Element;
if (element.IsSuccess)
// we may not be at the front of the line right now, so save the result for later
holder.SetElement(result);
if (result.IsSuccess)
{
if (IsAvailable(_stage.Out))
PushOne();
return;
}

var exception = element.Exception;
var exception = result.Exception;
var strategy = _decider(exception);
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
switch (strategy)
Expand All @@ -2703,7 +2706,7 @@ private void HolderCompleted(Holder<TOut> holder)
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception);
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", exception ?? new Exception("Null exception"));
}
}

Expand Down Expand Up @@ -2758,8 +2761,6 @@ protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
[InternalApi]
public sealed class SelectAsyncUnordered<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>>
{
Expand Down Expand Up @@ -2904,6 +2905,8 @@ public SelectAsyncUnordered(int parallelism, Func<TIn, Task<TOut>> mapFunc)
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
=> new Logic(inheritedAttributes, this);
}

#nullable disable

/// <summary>
/// INTERNAL API
Expand Down