Skip to content

Conversation

@He-Pin
Copy link
Contributor

@He-Pin He-Pin commented Jul 30, 2023

References #31958
was: #31602

I ran the tests after changing the default flatMapConcat to FlattenConcat; the tests passed, too.

[info] FlatMapConcatBenchmark.completedFuture         thrpt    5    832464.979 ± 1045741.316  ops/s
[info] FlatMapConcatBenchmark.completedFutureP1       thrpt    5  10560065.545 ±  406623.774  ops/s
[info] FlatMapConcatBenchmark.internalSingleSource    thrpt    5  17202143.700 ±  735428.818  ops/s
[info] FlatMapConcatBenchmark.internalSingleSourceP1  thrpt    5  16402884.417 ±  481353.061  ops/s
[info] FlatMapConcatBenchmark.mapBaseline             thrpt    5  37437826.208 ±  590107.276  ops/s
[info] FlatMapConcatBenchmark.normalFuture            thrpt    5    315802.300 ±   23575.498  ops/s
[info] FlatMapConcatBenchmark.normalFutureP1          thrpt    5    339282.443 ±    6550.085  ops/s
[info] FlatMapConcatBenchmark.oneElementList          thrpt    5   1019333.244 ±   50691.164  ops/s
[info] FlatMapConcatBenchmark.oneElementListP1        thrpt    5  10686004.485 ±  430848.272  ops/s
[info] FlatMapConcatBenchmark.sourceDotSingle         thrpt    5   7971587.554 ± 1193544.525  ops/s
[info] FlatMapConcatBenchmark.sourceDotSingleP1       thrpt    5  11181956.601 ±  124978.076  ops/s


jmh:run -i 5 -wi 5 -f1 -t1 akka.stream.FlatMapConcatBenchmark 

@He-Pin
Copy link
Contributor Author

He-Pin commented Jul 30, 2023

before:

[info] Benchmark                                                         Mode  Cnt         Score        Error   Units
[info] FlatMapConcatBenchmark.internalSingleSource                      thrpt   10   9111519.387 锟斤拷 183996.447   ops/s
[info] FlatMapConcatBenchmark.mapBaseline                               thrpt   10  20336793.367 锟斤拷 887065.039   ops/s
[info] FlatMapConcatBenchmark.oneElementList                            thrpt   10    371981.248 锟斤拷   3765.397   ops/s
[info] FlatMapConcatBenchmark.sourceDotSingle                           thrpt   10   4578547.437 锟斤拷 479604.201   ops/s

after:

info] Benchmark                                                         Mode  Cnt          Score            Error   Units
[info] FlatMapConcatBenchmark.internalSingleSource                      thrpt   10    9027659.060 锟斤拷     136614.084   ops/s
[info] FlatMapConcatBenchmark.mapBaseline                               thrpt   10   20947850.178 锟斤拷    1039800.021   ops/s
[info] FlatMapConcatBenchmark.oneElementList                            thrpt   10    4451591.431 锟斤拷      55815.355   ops/s
[info] FlatMapConcatBenchmark.sourceDotSingle                           thrpt   10    4547444.612 锟斤拷     136653.128   ops/s

@He-Pin He-Pin marked this pull request as ready for review July 30, 2023 14:12
@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch from ae99e81 to 0be8cdd Compare July 30, 2023 15:41
@GreyPlane
Copy link
Contributor

I wonder if it is possible to write a Buffer.peek(n: Int) which peek from current to n, and by that, is it easy to impl Merge via this, just change inflightSources.peek() eq self to check through the range from 0 to breadth - 1.

@He-Pin
Copy link
Contributor Author

He-Pin commented Jul 31, 2023

@GreyPlane How about comment in line, thanks

@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch from 0be8cdd to 6f2c507 Compare July 31, 2023 04:21
Copy link

@gael-ft gael-ft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the implementation.
This will allow us to have more than 1 source pulled at a time keeping elements ordered from downstream perspective.

@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch from 6f2c507 to 135ee1f Compare July 31, 2023 17:00
@He-Pin
Copy link
Contributor Author

