diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs index 19249836101..50e834095b6 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs @@ -88,7 +88,7 @@ public async Task A_Flow_with_SelectAsync_must_produce_task_elements_in_order() sub.Request(1000); foreach (var n in Enumerable.Range(1, 50)) await c.ExpectNextAsync(n); - //Enumerable.Range(1, 50).ForEach(n => c.ExpectNext(n)); + await c.ExpectCompleteAsync(); } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs index 026b18f2b46..c7b13bdb7d2 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs @@ -134,7 +134,7 @@ await this.AssertAllStagesStoppedAsync(async() => { .To(Sink.FromSubscriber(c)).Run(Materializer); var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); - c.ExpectError().InnerException.Message.Should().Be("err1"); + (await c.ExpectErrorAsync()).Message.Should().Be("err1"); latch.CountDown(); }, Materializer); } @@ -193,7 +193,7 @@ await this.AssertAllStagesStoppedAsync(async() => { .RunWith(Sink.FromSubscriber(c), Materializer); var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); - c.ExpectError().Message.Should().Be("err2"); + (await c.ExpectErrorAsync()).Message.Should().Be("err2"); latch.CountDown(); }, Materializer); } diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index 2eed10cc524..6fa07693026 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -312,14 +312,6 @@ public static Flow StatefulSelectMany(this Fl /// /// /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static Flow SelectAsync(this Flow flow, int parallelism, Func> asyncMapper) { return (Flow)InternalFlowOperations.SelectAsync(flow, parallelism, asyncMapper); @@ -357,14 +349,6 @@ public static Flow SelectAsync(this Flow /// /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static Flow SelectAsyncUnordered(this Flow flow, int parallelism, Func> asyncMapper) { return (Flow)InternalFlowOperations.SelectAsyncUnordered(flow, parallelism, asyncMapper); @@ -386,12 +370,6 @@ public static Flow SelectAsyncUnordered(this /// /// Cancels when downstream cancels /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static Flow Where(this Flow flow, Predicate predicate) { return (Flow)InternalFlowOperations.Where(flow, predicate); @@ -413,12 +391,6 @@ public static Flow Where(this Flow /// Cancels when downstream cancels /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static Flow WhereNot(this Flow flow, Predicate predicate) { return (Flow)InternalFlowOperations.WhereNot(flow, predicate); @@ -446,13 +418,6 @@ public static Flow WhereNot(this Flow /// /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static Flow TakeWhile(this Flow flow, Predicate predicate, bool inclusive = false) { return (Flow)InternalFlowOperations.TakeWhile(flow, predicate, inclusive); @@ -470,12 +435,6 @@ public static Flow TakeWhile(this Flow /// Cancels when downstream cancels /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static Flow SkipWhile(this Flow flow, Predicate predicate) { return (Flow)InternalFlowOperations.SkipWhile(flow, predicate); diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 376539b8f7e..2bff3a9d980 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -369,13 +369,6 @@ public static IFlow SelectAsync(this IFlow /// /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static IFlow SelectAsyncUnordered(this IFlow flow, int parallelism, Func> asyncMapper) { @@ -395,11 +388,6 @@ public static IFlow SelectAsyncUnordered(this IFlow /// /// Cancels when downstream cancels /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static IFlow Where(this IFlow flow, Predicate predicate) { return flow.Via(new Fusing.Where(predicate)); @@ -418,11 +406,6 @@ public static IFlow Where(this IFlow flow, Predicate< /// /// Cancels when downstream cancels /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static IFlow WhereNot(this IFlow flow, Predicate predicate) { return flow.Via(new Fusing.Where(e => !predicate(e))); @@ -451,12 +434,6 @@ public static IFlow WhereNot(this IFlow flow, Predica /// /// /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public static IFlow TakeWhile(this IFlow flow, Predicate predicate, bool inclusive) { return flow.Via(new Fusing.TakeWhile(predicate, inclusive)); diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 55c3d38daff..01fc7321688 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -2808,7 +2808,7 @@ private sealed class Logic : InAndOutGraphStageLogic public Logic(Attributes inheritedAttributes, SelectAsyncUnordered stage) : base(stage.Shape) { _stage = stage; - var attr = inheritedAttributes.GetAttribute(null); + var attr = inheritedAttributes.GetAttribute(); _decider = attr != null ? attr.Decider : Deciders.StoppingDecider; _taskCallback = GetAsyncCallback>(TaskCompleted); @@ -2827,8 +2827,10 @@ public override void OnPush() if (task.IsCompleted) TaskCompleted(Result.FromTask(task)); else - task.ContinueWith(t => _taskCallback(Result.FromTask(t)), - TaskContinuationOptions.ExecuteSynchronously); + { + // use an async local function to await the task and then run the task callback + _ = RunTask(task); + } } catch (Exception e) { @@ -2838,6 +2840,20 @@ public override void OnPush() if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In)) TryPull(_stage.In); + return; + + async Task RunTask(Task tt) + { + try + { + var result = Result.Success(await tt); + _taskCallback(result); + } + catch (Exception ex) + { + _taskCallback(Result.Failure(ex)); + } + } } public override void OnUpstreamFinish() @@ -2909,42 +2925,22 @@ private int Todo private readonly int _parallelism; private readonly Func> _mapFunc; - /// - /// TBD - /// + public readonly Inlet In = new("SelectAsyncUnordered.in"); - /// - /// TBD - /// - public readonly Outlet Out = new("SelectAsyncUnordered.out"); - /// - /// TBD - /// - /// TBD - /// TBD + public readonly Outlet Out = new("SelectAsyncUnordered.out"); + public SelectAsyncUnordered(int parallelism, Func> mapFunc) { _parallelism = parallelism; _mapFunc = mapFunc; Shape = new FlowShape(In, Out); } - - /// - /// TBD - /// + protected override Attributes InitialAttributes { get; } = Attributes.CreateName("selectAsyncUnordered"); - - /// - /// TBD - /// + public override FlowShape Shape { get; } - - /// - /// TBD - /// - /// TBD - /// TBD + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(inheritedAttributes, this); }