diff --git a/http-json/src/main/scala/org/mdedetrich/pekko/http/JsonSupport.scala b/http-json/src/main/scala/org/mdedetrich/pekko/http/JsonSupport.scala index d36a729..b0fc4eb 100644 --- a/http-json/src/main/scala/org/mdedetrich/pekko/http/JsonSupport.scala +++ b/http-json/src/main/scala/org/mdedetrich/pekko/http/JsonSupport.scala @@ -22,21 +22,80 @@ import pekko.http.scaladsl.model.HttpEntity import pekko.http.scaladsl.model.MediaTypes.`application/json` import pekko.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} import pekko.http.scaladsl.util.FastFuture +import pekko.stream._ +import pekko.stream.scaladsl.Sink +import pekko.stream.stage._ import org.mdedetrich.pekko.json.stream.JsonStreamParser import org.typelevel.jawn.Facade +import scala.concurrent.{Future, Promise} + trait JsonSupport { - implicit def jsonUnmarshaller[J: Facade]: FromEntityUnmarshaller[J] = + implicit def jsonUnmarshaller[J <: AnyRef: Facade]: FromEntityUnmarshaller[J] = Unmarshaller .withMaterializer[HttpEntity, J](_ => implicit mat => { case HttpEntity.Strict(_, data) => FastFuture(JsonStreamParser.parse[J](data)) - case entity => entity.dataBytes.runWith(JsonStreamParser.head[J]) + case entity => entity.dataBytes.via(JsonStreamParser[J]).runWith(JsonSupport.firstElementSink[J]) } ) .forContentTypes(`application/json`) } -object JsonSupport extends JsonSupport +object JsonSupport extends JsonSupport { + private def firstElementSink[J <: AnyRef]: Sink[J, Future[J]] = + Sink.fromGraph(new FirstElementSinkStage[J]) + + private final class FirstElementSinkStage[J <: AnyRef] + extends GraphStageWithMaterializedValue[SinkShape[J], Future[J]] { + private[this] val in: Inlet[J] = Inlet("firstElement.in") + + override val shape: SinkShape[J] = SinkShape.of(in) + override protected def initialAttributes: Attributes = Attributes.name("firstElement") + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[J]) = { + val p: Promise[J] = Promise() + (new GraphStageLogic(shape) with InHandler { + private[this] var element: J = null.asInstanceOf[J] + + override def preStart(): Unit = pull(in) + + def onPush(): Unit = { + if (element eq null) + element = grab(in) + pull(in) + } + + override def onUpstreamFinish(): Unit = { + val el = element + element = null.asInstanceOf[J] + if (el ne null) + p.trySuccess(el) + else + p.tryFailure(new NoSuchElementException("No complete json entity consumed")) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + element = null.asInstanceOf[J] + p.tryFailure(ex) + failStage(ex) + } + + override def postStop(): Unit = + if (!p.isCompleted) { + p.failure(new AbruptStageTerminationException(this)) + () + } + + setHandler(in, this) + }, + p.future + ) + } + + override def toString: String = "FirstElementSinkStage" + } +} diff --git a/tests/src/test/scala/org/mdedetrich/pekko/http/JsonSupportSpec.scala b/tests/src/test/scala/org/mdedetrich/pekko/http/JsonSupportSpec.scala index bbdb783..84ec67b 100644 --- a/tests/src/test/scala/org/mdedetrich/pekko/http/JsonSupportSpec.scala +++ b/tests/src/test/scala/org/mdedetrich/pekko/http/JsonSupportSpec.scala @@ -249,7 +249,7 @@ class JsonSupportSpec ) futureException.map { - _.getMessage shouldBe "head of empty stream" + _.getMessage shouldBe "No complete json entity consumed" } }