diff --git a/.travis.yml b/.travis.yml index 7e1f6a27..023dee26 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,8 @@ jdk: services: - rabbitmq +script: sbt '+ test' + sudo: false cache: diff --git a/addons/akka-stream/src/main/scala/com/spingo/op_rabbit/stream/MessagePublisherSink.scala b/addons/akka-stream/src/main/scala/com/spingo/op_rabbit/stream/MessagePublisherSink.scala index a7da6fbe..9913c10e 100644 --- a/addons/akka-stream/src/main/scala/com/spingo/op_rabbit/stream/MessagePublisherSink.scala +++ b/addons/akka-stream/src/main/scala/com/spingo/op_rabbit/stream/MessagePublisherSink.scala @@ -1,141 +1,68 @@ package com.spingo.op_rabbit package stream -import akka.actor.{ActorRef,Props} -import akka.actor.FSM -import akka.pattern.ask -import akka.stream.scaladsl.Sink -import akka.stream.actor._ import scala.concurrent.{Future, Promise} import scala.concurrent.duration._ -import com.timcharper.acked.AckedSink import scala.util.{Try,Success,Failure} -private [stream] object MessagePublisherSinkActor { - sealed trait State - case object Running extends State - case object Stopping extends State - case object AllDoneFuturePlease -} - -private class MessagePublisherSinkActor(rabbitControl: ActorRef, timeoutAfter: FiniteDuration, qos: Int) extends ActorSubscriber with FSM[MessagePublisherSinkActor.State, Unit] { - import ActorSubscriberMessage._ - import MessagePublisherSinkActor._ - - private val queue = scala.collection.mutable.Map.empty[Long, Promise[Unit]] - private val completed = Promise[Unit] - - startWith(Running, ()) - - override val requestStrategy = new MaxInFlightRequestStrategy(max = qos) { - override def inFlightInternally: Int = queue.size - } - - override def postRestart(reason: Throwable): Unit = { - stopWith(Failure(reason)) - super.postRestart(reason) - } - - private def stopWith(reason: Try[Unit]): Unit = { - context stop self - completed.tryComplete(reason) - } - - when(Running) { - case Event(response: Message.ConfirmResponse, _) => - handleResponse(response) - stay - - case Event(OnError(e), _) => - completed.tryFailure(e) - goto(Stopping) - - case Event(OnComplete, _) => - goto(Stopping) - } - - when(Stopping) { - case Event(response: Message.ConfirmResponse, _) => - handleResponse(response) - if(queue.isEmpty) - stop - else - stay - } - - whenUnhandled { - case Event(OnNext((p: Promise[Unit] @unchecked, msg: Message)), _) => - queue(msg.id) = p - rabbitControl ! msg - stay - - case Event(MessagePublisherSinkActor.AllDoneFuturePlease,_) => - sender ! completed.future - stay - } - - onTransition { - case Running -> Stopping if queue.isEmpty => - stopWith(Success(())) - } - - onTermination { - case e: StopEvent => - stopWith(Success(())) - } +import akka.actor.{ActorRef,Props} +import akka.actor.typed.scaladsl.Behaviors +import akka.pattern.ask +import akka.stream.{Attributes, Inlet, SinkShape} +import akka.stream.scaladsl.Source +import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic} - private val handleResponse: Message.ConfirmResponse => Unit = { - case Message.Ack(id) => - queue.remove(id).get.success(()) +import com.timcharper.acked.{AckTup, AckedSink} +import akka.stream.scaladsl.Flow +import akka.actor.typed.Behavior +import akka.actor.ActorSystem +import akka.actor.Actor - case Message.Nack(id) => - queue.remove(id).get.failure(new MessageNacked(id)) +object MessagePublisherSink { + private type In = AckTup[Message] - case Message.Fail(id, exception: Throwable) => - queue.remove(id).get.failure(exception) + def acked(name: String, rabbitControl: ActorRef, actorSystem: ActorSystem) = AckedSink { + MessagePublisherSink(rabbitControl, actorSystem).named(name) } } -/** - A MessagePublisherSink (an [[https://github.com/timcharper/acked-stream/blob/master/src/main/scala/com/timcharper/acked/AckedSink.scala AckedSink]]) publishes each input [[Message]], and either acks or fails the upstream element, depending on [[Message$.ConfirmResponse ConfirmResponse]]. - Using a [[RabbitSource$ RabbitSource]] with a [[MessagePublisherSink$ MessagePublisherSink]] is a great way to get persistent, recoverable streams. +case class MessagePublisherSink(rabbitControl: ActorRef, actorSystem: ActorSystem) extends GraphStage[SinkShape[AckTup[Message]]] { + import MessagePublisherSink.In - Note - MessagePublisherSink uses ActorPublisher and due to AkkaStream limitations, it DOES NOT abide your configured supervisor strategy. + val in: Inlet[In] = Inlet.create("MessagePublisherSinkActor.in") - == [[com.spingo.op_rabbit.Message$.ConfirmResponse Message.ConfirmResponse]] handling == + override val shape: SinkShape[In] = SinkShape.of(in) - After the sink publishes the [[Message]], it listens for the [[Message$.ConfirmResponse Message.ConfirmResponse]], and handles it accordingly: + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) { + val queue = scala.collection.mutable.Map.empty[Long, Promise[Unit]] + val completed = Promise[Unit] - - On [[Message$.Ack Message.Ack]], ack the upstream element. - - - On [[Message$.Nack Message.Nack]], fail the upstream element with - [[MessageNacked]]. '''Does not''' throw a stream - exception. Processing continues. - - - On [[Message$.Fail Message.Fail]], fail the upstream element with - publisher exception. '''Does not''' throw a stream - exception. Processing continues. - - == Future[Unit] materialized type: == - - This sinks materialized type is Future[Unit]. The following applies: - - - It yields any upstream failure as soon as it reaches the sink (potentially before messages are confirmed). - - After the stream completes, and all [[Message$.ConfirmResponse Message.ConfirmResponse]]'s have have been processed, the Future[Unit] is completed. - */ -object MessagePublisherSink { - /** - @param rabbitControl An actor - @param timeoutAfter The duration for which we'll wait for a message to be acked; note, timeouts and non-acknowledged messages will cause the upstream elements to fail. The sink will not throw an exception. - */ - def apply(rabbitControl: ActorRef, timeoutAfter: FiniteDuration = 30 seconds, qos: Int = 8): AckedSink[Message, Future[Unit]] = AckedSink { - Sink.actorSubscriber[(Promise[Unit], Message)](Props(new MessagePublisherSinkActor(rabbitControl, timeoutAfter, qos))). - mapMaterializedValue { subscriber => - implicit val akkaTimeout = akka.util.Timeout(timeoutAfter) - implicit val ec = SameThreadExecutionContext - - (subscriber ? MessagePublisherSinkActor.AllDoneFuturePlease).mapTo[Future[Unit]].flatMap(identity) + class ConfirmationActor extends Actor { + override def receive: Receive = { + case Message.Ack(id) => + queue.remove(id).get.success(()) + case Message.Nack(id) => + queue.remove(id).get.failure(new MessageNacked(id)) + case Message.Fail(id, exception: Throwable) => + queue.remove(id).get.failure(exception) + } } - } + + val confirmationActor = actorSystem.actorOf(Props[ConfirmationActor]) + + setHandler(in, new AbstractInHandler() { + override def onPush() { + val (promise, msg) = grab(in) + queue(msg.id) = promise + rabbitControl.tell(msg, sender = confirmationActor) + } + + override def onUpstreamFinish() { + completed.success(()) + super.onUpstreamFinish() + } + }) + } } diff --git a/addons/akka-stream/src/test/scala/com/spingo/op_rabbit/stream/MessagePublisherSinkSpec.scala b/addons/akka-stream/src/test/scala/com/spingo/op_rabbit/stream/MessagePublisherSinkSpec.scala index 60509f92..9c93f7c4 100644 --- a/addons/akka-stream/src/test/scala/com/spingo/op_rabbit/stream/MessagePublisherSinkSpec.scala +++ b/addons/akka-stream/src/test/scala/com/spingo/op_rabbit/stream/MessagePublisherSinkSpec.scala @@ -1,6 +1,9 @@ package com.spingo.op_rabbit package stream +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Try,Failure} + import akka.actor._ import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Keep, Sink} @@ -11,8 +14,6 @@ import com.spingo.op_rabbit.helpers.RabbitTestHelpers import com.timcharper.acked.AckedSource import com.spingo.scoped_fixtures.ScopedFixtures import org.scalatest.{FunSpec, Matchers} -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Try,Failure} class MessagePublisherSinkSpec extends FunSpec with ScopedFixtures with Matchers with RabbitTestHelpers { implicit val executionContext = ExecutionContext.global @@ -62,9 +63,9 @@ class MessagePublisherSinkSpec extends FunSpec with ScopedFixtures with Matchers val published = AckedSource(data). map(Message.queue(_, queueName)). - runWith(MessagePublisherSink(rabbitControl)) + to(MessagePublisherSink.acked("test-sink", rabbitControl, actorSystem)) - await(published) + published.run() await(Future.sequence(data.map(_._1.future))) // this asserts that all of the promises were fulfilled await(consumed) should be (range) } @@ -73,20 +74,18 @@ class MessagePublisherSinkSpec extends FunSpec with ScopedFixtures with Matchers it("propagates publish exceptions to promise") { new RabbitFixtures { val factory = Message.factory(Publisher.queue(Queue.passive("no-existe"))) - val sink = MessagePublisherSink(rabbitControl) + val sink = MessagePublisherSink.acked("test-sink", rabbitControl, actorSystem) val data = range map { i => (Promise[Unit], i) } val published = AckedSource(data). map(Message(_, Publisher.queue(Queue.passive("no-existe")))). - runWith(sink) + to(sink) - await(published) + published.run() val Failure(ex) = Try(await(data.head._1.future)) ex.getMessage should include ("no queue 'no-existe'") } } } - - } diff --git a/build.sbt b/build.sbt index 0c1cb086..3de2cd20 100644 --- a/build.sbt +++ b/build.sbt @@ -1,8 +1,8 @@ import java.util.Properties -val json4sVersion = "3.6.6" -val circeVersion = "0.12.3" -val akkaVersion = "2.5.25" +val json4sVersion = "3.6.11" +val circeVersion = "0.14.1" +val akkaVersion = "2.6.17" val playVersion = "2.7.4" val appProperties = { @@ -16,17 +16,18 @@ val assertNoApplicationConf = taskKey[Unit]("Makes sure application.conf isn't p val commonSettings = Seq( organization := "com.spingo", version := appProperties.getProperty("version"), - scalaVersion := "2.12.10", - crossScalaVersions := Seq("2.12.10", "2.13.1"), + scalaVersion := "2.13.6", + crossScalaVersions := Seq("2.12.15", "2.13.6"), libraryDependencies ++= Seq( - "com.chuusai" %% "shapeless" % "2.3.3", - "com.typesafe" % "config" % "1.3.4", - "com.newmotion" %% "akka-rabbitmq" % "5.1.2", - "org.slf4j" % "slf4j-api" % "1.7.26", - "ch.qos.logback" % "logback-classic" % "1.2.3" % "test", + "com.chuusai" %% "shapeless" % "2.3.7", + "com.typesafe" % "config" % "1.4.1", + "com.newmotion" %% "akka-rabbitmq" % "6.0.0", + "org.slf4j" % "slf4j-api" % "1.7.32", + "ch.qos.logback" % "logback-classic" % "1.2.5" % "test", "org.scalatest" %% "scalatest" % "3.0.8" % "test", "com.spingo" %% "scoped-fixtures" % "2.0.0" % "test", "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % "test" ), @@ -120,7 +121,7 @@ lazy val upickle = (project in file("./addons/upickle")). libraryDependencies += "com.lihaoyi" %% "upickle" % ( CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, 11)) => "0.7.4" - case _ => "0.8.0" + case _ => "1.2.2" } )). dependsOn(core) @@ -136,9 +137,14 @@ lazy val `akka-stream` = (project in file("./addons/akka-stream")). settings(commonSettings: _*). settings( name := "op-rabbit-akka-stream", + // Temporarily depend on jitpack published version of acked-streams for scala 2.13 + resolvers += "jitpack" at "https://jitpack.io", libraryDependencies ++= Seq( - "com.timcharper" %% "acked-streams" % "2.1.1", - "com.typesafe.akka" %% "akka-stream" % akkaVersion), + // TODO: remove and switch to com.timcharper when https://github.com/timcharper/acked-stream/pull/10 gets merged and published + // "com.timcharper" %% "acked-streams" % "2.1.1", + "com.github.deal-engine.acked-stream" %% "acked-streams" % "5babfe7f85", + "com.typesafe.akka" %% "akka-stream" % akkaVersion + ), unmanagedResourceDirectories in Test ++= Seq( file(".").getAbsoluteFile / "core" / "src" / "test" / "resources"), unmanagedSourceDirectories in Test ++= Seq( diff --git a/jitpack.yml b/jitpack.yml new file mode 100644 index 00000000..7577c477 --- /dev/null +++ b/jitpack.yml @@ -0,0 +1,2 @@ +install: + - sbt '+ publishM2'