Skip to content

Optimize flatMapConcat for single element source, #25241#25242

Merged
patriknw merged 4 commits intomasterfrom
wip-flatMapConcat-patriknw
Jul 11, 2018
Merged

Optimize flatMapConcat for single element source, #25241#25242
patriknw merged 4 commits intomasterfrom
wip-flatMapConcat-patriknw

Conversation

@patriknw
Copy link
Contributor

  • This first stab is only looking for SingleSource and note
    that Source.single is not returning that. Source.single has
    wrap with fromGraph and then the original SingleSource is
    lost.

Refs #25241

Benchmark results speak for themselves, 287k vs 6641k elements/s:

jmh:run -prof gc -i 5 -wi 5 -f2 -t1 akka.stream.FlatMapConcatBenchmark

[info] FlatMapConcatBenchmark.single                                        thrpt   10    287130.513 ±    4748.851   ops/s
[info] FlatMapConcatBenchmark.single:·gc.alloc.rate                         thrpt   10      1327.483 ±    1058.667  MB/sec
[info] FlatMapConcatBenchmark.single:·gc.alloc.rate.norm                    thrpt   10      5095.151 ±    4060.888    B/op

[info] FlatMapConcatBenchmark.optimizedSingle                               thrpt   10   6641485.752 ±   42696.132   ops/s
[info] FlatMapConcatBenchmark.optimizedSingle:·gc.alloc.rate                thrpt   10       502.781 ±     400.356  MB/sec
[info] FlatMapConcatBenchmark.optimizedSingle:·gc.alloc.rate.norm           thrpt   10        83.397 ±      66.405    B/op

[info] Benchmark                                                             Mode  Cnt         Score         Error   Units
[info] FlatMapConcatBenchmark.mapBaseline                                   thrpt   10  14275746.640 ± 1653684.721   ops/s
[info] FlatMapConcatBenchmark.mapBaseline:·gc.alloc.rate                    thrpt   10         1.847 ±       1.005  MB/sec
[info] FlatMapConcatBenchmark.mapBaseline:·gc.alloc.rate.norm               thrpt   10         0.143 ±       0.076    B/op

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed validating PR is currently being validated by Jenkins labels Jun 18, 2018
@akka-ci
Copy link

akka-ci commented Jun 18, 2018

Test FAILed.

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice numbers. Still a bit sad there isn't a way to make it more general though.


"work with flatMapConcat optimized GraphStages.SingleSource" in assertAllStagesStopped {
Source(0 to 3)
.flatMapConcat(elem ⇒ new GraphStages.SingleSource(elem))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we should make the constructor of SingleSource a public API or we keep it to ourselves?

@akka-ci akka-ci added validating PR is currently being validated by Jenkins and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) labels Jun 20, 2018
@patriknw
Copy link
Contributor Author

I found a way to grab the original SingleSource via the TraversalBuilder.

flatMapConcat(Source.single) improves from 295k ops/s to 3697k ops/s.

We could get an additional boost to 6743k ops/s by using the SourceSingle without the wrapping in fromGraph. That might not be needed. I think it's more than enough for the websocket case anyway. If we want to do that I prefer to add it as public API.

  def singleElementGraph[T](element: T): Graph[SourceShape[T], NotUsed] =
    new GraphStages.SingleSource(element)

I also verified that the extra work doesn't introduce a regression for non single, see oneElementList in results below.

BEFORE

[info] Benchmark                                                                  Mode  Cnt         Score         Error   Units
[info] FlatMapConcatBenchmark.internalSingleSource                               thrpt   10    304469.550 ±    3026.728   ops/s
[info] FlatMapConcatBenchmark.internalSingleSource:·gc.alloc.rate                thrpt   10      1394.921 ±    1111.594  MB/sec
[info] FlatMapConcatBenchmark.internalSingleSource:·gc.alloc.rate.norm           thrpt   10      5043.950 ±    4019.065    B/op

[info] FlatMapConcatBenchmark.mapBaseline                                        thrpt   10  14286020.955 ± 1613689.270   ops/s
[info] FlatMapConcatBenchmark.mapBaseline:·gc.alloc.rate                         thrpt   10         1.875 ±       1.050  MB/sec
[info] FlatMapConcatBenchmark.mapBaseline:·gc.alloc.rate.norm                    thrpt   10         0.142 ±       0.076    B/op

[info] FlatMapConcatBenchmark.oneElementList                                     thrpt   10    222884.964 ±   14273.282   ops/s
[info] FlatMapConcatBenchmark.oneElementList:·gc.alloc.rate                      thrpt   10      1246.652 ±     994.710  MB/sec
[info] FlatMapConcatBenchmark.oneElementList:·gc.alloc.rate.norm                 thrpt   10      6093.632 ±    4855.525    B/op

[info] FlatMapConcatBenchmark.sourceDotSingle                                    thrpt   10    295337.449 ±   14037.933   ops/s
[info] FlatMapConcatBenchmark.sourceDotSingle:·gc.alloc.rate                     thrpt   10      1361.842 ±    1088.199  MB/sec
[info] FlatMapConcatBenchmark.sourceDotSingle:·gc.alloc.rate.norm                thrpt   10      5063.151 ±    4034.767    B/op

AFTER

[info] Benchmark                                                                  Mode  Cnt         Score        Error   Units
[info] FlatMapConcatBenchmark.internalSingleSource                               thrpt   10   6743400.176 ±  30127.278   ops/s
[info] FlatMapConcatBenchmark.internalSingleSource:·gc.alloc.rate                thrpt   10       510.081 ±    406.160  MB/sec
[info] FlatMapConcatBenchmark.internalSingleSource:·gc.alloc.rate.norm           thrpt   10        83.397 ±     66.405    B/op

[info] FlatMapConcatBenchmark.mapBaseline                                        thrpt   10  14618585.353 ±  26890.408   ops/s
[info] FlatMapConcatBenchmark.mapBaseline:·gc.alloc.rate                         thrpt   10         1.904 ±      1.010  MB/sec
[info] FlatMapConcatBenchmark.mapBaseline:·gc.alloc.rate.norm                    thrpt   10         0.143 ±      0.076    B/op

[info] FlatMapConcatBenchmark.oneElementList                                     thrpt   10    228616.358 ±  22694.113   ops/s
[info] FlatMapConcatBenchmark.oneElementList:·gc.alloc.rate                      thrpt   10      1260.890 ±   1016.359  MB/sec
[info] FlatMapConcatBenchmark.oneElementList:·gc.alloc.rate.norm                 thrpt   10      6084.032 ±   4847.906    B/op

[info] FlatMapConcatBenchmark.sourceDotSingle                                    thrpt   10   3697623.899 ± 124067.029   ops/s
[info] FlatMapConcatBenchmark.sourceDotSingle:·gc.alloc.rate                     thrpt   10      1010.126 ±    804.724  MB/sec
[info] FlatMapConcatBenchmark.sourceDotSingle:·gc.alloc.rate.norm                thrpt   10       301.000 ±    240.064    B/op

@akka-ci akka-ci added tested PR that was successfully built and tested by Jenkins and removed validating PR is currently being validated by Jenkins labels Jun 20, 2018
@akka-ci
Copy link

akka-ci commented Jun 20, 2018

Test PASSed.

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! LGTM

@akka-ci akka-ci added validating PR is currently being validated by Jenkins and removed tested PR that was successfully built and tested by Jenkins labels Jun 20, 2018
@patriknw
Copy link
Contributor Author

Pushed another change to take care of the case when there is no demand b4ea15e

@akka-ci akka-ci added tested PR that was successfully built and tested by Jenkins and removed validating PR is currently being validated by Jenkins labels Jun 20, 2018
@akka-ci
Copy link

akka-ci commented Jun 20, 2018

Test PASSed.

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

"find Source.single via TraversalBuilder" in assertAllStagesStopped {
TraversalBuilder.getSingleSource(Source.single("a")).get.elem should ===("a")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check also that Source.single.{async/mapMatVal} causes a nested graph and does not match for extra measure? (Pretty sure they do, but just for extra measure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanandren Good point, it was wrong. Matched for those cases also. async is only adding attribute, and mapMaterializedValue is adding Transform traversal. Fixed with additional checks for those in 7ae02d1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, good!

@patriknw patriknw added the 2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding label Jul 9, 2018
patriknw added 4 commits July 10, 2018 16:08
* This first stab is only looking for `SingleSource` and note
  that `Source.single` is not returning that. Source.single has
  wrap with fromGraph and then the original `SingleSource` is
  lost.
@patriknw patriknw force-pushed the wip-flatMapConcat-patriknw branch from b4ea15e to 7ae02d1 Compare July 10, 2018 14:43
@akka-ci akka-ci added validating PR is currently being validated by Jenkins tested PR that was successfully built and tested by Jenkins and removed tested PR that was successfully built and tested by Jenkins validating PR is currently being validated by Jenkins labels Jul 10, 2018
@akka-ci
Copy link

akka-ci commented Jul 10, 2018

Test PASSed.

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A third LGTM!

}

"find Source.single via TraversalBuilder" in assertAllStagesStopped {
TraversalBuilder.getSingleSource(Source.single("a")).get.elem should ===("a")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, good!

case _ ⇒ OptionVal.None
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pyramid of optimization.

Copy link
Contributor

@raboof raboof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sidesteps quite some ceremony in this case 👍

var q: BufferImpl[SubSinkInlet[T]] = _
// To be able to optimize for SingleSource without materializing them the queue may hold either
// SubSinkInlet[T] or SingleSource
var queue: BufferImpl[AnyRef] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😈

TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match {
case OptionVal.Some(single) ⇒
if (isAvailable(out) && queue.isEmpty) {
push(out, single.elem.asInstanceOf[T])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

}

def removeSource(src: SubSinkInlet[T]): Unit = {
def removeSource(src: AnyRef): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe explicitly make it private? OK though.

@patriknw patriknw merged commit d76b27b into master Jul 11, 2018
@patriknw patriknw deleted the wip-flatMapConcat-patriknw branch July 11, 2018 16:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding tested PR that was successfully built and tested by Jenkins

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants