diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index ce2fac489e5..442fb717694 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -1388,7 +1388,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow DivertToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.Flow Expand(this Akka.Streams.Dsl.Flow flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Flow, TMat> Grouped(this Akka.Streams.Dsl.Flow flow, int n) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Flow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWithin(this Akka.Streams.Dsl.Flow flow, int n, System.TimeSpan timeout) { } @@ -2089,6 +2091,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source DivertToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.Source Expand(this Akka.Streams.Dsl.Source flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Source flow, int maxSubstreams, System.Func groupingFunc) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Source flow, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Source, TMat> Grouped(this Akka.Streams.Dsl.Source flow, int n) { } public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index f37a016c412..e23359dba13 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -1386,7 +1386,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow DivertToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.Flow Expand(this Akka.Streams.Dsl.Flow flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Flow, TMat> Grouped(this Akka.Streams.Dsl.Flow flow, int n) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Flow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWithin(this Akka.Streams.Dsl.Flow flow, int n, System.TimeSpan timeout) { } @@ -2087,6 +2089,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source DivertToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.Source Expand(this Akka.Streams.Dsl.Source flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Source flow, int maxSubstreams, System.Func groupingFunc) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Source flow, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Source, TMat> Grouped(this Akka.Streams.Dsl.Source flow, int n) { } public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index 5b3a8b1871d..efc19e400a7 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -377,6 +377,29 @@ await this.AssertAllStagesStoppedAsync(async () => }, Materializer); } + [Fact(DisplayName = "GroupBy must not have substream limit when maxSubStream is set to negative numbers")] + public async Task GroupBy_UnlimitedSubstreamTest() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var f = Flow.Create().GroupBy(-1, x => x).PrefixAndTail(0).MergeSubstreams(); + var (up, down) = ((Flow, Source), NotUsed>)f) + .RunWith(this.SourceProbe(), this.SinkProbe<(IImmutableList, Source)>(), Materializer); + + await down.RequestAsync(100); + + foreach (var i in Enumerable.Range(0, 100)) + { + await up.SendNextAsync(i); + var (_, source) = await down.ExpectNextAsync(); + var (sub, probe) = await StreamPuppet(source.RunWith(Sink.AsPublisher(false), Materializer), this); + + sub.Request(1); + await probe.ExpectNextAsync(i); + } + }, Materializer); + } + [Fact] public async Task GroupBy_must_resume_when_exceeding_maxSubStreams() { diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index 6fa07693026..0a9705b5699 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -1346,13 +1346,70 @@ public static Flow Transform(this Flo /// TBD /// TBD /// TBD - /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams. /// Computes the key for each element /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion /// TBD public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation) => flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), allowClosedSubstreamRecreation); + /// + /// This operation demultiplexes the incoming stream into separate output + /// streams, one for each element key. The key is computed for each element + /// using the given function. When a new key is encountered for the first time + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: If is set to false (default behavior) the operator + /// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this + /// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. + /// + /// + /// Note: If is set to true substream completion and incoming + /// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing + /// these elements might get lost. + /// + /// + /// The object returned from this method is not a normal , it is a + /// . This means that after this operator + /// all transformations are applied to all encountered substreams in the same fashion. + /// Substream mode is exited either by closing the substream (i.e. connecting it to a ) + /// or by merging the substreams back together; see the To and MergeBack methods + /// on for more information. + /// + /// + /// It is important to note that the substreams also propagate back-pressure as any other stream, which means + /// that blocking one substream will block the GroupBy operator itself —and thereby all substreams— once all + /// internal or explicit buffers are filled. + /// + /// + /// If the group by function throws an exception and the supervision decision + /// is the stream and substreams will be completed with failure. + /// + /// + /// If the group by throws an exception and the supervision decision + /// is or + /// the element is dropped and the stream and substreams continue. + /// + /// + /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. + /// + /// **Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group. + /// **Backpressures when** there is an element pending for a group whose substream backpressures + /// **Completes when** upstream completes + /// **Cancels when** downstream cancels and all substreams cancel + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Computes the key for each element + /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion + /// TBD + public static SubFlow> GroupBy(this Flow flow, Func groupingFunc, bool allowClosedSubstreamRecreation) => + flow.GroupBy(-1, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), allowClosedSubstreamRecreation); + /// /// This operation demultiplexes the incoming stream into separate output /// streams, one for each element key. The key is computed for each element @@ -1366,9 +1423,38 @@ public static SubFlow> GroupBy /// See /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams. + /// Computes the key for each element public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc) => flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), false); + /// + /// This operation demultiplexes the incoming stream into separate output + /// streams, one for each element key. The key is computed for each element + /// using the given function. When a new key is encountered for the first time + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: The stage keeps track of all keys of streams that have already been closed. + /// If you expect an infinite number of keys this can cause memory issues. Elements belonging + /// to those keys are drained directly and not send to the substream. + /// + /// See + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Computes the key for each element + public static SubFlow> GroupBy(this Flow flow, Func groupingFunc) => + flow.GroupBy(-1, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), false); + /// /// This operation applies the given predicate to all incoming elements and /// emits them to a stream of output streams, always beginning a new one with diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index a8dd60c3129..804519bd71f 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -1314,7 +1314,7 @@ public static IFlow Transform(this IFlow /// TBD /// TBD /// TBD - /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams. /// Computes the key for each element /// TBD /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion @@ -1328,14 +1328,14 @@ public static SubFlow GroupBy( { var merge = new GroupByMergeBack(flow, maxSubstreams, groupingFunc, allowClosedSubstreamRecreation); - TClosed finish(Sink s) + return new SubFlowImpl(Flow.Create(), merge, Finish); + + TClosed Finish(Sink s) { return toFunc( flow.Via(new Fusing.GroupBy(maxSubstreams, groupingFunc, allowClosedSubstreamRecreation)), Sink.ForEach>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer))); } - - return new SubFlowImpl(Flow.Create(), merge, finish); } /// @@ -1354,8 +1354,8 @@ TClosed finish(Sink s) /// /// /// - /// - /// + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams. + /// Computes the key for each element /// /// public static SubFlow GroupBy( @@ -1381,9 +1381,9 @@ internal sealed class GroupByMergeBack : IMergeBack /// TBD - /// TBD - /// TBD - /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams. + /// Computes the key for each element + /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion public GroupByMergeBack(IFlow self, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation = false) { _self = self; diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index 557b2158826..8b88a5ed71f 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -1215,14 +1215,53 @@ public static Source Transform(this SourceTBD /// TBD /// TBD - /// TBD - /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams. + /// Computes the key for each element /// TBD public static SubFlow> GroupBy(this Source flow, int maxSubstreams, Func groupingFunc) { return flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Source, TMat>)f).To(s)); } + /// + /// This operation demultiplexes the incoming stream into separate output + /// streams, one for each element key. The key is computed for each element + /// using the given function. When a new key is encountered for the first time + /// it is emitted to the downstream subscriber together with a fresh + /// flow that will eventually produce all the elements of the substream + /// for that key. Not consuming the elements from the created streams will + /// stop this processor from processing more elements, therefore you must take + /// care to unblock (or cancel) all of the produced streams even if you want + /// to consume only one of them. + /// + /// If the group by function throws an exception and the supervision decision + /// is the stream and substreams will be completed + /// with failure. + /// + /// If the group by throws an exception and the supervision decision + /// is or + /// the element is dropped and the stream and substreams continue. + /// + /// Emits when an element for which the grouping function returns a group that has not yet been created. + /// Emits the new group + /// + /// Backpressures when there is an element pending for a group whose substream backpressures + /// + /// Completes when upstream completes + /// + /// Cancels when downstream cancels and all substreams cancel + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// Computes the key for each element + /// TBD + public static SubFlow> GroupBy(this Source flow, Func groupingFunc) + { + return flow.GroupBy(-1, groupingFunc, (f, s) => ((Source, TMat>)f).To(s)); + } + /// /// This operation applies the given predicate to all incoming elements and /// emits them to a stream of output streams, always beginning a new one with diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index e4d9c2d03c9..3b702b6306e 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -419,9 +419,9 @@ public void OnPush() } else { - if (_activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams) + if (_stage._maxSubstreams > -1 && _activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams) throw new TooManySubstreamsOpenException(); - else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) + if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) Pull(_stage.In); else RunSubstream(key, element); @@ -640,9 +640,9 @@ public void OnDownstreamFinish(Exception cause) /// /// TBD /// - /// TBD - /// TBD - /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams. + /// Computes the key for each element + /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion public GroupBy(int maxSubstreams, Func keyFor, bool allowClosedSubstreamRecreation = false) { _maxSubstreams = maxSubstreams;