From e9b3ab2c96cf90b55475482ce871c76ca14ce9bd Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Tue, 30 May 2023 23:51:47 -0700 Subject: [PATCH 1/2] `KeepAliveManager`: add tests for #2611 Motivation: #2611 fixed behavior, but didn't add new tests to assert fixed scenarios. Modifications: - Test that the input will be shutdown after timeout; - Test that channel is closes if any write operation inside `KeepAliveManager` fails; - Reduce `KeepAliveTest` timeouts to speed up local tests; Result: Behavior fixed in #2611 is tested now. --- .../servicetalk/grpc/netty/KeepAliveTest.java | 7 +- ...GracefulConnectionClosureHandlingTest.java | 4 +- .../http/netty/KeepAliveManagerTest.java | 127 +++++++++++++++--- 3 files changed, 117 insertions(+), 21 deletions(-) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java index 36e39a533b..a3836698ca 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java @@ -45,6 +45,7 @@ import static io.servicetalk.concurrent.api.Publisher.never; import static io.servicetalk.concurrent.api.Single.failed; +import static io.servicetalk.concurrent.internal.TestTimeoutConstants.CI; import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy; import static io.servicetalk.grpc.api.GrpcStatusCode.UNIMPLEMENTED; import static io.servicetalk.grpc.netty.GrpcServers.forAddress; @@ -57,7 +58,7 @@ import static io.servicetalk.transport.api.ServiceTalkSocketOptions.IDLE_TIMEOUT; import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; -import static java.time.Duration.ofSeconds; +import static java.time.Duration.ofMillis; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -112,8 +113,8 @@ private void setUp(final boolean keepAlivesFromClient, static Stream data() { return Stream.of( - Arguments.of(true, ofSeconds(1), ofSeconds(2)), - Arguments.of(false, ofSeconds(1), ofSeconds(2)) + Arguments.of(true, ofMillis(CI ? 1000 : 200), ofMillis(CI ? 2000 : 400)), + Arguments.of(false, ofMillis(CI ? 1000 : 200), ofMillis(CI ? 2000 : 400)) ); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java index 421d766e8a..296c2c4c07 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java @@ -77,6 +77,7 @@ import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable; import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.internal.TestTimeoutConstants.CI; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH; import static io.servicetalk.http.api.HttpHeaderValues.ZERO; @@ -101,7 +102,6 @@ import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.CHANNEL_CLOSED_INBOUND; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.GRACEFUL_USER_CLOSING; import static io.servicetalk.utils.internal.PlatformDependent.throwException; -import static java.lang.Boolean.parseBoolean; import static java.lang.Integer.parseInt; import static java.lang.String.valueOf; import static java.nio.charset.StandardCharsets.US_ASCII; @@ -120,7 +120,7 @@ class GracefulConnectionClosureHandlingTest { private static final Logger LOGGER = LoggerFactory.getLogger(GracefulConnectionClosureHandlingTest.class); - private static final long TIMEOUT_MILLIS = parseBoolean(System.getenv("CI")) ? 1000 : 200; + private static final long TIMEOUT_MILLIS = CI ? 1000 : 200; private static final Collection TRUE_FALSE = asList(true, false); static final HttpStreamingSerializer RAW_STRING_SERIALIZER = diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java index ee249ac794..1c402dfaec 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.socket.ChannelInputShutdownReadComplete; @@ -35,16 +36,22 @@ import io.netty.util.concurrent.Promise; import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.time.Duration; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import static io.netty.util.ReferenceCountUtil.release; -import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_ACK_TIMEOUT; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static io.servicetalk.concurrent.internal.TestTimeoutConstants.CI; import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_IDLE_DURATION; +import static java.time.Duration.ofMillis; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; @@ -60,7 +67,10 @@ class KeepAliveManagerTest { + private static final Duration ACK_TIMEOUT = CI ? ofMillis(1000) : ofMillis(200); + private final BlockingQueue scheduledTasks = new LinkedBlockingQueue<>(); + private final AtomicBoolean failWrite = new AtomicBoolean(); private EmbeddedChannel channel; private KeepAliveManager manager; @@ -71,7 +81,9 @@ void tearDown() { private void setUp(boolean duplex, boolean allowPingWithoutActiveStreams) { KeepAliveManagerHandler managerHandler = new KeepAliveManagerHandler(); - channel = duplex ? new EmbeddedDuplexChannel(true, managerHandler) : new EmbeddedChannel(managerHandler); + FailWriteHandler failWriteHandler = new FailWriteHandler(); + channel = duplex ? new EmbeddedDuplexChannel(true, failWriteHandler, managerHandler) + : new EmbeddedChannel(failWriteHandler, managerHandler); manager = newManager(allowPingWithoutActiveStreams, channel); managerHandler.keepAliveManager(manager); } @@ -116,7 +128,7 @@ void keepAlivePingAckReceived(boolean duplex) { manager.pingReceived(new DefaultHttp2PingFrame(ping.content(), true)); assertThat("Ping ack timeout task not cancelled.", ackTimeoutTask.promise.isCancelled(), is(true)); - ackTimeoutTask.task.run(); + ackTimeoutTask.runTask(); verifyNoWrite(); verifyNoScheduledTasks(); assertThat("Channel unexpectedly closed.", channel.isOpen(), is(true)); @@ -155,7 +167,7 @@ void gracefulCloseNoActiveStreams(boolean duplex) throws Exception { sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, duplex); shutdownInputIfDuplexChannel(); - channel.closeFuture().sync().await(); + channel.closeFuture().await(); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -170,7 +182,7 @@ void gracefulCloseWithActiveStreams(boolean duplex) throws Exception { assertThat("Channel not closed.", channel.isOpen(), is(true)); activeStream.close().sync().await(); shutdownInputIfDuplexChannel(); - channel.closeFuture().sync().await(); + channel.closeFuture().await(); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -180,10 +192,10 @@ void gracefulCloseNoActiveStreamsMissingPingAck(boolean duplex) throws Exception initiateGracefulCloseVerifyGoAwayAndPing(manager); ScheduledTask pingAckTimeoutTask = scheduledTasks.take(); - pingAckTimeoutTask.task.run(); + pingAckTimeoutTask.runTask(); verifySecondGoAway(duplex); shutdownInputIfDuplexChannel(); - channel.closeFuture().sync().await(); + channel.closeFuture().await(); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -194,13 +206,13 @@ void gracefulCloseActiveStreamsMissingPingAck(boolean duplex) throws Exception { initiateGracefulCloseVerifyGoAwayAndPing(manager); ScheduledTask pingAckTimeoutTask = scheduledTasks.take(); - pingAckTimeoutTask.task.run(); + pingAckTimeoutTask.runTask(); verifySecondGoAway(duplex); assertThat("Channel closed.", channel.isOpen(), is(true)); activeStream.close().sync().await(); shutdownInputIfDuplexChannel(); - channel.closeFuture().sync().await(); + channel.closeFuture().await(); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -261,6 +273,66 @@ void channelClosedDuringPing(boolean duplex) { verifyNoOtherActionPostClose(manager); } + @Test + void duplexGracefulCloseNoInputShutdown() throws Exception { + setUp(true, false); + Http2PingFrame pingFrame = initiateGracefulCloseVerifyGoAwayAndPing(manager); + + sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, true); + EmbeddedDuplexChannel duplexChannel = (EmbeddedDuplexChannel) channel; + duplexChannel.awaitOutputShutdown(); + // Don't shutdown input, verify that timeout will force channel closure. + // Use CountDownLatch instead of channel.closeFuture().await() to avoid BlockingOperationException. + CountDownLatch closeLatch = new CountDownLatch(1); + channel.closeFuture().addListener(f -> closeLatch.countDown()); + assertThat("Channel closed unexpectedly", + closeLatch.await(ACK_TIMEOUT.toMillis(), MILLISECONDS), is(false)); + ScheduledTask inputShutdownTimeoutTask = scheduledTasks.take(); + inputShutdownTimeoutTask.runTask(); + closeLatch.await(); + channel.closeFuture().await(); + } + + @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") + @ValueSource(booleans = {true, false}) + void failureToWritePingClosesChannel(boolean duplex) throws Exception { + setUp(duplex, true); + failWrite.set(true); + manager.channelIdle(); + channel.closeFuture().await(); + } + + @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") + @ValueSource(booleans = {true, false}) + void failureToWriteLastGoAwayAfterPingAckTimeoutClosesChannel(boolean duplex) throws Exception { + setUp(duplex, true); + manager.channelIdle(); + verifyWrite(instanceOf(Http2PingFrame.class)); + ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled(); + failWrite.set(true); + ackTimeoutTask.runTask(); + channel.closeFuture().await(); + } + + @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") + @ValueSource(booleans = {true, false}) + void failureToWriteFirstGoAwayClosesChannel(boolean duplex) throws Exception { + setUp(duplex, true); + failWrite.set(true); + initiateGracefulClose(manager); + channel.closeFuture().await(); + } + + @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") + @ValueSource(booleans = {true, false}) + void failureToWriteSecondGoAwayClosesChannel(boolean duplex) throws Exception { + setUp(duplex, true); + Http2PingFrame pingFrame = initiateGracefulCloseVerifyGoAwayAndPing(manager); + failWrite.set(true); + manager.pingReceived(new DefaultHttp2PingFrame(pingFrame.content(), true)); + channel.closeFuture().await(); + } + private void verifyNoOtherActionPostClose(final KeepAliveManager manager) { manager.channelIdle(); verifyNoWrite(); @@ -277,7 +349,7 @@ private ScheduledTask verifyPingAckTimeoutScheduled() { ScheduledTask ackTimeoutTask = scheduledTasks.poll(); assertThat("Ping ack timeout not scheduled.", ackTimeoutTask, is(notNullValue())); assertThat("Unexpected ping ack timeout duration.", ackTimeoutTask.delayMillis, - is(DEFAULT_ACK_TIMEOUT.toMillis())); + is(ACK_TIMEOUT.toMillis())); return ackTimeoutTask; } @@ -289,7 +361,7 @@ private void sendGracefulClosePingAckAndVerifySecondGoAway(final KeepAliveManage assertThat("Ping ack task not cancelled.", pingAckTimeoutTask.promise.isCancelled(), is(true)); verifySecondGoAway(duplex); - pingAckTimeoutTask.task.run(); + pingAckTimeoutTask.runTask(); verifyNoWrite(); if (duplex) { @@ -312,9 +384,7 @@ private void verifySecondGoAway(boolean duplex) { } private Http2PingFrame initiateGracefulCloseVerifyGoAwayAndPing(final KeepAliveManager manager) { - Runnable whenInitiated = mock(Runnable.class); - manager.initiateGracefulClose(whenInitiated); - verify(whenInitiated).run(); + initiateGracefulClose(manager); Http2GoAwayFrame firstGoAway = verifyWrite(instanceOf(Http2GoAwayFrame.class)); assertThat("Unexpected error in go_away", firstGoAway.errorCode(), is(Http2Error.NO_ERROR.code())); @@ -325,6 +395,12 @@ private Http2PingFrame initiateGracefulCloseVerifyGoAwayAndPing(final KeepAliveM return pingFrame; } + private void initiateGracefulClose(final KeepAliveManager manager) { + Runnable whenInitiated = mock(Runnable.class); + manager.initiateGracefulClose(whenInitiated); + verify(whenInitiated).run(); + } + @SuppressWarnings("unchecked") private T verifyWrite(Matcher writeMatcher) { Object written = channel.outboundMessages().poll(); @@ -369,7 +445,7 @@ private Http2StreamChannel addActiveStream(final KeepAliveManager manager) { private void verifyChannelCloseOnMissingPingAck(final ScheduledTask ackTimeoutTask, boolean duplex) throws InterruptedException { - ackTimeoutTask.task.run(); + ackTimeoutTask.runTask(); verifyWrite(instanceOf(Http2GoAwayFrame.class)); if (duplex) { verifyAtMostOneScheduledTasks(); @@ -383,7 +459,7 @@ private void verifyChannelCloseOnMissingPingAck(final ScheduledTask ackTimeoutTa private KeepAliveManager newManager(final boolean allowPingWithoutActiveStreams, final Channel channel) { KeepAlivePolicy policy = mock(KeepAlivePolicy.class); when(policy.idleDuration()).thenReturn(DEFAULT_IDLE_DURATION); - when(policy.ackTimeout()).thenReturn(DEFAULT_ACK_TIMEOUT); + when(policy.ackTimeout()).thenReturn(ACK_TIMEOUT); when(policy.withoutActiveStreams()).thenReturn(allowPingWithoutActiveStreams); return new KeepAliveManager(channel, policy, (task, delay, unit) -> { @@ -416,6 +492,13 @@ private static final class ScheduledTask { this.promise = promise; this.delayMillis = delayMillis; } + + void runTask() { + if (promise.isCancelled()) { + return; + } + task.run(); + } } private static final class KeepAliveManagerHandler extends ChannelInboundHandlerAdapter { @@ -446,4 +529,16 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt } } } + + private final class FailWriteHandler extends ChannelOutboundHandlerAdapter { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (failWrite.get()) { + release(msg); + promise.tryFailure(DELIBERATE_EXCEPTION); + return; + } + ctx.write(msg, promise); + } + } } From e5da28f79fe20a59c6bf26560ca488eccefbc7d3 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 31 May 2023 13:01:08 -0700 Subject: [PATCH 2/2] `KeepAliveManager`: report an exception to `ConnectionObserver` Motivation: When `KeepAliveManager` closes a channel due to some error, it force closes the `Channel` but does not report an error to `ConnectionObserver`. Modifications: - `KeepAliveManager` uses `ChannelCloseUtils` to make errors visible for `ConnectionObserver`; - When timeout fires, create `TimeoutException` that will be reported to `ConnectionObserver`; - If it doesn't receive `PING(ACK)` during graceful closure process, force close the channel regardless of a number of active streams; - Enhance `KeepAliveManagerTest` to verify new behavior; Result: All errors observed by `KeepAliveManager` are visible to `ConnectionObserver`. --- .../http/netty/KeepAliveManager.java | 79 ++++++++++++++----- .../http/netty/KeepAliveManagerTest.java | 44 +++++++++-- 2 files changed, 98 insertions(+), 25 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java index 2a38eca6d9..ba6b97c002 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java @@ -15,7 +15,9 @@ */ package io.servicetalk.http.netty; +import io.servicetalk.concurrent.internal.ThrowableUtils; import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy; +import io.servicetalk.transport.netty.internal.ChannelCloseUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -35,6 +37,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -43,6 +46,7 @@ import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_ACK_TIMEOUT; +import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed; import static java.lang.Boolean.TRUE; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -141,17 +145,23 @@ protected void channelIdle(final ChannelHandlerContext ctx, final IdleStateEvent keepAliveState = scheduler.afterDuration(() -> { if (keepAliveState != null) { keepAliveState = State.KEEP_ALIVE_ACK_TIMEDOUT; + final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos); LOGGER.debug( "{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY and " + "closing the channel with activeStreams={}", - this.channel, NANOSECONDS.toMillis(pingAckTimeoutNanos), activeStreams); + this.channel, timeoutMillis, activeStreams); + final TimeoutException cause = StacklessTimeoutException.newInstance( + "Timeout after " + timeoutMillis + "ms waiting for keep-alive PING(ACK)", + KeepAliveManager.class, "keepAlivePingAckTimeout()"); channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR)) .addListener(f -> { + Throwable closeCause = cause; if (!f.isSuccess()) { + closeCause = addSuppressed(f.cause(), cause); LOGGER.debug("{} Failed to write the last GO_AWAY after PING(ACK) " + - "timeout, closing the channel", channel, f.cause()); + "timeout, closing the channel", channel, closeCause); } - close0(f.cause()); + close0(closeCause); }); } }, pingAckTimeoutNanos, NANOSECONDS); @@ -178,7 +188,7 @@ void pingReceived(final Http2PingFrame pingFrame) { LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY, activeStreams={}", channel, activeStreams); cancelIfStateIsAFuture(gracefulCloseState); - gracefulCloseWriteSecondGoAway(); + gracefulCloseWriteSecondGoAway(null); } else if (pingAckContent == KEEP_ALIVE_PING_CONTENT) { LOGGER.trace("{} PING(ACK) received, activeStreams={}", channel, activeStreams); cancelIfStateIsAFuture(keepAliveState); @@ -294,12 +304,18 @@ private void channelHalfShutdown(String side, Predicate otherSide gracefulCloseState != State.CLOSED) { // If we have not started the graceful close process, or waiting for ack/read to complete the graceful // close process just force a close now because we will not read any more data. - LOGGER.debug("{} Observed {} shutdown, graceful close is not started or in progress, must force " + + final String state = gracefulCloseState == null ? "not started" : "in progress"; + final IllegalStateException cause = new IllegalStateException("Observed " + side + + " shutdown while graceful closure is " + state); + LOGGER.debug("{} Observed {} shutdown while graceful closure is {}, must force " + "channel closure with activeStreams={}, gracefulCloseState={}, keepAliveState={}", - channel, side, activeStreams, gracefulCloseState, keepAliveState); - channel.close(); + channel, side, state, activeStreams, gracefulCloseState, keepAliveState, cause); + ChannelCloseUtils.close(channel, cause); } } else { + LOGGER.debug("{} Observed {} shutdown, closing non-duplex channel with " + + "activeStreams={}, gracefulCloseState={}, keepAliveState={}", + channel, side, activeStreams, gracefulCloseState, keepAliveState); channel.close(); } } @@ -342,16 +358,19 @@ private void doCloseAsyncGracefully0(final Runnable whenInitiated) { // If the PING(ACK) times out we may have under estimated the 2RTT time so we // optimistically keep the connection open and rely upon higher level timeouts to tear // down the connection. - LOGGER.debug( - "{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second GO_AWAY", - channel, NANOSECONDS.toMillis(pingAckTimeoutNanos)); - gracefulCloseWriteSecondGoAway(); + final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos); + LOGGER.debug("{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second " + + "GO_AWAY and closing the channel with activeStreams={}", + channel, timeoutMillis, activeStreams); + gracefulCloseWriteSecondGoAway(StacklessTimeoutException.newInstance( + "Timeout after " + timeoutMillis + "ms waiting for graceful close PING(ACK)", + KeepAliveManager.class, "gracefulClosePingAckTimeout()")); }, pingAckTimeoutNanos, NANOSECONDS); } }); } - private void gracefulCloseWriteSecondGoAway() { + private void gracefulCloseWriteSecondGoAway(@Nullable final Throwable cause) { assert channel.eventLoop().inEventLoop(); if (gracefulCloseState == State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT) { @@ -362,10 +381,11 @@ private void gracefulCloseWriteSecondGoAway() { channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR)).addListener(future -> { if (!future.isSuccess()) { - LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, future.cause()); - close0(future.cause()); - } else if (activeStreams == 0) { - close0(null); + final Throwable closeCause = cause == null ? future.cause() : addSuppressed(future.cause(), cause); + LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, closeCause); + close0(closeCause); + } else if (cause != null || activeStreams == 0) { + close0(cause); } }); } @@ -383,7 +403,7 @@ private void close0(@Nullable Throwable cause) { if (cause != null) { // Previous write failed with an exception, close immediately. - channel.close(); + ChannelCloseUtils.close(channel, cause); return; } // The way netty H2 stream state machine works, we may trigger stream closures during writes with flushes @@ -422,9 +442,12 @@ private void doShutdownOutput() { if (duplexChannel.isInputShutdown()) { return; } + final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos); LOGGER.debug("{} Timeout after {}ms waiting for InputShutdown, closing the channel", - channel, NANOSECONDS.toMillis(pingAckTimeoutNanos)); - channel.close(); + channel, timeoutMillis); + ChannelCloseUtils.close(channel, StacklessTimeoutException.newInstance( + "Timeout after " + timeoutMillis + "ms waiting for InputShutdown", + KeepAliveManager.class, "doShutdownOutput()")); }, pingAckTimeoutNanos, NANOSECONDS); } }); @@ -441,4 +464,22 @@ private void cancelIfStateIsAFuture(@Nullable final Object state) { } } } + + private static final class StacklessTimeoutException extends TimeoutException { + private static final long serialVersionUID = -8647261218787418981L; + + private StacklessTimeoutException(final String message) { + super(message); + } + + @Override + public Throwable fillInStackTrace() { + // Don't fill in the stacktrace to reduce performance overhead + return this; + } + + static StacklessTimeoutException newInstance(final String message, final Class clazz, final String method) { + return ThrowableUtils.unknownStackTrace(new StacklessTimeoutException(message), clazz, method); + } + } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java index 1c402dfaec..7db1475c96 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java @@ -15,7 +15,10 @@ */ package io.servicetalk.http.netty; +import io.servicetalk.concurrent.internal.DeliberateException; import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy; +import io.servicetalk.transport.api.ConnectionObserver; +import io.servicetalk.transport.netty.internal.ConnectionObserverInitializer; import io.servicetalk.transport.netty.internal.EmbeddedDuplexChannel; import io.netty.buffer.ByteBuf; @@ -45,7 +48,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import static io.netty.util.ReferenceCountUtil.release; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; @@ -60,6 +65,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -71,6 +77,7 @@ class KeepAliveManagerTest { private final BlockingQueue scheduledTasks = new LinkedBlockingQueue<>(); private final AtomicBoolean failWrite = new AtomicBoolean(); + private final ConnectionObserver connectionObserver = mock(ConnectionObserver.class); private EmbeddedChannel channel; private KeepAliveManager manager; @@ -84,6 +91,7 @@ private void setUp(boolean duplex, boolean allowPingWithoutActiveStreams) { FailWriteHandler failWriteHandler = new FailWriteHandler(); channel = duplex ? new EmbeddedDuplexChannel(true, failWriteHandler, managerHandler) : new EmbeddedChannel(failWriteHandler, managerHandler); + new ConnectionObserverInitializer(connectionObserver, false, false).init(channel); manager = newManager(allowPingWithoutActiveStreams, channel); managerHandler.keepAliveManager(manager); } @@ -138,7 +146,7 @@ void keepAlivePingAckReceived(boolean duplex) { @ValueSource(booleans = {true, false}) void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception { setUp(duplex, false); - addActiveStream(manager); + Http2StreamChannel activeStream = addActiveStream(manager); manager.channelIdle(); Http2PingFrame ping = verifyWrite(instanceOf(Http2PingFrame.class)); ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled(); @@ -147,16 +155,20 @@ void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception { assertThat("Ping ack timeout task cancelled.", ackTimeoutTask.promise.isCancelled(), is(false)); verifyChannelCloseOnMissingPingAck(ackTimeoutTask, duplex); + activeStream.closeFuture().await(); + verifyConnectionObserver(TimeoutException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @ValueSource(booleans = {true, false}) void keepAliveMissingPingAck(boolean duplex) throws Exception { setUp(duplex, false); - addActiveStream(manager); + Http2StreamChannel activeStream = addActiveStream(manager); manager.channelIdle(); verifyWrite(instanceOf(Http2PingFrame.class)); verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex); + activeStream.closeFuture().await(); + verifyConnectionObserver(TimeoutException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -168,6 +180,7 @@ void gracefulCloseNoActiveStreams(boolean duplex) throws Exception { sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, duplex); shutdownInputIfDuplexChannel(); channel.closeFuture().await(); + verifyConnectionObserver(null); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -183,6 +196,7 @@ void gracefulCloseWithActiveStreams(boolean duplex) throws Exception { activeStream.close().sync().await(); shutdownInputIfDuplexChannel(); channel.closeFuture().await(); + verifyConnectionObserver(null); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -196,6 +210,7 @@ void gracefulCloseNoActiveStreamsMissingPingAck(boolean duplex) throws Exception verifySecondGoAway(duplex); shutdownInputIfDuplexChannel(); channel.closeFuture().await(); + verifyConnectionObserver(TimeoutException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -209,17 +224,16 @@ void gracefulCloseActiveStreamsMissingPingAck(boolean duplex) throws Exception { pingAckTimeoutTask.runTask(); verifySecondGoAway(duplex); - assertThat("Channel closed.", channel.isOpen(), is(true)); - activeStream.close().sync().await(); - shutdownInputIfDuplexChannel(); channel.closeFuture().await(); + activeStream.closeFuture().await(); + verifyConnectionObserver(TimeoutException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @ValueSource(booleans = {true, false}) void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception { setUp(duplex, false); - addActiveStream(manager); + Http2StreamChannel activeStream = addActiveStream(manager); Http2PingFrame pingFrame = initiateGracefulCloseVerifyGoAwayAndPing(manager); sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, duplex); @@ -228,6 +242,8 @@ void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception { manager.channelIdle(); verifyWrite(instanceOf(Http2PingFrame.class)); verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex); + activeStream.closeFuture().await(); + verifyConnectionObserver(TimeoutException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -291,6 +307,7 @@ void duplexGracefulCloseNoInputShutdown() throws Exception { inputShutdownTimeoutTask.runTask(); closeLatch.await(); channel.closeFuture().await(); + verifyConnectionObserver(TimeoutException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -300,6 +317,7 @@ void failureToWritePingClosesChannel(boolean duplex) throws Exception { failWrite.set(true); manager.channelIdle(); channel.closeFuture().await(); + verifyConnectionObserver(DeliberateException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -312,6 +330,7 @@ void failureToWriteLastGoAwayAfterPingAckTimeoutClosesChannel(boolean duplex) th failWrite.set(true); ackTimeoutTask.runTask(); channel.closeFuture().await(); + verifyConnectionObserver(DeliberateException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -321,6 +340,7 @@ void failureToWriteFirstGoAwayClosesChannel(boolean duplex) throws Exception { failWrite.set(true); initiateGracefulClose(manager); channel.closeFuture().await(); + verifyConnectionObserver(DeliberateException.class); } @ParameterizedTest(name = "{displayName} [{index}] duplex={0}") @@ -331,6 +351,7 @@ void failureToWriteSecondGoAwayClosesChannel(boolean duplex) throws Exception { failWrite.set(true); manager.pingReceived(new DefaultHttp2PingFrame(pingFrame.content(), true)); channel.closeFuture().await(); + verifyConnectionObserver(DeliberateException.class); } private void verifyNoOtherActionPostClose(final KeepAliveManager manager) { @@ -440,6 +461,7 @@ private Http2StreamChannel addActiveStream(final KeepAliveManager manager) { return channel.newSucceededFuture(); }); manager.trackActiveStream(stream); + channel.closeFuture().addListener(f -> stream.close()); return stream; } @@ -482,6 +504,16 @@ private void shutdownInputIfDuplexChannel() throws InterruptedException { } } + private void verifyConnectionObserver(@Nullable Class exceptionClass) { + if (exceptionClass != null) { + verify(connectionObserver).connectionClosed(any(exceptionClass)); + verify(connectionObserver, never()).connectionClosed(); + } else { + verify(connectionObserver).connectionClosed(); + verify(connectionObserver, never()).connectionClosed(any(Throwable.class)); + } + } + private static final class ScheduledTask { final Runnable task; final Promise promise;