Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream

/**
* Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element.
*/
sealed abstract class OverflowStrategy

object OverflowStrategy {

/**
* INTERNAL API
*/
private[akka] final case object DropHead extends OverflowStrategy

/**
* INTERNAL API
*/
private[akka] final case object DropTail extends OverflowStrategy

/**
* INTERNAL API
*/
private[akka] final case object DropBuffer extends OverflowStrategy

/**
* INTERNAL API
*/
private[akka] final case object Backpressure extends OverflowStrategy

/**
* If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
* the new element.
*/
def dropHead: OverflowStrategy = DropHead

/**
* If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
* the new element.
*/
def dropTail: OverflowStrategy = DropTail

/**
* If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
*/
def dropBuffer: OverflowStrategy = DropBuffer

/**
* If the buffer is full when a new element is available this strategy backpressures the upstream producer until
* space becomes available in the buffer.
*/
def backpressure: OverflowStrategy = Backpressure
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import scala.collection.immutable
import org.reactivestreams.api.{ Consumer, Processor, Producer }
import org.reactivestreams.spi.Subscriber
import akka.actor.ActorRefFactory
import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.stream.Transformer
import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer, Transformer }
import scala.util.Try
import scala.concurrent.Future
import scala.util.Success
Expand Down Expand Up @@ -59,6 +58,18 @@ private[akka] object Ast {
override def name = "concatFlatten"
}

case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends AstNode {
override def name = "conflate"
}

case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends AstNode {
override def name = "expand"
}

case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode {
override def name = "buffer"
}

trait ProducerNode[I] {
private[akka] def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I]
}
Expand Down
17 changes: 10 additions & 7 deletions akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ private[akka] object ActorProcessor {
import Ast._
def props(settings: MaterializerSettings, op: AstNode): Props =
(op match {
case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer))
case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge ⇒ Props(new MergeImpl(settings, m.other))
case z: Zip ⇒ Props(new ZipImpl(settings, z.other))
case c: Concat ⇒ Props(new ConcatImpl(settings, c.next))
case t: Tee ⇒ Props(new TeeImpl(settings, t.other))
case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer))
case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge ⇒ Props(new MergeImpl(settings, m.other))
case z: Zip ⇒ Props(new ZipImpl(settings, z.other))
case c: Concat ⇒ Props(new ConcatImpl(settings, c.next))
case t: Tee ⇒ Props(new TeeImpl(settings, t.other))
case cf: Conflate ⇒ Props(new ConflateImpl(settings, cf.seed, cf.aggregate))
case ex: Expand ⇒ Props(new ExpandImpl(settings, ex.seed, ex.extrapolate))
case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy))
case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n))
case ConcatAll ⇒ Props(new ConcatAllImpl(settings))
}).withDispatcher(settings.dispatcher)
Expand Down
83 changes: 83 additions & 0 deletions akka-stream/src/main/scala/akka/stream/impl/FixedSizeBuffer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl

/**
* INTERNAL API
*/
private[akka] object FixedSizeBuffer {

/**
* INTERNAL API
*
* Returns a fixed size buffer backed by an array. The buffer implementation DOES NOT check agains overflow or
* underflow, it is the responsibility of the user to track or check the capacity of the buffer before enqueueing
* dequeueing or dropping.
*
* Returns a specialized instance for power-of-two sized buffers.
*/
def apply(size: Int): FixedSizeBuffer =
if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)

sealed abstract class FixedSizeBuffer(val size: Int) {
protected var readIdx = 0
protected var writeIdx = 0
private var remainingCapacity = size
private val buffer = Array.ofDim[Any](size)

protected def incWriteIdx(): Unit
protected def decWriteIdx(): Unit
protected def incReadIdx(): Unit

def isFull: Boolean = remainingCapacity == 0
def isEmpty: Boolean = remainingCapacity == size

def enqueue(elem: Any): Unit = {
buffer(writeIdx) = elem
incWriteIdx()
remainingCapacity -= 1
}

def dequeue(): Any = {
val result = buffer(readIdx)
dropHead()
result
}

def clear(): Unit = {
java.util.Arrays.fill(buffer.asInstanceOf[Array[Object]], null)
readIdx = 0
writeIdx = 0
remainingCapacity = size
}

def dropHead(): Unit = {
buffer(readIdx) = null
incReadIdx()
remainingCapacity += 1
}

def dropTail(): Unit = {
decWriteIdx()
//buffer(writeIdx) = null
remainingCapacity += 1
}
}

private final class ModuloFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) {
override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) % size
override protected def decWriteIdx(): Unit = writeIdx = (writeIdx + size - 1) % size
override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) % size
}

private final class PowerOfTwoFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) {
private val Mask = size - 1
override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) & Mask
override protected def decWriteIdx(): Unit = writeIdx = (writeIdx - 1) & Mask
override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) & Mask
}

}

12 changes: 12 additions & 0 deletions akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import Ast.{ AstNode, Transform }
import akka.stream.{ OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.Flow
import scala.util.Success
Expand Down Expand Up @@ -248,6 +249,17 @@ private[akka] trait Builder[Out] {

def tee(other: Consumer[_ >: Out]): Thing[Out] = andThen(Tee(other.asInstanceOf[Consumer[Any]]))

def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Thing[S] =
andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any]))

def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Thing[U] =
andThen(Expand(seed.asInstanceOf[Any ⇒ Any], extrapolate.asInstanceOf[Any ⇒ (Any, Any)]))

