Skip to content

Commit

Permalink
Fix .head causing JsonSupport sink to terminate
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 12, 2024
1 parent 2030f74 commit 3c19fcf
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,80 @@ import pekko.http.scaladsl.model.HttpEntity
import pekko.http.scaladsl.model.MediaTypes.`application/json`
import pekko.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import pekko.http.scaladsl.util.FastFuture
import pekko.stream._
import pekko.stream.scaladsl.Sink
import pekko.stream.stage._
import org.mdedetrich.pekko.json.stream.JsonStreamParser
import org.typelevel.jawn.Facade

import scala.concurrent.{Future, Promise}

trait JsonSupport {

implicit def jsonUnmarshaller[J: Facade]: FromEntityUnmarshaller[J] =
implicit def jsonUnmarshaller[J <: AnyRef: Facade]: FromEntityUnmarshaller[J] =
Unmarshaller
.withMaterializer[HttpEntity, J](_ =>
implicit mat => {
case HttpEntity.Strict(_, data) =>
FastFuture(JsonStreamParser.parse[J](data))
case entity => entity.dataBytes.runWith(JsonStreamParser.head[J])
case entity => entity.dataBytes.via(JsonStreamParser[J]).runWith(JsonSupport.firstElementSink[J])
}
)
.forContentTypes(`application/json`)
}

object JsonSupport extends JsonSupport
object JsonSupport extends JsonSupport {
private def firstElementSink[J <: AnyRef]: Sink[J, Future[J]] =
Sink.fromGraph(new FirstElementSinkStage[J])

private final class FirstElementSinkStage[J <: AnyRef]
extends GraphStageWithMaterializedValue[SinkShape[J], Future[J]] {
private[this] val in: Inlet[J] = Inlet("firstElement.in")

override val shape: SinkShape[J] = SinkShape.of(in)
override protected def initialAttributes: Attributes = Attributes.name("firstElement")

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[J]) = {
val p: Promise[J] = Promise()
(new GraphStageLogic(shape) with InHandler {
private[this] var element: J = null.asInstanceOf[J]

override def preStart(): Unit = pull(in)

def onPush(): Unit = {
if (element eq null)
element = grab(in)
pull(in)
}

override def onUpstreamFinish(): Unit = {
val el = element
element = null.asInstanceOf[J]
if (el ne null)
p.trySuccess(el)
else
p.tryFailure(new NoSuchElementException("No complete json entity consumed"))
completeStage()
}

override def onUpstreamFailure(ex: Throwable): Unit = {
element = null.asInstanceOf[J]
p.tryFailure(ex)
failStage(ex)
}

override def postStop(): Unit =
if (!p.isCompleted) {
p.failure(new AbruptStageTerminationException(this))
()
}

setHandler(in, this)
},
p.future
)
}

override def toString: String = "FirstElementSinkStage"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class JsonSupportSpec
)

futureException.map {
_.getMessage shouldBe "head of empty stream"
_.getMessage shouldBe "No complete json entity consumed"
}

}
Expand Down

0 comments on commit 3c19fcf

Please sign in to comment.