Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public async Task A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
sub.Request(1000);
foreach (var n in Enumerable.Range(1, 50))
await c.ExpectNextAsync(n);
//Enumerable.Range(1, 50).ForEach(n => c.ExpectNext(n));

await c.ExpectCompleteAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ await this.AssertAllStagesStoppedAsync(async() => {
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().InnerException.Message.Should().Be("err1");
(await c.ExpectErrorAsync()).Message.Should().Be("err1");

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer get AggregateExceptions here unless the test completes immediately, in which case we might have to harden this in the future.

latch.CountDown();
}, Materializer);
}
Expand Down Expand Up @@ -193,7 +193,7 @@ await this.AssertAllStagesStoppedAsync(async() => {
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().Be("err2");
(await c.ExpectErrorAsync()).Message.Should().Be("err2");

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer get AggregateExceptions here unless the test completes immediately, in which case we might have to harden this in the future.

latch.CountDown();
}, Materializer);
}
Expand Down
41 changes: 0 additions & 41 deletions src/core/Akka.Streams/Dsl/FlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,6 @@ public static Flow<T, TOut, TMat> StatefulSelectMany<T, TIn, TOut, TMat>(this Fl
/// </para>
/// </summary>
/// <seealso cref="SelectAsyncUnordered{T,TIn,TOut,TMat}"/>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="parallelism">TBD</param>
/// <param name="asyncMapper">TBD</param>
/// <returns>TBD</returns>
public static Flow<T, TOut, TMat> SelectAsync<T, TIn, TOut, TMat>(this Flow<T, TIn, TMat> flow, int parallelism, Func<TIn, Task<TOut>> asyncMapper)
{
return (Flow<T, TOut, TMat>)InternalFlowOperations.SelectAsync(flow, parallelism, asyncMapper);
Expand Down Expand Up @@ -357,14 +349,6 @@ public static Flow<T, TOut, TMat> SelectAsync<T, TIn, TOut, TMat>(this Flow<T, T
/// </para>
/// </summary>
/// <seealso cref="SelectAsync{T,TIn,TOut,TMat}"/>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="parallelism">TBD</param>
/// <param name="asyncMapper">TBD</param>
/// <returns>TBD</returns>
public static Flow<T, TOut, TMat> SelectAsyncUnordered<T, TIn, TOut, TMat>(this Flow<T, TIn, TMat> flow, int parallelism, Func<TIn, Task<TOut>> asyncMapper)
{
return (Flow<T, TOut, TMat>)InternalFlowOperations.SelectAsyncUnordered(flow, parallelism, asyncMapper);
Expand All @@ -386,12 +370,6 @@ public static Flow<T, TOut, TMat> SelectAsyncUnordered<T, TIn, TOut, TMat>(this
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="predicate">TBD</param>
/// <returns>TBD</returns>
public static Flow<TIn, TOut, TMat> Where<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, Predicate<TOut> predicate)
{
return (Flow<TIn, TOut, TMat>)InternalFlowOperations.Where(flow, predicate);
Expand All @@ -413,12 +391,6 @@ public static Flow<TIn, TOut, TMat> Where<TIn, TOut, TMat>(this Flow<TIn, TOut,
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="predicate">TBD</param>
/// <returns>TBD</returns>
public static Flow<TIn, TOut, TMat> WhereNot<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, Predicate<TOut> predicate)
{
return (Flow<TIn, TOut, TMat>)InternalFlowOperations.WhereNot(flow, predicate);
Expand Down Expand Up @@ -446,13 +418,6 @@ public static Flow<TIn, TOut, TMat> WhereNot<TIn, TOut, TMat>(this Flow<TIn, TOu
/// </para>
/// <seealso cref="Limit{TIn,TOut,TMat}"/> <seealso cref="LimitWeighted{TIn,TOut,TMat}"/>
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="predicate">TBD</param>
/// <param name="inclusive">TBD</param>
/// <returns>TBD</returns>
public static Flow<TIn, TOut, TMat> TakeWhile<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, Predicate<TOut> predicate, bool inclusive = false)
{
return (Flow<TIn, TOut, TMat>)InternalFlowOperations.TakeWhile(flow, predicate, inclusive);
Expand All @@ -470,12 +435,6 @@ public static Flow<TIn, TOut, TMat> TakeWhile<TIn, TOut, TMat>(this Flow<TIn, TO
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="predicate">TBD</param>
/// <returns>TBD</returns>
public static Flow<TIn, TOut, TMat> SkipWhile<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, Predicate<TOut> predicate)
{
return (Flow<TIn, TOut, TMat>)InternalFlowOperations.SkipWhile(flow, predicate);
Expand Down
23 changes: 0 additions & 23 deletions src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,6 @@ public static IFlow<TOut, TMat> SelectAsync<TIn, TOut, TMat>(this IFlow<TIn, TMa
/// </para>
/// </summary>
/// <seealso cref="SelectAsync{TIn,TOut,TMat}"/>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="parallelism">TBD</param>
/// <param name="asyncMapper">TBD</param>
/// <returns>TBD</returns>
public static IFlow<TOut, TMat> SelectAsyncUnordered<TIn, TOut, TMat>(this IFlow<TIn, TMat> flow, int parallelism,
Func<TIn, Task<TOut>> asyncMapper)
{
Expand All @@ -395,11 +388,6 @@ public static IFlow<TOut, TMat> SelectAsyncUnordered<TIn, TOut, TMat>(this IFlow
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="predicate">TBD</param>
/// <returns>TBD</returns>
public static IFlow<T, TMat> Where<T, TMat>(this IFlow<T, TMat> flow, Predicate<T> predicate)
{
return flow.Via(new Fusing.Where<T>(predicate));
Expand All @@ -418,11 +406,6 @@ public static IFlow<T, TMat> Where<T, TMat>(this IFlow<T, TMat> flow, Predicate<
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="predicate">TBD</param>
/// <returns>TBD</returns>
public static IFlow<T, TMat> WhereNot<T, TMat>(this IFlow<T, TMat> flow, Predicate<T> predicate)
{
return flow.Via(new Fusing.Where<T>(e => !predicate(e)));
Expand Down Expand Up @@ -451,12 +434,6 @@ public static IFlow<T, TMat> WhereNot<T, TMat>(this IFlow<T, TMat> flow, Predica
/// <seealso cref="Limit{T, TMat}(IFlow{T, TMat}, long)"/>
/// <seealso cref="LimitWeighted{T, TMat}(IFlow{T, TMat}, long, Func{T, long})"/>
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="predicate">TBD</param>
/// <param name="inclusive">TBD</param>
/// <returns>TBD</returns>
public static IFlow<T, TMat> TakeWhile<T, TMat>(this IFlow<T, TMat> flow, Predicate<T> predicate, bool inclusive)
{
return flow.Via(new Fusing.TakeWhile<T>(predicate, inclusive));
Expand Down
54 changes: 25 additions & 29 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2808,7 +2808,7 @@ private sealed class Logic : InAndOutGraphStageLogic
public Logic(Attributes inheritedAttributes, SelectAsyncUnordered<TIn, TOut> stage) : base(stage.Shape)
{
_stage = stage;
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>();
_decider = attr != null ? attr.Decider : Deciders.StoppingDecider;

_taskCallback = GetAsyncCallback<Result<TOut>>(TaskCompleted);
Expand All @@ -2827,8 +2827,10 @@ public override void OnPush()
if (task.IsCompleted)
TaskCompleted(Result.FromTask(task));
else
task.ContinueWith(t => _taskCallback(Result.FromTask(t)),
TaskContinuationOptions.ExecuteSynchronously);
{
// use an async local function to await the task and then run the task callback
_ = RunTask(task);
}
}
catch (Exception e)
{
Expand All @@ -2838,6 +2840,20 @@ public override void OnPush()

if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In))
TryPull(_stage.In);
return;

async Task RunTask(Task<TOut> tt)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an async local function with an await on the end-user Task<TOut> instead of using ContinueWith - this will result in better error handling and fewer weird task context issues.

Replicates the same work we did on #7521

{
try
{
var result = Result.Success(await tt);
_taskCallback(result);
}
catch (Exception ex)
{
_taskCallback(Result.Failure<TOut>(ex));
}
}
}

public override void OnUpstreamFinish()
Expand Down Expand Up @@ -2909,42 +2925,22 @@ private int Todo

private readonly int _parallelism;
private readonly Func<TIn, Task<TOut>> _mapFunc;
/// <summary>
/// TBD
/// </summary>

public readonly Inlet<TIn> In = new("SelectAsyncUnordered.in");
/// <summary>
/// TBD
/// </summary>
public readonly Outlet<TOut> Out = new("SelectAsyncUnordered.out");

/// <summary>
/// TBD
/// </summary>
/// <param name="parallelism">TBD</param>
/// <param name="mapFunc">TBD</param>
public readonly Outlet<TOut> Out = new("SelectAsyncUnordered.out");

public SelectAsyncUnordered(int parallelism, Func<TIn, Task<TOut>> mapFunc)
{
_parallelism = parallelism;
_mapFunc = mapFunc;
Shape = new FlowShape<TIn, TOut>(In, Out);
}

/// <summary>
/// TBD
/// </summary>

protected override Attributes InitialAttributes { get; } = Attributes.CreateName("selectAsyncUnordered");

/// <summary>
/// TBD
/// </summary>

public override FlowShape<TIn, TOut> Shape { get; }

/// <summary>
/// TBD
/// </summary>
/// <param name="inheritedAttributes">TBD</param>
/// <returns>TBD</returns>

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
=> new Logic(inheritedAttributes, this);
}
Expand Down