diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala index 593659435..f243b71ec 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala @@ -154,10 +154,8 @@ private[http] object Http2Blueprint { def httpLayerClient(masterHttpHeaderParser: HttpHeaderParser, settings: ClientConnectionSettings, log: LoggingAdapter) : BidiFlow[HttpRequest, Http2SubStream, Http2SubStream, HttpResponse, NotUsed] = BidiFlow.fromFlows( - Flow[HttpRequest].statefulMapConcat { () => - val renderer = new RequestRendering(settings, log) - request => renderer(request) :: Nil - }, + Flow[HttpRequest].statefulMap(() => new RequestRendering(settings, log))((renderer, request) => + (renderer, renderer(request)), _ => None), StreamUtils.statefulAttrsMap[Http2SubStream, HttpResponse] { attrs => val headerParser = masterHttpHeaderParser.createShallowCopy() stream => ResponseParsing.parseResponse(headerParser, settings.parserSettings, attrs)(stream) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/MessageToFrameRenderer.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/MessageToFrameRenderer.scala index d5e8e95e7..8ca67aaee 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/MessageToFrameRenderer.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/MessageToFrameRenderer.scala @@ -19,7 +19,6 @@ import pekko.util.ByteString import pekko.stream.scaladsl.{ Flow, Source } import Protocol.Opcode import pekko.annotation.InternalApi -import pekko.http.impl.util.StreamUtils import pekko.http.scaladsl.model.ws._ /** @@ -35,20 +34,10 @@ private[http] object MessageToFrameRenderer { Source.single(FrameEvent.fullFrame(opcode, None, data, fin = true)) def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, Any] = - data.via(StreamUtils.statefulMap(() => { - var isFirst = true - - { data => - val frameOpcode = - if (isFirst) { - isFirst = false - opcode - } else Opcode.Continuation - - FrameEvent.fullFrame(frameOpcode, None, data, fin = false) - } - })) ++ - Source.single(FrameEvent.emptyLastContinuationFrame) + data.statefulMap(() => true)((isFirst, data) => { + val frameOpcode = if (isFirst) opcode else Opcode.Continuation + (false, FrameEvent.fullFrame(frameOpcode, None, data, fin = false)) + }, _ => None) ++ Source.single(FrameEvent.emptyLastContinuationFrame) Flow[Message] .flatMapConcat { diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala index 20e6097da..6b6343524 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala @@ -260,15 +260,6 @@ private[http] object StreamUtils { } } - /** - * Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map. - */ - def statefulMap[T, U](functionConstructor: () => T => U): Flow[T, U, NotUsed] = - Flow[T].statefulMapConcat { () => - val f = functionConstructor() - i => f(i) :: Nil - } - /** * Lifts the streams attributes into an element and passes them to the function for each passed through element. * Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map. diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/RequestParsingSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/RequestParsingSpec.scala index a8cacebd7..c2b86b8ca 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/RequestParsingSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/RequestParsingSpec.scala @@ -66,7 +66,7 @@ class RequestParsingSpec extends PekkoSpecWithMaterializer with Inside with Insp .map(parseRequest) .runWith(Sink.head) .futureValue - catch { case ex => throw ex.getCause } // unpack futureValue exceptions + catch { case ex: Throwable => throw ex.getCause } // unpack futureValue exceptions } def shouldThrowMalformedRequest[T](block: => T): Exception = {