def buffer(size: Int, overflowStrategy: OverflowStrategy): Thing[Out] = {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
andThen(Buffer(size, overflowStrategy))
}

def flatten[U](strategy: FlattenStrategy[Out, U]): Thing[U] = strategy match {
case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll)
case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl

import akka.stream.{ OverflowStrategy, MaterializerSettings }

class ConflateImpl(_settings: MaterializerSettings, seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends ActorProcessorImpl(_settings) {
var conflated: Any = null

val waitNextZero: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒
conflated = seed(primaryInputs.dequeueInputElement())
nextPhase(conflateThenEmit)
}

val conflateThenEmit: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { () ⇒
if (primaryInputs.inputsAvailable) conflated = aggregate(conflated, primaryInputs.dequeueInputElement())
if (primaryOutputs.demandAvailable) {
primaryOutputs.enqueueOutputElement(conflated)
conflated = null
nextPhase(waitNextZero)
}
}

nextPhase(waitNextZero)
}

class ExpandImpl(_settings: MaterializerSettings, seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends ActorProcessorImpl(_settings) {
var extrapolateState: Any = null

val waitFirst: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒
extrapolateState = seed(primaryInputs.dequeueInputElement())
nextPhase(emitFirst)
}

val emitFirst: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒
emitExtrapolate()
nextPhase(extrapolateOrReset)
}

val extrapolateOrReset: TransferPhase = TransferPhase(primaryInputs.NeedsInputOrComplete || primaryOutputs.NeedsDemand) { () ⇒
if (primaryInputs.inputsDepleted) nextPhase(completedPhase)
else if (primaryInputs.inputsAvailable) {
extrapolateState = seed(primaryInputs.dequeueInputElement())
nextPhase(emitFirst)
} else emitExtrapolate()
}

def emitExtrapolate(): Unit = {
val (emit, nextState) = extrapolate(extrapolateState)
primaryOutputs.enqueueOutputElement(emit)
extrapolateState = nextState
}

nextPhase(waitFirst)
}

class BufferImpl(_settings: MaterializerSettings, size: Int, overflowStrategy: OverflowStrategy) extends ActorProcessorImpl(_settings) {
import OverflowStrategy._

val buffer = FixedSizeBuffer(size)

val dropAction: () ⇒ Unit = overflowStrategy match {
case DropHead ⇒ buffer.dropHead
case DropTail ⇒ buffer.dropTail
case DropBuffer ⇒ buffer.clear
case Backpressure ⇒ () ⇒ nextPhase(bufferFull)
}

val bufferEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒
buffer.enqueue(primaryInputs.dequeueInputElement())
nextPhase(bufferNonEmpty)
}

val bufferNonEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { () ⇒
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave out the TransferPhase type annotation for these internal vals, for readabilty

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiler complains of recursive types if I do that

if (primaryOutputs.demandAvailable) {
primaryOutputs.enqueueOutputElement(buffer.dequeue())
if (buffer.isEmpty) nextPhase(bufferEmpty)
} else {
if (buffer.isFull) dropAction()
else buffer.enqueue(primaryInputs.dequeueInputElement())
}
}

val bufferFull: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒
primaryOutputs.enqueueOutputElement(buffer.dequeue())
if (buffer.isEmpty) nextPhase(bufferEmpty)
else nextPhase(bufferNonEmpty)
}

nextPhase(bufferEmpty)
}
52 changes: 51 additions & 1 deletion akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.japi.Pair
import akka.japi.Predicate
import akka.japi.Procedure
import akka.japi.Util.immutableSeq
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.{ Duct ⇒ SDuct }
import akka.stream.impl.Ast

Expand Down Expand Up @@ -201,6 +201,44 @@ abstract class Duct[In, Out] {
*/
def append[U](duct: Duct[_ >: In, U]): Duct[In, U]

/**
* Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary
* until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream producer is faster.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: Function[Out, S], aggregate: Function2[S, Out, S]): Duct[In, S]

/**
* Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older
* element until new element comes from the upstream. For example an expand step might repeat the last element for
* the consumer until it receives an update from upstream.
*
* This element will never "drop" upstream elements as all elements go through at least one extrapolation step.
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* consumer.
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
*/
def expand[S, U](seed: Function[Out, S], extrapolate: Function[S, Pair[U, S]]): Duct[In, U]

/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
* Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no
* space available
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, Out]

/**
* Materialize this `Duct` by attaching it to the specified downstream `consumer`
* and return a `Consumer` representing the input side of the `Duct`.
Expand Down Expand Up @@ -311,6 +349,18 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In,
override def tee(other: Consumer[_ >: T]): Duct[In, T] =
new DuctAdapter(delegate.tee(other))

override def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, T] =
new DuctAdapter(delegate.buffer(size, overflowStrategy))

override def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Duct[In, U] =
new DuctAdapter(delegate.expand(seed.apply, (s: S) ⇒ {
val p = extrapolate.apply(s)
(p.first, p.second)
}))

override def conflate[S](seed: Function[T, S], aggregate: Function2[S, T, S]): Duct[In, S] =
new DuctAdapter(delegate.conflate(seed.apply, aggregate.apply))

override def flatten[U](strategy: FlattenStrategy[T, U]): Duct[In, U] =
new DuctAdapter(delegate.flatten(strategy))

Expand Down
Loading