Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 13 additions & 27 deletions akka-remote/src/main/scala/akka/remote/artery/Control.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[remote] object InboundControlJunction {
def notify(inboundEnvelope: InboundEnvelope): Unit
}

// messages for the CallbackWrapper
// messages for the stream callback
private[InboundControlJunction] sealed trait CallbackMessage
private[InboundControlJunction] final case class Attach(observer: ControlMessageObserver, done: Promise[Done])
extends CallbackMessage
Expand All @@ -93,8 +93,7 @@ private[remote] class InboundControlJunction

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stoppedPromise = Promise[Done]()
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler {
val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ControlMessageSubject {

private var observers: Vector[ControlMessageObserver] = Vector.empty

Expand All @@ -106,10 +105,6 @@ private[remote] class InboundControlJunction
observers = observers.filterNot(_ == observer)
}

override def preStart(): Unit = {
initCallback(callback.invoke)
}

override def postStop(): Unit = stoppedPromise.success(Done)

// InHandler
Expand All @@ -127,24 +122,22 @@ private[remote] class InboundControlJunction
override def onPull(): Unit = pull(in)

setHandlers(in, out, this)
}

// materialized value
val controlSubject: ControlMessageSubject = new ControlMessageSubject {
// ControlMessageSubject impl
override def attach(observer: ControlMessageObserver): Future[Done] = {
val p = Promise[Done]()
logic.invoke(Attach(observer, p))
callback.invoke(Attach(observer, p))
p.future
}

override def detach(observer: ControlMessageObserver): Unit =
logic.invoke(Dettach(observer))
callback.invoke(Dettach(observer))

override def stopped: Future[Done] =
stoppedPromise.future
}

(logic, controlSubject)
(logic, logic)
}
}

Expand All @@ -169,18 +162,14 @@ private[remote] class OutboundControlJunction(
override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler with StageLogging {

val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundControlIngress {
import OutboundControlJunction._

private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
private val maxControlMessageBufferSize: Int = outboundContext.settings.Advanced.OutboundControlQueueSize
private val buffer = new ArrayDeque[OutboundEnvelope]

override def preStart(): Unit = {
initCallback(sendControlMessageCallback.invoke)
}

// InHandler
override def onPush(): Unit = {
if (buffer.isEmpty && isAvailable(out))
Expand Down Expand Up @@ -212,16 +201,13 @@ private[remote] class OutboundControlJunction(
outboundEnvelopePool.acquire().init(
recipient = OptionVal.None, message = message, sender = OptionVal.None)

setHandlers(in, out, this)
}

// materialized value
val outboundControlIngress = new OutboundControlIngress {
override def sendControlMessage(message: ControlMessage): Unit =
logic.invoke(message)
sendControlMessageCallback.invoke(message)

setHandlers(in, out, this)
}

(logic, outboundControlIngress)
(logic, logic)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, Te
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.Utils._
import akka.testkit.TestProbe

import scala.concurrent.duration._
import scala.concurrent._
import akka.Done
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.time.Span

class QueueSourceSpec extends StreamSpec {
Expand Down Expand Up @@ -184,7 +186,7 @@ class QueueSourceSpec extends StreamSpec {
expectMsgClass(classOf[Status.Failure])
}

"return false when elemen was not added to buffer" in assertAllStagesStopped {
"return false when element was not added to buffer" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
Expand Down Expand Up @@ -300,6 +302,15 @@ class QueueSourceSpec extends StreamSpec {
.expectComplete()
source.watchCompletion().futureValue should ===(Done)
}

"some elements not yet delivered to stage" in {
val (queue, probe) =
Source.queue[Unit](10, OverflowStrategy.fail).toMat(TestSink.probe)(Keep.both).run()
intercept[StreamDetachedException] {
Await.result(
(1 to 15).map(_ ⇒ queue.offer(())).last, 3.seconds)
}
}
}

"fail the stream" when {
Expand Down
2 changes: 1 addition & 1 deletion akka-stream/src/main/mima-filters/2.5.3.backwards.excludes
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.attributes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.this")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$")
19 changes: 18 additions & 1 deletion akka-stream/src/main/mima-filters/2.5.6.backwards.excludes
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this")

# #23111 AsyncCallbacks to just-finishing stages can be lost
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSource$Offer")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSource$Completion$")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSink$Pull")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSink$Cancel$")
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("akka.stream.impl.QueueSink$Output")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSource$Failure")
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("akka.stream.impl.QueueSource$Input")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.stage.AsyncCallback.invokeWithFeedback")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$Stopped")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$NotInitialized")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$Stopped$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$Initialized")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$Initialized$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$NotInitialized$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$CallbackState")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper")

# Optimize TCP stream writes
ProblemFilters.exclude[Problem]("akka.stream.impl.io.*")

52 changes: 25 additions & 27 deletions akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,29 @@
*/
package akka.stream.impl

import java.util.concurrent.CompletionStage

import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
import akka.stream.OverflowStrategies._
import akka.stream._
import akka.stream.stage._
import akka.stream.scaladsl.SourceQueueWithComplete
import akka.Done
import java.util.concurrent.CompletionStage

import akka.annotation.InternalApi

import scala.concurrent.{ Future, Promise }
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal

/**
* INTERNAL API
*/
@InternalApi private[akka] object QueueSource {

sealed trait Input[+T]
final case class Offer[+T](elem: T, promise: Promise[QueueOfferResult]) extends Input[T]
case object Completion extends Input[Nothing]
final case class Failure(ex: Throwable) extends Input[Nothing]

}

