-
Notifications
You must be signed in to change notification settings - Fork 3.6k
=str Add IterableSource. #31372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
=str Add IterableSource. #31372
Conversation
939df37 to
387904e
Compare
5929665 to
57bb0e7
Compare
|
Can you explain a bit more what the problem this PR is trying to solve is? Did you observe performance problems with the composed operator? |
|
@johanandren I was trying to solve refs: #21462 |
|
I see, so it is to be able to determine and optimize iterable sources in for example Akka HTTP. @jrudolph is the new stage here useful for that? |
akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
Outdated
Show resolved
Hide resolved
063ee82 to
1fb5bcf
Compare
86586a6 to
690513f
Compare
patriknw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
690513f to
57f66e4
Compare
johanandren
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
57f66e4 to
122a879
Compare
patriknw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| if (isAvailable(out)) { | ||
| push(out, currentIterator.next()) | ||
| } | ||
| if (!currentIterator.hasNext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why call hasNext here,is it ok not to to call here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it just pushed the last element it can immediately complete the stage after that instead of waiting for next pull to make that decision.
I think the
single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)is too longThe current
Source.fromIterableis implemented with asingleandStatefulMapConcat, when the upstream'sSingleSourceis completed, the internal state ofStatefulMapConcatis discarded and completed, and therestartingDeciderwill not make it purpose becauseisClosed(in)is true.I think this can enable further optimization in the FlattenMerge, but PR will change the current behavior of
Source.fromIterable.More optimization can be done in
FlatMapMerge