diff --git a/core/src/main/scala/com/spingo/op_rabbit/ConnectionParams.scala b/core/src/main/scala/com/spingo/op_rabbit/ConnectionParams.scala index d20b604..59650c7 100644 --- a/core/src/main/scala/com/spingo/op_rabbit/ConnectionParams.scala +++ b/core/src/main/scala/com/spingo/op_rabbit/ConnectionParams.scala @@ -10,6 +10,8 @@ import com.rabbitmq.client.SaslConfig import com.rabbitmq.client.impl.DefaultExceptionHandler import com.typesafe.config.Config import javax.net.SocketFactory +import javax.net.ssl.SSLContext + import scala.collection.JavaConverters._ import scala.util.Try @@ -39,7 +41,8 @@ case class ConnectionParams( saslConfig: SaslConfig = DefaultSaslConfig.PLAIN, sharedExecutor: Option[java.util.concurrent.ExecutorService] = None, shutdownTimeout: Int = ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT, - socketFactory: SocketFactory = SocketFactory.getDefault + socketFactory: SocketFactory = SocketFactory.getDefault, + sslContextOpt: Option[SSLContext] = None ) { // TODO - eliminate ClusterConnectionFactory after switching to use RabbitMQ's topology recovery features. protected [op_rabbit] def applyTo(factory: ClusterConnectionFactory): Unit = { @@ -58,7 +61,16 @@ case class ConnectionParams( sharedExecutor.foreach(factory.setSharedExecutor) factory.setShutdownTimeout(shutdownTimeout) factory.setSocketFactory(socketFactory) - if (ssl) factory.useSslProtocol() + if (ssl) { + sslContextOpt match { + case Some(sslContext) => + // suitable for production + factory.useSslProtocol(sslContext) + case None => + // suitable for development + factory.useSslProtocol() + } + } } } diff --git a/core/src/main/scala/com/spingo/op_rabbit/Directives.scala b/core/src/main/scala/com/spingo/op_rabbit/Directives.scala index 960bfb3..c52e4a2 100644 --- a/core/src/main/scala/com/spingo/op_rabbit/Directives.scala +++ b/core/src/main/scala/com/spingo/op_rabbit/Directives.scala @@ -7,6 +7,7 @@ import scala.language.implicitConversions import shapeless._ import com.spingo.op_rabbit.Binding._ import com.spingo.op_rabbit.Exchange.ExchangeType +import EnhancedTry._ import scala.util.{Failure, Success, Try} @@ -179,6 +180,27 @@ trait Directives { } } + /** + Extract the message body as a Either. Uses a [[com.spingo.op_rabbit.RabbitUnmarshaller RabbitUnmarshaller]] to deserialize. + In case the body cannot be unmarshalled, the exception is present in the Left of the Either. + In this way the client code can have the unmarshalling error reason. + Example: + + {{{ + bodyOpt(as[JobDescription]) { jobDescriptionEither => ... + } + }}} + */ + def bodyEither[T](um: RabbitUnmarshaller[T]): Directive1[Either[Throwable, T]] = new Directive1[Either[Throwable, T]] { + def happly(fn: ::[Either[Throwable, T], HNil] => Handler): Handler = { (promise, delivery) => + val dataTry = Try { + um.unmarshall(delivery.body, Option(delivery.properties.getContentType), Option(delivery.properties.getContentEncoding)) + } + + fn(dataTry.toEither :: HNil)(promise, delivery) + } + } + /** Extract any arbitrary value from the delivery / Java RabbitMQ objects. Accepts a function which receives a Delivery and returns some value. */ diff --git a/core/src/main/scala/com/spingo/op_rabbit/EnhancedTry.scala b/core/src/main/scala/com/spingo/op_rabbit/EnhancedTry.scala new file mode 100644 index 0000000..03aa2f1 --- /dev/null +++ b/core/src/main/scala/com/spingo/op_rabbit/EnhancedTry.scala @@ -0,0 +1,10 @@ +package com.spingo.op_rabbit + +import scala.util.{Success, Try} + +object EnhancedTry { + implicit class EnhancedTryImpl[T](t: Try[T]) { + def toEither: Either[Throwable, T] = + t.transform(success => Success(Right(success)), exception => Success(Left(exception))).get + } +} diff --git a/core/src/test/scala/com/spingo/op_rabbit/DirectivesSpec.scala b/core/src/test/scala/com/spingo/op_rabbit/DirectivesSpec.scala index e46cd8e..0892078 100644 --- a/core/src/test/scala/com/spingo/op_rabbit/DirectivesSpec.scala +++ b/core/src/test/scala/com/spingo/op_rabbit/DirectivesSpec.scala @@ -107,6 +107,29 @@ class DirectivesSpec extends FunSpec with Matchers with Inside { } } should be (acked) } + + it("yields the value for both directives when one is bodyEither") { + val delivery = testDelivery(body = "hi".getBytes, properties = Seq(ReplyTo("place"))) + resultFor(delivery) { + (bodyEither(as[String]) & property(ReplyTo)) { (bodyEither, replyTo) => + replyTo should be ("place") + bodyEither.right.get should be ("hi") + ack + } + } should be (acked) + } + } + + describe("bodyEither") { + it("yields the value as an option") { + val delivery = testDelivery(body = "hi".getBytes) + resultFor(delivery) { + (bodyEither(as[String])) { (bodyEither) => + bodyEither.right.get should be ("hi") + ack + } + } should be (acked) + } } describe("|") { diff --git a/project/version.properties b/project/version.properties index 16cc23c..1d50c83 100644 --- a/project/version.properties +++ b/project/version.properties @@ -1 +1 @@ -version=2.1.0 \ No newline at end of file +version=2.1.0