diff --git a/http-bench-jmh/src/main/scala/org/apache/pekko/http/impl/engine/http2/H2ClientServerBenchmark.scala b/http-bench-jmh/src/main/scala/org/apache/pekko/http/impl/engine/http2/H2ClientServerBenchmark.scala index fd1afd074..ce23889b0 100644 --- a/http-bench-jmh/src/main/scala/org/apache/pekko/http/impl/engine/http2/H2ClientServerBenchmark.scala +++ b/http-bench-jmh/src/main/scala/org/apache/pekko/http/impl/engine/http2/H2ClientServerBenchmark.scala @@ -25,6 +25,7 @@ import pekko.stream.ActorMaterializer import pekko.stream.TLSProtocol.{ SslTlsInbound, SslTlsOutbound } import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, Source } import pekko.util.ByteString +import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ import java.util.concurrent.{ CountDownLatch, TimeUnit } @@ -42,6 +43,9 @@ class H2ClientServerBenchmark extends CommonBenchmark with H2RequestResponseBenc val numRequests = 1000 + @Param(Array("[]", "[\"reset\"]")) + var frameTypeThrottleFrameTypes: String = _ + @Benchmark @OperationsPerInvocation(1000) // should be same as numRequest def benchRequestProcessing(): Unit = { @@ -71,8 +75,9 @@ class H2ClientServerBenchmark extends CommonBenchmark with H2RequestResponseBenc def setup(): Unit = { initRequestResponse() - system = ActorSystem("PekkoHttpBenchmarkSystem", config) - mat = ActorMaterializer() + val throttleConfig = ConfigFactory.parseString( + s"pekko.http.server.http2.frame-type-throttle.frame-types=$frameTypeThrottleFrameTypes") + system = ActorSystem("PekkoHttpBenchmarkSystem", throttleConfig.withFallback(config)) val settings = implicitly[ServerSettings] val log = system.log implicit val ec = system.dispatcher diff --git a/http-core/src/main/mima-filters/1.0.x.backwards.excludes/http2-rapid-reset-configs.backwards.excludes b/http-core/src/main/mima-filters/1.0.x.backwards.excludes/http2-rapid-reset-configs.backwards.excludes new file mode 100644 index 000000000..6c903098e --- /dev/null +++ b/http-core/src/main/mima-filters/1.0.x.backwards.excludes/http2-rapid-reset-configs.backwards.excludes @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# New configs added to support throttling HTTP/2 reset frames +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleFrameTypes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleCost") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleBurst") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleInterval") diff --git a/http-core/src/main/resources/reference.conf b/http-core/src/main/resources/reference.conf index 60af8d6a5..18fac38b3 100644 --- a/http-core/src/main/resources/reference.conf +++ b/http-core/src/main/resources/reference.conf @@ -307,6 +307,18 @@ pekko.http { # Fail the connection if a sent ping is not acknowledged within this timeout. # When zero the ping-interval is used, if set the value must be evenly divisible by less than or equal to the ping-interval. ping-timeout = 0s + + frame-type-throttle { + # Configure the throttle for non-data frame types (https://github.com/apache/incubator-pekko-http/issues/332). + # The supported frame-types for throttlng are: + # reset, headers, continuation, go-away, priority, ping, push-promise, window-update + # If you are concerned about CVE-2023-44487, you could set: pekko.http.server.http2.frame-type-throttle.frame-types = ["reset"] + frame-types = [] + cost = 100 + burst = 100 + # interval must be a positive duration + interval = 1s + } } websocket { diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala index 593659435..131bbec06 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala @@ -35,7 +35,7 @@ import pekko.http.scaladsl.settings.{ ParserSettings, ServerSettings } -import pekko.stream.{ BidiShape, Graph, StreamTcpException } +import pekko.stream.{ BidiShape, Graph, StreamTcpException, ThrottleMode } import pekko.stream.TLSProtocol._ import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Source } import pekko.util.ByteString @@ -122,12 +122,20 @@ private[http] object Http2Blueprint { telemetry: TelemetrySpi, dateHeaderRendering: DateHeaderRendering): BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, ServerTerminator] = { val masterHttpHeaderParser = HttpHeaderParser(settings.parserSettings, log) // FIXME: reuse for framing - telemetry.serverConnection atop + + val initialFlow = telemetry.serverConnection atop httpLayer(settings, log, dateHeaderRendering) atopKeepRight serverDemux(settings.http2Settings, initialDemuxerSettings, upgraded) atop FrameLogger.logFramesIfEnabled(settings.http2Settings.logFrames) atop // enable for debugging - hpackCoding(masterHttpHeaderParser, settings.parserSettings) atop - framing(log) atop + hpackCoding(masterHttpHeaderParser, settings.parserSettings) + + val frameTypesForThrottle = getFrameTypesForThrottle(settings.http2Settings) + + val flowWithPossibleThrottle = if (frameTypesForThrottle.nonEmpty) { + initialFlow atop rapidResetMitigation(settings.http2Settings, frameTypesForThrottle) atopKeepLeft framing(log) + } else initialFlow atop framing(log) + + flowWithPossibleThrottle atop errorHandling(log) atop idleTimeoutIfConfigured(settings.idleTimeout) } @@ -198,6 +206,41 @@ private[http] object Http2Blueprint { Flow[FrameEvent].map(FrameRenderer.render).prepend(Source.single(Http2Protocol.ClientConnectionPreface)), Flow[ByteString].via(new Http2FrameParsing(shouldReadPreface = false, log))) + private def rapidResetMitigation(settings: Http2ServerSettings, + frameTypesForThrottle: Set[String]): BidiFlow[FrameEvent, FrameEvent, FrameEvent, FrameEvent, NotUsed] = { + def frameCost(event: FrameEvent): Int = { + if (frameTypesForThrottle.contains(event.frameTypeName)) 1 else 0 + } + + BidiFlow.fromFlows( + Flow[FrameEvent], + Flow[FrameEvent].throttle(settings.frameTypeThrottleCost, settings.frameTypeThrottleInterval, + settings.frameTypeThrottleBurst, frameCost, ThrottleMode.Enforcing)) + } + + private def getFrameTypesForThrottle(settings: Http2ServerSettings): Set[String] = { + val set = settings.frameTypeThrottleFrameTypes + if (set.isEmpty) { + Set.empty + } else { + set.flatMap(frameTypeAliasToFrameTypeName) + } + } + + private[http2] def frameTypeAliasToFrameTypeName(frameType: String): Option[String] = { + frameType.toLowerCase match { + case "reset" => Some("RstStreamFrame") + case "headers" => Some("HeadersFrame") + case "continuation" => Some("ContinuationFrame") + case "go-away" => Some("GoAwayFrame") + case "priority" => Some("PriorityFrame") + case "ping" => Some("PingFrame") + case "push-promise" => Some("PushPromiseFrame") + case "window-update" => Some("WindowUpdateFrame") + case _ => None + } + } + /** * Runs hpack encoding and decoding. Incoming frames that are processed are HEADERS and CONTINUATION. * Outgoing frame is ParsedHeadersFrame. @@ -290,5 +333,9 @@ private[http] object Http2Blueprint { def atopKeepRight[OO1, II2, Mat2]( other: Graph[BidiShape[O1, OO1, II2, I2], Mat2]): BidiFlow[I1, OO1, II2, O2, Mat2] = bidi.atopMat(other)(Keep.right) + + def atopKeepLeft[OO1, II2, Mat2]( + other: Graph[BidiShape[O1, OO1, II2, I2], Mat2]): BidiFlow[I1, OO1, II2, O2, Mat] = + bidi.atopMat(other)(Keep.left) } } diff --git a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala index b5705ca6b..a7539298e 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala @@ -18,6 +18,7 @@ import java.time.Duration import org.apache.pekko import pekko.annotation.DoNotInherit import pekko.http.scaladsl +import pekko.util.ccompat.JavaConverters._ import com.typesafe.config.Config import scala.concurrent.duration._ @@ -51,6 +52,14 @@ trait Http2ServerSettings { def getPingTimeout: Duration = Duration.ofMillis(pingTimeout.toMillis) def withPingTimeout(timeout: Duration): Http2ServerSettings = withPingTimeout(timeout.toMillis.millis) + + def getFrameTypeThrottleFrameTypes(): java.util.Set[String] = frameTypeThrottleFrameTypes.asJava + def getFrameTypeThrottleCost(): Int = frameTypeThrottleCost + def getFrameTypeThrottleBurst(): Int = frameTypeThrottleBurst + def getFrameTypeThrottleInterval: Duration = Duration.ofMillis(frameTypeThrottleInterval.toMillis) + + def withFrameTypeThrottleInterval(interval: Duration): Http2ServerSettings = + withFrameTypeThrottleInterval(interval.toMillis.millis) } object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] { def create(config: Config): Http2ServerSettings = scaladsl.settings.Http2ServerSettings(config) diff --git a/http-core/src/main/scala/org/apache/pekko/http/scaladsl/settings/Http2ServerSettings.scala b/http-core/src/main/scala/org/apache/pekko/http/scaladsl/settings/Http2ServerSettings.scala index 700ecccea..692fbd462 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/scaladsl/settings/Http2ServerSettings.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/scaladsl/settings/Http2ServerSettings.scala @@ -19,6 +19,7 @@ import pekko.annotation.DoNotInherit import pekko.annotation.InternalApi import pekko.http.impl.util._ import pekko.http.javadsl +import pekko.util.ccompat.JavaConverters._ import com.typesafe.config.Config import scala.concurrent.duration.Duration @@ -102,6 +103,18 @@ trait Http2ServerSettings extends javadsl.settings.Http2ServerSettings with Http def pingTimeout: FiniteDuration def withPingTimeout(timeout: FiniteDuration): Http2ServerSettings = copy(pingTimeout = timeout) + def frameTypeThrottleFrameTypes: Set[String] + def withFrameTypeThrottleFrameTypes(frameTypes: Set[String]) = copy(frameTypeThrottleFrameTypes = frameTypes) + + def frameTypeThrottleCost: Int + def withFrameTypeThrottleCost(cost: Int) = copy(frameTypeThrottleCost = cost) + + def frameTypeThrottleBurst: Int + def withFrameTypeThrottleBurst(burst: Int) = copy(frameTypeThrottleBurst = burst) + + def frameTypeThrottleInterval: FiniteDuration + def withFrameTypeThrottleInterval(interval: FiniteDuration) = copy(frameTypeThrottleInterval = interval) + @InternalApi private[http] def internalSettings: Option[Http2InternalServerSettings] @InternalApi @@ -124,6 +137,10 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] { logFrames: Boolean, pingInterval: FiniteDuration, pingTimeout: FiniteDuration, + frameTypeThrottleFrameTypes: Set[String], + frameTypeThrottleCost: Int, + frameTypeThrottleBurst: Int, + frameTypeThrottleInterval: FiniteDuration, internalSettings: Option[Http2InternalServerSettings]) extends Http2ServerSettings { require(maxConcurrentStreams >= 0, "max-concurrent-streams must be >= 0") @@ -136,6 +153,7 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] { require(minCollectStrictEntitySize <= (incomingConnectionLevelBufferSize / maxConcurrentStreams), "min-collect-strict-entity-size <= incoming-connection-level-buffer-size / max-concurrent-streams") require(outgoingControlFrameBufferSize > 0, "outgoing-control-frame-buffer-size must be > 0") + require(frameTypeThrottleInterval.toMillis > 0, "frame-type-throttle.interval must be a positive duration") Http2CommonSettings.validate(this) } @@ -151,6 +169,10 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] { logFrames = c.getBoolean("log-frames"), pingInterval = c.getFiniteDuration("ping-interval"), pingTimeout = c.getFiniteDuration("ping-timeout"), + frameTypeThrottleFrameTypes = c.getStringList("frame-type-throttle.frame-types").asScala.toSet, + frameTypeThrottleCost = c.getInt("frame-type-throttle.cost"), + frameTypeThrottleBurst = c.getInt("frame-type-throttle.burst"), + frameTypeThrottleInterval = c.getFiniteDuration("frame-type-throttle.interval"), None // no possibility to configure internal settings with config ) } diff --git a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2BlueprintSpec.scala b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2BlueprintSpec.scala new file mode 100644 index 000000000..4858c3149 --- /dev/null +++ b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2BlueprintSpec.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.http.impl.engine.http2 + +import org.apache.pekko +import pekko.http.impl.engine.http2.FrameEvent._ +import pekko.http.impl.engine.http2.Http2Protocol.ErrorCode +import pekko.util.ByteString +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class Http2BlueprintSpec extends AnyWordSpec with Matchers { + "Http2Blueprint" should { + "match frame type alias (reset)" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("reset") shouldEqual + Some(RstStreamFrame(0, ErrorCode.PROTOCOL_ERROR).frameTypeName) + } + "match frame type alias (headers)" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("headers") shouldEqual + Some(HeadersFrame(0, true, true, ByteString.empty, None).frameTypeName) + } + "match frame type alias (continuation)" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("continuation") shouldEqual + Some(ContinuationFrame(0, true, ByteString.empty).frameTypeName) + } + "match frame type alias (go-away)" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("go-away") shouldEqual + Some(GoAwayFrame(0, ErrorCode.PROTOCOL_ERROR).frameTypeName) + } + "match frame type alias (priority)" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("priority") shouldEqual + Some(PriorityFrame(0, true, 0, 0).frameTypeName) + } + "match frame type alias (ping)" in { + val rnd = new java.util.Random() + val bytes = new Array[Byte](8) + rnd.nextBytes(bytes) + Http2Blueprint.frameTypeAliasToFrameTypeName("ping") shouldEqual + Some(PingFrame(true, ByteString(bytes)).frameTypeName) + } + "match frame type alias (push-promise)" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("push-promise") shouldEqual + Some(PushPromiseFrame(0, true, 0, ByteString.empty).frameTypeName) + } + "match frame type alias (window-update)" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("window-update") shouldEqual + Some(WindowUpdateFrame(0, 0).frameTypeName) + } + "not match empty frame type alias" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("") shouldEqual None + } + "not match unknown frame type alias" in { + Http2Blueprint.frameTypeAliasToFrameTypeName("unknown") shouldEqual None + } + } +} diff --git a/http-core/src/test/scala/org/apache/pekko/http/impl/util/WithLogCapturing.scala b/http-core/src/test/scala/org/apache/pekko/http/impl/util/WithLogCapturing.scala index 745359fc5..c12e4bc73 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/impl/util/WithLogCapturing.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/impl/util/WithLogCapturing.scala @@ -34,6 +34,12 @@ trait WithLogCapturing extends SuiteMixin { this: TestSuite => */ protected def failOnSevereMessages: Boolean = false + /** + * We expect a severe message but the message should contain this text. If there are any other severe messages, + * the test will fail. + */ + protected val expectSevereLogsOnlyToMatch: Option[String] = None + /** * Can be overridden to adapt which events should be considered as severe if `failOnSevereMessages` is * enabled. @@ -86,6 +92,19 @@ trait WithLogCapturing extends SuiteMixin { this: TestSuite => Failed(new AssertionError( s"No severe log messages should be emitted during test run but got [${stats( Logging.WarningLevel)}] warnings and [${stats(Logging.ErrorLevel)}] errors (see marked lines above)")) + } else if (expectSevereLogsOnlyToMatch.nonEmpty) { + val severeEvents = events.filter(isSevere(_)) + val matchingEvents = severeEvents.filter(_.message.toString.contains(expectSevereLogsOnlyToMatch.get)) + if (severeEvents.isEmpty || matchingEvents != severeEvents) { + val stats = events.groupBy(_.level).mapValues(_.size).toMap.withDefaultValue(0) + flushLog() + + Failed(new AssertionError( + s"Expected an error during test run but got unexpected results - got [${ + stats( + Logging.WarningLevel) + }] warnings and [${stats(Logging.ErrorLevel)}] errors (see marked lines above)")) + } else res } else res } diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerDisableFrameTypeThrottleSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerDisableFrameTypeThrottleSpec.scala new file mode 100644 index 000000000..1d8de5240 --- /dev/null +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerDisableFrameTypeThrottleSpec.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2018-2022 Lightbend Inc. + */ + +package org.apache.pekko.http.impl.engine.http2 + +import org.apache.pekko +import pekko.http.impl.engine.http2.Http2Protocol.FrameType +import pekko.http.impl.engine.http2.framing.FrameRenderer +import pekko.util.ByteStringBuilder + +import java.nio.ByteOrder + +/** + * This tests the http2 server throttle support for rapid resets is disabled by default. + */ +class Http2ServerDisableFrameTypeThrottleSpec extends Http2SpecWithMaterializer(""" + pekko.http.server.remote-address-header = on + pekko.http.server.http2.log-frames = on + """) { + override def failOnSevereMessages: Boolean = true + + "The Http/2 server implementation" should { + "not cancel connection during rapid reset attack (throttle disabled)".inAssertAllStagesStopped( + new TestSetup with RequestResponseProbes { + implicit val bigEndian: ByteOrder = ByteOrder.BIG_ENDIAN + val bb = new ByteStringBuilder + bb.putInt(0) + val rstFrame = FrameRenderer.renderFrame(FrameType.RST_STREAM, ByteFlag.Zero, 1, bb.result()) + val longFrame = Seq.fill(1000)(rstFrame).reduce(_ ++ _) + network.sendBytes(longFrame) + }) + } +} diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerEnableFrameTypeThrottleSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerEnableFrameTypeThrottleSpec.scala new file mode 100644 index 000000000..2ff5a9d08 --- /dev/null +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerEnableFrameTypeThrottleSpec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2018-2022 Lightbend Inc. + */ + +package org.apache.pekko.http.impl.engine.http2 + +import org.apache.pekko +import pekko.http.impl.engine.http2.Http2Protocol.FrameType +import pekko.http.impl.engine.http2.framing.FrameRenderer +import pekko.util.ByteStringBuilder + +import java.nio.ByteOrder + +/** + * This tests the http2 server throttle support for rapid resets. + */ +class Http2ServerEnableFrameTypeThrottleSpec extends Http2SpecWithMaterializer(""" + pekko.http.server.remote-address-header = on + pekko.http.server.http2.log-frames = on + pekko.http.server.http2.frame-type-throttle.frame-types = ["reset"] + """) { + override val expectSevereLogsOnlyToMatch: Option[String] = Some( + "HTTP2 connection failed with error [Maximum throttle throughput exceeded.]. Sending INTERNAL_ERROR and closing connection.") + + "The Http/2 server implementation" should { + "cancel connection during rapid reset attack".inAssertAllStagesStopped(new TestSetup with RequestResponseProbes { + implicit val bigEndian: ByteOrder = ByteOrder.BIG_ENDIAN + val bb = new ByteStringBuilder + bb.putInt(0) + val rstFrame = FrameRenderer.renderFrame(FrameType.RST_STREAM, ByteFlag.Zero, 1, bb.result()) + val longFrame = Seq.fill(1000)(rstFrame).reduce(_ ++ _) + network.sendBytes(longFrame) + }) + } +} diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala index e3b9c2227..cd3f459be 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala @@ -13,47 +13,34 @@ package org.apache.pekko.http.impl.engine.http2 -import java.net.InetSocketAddress import org.apache.pekko import pekko.NotUsed -import pekko.event.Logging import pekko.http.impl.engine.http2.FrameEvent._ -import pekko.http.impl.engine.http2.Http2Protocol.ErrorCode -import pekko.http.impl.engine.http2.Http2Protocol.Flags -import pekko.http.impl.engine.http2.Http2Protocol.FrameType -import pekko.http.impl.engine.http2.Http2Protocol.SettingIdentifier +import pekko.http.impl.engine.http2.Http2Protocol.{ ErrorCode, Flags, FrameType, SettingIdentifier } import pekko.http.impl.engine.server.{ HttpAttributes, ServerTerminator } import pekko.http.impl.engine.ws.ByteStringSinkProbe -import pekko.http.impl.util.PekkoSpecWithMaterializer -import pekko.http.impl.util.LogByteStringTools -import pekko.http.scaladsl.Http import pekko.http.scaladsl.client.RequestBuilding.Get import pekko.http.scaladsl.model._ -import pekko.http.scaladsl.model.headers.CacheDirectives -import pekko.http.scaladsl.model.headers.RawHeader +import pekko.http.scaladsl.model.headers.{ CacheDirectives, RawHeader } import pekko.http.scaladsl.settings.ServerSettings -import pekko.stream.Attributes -import pekko.stream.Attributes.LogLevels import pekko.stream.OverflowStrategy -import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, Source, SourceQueueWithComplete } -import pekko.stream.testkit.TestPublisher.{ ManualProbe, Probe } +import pekko.stream.scaladsl.{ BidiFlow, Flow, Source, SourceQueueWithComplete } +import pekko.stream.testkit.TestPublisher.ManualProbe import pekko.stream.testkit.scaladsl.StreamTestKit import pekko.stream.testkit.TestPublisher -import pekko.stream.testkit.TestSubscriber import pekko.testkit._ import pekko.util.ByteString -import scala.annotation.nowarn -import javax.net.ssl.SSLContext import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.PatienceConfiguration.Timeout +import java.net.InetSocketAddress +import javax.net.ssl.SSLContext + +import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.Await -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.Promise +import scala.concurrent.{ Await, Promise } /** * This tests the http2 server protocol logic. @@ -64,7 +51,7 @@ import scala.concurrent.Promise * * if applicable: provide application-level response * * validate the produced response frames */ -class Http2ServerSpec extends PekkoSpecWithMaterializer(""" +class Http2ServerSpec extends Http2SpecWithMaterializer(""" pekko.http.server.remote-address-header = on pekko.http.server.http2.log-frames = on """) @@ -1793,100 +1780,4 @@ class Http2ServerSpec extends PekkoSpecWithMaterializer(""" } } - implicit class InWithStoppedStages(name: String) { - def inAssertAllStagesStopped(runTest: => TestSetup) = - name in StreamTestKit.assertAllStagesStopped { - val setup = runTest - - // force connection to shutdown (in case it is an invalid state) - setup.network.fromNet.sendError(new RuntimeException) - setup.network.toNet.cancel() - - // and then assert that all stages, substreams in particular, are stopped - } - } - - protected /* To make ByteFlag warnings go away */ abstract class TestSetupWithoutHandshake { - implicit def ec: ExecutionContext = system.dispatcher - - private val framesOut: Http2FrameProbe = Http2FrameProbe() - private val toNet = framesOut.plainDataProbe - private val fromNet = TestPublisher.probe[ByteString]() - - def handlerFlow: Flow[HttpRequest, HttpResponse, NotUsed] - - // hook to modify server, for example add attributes - def modifyServer(server: BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, ServerTerminator]) = server - - // hook to modify server settings - def settings: ServerSettings = ServerSettings(system).withServerHeader(None) - - final def theServer: BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, ServerTerminator] = - modifyServer(Http2Blueprint.serverStack(settings, system.log, telemetry = NoOpTelemetry, - dateHeaderRendering = Http().dateHeaderRendering)) - .atop(LogByteStringTools.logByteStringBidi("network-plain-text").addAttributes( - Attributes(LogLevels(Logging.DebugLevel, Logging.DebugLevel, Logging.DebugLevel)))) - - val serverTerminator = - handlerFlow - .joinMat(theServer)(Keep.right) - .join(Flow.fromSinkAndSource(toNet.sink, Source.fromPublisher(fromNet))) - .withAttributes(Attributes.inputBuffer(1, 1)) - .run() - - val network = new NetworkSide(fromNet, toNet, framesOut) with Http2FrameHpackSupport - } - - class NetworkSide(val fromNet: Probe[ByteString], val toNet: ByteStringSinkProbe, val framesOut: Http2FrameProbe) - extends WindowTracking { - override def frameProbeDelegate = framesOut - - def sendBytes(bytes: ByteString): Unit = fromNet.sendNext(bytes) - - } - - /** Basic TestSetup that has already passed the exchange of the connection preface */ - abstract class TestSetup(initialClientSettings: Setting*) extends TestSetupWithoutHandshake { - network.sendBytes(Http2Protocol.ClientConnectionPreface) - network.expectSETTINGS() - - network.sendFrame(SettingsFrame(immutable.Seq.empty ++ initialClientSettings)) - network.expectSettingsAck() - } - - /** Provides the user handler flow as `requestIn` and `responseOut` probes for manual stream interaction */ - trait RequestResponseProbes extends TestSetupWithoutHandshake { - private lazy val requestIn = TestSubscriber.probe[HttpRequest]() - private lazy val responseOut = TestPublisher.probe[HttpResponse]() - - def handlerFlow: Flow[HttpRequest, HttpResponse, NotUsed] = - Flow.fromSinkAndSource(Sink.fromSubscriber(requestIn), Source.fromPublisher(responseOut)) - - lazy val user = new UserSide(requestIn, responseOut) - - def expectGracefulCompletion(): Unit = { - network.toNet.expectComplete() - user.requestIn.expectComplete() - } - } - - class UserSide(val requestIn: TestSubscriber.Probe[HttpRequest], val responseOut: TestPublisher.Probe[HttpResponse]) { - def expectRequest(): HttpRequest = requestIn.requestNext().removeAttribute(Http2.streamId) - def expectRequestRaw(): HttpRequest = requestIn.requestNext() // TODO, make it so that internal headers are not listed in `headers` etc? - def emitResponse(streamId: Int, response: HttpResponse): Unit = - responseOut.sendNext(response.addAttribute(Http2.streamId, streamId)) - - } - - /** Provides the user handler flow as a handler function */ - trait HandlerFunctionSupport extends TestSetupWithoutHandshake { - def parallelism: Int = 2 - def handler: HttpRequest => Future[HttpResponse] = - _ => Future.successful(HttpResponse()) - - def handlerFlow: Flow[HttpRequest, HttpResponse, NotUsed] = - Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler) - } - - def bytes(num: Int, byte: Byte): ByteString = ByteString(Array.fill[Byte](num)(byte)) } diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2SpecWithMaterializer.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2SpecWithMaterializer.scala new file mode 100644 index 000000000..19d237c77 --- /dev/null +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2SpecWithMaterializer.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2018-2022 Lightbend Inc. + */ + +package org.apache.pekko.http.impl.engine.http2 + +import org.apache.pekko +import pekko.NotUsed +import pekko.event.Logging +import pekko.http.impl.engine.http2.FrameEvent._ +import pekko.http.impl.engine.server.ServerTerminator +import pekko.http.impl.engine.ws.ByteStringSinkProbe +import pekko.http.impl.util.PekkoSpecWithMaterializer +import pekko.http.impl.util.LogByteStringTools +import pekko.http.scaladsl.Http +import pekko.http.scaladsl.model._ +import pekko.http.scaladsl.settings.ServerSettings +import pekko.stream.Attributes +import pekko.stream.Attributes.LogLevels +import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, Source } +import pekko.stream.testkit.TestPublisher.Probe +import pekko.stream.testkit.scaladsl.StreamTestKit +import pekko.stream.testkit.{ TestPublisher, TestSubscriber } +import pekko.util.ByteString + +import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Future } + +abstract class Http2SpecWithMaterializer(configOverrides: String) extends PekkoSpecWithMaterializer(configOverrides) { + implicit class InWithStoppedStages(name: String) { + def inAssertAllStagesStopped(runTest: => TestSetup) = + name in StreamTestKit.assertAllStagesStopped { + val setup = runTest + + // force connection to shutdown (in case it is an invalid state) + setup.network.fromNet.sendError(new RuntimeException) + setup.network.toNet.cancel() + + // and then assert that all stages, substreams in particular, are stopped + } + } + + protected /* To make ByteFlag warnings go away */ abstract class TestSetupWithoutHandshake { + implicit def ec: ExecutionContext = system.dispatcher + + private val framesOut: Http2FrameProbe = Http2FrameProbe() + private val toNet = framesOut.plainDataProbe + private val fromNet = TestPublisher.probe[ByteString]() + + def handlerFlow: Flow[HttpRequest, HttpResponse, NotUsed] + + // hook to modify server, for example add attributes + def modifyServer(server: BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, ServerTerminator]) = server + + // hook to modify server settings + def settings: ServerSettings = ServerSettings(system).withServerHeader(None) + + final def theServer: BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, ServerTerminator] = + modifyServer(Http2Blueprint.serverStack(settings, system.log, telemetry = NoOpTelemetry, + dateHeaderRendering = Http().dateHeaderRendering)) + .atop(LogByteStringTools.logByteStringBidi("network-plain-text").addAttributes( + Attributes(LogLevels(Logging.DebugLevel, Logging.DebugLevel, Logging.DebugLevel)))) + + val serverTerminator = + handlerFlow + .joinMat(theServer)(Keep.right) + .join(Flow.fromSinkAndSource(toNet.sink, Source.fromPublisher(fromNet))) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val network = new NetworkSide(fromNet, toNet, framesOut) with Http2FrameHpackSupport + } + + class NetworkSide(val fromNet: Probe[ByteString], val toNet: ByteStringSinkProbe, val framesOut: Http2FrameProbe) + extends WindowTracking { + override def frameProbeDelegate = framesOut + + def sendBytes(bytes: ByteString): Unit = fromNet.sendNext(bytes) + + } + + /** Basic TestSetup that has already passed the exchange of the connection preface */ + abstract class TestSetup(initialClientSettings: Setting*) extends TestSetupWithoutHandshake { + network.sendBytes(Http2Protocol.ClientConnectionPreface) + network.expectSETTINGS() + + network.sendFrame(SettingsFrame(immutable.Seq.empty ++ initialClientSettings)) + network.expectSettingsAck() + } + + /** Provides the user handler flow as `requestIn` and `responseOut` probes for manual stream interaction */ + trait RequestResponseProbes extends TestSetupWithoutHandshake { + private lazy val requestIn = TestSubscriber.probe[HttpRequest]() + private lazy val responseOut = TestPublisher.probe[HttpResponse]() + + def handlerFlow: Flow[HttpRequest, HttpResponse, NotUsed] = + Flow.fromSinkAndSource(Sink.fromSubscriber(requestIn), Source.fromPublisher(responseOut)) + + lazy val user = new UserSide(requestIn, responseOut) + + def expectGracefulCompletion(): Unit = { + network.toNet.expectComplete() + user.requestIn.expectComplete() + } + } + + class UserSide(val requestIn: TestSubscriber.Probe[HttpRequest], val responseOut: TestPublisher.Probe[HttpResponse]) { + def expectRequest(): HttpRequest = requestIn.requestNext().removeAttribute(Http2.streamId) + + def expectRequestRaw(): HttpRequest = requestIn.requestNext() // TODO, make it so that internal headers are not listed in `headers` etc? + + def emitResponse(streamId: Int, response: HttpResponse): Unit = + responseOut.sendNext(response.addAttribute(Http2.streamId, streamId)) + + } + + /** Provides the user handler flow as a handler function */ + trait HandlerFunctionSupport extends TestSetupWithoutHandshake { + def parallelism: Int = 2 + + def handler: HttpRequest => Future[HttpResponse] = + _ => Future.successful(HttpResponse()) + + def handlerFlow: Flow[HttpRequest, HttpResponse, NotUsed] = + Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler) + } + + def bytes(num: Int, byte: Byte): ByteString = ByteString(Array.fill[Byte](num)(byte)) +}