Skip to content

Commit

Permalink
=core Make use of statefulMap instead of statefulMapConcat.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Aug 30, 2023
1 parent 6781b5a commit 6ded069
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,8 @@ private[http] object Http2Blueprint {
def httpLayerClient(masterHttpHeaderParser: HttpHeaderParser, settings: ClientConnectionSettings, log: LoggingAdapter)
: BidiFlow[HttpRequest, Http2SubStream, Http2SubStream, HttpResponse, NotUsed] =
BidiFlow.fromFlows(
Flow[HttpRequest].statefulMapConcat { () =>
val renderer = new RequestRendering(settings, log)
request => renderer(request) :: Nil
},
Flow[HttpRequest].statefulMap(() => new RequestRendering(settings, log))((renderer, request) =>
(renderer, renderer(request)), _ => None),
StreamUtils.statefulAttrsMap[Http2SubStream, HttpResponse] { attrs =>
val headerParser = masterHttpHeaderParser.createShallowCopy()
stream => ResponseParsing.parseResponse(headerParser, settings.parserSettings, attrs)(stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import pekko.util.ByteString
import pekko.stream.scaladsl.{ Flow, Source }
import Protocol.Opcode
import pekko.annotation.InternalApi
import pekko.http.impl.util.StreamUtils
import pekko.http.scaladsl.model.ws._

/**
Expand All @@ -35,20 +34,10 @@ private[http] object MessageToFrameRenderer {
Source.single(FrameEvent.fullFrame(opcode, None, data, fin = true))

def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, Any] =
data.via(StreamUtils.statefulMap(() => {
var isFirst = true

{ data =>
val frameOpcode =
if (isFirst) {
isFirst = false
opcode
} else Opcode.Continuation

FrameEvent.fullFrame(frameOpcode, None, data, fin = false)
}
})) ++
Source.single(FrameEvent.emptyLastContinuationFrame)
data.statefulMap(() => true)((isFirst, data) => {
val frameOpcode = if (isFirst) opcode else Opcode.Continuation
(false, FrameEvent.fullFrame(frameOpcode, None, data, fin = false))
}, _ => None) ++ Source.single(FrameEvent.emptyLastContinuationFrame)

Flow[Message]
.flatMapConcat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,6 @@ private[http] object StreamUtils {
}
}

/**
* Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map.
*/
def statefulMap[T, U](functionConstructor: () => T => U): Flow[T, U, NotUsed] =
Flow[T].statefulMapConcat { () =>
val f = functionConstructor()
i => f(i) :: Nil
}

/**
* Lifts the streams attributes into an element and passes them to the function for each passed through element.
* Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class RequestParsingSpec extends PekkoSpecWithMaterializer with Inside with Insp
.map(parseRequest)
.runWith(Sink.head)
.futureValue
catch { case ex => throw ex.getCause } // unpack futureValue exceptions
catch { case ex: Throwable => throw ex.getCause } // unpack futureValue exceptions
}

def shouldThrowMalformedRequest[T](block: => T): Exception = {
Expand Down

0 comments on commit 6ded069

Please sign in to comment.