Skip to content

Commit

Permalink
Http2 frame type throttle (apache#394)
Browse files Browse the repository at this point in the history
* parent 38aa25e
author PJ Fanning <[email protected]> 1698583210 +0000
committer PJ Fanning <[email protected]> 1702470760 +0100

test for http2 rapid reset

Co-Authored-By: Johannes Rudolph <[email protected]>

Update Http2ServerSpec.scala

add throttle and config

Co-Authored-By: Johannes Rudolph <[email protected]>

revert code format change

Update Http2ServerSettings.scala

Update Http2ServerSpec.scala

rework test - still needs proper asserts

refactor tests

scalafmt

Create http2-rapid-reset-configs.backwards.excludes

refactor test

update test

add ability to disable throttle

Update Http2ServerDisableResetSpec.scala

use keepLeft after rapidResetMitigation

Update http2-rapid-reset-configs.backwards.excludes

uptake sbt-pekko-build

use updated plugin

refactor

sbt-pekko-build 0.1.0

rename configs

rename vars

* remove imports

* Update http2-rapid-reset-configs.backwards.excludes

* don't throttle header frames

* only throttle reset frames

* rename params

* Update Http2ServerDisableResetSpec.scala

* Rapid reset bench (#2)

* Update H2ClientServerBenchmark.scala

* don't throttle header frames

* only throttle reset frames

* rename params

* Update H2ClientServerBenchmark.scala

* Update Http2ServerDisableResetSpec.scala

* disable throttle by default

* rename methods

* rename configs

* extra config

* scalafmt

* refactor

* Update reference.conf

* use sets

* test frame type alias

* scalafmt

* Update reference.conf

* rename params

* Update Http2Blueprint.scala
  • Loading branch information
pjfanning committed Jan 19, 2024
1 parent 2e28b00 commit faa5579
Show file tree
Hide file tree
Showing 12 changed files with 448 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import pekko.stream.ActorMaterializer
import pekko.stream.TLSProtocol.{ SslTlsInbound, SslTlsOutbound }
import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, Source }
import pekko.util.ByteString
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._

import java.util.concurrent.{ CountDownLatch, TimeUnit }
Expand All @@ -42,6 +43,9 @@ class H2ClientServerBenchmark extends CommonBenchmark with H2RequestResponseBenc

val numRequests = 1000

@Param(Array("[]", "[\"reset\"]"))
var frameTypeThrottleFrameTypes: String = _

@Benchmark
@OperationsPerInvocation(1000) // should be same as numRequest
def benchRequestProcessing(): Unit = {
Expand Down Expand Up @@ -71,8 +75,9 @@ class H2ClientServerBenchmark extends CommonBenchmark with H2RequestResponseBenc
def setup(): Unit = {
initRequestResponse()

system = ActorSystem("PekkoHttpBenchmarkSystem", config)
mat = ActorMaterializer()
val throttleConfig = ConfigFactory.parseString(
s"pekko.http.server.http2.frame-type-throttle.frame-types=$frameTypeThrottleFrameTypes")
system = ActorSystem("PekkoHttpBenchmarkSystem", throttleConfig.withFallback(config))
val settings = implicitly[ServerSettings]
val log = system.log
implicit val ec = system.dispatcher
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# New configs added to support throttling HTTP/2 reset frames
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleFrameTypes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleCost")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleBurst")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.frameTypeThrottleInterval")
12 changes: 12 additions & 0 deletions http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ pekko.http {
# Fail the connection if a sent ping is not acknowledged within this timeout.
# When zero the ping-interval is used, if set the value must be evenly divisible by less than or equal to the ping-interval.
ping-timeout = 0s

frame-type-throttle {
# Configure the throttle for non-data frame types (https://github.com/apache/incubator-pekko-http/issues/332).
# The supported frame-types for throttlng are:
# reset, headers, continuation, go-away, priority, ping, push-promise, window-update
# If you are concerned about CVE-2023-44487, you could set: pekko.http.server.http2.frame-type-throttle.frame-types = ["reset"]
frame-types = []
cost = 100
burst = 100
# interval must be a positive duration
interval = 1s
}
}

websocket {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import pekko.http.scaladsl.settings.{
ParserSettings,
ServerSettings
}
import pekko.stream.{ BidiShape, Graph, StreamTcpException }
import pekko.stream.{ BidiShape, Graph, StreamTcpException, ThrottleMode }
import pekko.stream.TLSProtocol._
import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Source }
import pekko.util.ByteString
Expand Down Expand Up @@ -122,12 +122,20 @@ private[http] object Http2Blueprint {
telemetry: TelemetrySpi,
dateHeaderRendering: DateHeaderRendering): BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, ServerTerminator] = {
val masterHttpHeaderParser = HttpHeaderParser(settings.parserSettings, log) // FIXME: reuse for framing
telemetry.serverConnection atop

val initialFlow = telemetry.serverConnection atop
httpLayer(settings, log, dateHeaderRendering) atopKeepRight
serverDemux(settings.http2Settings, initialDemuxerSettings, upgraded) atop
FrameLogger.logFramesIfEnabled(settings.http2Settings.logFrames) atop // enable for debugging
hpackCoding(masterHttpHeaderParser, settings.parserSettings) atop
framing(log) atop
hpackCoding(masterHttpHeaderParser, settings.parserSettings)

val frameTypesForThrottle = getFrameTypesForThrottle(settings.http2Settings)

val flowWithPossibleThrottle = if (frameTypesForThrottle.nonEmpty) {
initialFlow atop rapidResetMitigation(settings.http2Settings, frameTypesForThrottle) atopKeepLeft framing(log)
} else initialFlow atop framing(log)

flowWithPossibleThrottle atop
errorHandling(log) atop
idleTimeoutIfConfigured(settings.idleTimeout)
}
Expand Down Expand Up @@ -198,6 +206,41 @@ private[http] object Http2Blueprint {
Flow[FrameEvent].map(FrameRenderer.render).prepend(Source.single(Http2Protocol.ClientConnectionPreface)),
Flow[ByteString].via(new Http2FrameParsing(shouldReadPreface = false, log)))

private def rapidResetMitigation(settings: Http2ServerSettings,
frameTypesForThrottle: Set[String]): BidiFlow[FrameEvent, FrameEvent, FrameEvent, FrameEvent, NotUsed] = {
def frameCost(event: FrameEvent): Int = {
if (frameTypesForThrottle.contains(event.frameTypeName)) 1 else 0
}

BidiFlow.fromFlows(
Flow[FrameEvent],
Flow[FrameEvent].throttle(settings.frameTypeThrottleCost, settings.frameTypeThrottleInterval,
settings.frameTypeThrottleBurst, frameCost, ThrottleMode.Enforcing))
}

private def getFrameTypesForThrottle(settings: Http2ServerSettings): Set[String] = {
val set = settings.frameTypeThrottleFrameTypes
if (set.isEmpty) {
Set.empty
} else {
set.flatMap(frameTypeAliasToFrameTypeName)
}
}

private[http2] def frameTypeAliasToFrameTypeName(frameType: String): Option[String] = {
frameType.toLowerCase match {
case "reset" => Some("RstStreamFrame")
case "headers" => Some("HeadersFrame")
case "continuation" => Some("ContinuationFrame")
case "go-away" => Some("GoAwayFrame")
case "priority" => Some("PriorityFrame")
case "ping" => Some("PingFrame")
case "push-promise" => Some("PushPromiseFrame")
case "window-update" => Some("WindowUpdateFrame")
case _ => None
}
}

/**
* Runs hpack encoding and decoding. Incoming frames that are processed are HEADERS and CONTINUATION.
* Outgoing frame is ParsedHeadersFrame.
Expand Down Expand Up @@ -290,5 +333,9 @@ private[http] object Http2Blueprint {
def atopKeepRight[OO1, II2, Mat2](
other: Graph[BidiShape[O1, OO1, II2, I2], Mat2]): BidiFlow[I1, OO1, II2, O2, Mat2] =
bidi.atopMat(other)(Keep.right)

def atopKeepLeft[OO1, II2, Mat2](
other: Graph[BidiShape[O1, OO1, II2, I2], Mat2]): BidiFlow[I1, OO1, II2, O2, Mat] =
bidi.atopMat(other)(Keep.left)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import java.time.Duration
import org.apache.pekko
import pekko.annotation.DoNotInherit
import pekko.http.scaladsl
import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.Config
import scala.concurrent.duration._

Expand Down Expand Up @@ -51,6 +52,14 @@ trait Http2ServerSettings {

def getPingTimeout: Duration = Duration.ofMillis(pingTimeout.toMillis)
def withPingTimeout(timeout: Duration): Http2ServerSettings = withPingTimeout(timeout.toMillis.millis)

def getFrameTypeThrottleFrameTypes(): java.util.Set[String] = frameTypeThrottleFrameTypes.asJava
def getFrameTypeThrottleCost(): Int = frameTypeThrottleCost
def getFrameTypeThrottleBurst(): Int = frameTypeThrottleBurst
def getFrameTypeThrottleInterval: Duration = Duration.ofMillis(frameTypeThrottleInterval.toMillis)

def withFrameTypeThrottleInterval(interval: Duration): Http2ServerSettings =
withFrameTypeThrottleInterval(interval.toMillis.millis)
}
object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {
def create(config: Config): Http2ServerSettings = scaladsl.settings.Http2ServerSettings(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import pekko.annotation.DoNotInherit
import pekko.annotation.InternalApi
import pekko.http.impl.util._
import pekko.http.javadsl
import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.Config

import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -102,6 +103,18 @@ trait Http2ServerSettings extends javadsl.settings.Http2ServerSettings with Http
def pingTimeout: FiniteDuration
def withPingTimeout(timeout: FiniteDuration): Http2ServerSettings = copy(pingTimeout = timeout)

def frameTypeThrottleFrameTypes: Set[String]
def withFrameTypeThrottleFrameTypes(frameTypes: Set[String]) = copy(frameTypeThrottleFrameTypes = frameTypes)

def frameTypeThrottleCost: Int
def withFrameTypeThrottleCost(cost: Int) = copy(frameTypeThrottleCost = cost)

def frameTypeThrottleBurst: Int
def withFrameTypeThrottleBurst(burst: Int) = copy(frameTypeThrottleBurst = burst)

def frameTypeThrottleInterval: FiniteDuration
def withFrameTypeThrottleInterval(interval: FiniteDuration) = copy(frameTypeThrottleInterval = interval)

@InternalApi
private[http] def internalSettings: Option[Http2InternalServerSettings]
@InternalApi
Expand All @@ -124,6 +137,10 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {
logFrames: Boolean,
pingInterval: FiniteDuration,
pingTimeout: FiniteDuration,
frameTypeThrottleFrameTypes: Set[String],
frameTypeThrottleCost: Int,
frameTypeThrottleBurst: Int,
frameTypeThrottleInterval: FiniteDuration,
internalSettings: Option[Http2InternalServerSettings])
extends Http2ServerSettings {
require(maxConcurrentStreams >= 0, "max-concurrent-streams must be >= 0")
Expand All @@ -136,6 +153,7 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {
require(minCollectStrictEntitySize <= (incomingConnectionLevelBufferSize / maxConcurrentStreams),
"min-collect-strict-entity-size <= incoming-connection-level-buffer-size / max-concurrent-streams")
require(outgoingControlFrameBufferSize > 0, "outgoing-control-frame-buffer-size must be > 0")
require(frameTypeThrottleInterval.toMillis > 0, "frame-type-throttle.interval must be a positive duration")
Http2CommonSettings.validate(this)
}

Expand All @@ -151,6 +169,10 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {
logFrames = c.getBoolean("log-frames"),
pingInterval = c.getFiniteDuration("ping-interval"),
pingTimeout = c.getFiniteDuration("ping-timeout"),
frameTypeThrottleFrameTypes = c.getStringList("frame-type-throttle.frame-types").asScala.toSet,
frameTypeThrottleCost = c.getInt("frame-type-throttle.cost"),
frameTypeThrottleBurst = c.getInt("frame-type-throttle.burst"),
frameTypeThrottleInterval = c.getFiniteDuration("frame-type-throttle.interval"),
None // no possibility to configure internal settings with config
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http.impl.engine.http2

import org.apache.pekko
import pekko.http.impl.engine.http2.FrameEvent._
import pekko.http.impl.engine.http2.Http2Protocol.ErrorCode
import pekko.util.ByteString
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class Http2BlueprintSpec extends AnyWordSpec with Matchers {
"Http2Blueprint" should {
"match frame type alias (reset)" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("reset") shouldEqual
Some(RstStreamFrame(0, ErrorCode.PROTOCOL_ERROR).frameTypeName)
}
"match frame type alias (headers)" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("headers") shouldEqual
Some(HeadersFrame(0, true, true, ByteString.empty, None).frameTypeName)
}
"match frame type alias (continuation)" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("continuation") shouldEqual
Some(ContinuationFrame(0, true, ByteString.empty).frameTypeName)
}
"match frame type alias (go-away)" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("go-away") shouldEqual
Some(GoAwayFrame(0, ErrorCode.PROTOCOL_ERROR).frameTypeName)
}
"match frame type alias (priority)" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("priority") shouldEqual
Some(PriorityFrame(0, true, 0, 0).frameTypeName)
}
"match frame type alias (ping)" in {
val rnd = new java.util.Random()
val bytes = new Array[Byte](8)
rnd.nextBytes(bytes)
Http2Blueprint.frameTypeAliasToFrameTypeName("ping") shouldEqual
Some(PingFrame(true, ByteString(bytes)).frameTypeName)
}
"match frame type alias (push-promise)" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("push-promise") shouldEqual
Some(PushPromiseFrame(0, true, 0, ByteString.empty).frameTypeName)
}
"match frame type alias (window-update)" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("window-update") shouldEqual
Some(WindowUpdateFrame(0, 0).frameTypeName)
}
"not match empty frame type alias" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("") shouldEqual None
}
"not match unknown frame type alias" in {
Http2Blueprint.frameTypeAliasToFrameTypeName("unknown") shouldEqual None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ trait WithLogCapturing extends SuiteMixin { this: TestSuite =>
*/
protected def failOnSevereMessages: Boolean = false

/**
* We expect a severe message but the message should contain this text. If there are any other severe messages,
* the test will fail.
*/
protected val expectSevereLogsOnlyToMatch: Option[String] = None

/**
* Can be overridden to adapt which events should be considered as severe if `failOnSevereMessages` is
* enabled.
Expand Down Expand Up @@ -86,6 +92,19 @@ trait WithLogCapturing extends SuiteMixin { this: TestSuite =>
Failed(new AssertionError(
s"No severe log messages should be emitted during test run but got [${stats(
Logging.WarningLevel)}] warnings and [${stats(Logging.ErrorLevel)}] errors (see marked lines above)"))
} else if (expectSevereLogsOnlyToMatch.nonEmpty) {
val severeEvents = events.filter(isSevere(_))
val matchingEvents = severeEvents.filter(_.message.toString.contains(expectSevereLogsOnlyToMatch.get))
if (severeEvents.isEmpty || matchingEvents != severeEvents) {
val stats = events.groupBy(_.level).mapValues(_.size).toMap.withDefaultValue(0)
flushLog()

Failed(new AssertionError(
s"Expected an error during test run but got unexpected results - got [${
stats(
Logging.WarningLevel)
}] warnings and [${stats(Logging.ErrorLevel)}] errors (see marked lines above)"))
} else res
} else res

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.http.impl.engine.http2

import org.apache.pekko
import pekko.http.impl.engine.http2.Http2Protocol.FrameType
import pekko.http.impl.engine.http2.framing.FrameRenderer
import pekko.util.ByteStringBuilder

import java.nio.ByteOrder

/**
* This tests the http2 server throttle support for rapid resets is disabled by default.
*/
class Http2ServerDisableFrameTypeThrottleSpec extends Http2SpecWithMaterializer("""
pekko.http.server.remote-address-header = on
pekko.http.server.http2.log-frames = on
""") {
override def failOnSevereMessages: Boolean = true

"The Http/2 server implementation" should {
"not cancel connection during rapid reset attack (throttle disabled)".inAssertAllStagesStopped(
new TestSetup with RequestResponseProbes {
implicit val bigEndian: ByteOrder = ByteOrder.BIG_ENDIAN
val bb = new ByteStringBuilder
bb.putInt(0)
val rstFrame = FrameRenderer.renderFrame(FrameType.RST_STREAM, ByteFlag.Zero, 1, bb.result())
val longFrame = Seq.fill(1000)(rstFrame).reduce(_ ++ _)
network.sendBytes(longFrame)
})
}
}
Loading

0 comments on commit faa5579

Please sign in to comment.