He-Pin commented Jul 31, 2023

Seems should add another parameter eager too. When true will complete / fail the stream when upstream fail.

@gael-ft
Copy link

gael-ft commented Aug 1, 2023

Seems should add another parameter eager too. When true will complete / fail the stream when upstream fail.

Not obvious to me.
Goal flatMapConcat is to keep elements ordered.
If order is not a concern, you could just use merge (which has the concept of eager).

So I can't see a scenario where you would like to continue after one of the upstream sources cancelled when using this operator.

That being said, if you feel that could be useful, let's go 😉

@He-Pin
Copy link
Contributor Author

He-Pin commented Aug 1, 2023

@gael-ft that's true. but I saw your PR and you can return Source.failed(...) sometime. and for the eager I mean eagerError.
If when materilization, the upstream is Source.failed would I failed the stream ASAP when the parallesm > 1?

And for most case, I think we should make the eagerError = false, even there is an inflight Source.failed(...).

I was wantted to optimize for Source.future and Source.failed, but in which way the Source can be already failed. and there are Flux.concatMapDelayError/flatMapSequentialDelayError, I think delaying error should be the default behavior as the operator only parallelism materialization but not emitting.

@gael-ft
Copy link

gael-ft commented Aug 1, 2023

@He-Pin I might be a bit too much use case centric (i.e. S3 object upload where we don't want any missing part).

If when materilization, the upstream is Source.failed would I failed the stream ASAP when the parallesm > 1?

So this question can probably be a valid question in some cases.

But regarding the default value of eager I think it should be intentional to continue processing after any error occured.
As with Flux, using flatMapSequentialDelayError is intentional vs flatMapSequential.

In my understanding of flatMapConcat it guarantees that the element you process will be in order.
If for some reason you can afford skipping element N+1 and process directly N+2 that should be intentional.

Once again, just my opinion, but as long as this is parameter, users can use it as they want.

@He-Pin
Copy link
Contributor Author

He-Pin commented Aug 1, 2023

@gael-ft Yes, let's just keep it simple for now. And I saw the FailedSource which will failStage when preStartI().
And the FutureSource only trigger the failStage after be pulled. And in your code you are using the FailedSource which will cause the stream fails very soon, So I think I need adjust my code for that too.

@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 3 times, most recently from 7d4965b to 9813a33 Compare August 10, 2023 16:27
@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 4 times, most recently from bd40615 to 92172b4 Compare August 27, 2023 09:59
@He-Pin He-Pin marked this pull request as draft December 31, 2024 14:25
@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 7 times, most recently from cfd75c9 to 05d5b17 Compare January 1, 2025 15:16
@He-Pin He-Pin marked this pull request as ready for review January 1, 2025 15:17
@He-Pin
Copy link
Contributor Author

He-Pin commented Jan 1, 2025

@gael-ft @johanandren @patriknw I think it's ready now, and happy new year.

@He-Pin He-Pin marked this pull request as draft January 1, 2025 19:46
@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 3 times, most recently from 78bda86 to 8103cbc Compare January 1, 2025 20:21
@He-Pin He-Pin marked this pull request as ready for review January 2, 2025 02:26
@He-Pin He-Pin marked this pull request as draft January 3, 2025 14:19
@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch from ea3ca90 to ec4d0c0 Compare January 3, 2025 14:33
@He-Pin He-Pin marked this pull request as ready for review January 3, 2025 14:33
Comment on lines +94 to +99
.statefulMap(() => -1)((pre, current) => {
if (pre + 1 != current) {
throw new IllegalStateException(s"expected $pre + 1 == $current")
}
(current, current)
}, _ => None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the ordering

@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 3 times, most recently from 3ffe355 to 5e084f4 Compare January 10, 2025 11:56
@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch from 5e084f4 to 9ed93c5 Compare January 10, 2025 12:33
@johanandren
Copy link
Contributor

Sorry for not reviewing this yet, busy day, we'll get there eventually so please be patient.

@He-Pin
Copy link
Contributor Author

He-Pin commented Jan 13, 2025

@johanandren take easy, $Work first :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants