Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.NotUsed
import akka.actor.ActorSystem
import akka.remote.artery.BenchTestSource
import akka.remote.artery.LatchSink
import akka.stream.impl.PhasedFusingActorMaterializer
import akka.stream.impl.StreamSupervisor
import akka.stream.scaladsl._
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import akka.stream.impl.fusing.GraphStages

object FlatMapConcatBenchmark {
final val OperationsPerInvocation = 100000
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class FlatMapConcatBenchmark {
import FlatMapConcatBenchmark._

private val config = ConfigFactory.parseString(
"""
akka.actor.default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-factor = 1
}
}
"""
)

private implicit val system: ActorSystem = ActorSystem("FlatMapConcatBenchmark", config)

var materializer: ActorMaterializer = _

var testSource: Source[java.lang.Integer, NotUsed] = _

@Setup
def setup(): Unit = {
val settings = ActorMaterializerSettings(system)
materializer = ActorMaterializer(settings)

testSource = Source.fromGraph(new BenchTestSource(OperationsPerInvocation))
}

@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def sourceDotSingle(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(Source.single)
.runWith(new LatchSink(OperationsPerInvocation, latch))(materializer)

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def internalSingleSource(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(elem ⇒ new GraphStages.SingleSource(elem))
.runWith(new LatchSink(OperationsPerInvocation, latch))(materializer)

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def oneElementList(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(n ⇒ Source(n :: Nil))
.runWith(new LatchSink(OperationsPerInvocation, latch))(materializer)

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def mapBaseline(): Unit = {
val latch = new CountDownLatch(1)

testSource
.map(elem ⇒ elem)
.runWith(new LatchSink(OperationsPerInvocation, latch))(materializer)

awaitLatch(latch)
}

private def awaitLatch(latch: CountDownLatch): Unit = {
if (!latch.await(30, TimeUnit.SECONDS)) {
dumpMaterializer()
throw new RuntimeException("Latch didn't complete in time")
}
}

private def dumpMaterializer(): Unit = {
materializer match {
case impl: PhasedFusingActorMaterializer ⇒
val probe = TestProbe()(system)
impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
val children = probe.expectMsgType[StreamSupervisor.Children].children
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import akka.stream.testkit.scaladsl.StreamTestKit._

import scala.concurrent._
import scala.concurrent.duration._

import akka.stream.impl.TraversalBuilder
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages.SingleSource
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import org.scalatest.exceptions.TestFailedException
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestLatch
import akka.util.OptionVal

class FlowFlattenMergeSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
Expand Down Expand Up @@ -210,5 +215,85 @@ class FlowFlattenMergeSpec extends StreamSpec {
attributes.indexOf(Attributes.Name("inner")) < attributes.indexOf(Attributes.Name("outer")) should be(true)
}

"work with optimized Source.single" in assertAllStagesStopped {
Source(0 to 3)
.flatMapConcat(Source.single)
.runWith(toSeq)
.futureValue should ===(0 to 3)
}

"work with optimized Source.single when slow demand" in assertAllStagesStopped {
val probe = Source(0 to 4)
.flatMapConcat(Source.single)
.runWith(TestSink.probe)

probe.request(3)
probe.expectNext(0)
probe.expectNext(1)
probe.expectNext(2)
probe.expectNoMessage(100.millis)

probe.request(10)
probe.expectNext(3)
probe.expectNext(4)
probe.expectComplete()
}

"work with mix of Source.single and other sources when slow demand" in assertAllStagesStopped {
val sources: Source[Source[Int, NotUsed], NotUsed] = Source(List(
Source.single(0),
Source.single(1),
Source(2 to 4),
Source.single(5),
Source(6 to 6),
Source.single(7),
Source(8 to 10),
Source.single(11)
))

val probe =
sources
.flatMapConcat(identity)
.runWith(TestSink.probe)

probe.request(3)
probe.expectNext(0)
probe.expectNext(1)
probe.expectNext(2)
probe.expectNoMessage(100.millis)

probe.request(1)
probe.expectNext(3)
probe.expectNoMessage(100.millis)

probe.request(1)
probe.expectNext(4)
probe.expectNoMessage(100.millis)

probe.request(3)
probe.expectNext(5)
probe.expectNext(6)
probe.expectNext(7)
probe.expectNoMessage(100.millis)

probe.request(10)
probe.expectNext(8)
probe.expectNext(9)
probe.expectNext(10)
probe.expectNext(11)
probe.expectComplete()
}

"find Source.single via TraversalBuilder" in assertAllStagesStopped {
TraversalBuilder.getSingleSource(Source.single("a")).get.elem should ===("a")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check also that Source.single.{async/mapMatVal} causes a nested graph and does not match for extra measure? (Pretty sure they do, but just for extra measure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanandren Good point, it was wrong. Matched for those cases also. async is only adding attribute, and mapMaterializedValue is adding Transform traversal. Fixed with additional checks for those in 7ae02d1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, good!

TraversalBuilder.getSingleSource(Source(List("a", "b"))) should be(OptionVal.None)

val singleSourceA = new SingleSource("a")
TraversalBuilder.getSingleSource(singleSourceA) should be(OptionVal.Some(singleSourceA))

TraversalBuilder.getSingleSource(Source.single("c").async) should be(OptionVal.None)
TraversalBuilder.getSingleSource(Source.single("d").mapMaterializedValue(_ ⇒ "Mat")) should be(OptionVal.None)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
import akka.stream.scaladsl.Keep
import akka.util.OptionVal

import scala.language.existentials
import scala.collection.immutable.Map.Map1

import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphStages.SingleSource

/**
* INTERNAL API
*
Expand Down Expand Up @@ -334,6 +336,37 @@ import scala.collection.immutable.Map.Map1
}
slot
}

/**
* Try to find `SingleSource` or wrapped such. This is used as a
* performance optimization in FlattenMerge and possibly other places.
*/
def getSingleSource[A >: Null](graph: Graph[SourceShape[A], _]): OptionVal[SingleSource[A]] = {
graph match {
case single: SingleSource[A] @unchecked ⇒ OptionVal.Some(single)
case _ ⇒
graph.traversalBuilder match {
case l: LinearTraversalBuilder ⇒
l.pendingBuilder match {
case OptionVal.Some(a: AtomicTraversalBuilder) ⇒
a.module match {
case m: GraphStageModule[_, _] ⇒
m.stage match {
case single: SingleSource[A] @unchecked ⇒
// It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize.
if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync)
OptionVal.Some(single)
else OptionVal.None
case _ ⇒ OptionVal.None
}
case _ ⇒ OptionVal.None
}
case _ ⇒ OptionVal.None
}
case _ ⇒ OptionVal.None
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pyramid of optimization.

}

/**
Expand Down
Loading