diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala index c55d9e61e..1bd369490 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala @@ -447,9 +447,9 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper // We're not planning on sending any data on this stream anymore, so we don't care about window updates. this case rst@RstStreamFrame(streamId, _) => - //TODO if errorCode is REFUSED_STREAM, we should try to open a new stream - val frame = ParsedHeadersFrame(streamId, endStream = true, Seq((":status", "429")), None) - dispatchStream(streamId, frame, ByteString.empty, correlationAttributes, _ => Closed) + val headers = ParsedHeadersFrame(streamId, endStream = false, Seq((":status", "429")), None) + dispatchSubstream(headers, Right(Source.failed(new PeerClosedStreamException(rst.streamId, rst.errorCode))), correlationAttributes) + Closed case _ => expectIncomingStream(event, Closed, HalfClosedLocal(_), correlationAttributes) } diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala index 4a8e2e9c3..33b868475 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala @@ -293,14 +293,21 @@ class Http2ClientSpec extends PekkoSpecWithMaterializer(""" headerPayload.take(3) shouldBe dynamicTableUpdateTo8192 }) "close stream if peer sends RST_STREAM frame with REFUSED_STREAM".inAssertAllStagesStopped(new TestSetup with NetProbes { - user.emitRequest(Post("/", HttpEntity("hello"))) + val data = ByteString("abcd") + user.emitRequest(Post("/", HttpEntity(data))) val TheStreamId = network.expect[HeadersFrame]().streamId + network.expectDATA(TheStreamId, endStream = true, data) network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM) val response = user.expectResponse() response.status should be(StatusCodes.TooManyRequests) + val entityDataIn = ByteStringSinkProbe(response.entity.dataBytes) + val error = entityDataIn.expectError() + error.getMessage shouldBe "Stream with ID [1] was closed by peer with code REFUSED_STREAM(0x07)" + + connectionShouldStillBeUsable() }) }