/**
Expand All @@ -36,22 +39,18 @@ import scala.compat.java8.FutureConverters._

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val completion = Promise[Done]
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Input[T]] with OutHandler {

val stageLogic = new GraphStageLogic(shape) with OutHandler with SourceQueueWithComplete[T] {
var buffer: Buffer[T] = _
var pendingOffer: Option[Offer[T]] = None
var terminating = false

override def preStart(): Unit = {
if (maxBuffer > 0) buffer = Buffer(maxBuffer, materializer)
initCallback(callback.invoke)
}
override def postStop(): Unit = {
val exception = new StreamDetachedException()
completion.tryFailure(exception)
stopCallback {
case Offer(elem, promise) ⇒ promise.failure(exception)
case _ ⇒ // ignore
}
}

private def enqueueAndSuccess(offer: Offer[T]): Unit = {
Expand All @@ -75,7 +74,7 @@ import scala.compat.java8.FutureConverters._
case DropNew ⇒
offer.promise.success(QueueOfferResult.Dropped)
case Fail ⇒
val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!")
val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!")
offer.promise.success(QueueOfferResult.Failure(bufferOverflowException))
completion.failure(bufferOverflowException)
failStage(bufferOverflowException)
Expand All @@ -89,8 +88,7 @@ import scala.compat.java8.FutureConverters._
}
}

private val callback: AsyncCallback[Input[T]] = getAsyncCallback {

private val callback = getAsyncCallback[Input[T]] {
case offer @ Offer(elem, promise) ⇒
if (maxBuffer != 0) {
bufferElem(offer)
Expand All @@ -107,7 +105,7 @@ import scala.compat.java8.FutureConverters._
case DropTail | DropNew ⇒
promise.success(QueueOfferResult.Dropped)
case Fail ⇒
val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!")
val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!")
promise.success(QueueOfferResult.Failure(bufferOverflowException))
completion.failure(bufferOverflowException)
failStage(bufferOverflowException)
Expand All @@ -131,7 +129,7 @@ import scala.compat.java8.FutureConverters._

override def onDownstreamFinish(): Unit = {
pendingOffer match {
case Some(Offer(elem, promise)) ⇒
case Some(Offer(_, promise)) ⇒
promise.success(QueueOfferResult.QueueClosed)
pendingOffer = None
case None ⇒ // do nothing
Expand Down Expand Up @@ -167,22 +165,22 @@ import scala.compat.java8.FutureConverters._
}
}
}
}

(stageLogic, new SourceQueueWithComplete[T] {
// SourceQueueWithComplete impl
override def watchCompletion() = completion.future
override def offer(element: T): Future[QueueOfferResult] = {
val p = Promise[QueueOfferResult]
stageLogic.invoke(Offer(element, p))
callback.invokeWithFeedback(Offer(element, p))
.onFailure { case NonFatal(e) ⇒ p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
p.future
}
override def complete(): Unit = {
stageLogic.invoke(Completion)
}
override def fail(ex: Throwable): Unit = {
stageLogic.invoke(Failure(ex))
}
})
override def complete(): Unit = callback.invoke(Completion)

override def fail(ex: Throwable): Unit = callback.invoke(Failure(ex))

}

(stageLogic, stageLogic)
}
}

Expand Down
48 changes: 23 additions & 25 deletions akka-stream/src/main/scala/akka/stream/impl/Sinks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,9 @@ import akka.util.OptionVal
override def toString: String = "QueueSink"

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] with InHandler {
var logicCallback: AsyncCallback[Output[T]] = null

val stageLogic = new GraphStageLogic(shape) with InHandler with SinkQueueWithCancel[T] {
type Received[E] = Try[Option[E]]

val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
Expand All @@ -348,29 +350,22 @@ import akka.util.OptionVal
// closed/failure indicators
buffer = Buffer(maxBuffer + 1, materializer)
setKeepGoing(true)
initCallback(callback.invoke)
pull(in)
}

override def postStop(): Unit = stopCallback {
case Pull(promise) ⇒ promise.failure(new StreamDetachedException())
case _ ⇒ //do nothing
}

private val callback: AsyncCallback[Output[T]] =
getAsyncCallback {
case QueueSink.Pull(pullPromise) ⇒ currentRequest match {
case Some(_) ⇒
pullPromise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
case None ⇒
if (buffer.isEmpty) currentRequest = Some(pullPromise)
else {
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(pullPromise)
}
}
case QueueSink.Cancel ⇒ completeStage()
private val callback = getAsyncCallback[Output[T]] {
case QueueSink.Pull(pullPromise) ⇒ currentRequest match {
case Some(_) ⇒
pullPromise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
case None ⇒
if (buffer.isEmpty) currentRequest = Some(pullPromise)
else {
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(pullPromise)
}
}
case QueueSink.Cancel ⇒ completeStage()
}

def sendDownstream(promise: Requested[T]): Unit = {
val e = buffer.dequeue()
Expand Down Expand Up @@ -400,19 +395,22 @@ import akka.util.OptionVal
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))

logicCallback = callback
setHandler(in, this)
}

(stageLogic, new SinkQueueWithCancel[T] {
// SinkQueueWithCancel impl
override def pull(): Future[Option[T]] = {
val p = Promise[Option[T]]
stageLogic.invoke(Pull(p))
logicCallback.invokeWithFeedback(Pull(p))
.onFailure { case NonFatal(e) ⇒ p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
p.future
}
override def cancel(): Unit = {
stageLogic.invoke(QueueSink.Cancel)
logicCallback.invoke(QueueSink.Cancel)
}
})
}

(stageLogic, stageLogic)
}
}

Expand Down
Loading