Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeepAliveManager: report an exception to ConnectionObserver #2614

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -35,6 +37,7 @@

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand All @@ -43,6 +46,7 @@
import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_ACK_TIMEOUT;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.lang.Boolean.TRUE;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand Down Expand Up @@ -141,17 +145,23 @@ protected void channelIdle(final ChannelHandlerContext ctx, final IdleStateEvent
keepAliveState = scheduler.afterDuration(() -> {
if (keepAliveState != null) {
keepAliveState = State.KEEP_ALIVE_ACK_TIMEDOUT;
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug(
"{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY and " +
"closing the channel with activeStreams={}",
this.channel, NANOSECONDS.toMillis(pingAckTimeoutNanos), activeStreams);
this.channel, timeoutMillis, activeStreams);
final TimeoutException cause = StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for keep-alive PING(ACK)",
KeepAliveManager.class, "keepAlivePingAckTimeout()");
channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR))
.addListener(f -> {
Throwable closeCause = cause;
if (!f.isSuccess()) {
closeCause = addSuppressed(f.cause(), cause);
LOGGER.debug("{} Failed to write the last GO_AWAY after PING(ACK) " +
"timeout, closing the channel", channel, f.cause());
"timeout, closing the channel", channel, closeCause);
}
close0(f.cause());
close0(closeCause);
});
}
}, pingAckTimeoutNanos, NANOSECONDS);
Expand All @@ -178,7 +188,7 @@ void pingReceived(final Http2PingFrame pingFrame) {
LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY, activeStreams={}",
channel, activeStreams);
cancelIfStateIsAFuture(gracefulCloseState);
gracefulCloseWriteSecondGoAway();
gracefulCloseWriteSecondGoAway(null);
} else if (pingAckContent == KEEP_ALIVE_PING_CONTENT) {
LOGGER.trace("{} PING(ACK) received, activeStreams={}", channel, activeStreams);
cancelIfStateIsAFuture(keepAliveState);
Expand Down Expand Up @@ -294,12 +304,18 @@ private void channelHalfShutdown(String side, Predicate<DuplexChannel> otherSide
gracefulCloseState != State.CLOSED) {
// If we have not started the graceful close process, or waiting for ack/read to complete the graceful
// close process just force a close now because we will not read any more data.
LOGGER.debug("{} Observed {} shutdown, graceful close is not started or in progress, must force " +
final String state = gracefulCloseState == null ? "not started" : "in progress";
final IllegalStateException cause = new IllegalStateException("Observed " + side +
" shutdown while graceful closure is " + state);
LOGGER.debug("{} Observed {} shutdown while graceful closure is {}, must force " +
"channel closure with activeStreams={}, gracefulCloseState={}, keepAliveState={}",
channel, side, activeStreams, gracefulCloseState, keepAliveState);
channel.close();
channel, side, state, activeStreams, gracefulCloseState, keepAliveState, cause);
ChannelCloseUtils.close(channel, cause);
}
} else {
LOGGER.debug("{} Observed {} shutdown, closing non-duplex channel with " +
"activeStreams={}, gracefulCloseState={}, keepAliveState={}",
channel, side, activeStreams, gracefulCloseState, keepAliveState);
channel.close();
}
}
Expand Down Expand Up @@ -342,16 +358,19 @@ private void doCloseAsyncGracefully0(final Runnable whenInitiated) {
// If the PING(ACK) times out we may have under estimated the 2RTT time so we
// optimistically keep the connection open and rely upon higher level timeouts to tear
// down the connection.
LOGGER.debug(
"{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second GO_AWAY",
channel, NANOSECONDS.toMillis(pingAckTimeoutNanos));
gracefulCloseWriteSecondGoAway();
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug("{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second " +
"GO_AWAY and closing the channel with activeStreams={}",
channel, timeoutMillis, activeStreams);
gracefulCloseWriteSecondGoAway(StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for graceful close PING(ACK)",
KeepAliveManager.class, "gracefulClosePingAckTimeout()"));
}, pingAckTimeoutNanos, NANOSECONDS);
}
});
}

private void gracefulCloseWriteSecondGoAway() {
private void gracefulCloseWriteSecondGoAway(@Nullable final Throwable cause) {
assert channel.eventLoop().inEventLoop();

if (gracefulCloseState == State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT) {
Expand All @@ -362,10 +381,11 @@ private void gracefulCloseWriteSecondGoAway() {

channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR)).addListener(future -> {
if (!future.isSuccess()) {
LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, future.cause());
close0(future.cause());
} else if (activeStreams == 0) {
close0(null);
final Throwable closeCause = cause == null ? future.cause() : addSuppressed(future.cause(), cause);
LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, closeCause);
close0(closeCause);
} else if (cause != null || activeStreams == 0) {
close0(cause);
}
});
}
Expand All @@ -383,7 +403,7 @@ private void close0(@Nullable Throwable cause) {

if (cause != null) {
// Previous write failed with an exception, close immediately.
channel.close();
ChannelCloseUtils.close(channel, cause);
return;
}
// The way netty H2 stream state machine works, we may trigger stream closures during writes with flushes
Expand Down Expand Up @@ -422,9 +442,12 @@ private void doShutdownOutput() {
if (duplexChannel.isInputShutdown()) {
return;
}
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug("{} Timeout after {}ms waiting for InputShutdown, closing the channel",
channel, NANOSECONDS.toMillis(pingAckTimeoutNanos));
channel.close();
channel, timeoutMillis);
ChannelCloseUtils.close(channel, StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for InputShutdown",
KeepAliveManager.class, "doShutdownOutput()"));
}, pingAckTimeoutNanos, NANOSECONDS);
}
});
Expand All @@ -441,4 +464,22 @@ private void cancelIfStateIsAFuture(@Nullable final Object state) {
}
}
}

private static final class StacklessTimeoutException extends TimeoutException {
private static final long serialVersionUID = -8647261218787418981L;

private StacklessTimeoutException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
// Don't fill in the stacktrace to reduce performance overhead
return this;
}

static StacklessTimeoutException newInstance(final String message, final Class<?> clazz, final String method) {
return ThrowableUtils.unknownStackTrace(new StacklessTimeoutException(message), clazz, method);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.internal.DeliberateException;
import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.netty.internal.ConnectionObserverInitializer;
import io.servicetalk.transport.netty.internal.EmbeddedDuplexChannel;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -45,7 +48,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import static io.netty.util.ReferenceCountUtil.release;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
Expand All @@ -60,6 +65,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -71,6 +77,7 @@ class KeepAliveManagerTest {

private final BlockingQueue<ScheduledTask> scheduledTasks = new LinkedBlockingQueue<>();
private final AtomicBoolean failWrite = new AtomicBoolean();
private final ConnectionObserver connectionObserver = mock(ConnectionObserver.class);
private EmbeddedChannel channel;
private KeepAliveManager manager;

Expand All @@ -84,6 +91,7 @@ private void setUp(boolean duplex, boolean allowPingWithoutActiveStreams) {
FailWriteHandler failWriteHandler = new FailWriteHandler();
channel = duplex ? new EmbeddedDuplexChannel(true, failWriteHandler, managerHandler)
: new EmbeddedChannel(failWriteHandler, managerHandler);
new ConnectionObserverInitializer(connectionObserver, false, false).init(channel);
manager = newManager(allowPingWithoutActiveStreams, channel);
managerHandler.keepAliveManager(manager);
}
Expand Down Expand Up @@ -138,7 +146,7 @@ void keepAlivePingAckReceived(boolean duplex) {
@ValueSource(booleans = {true, false})
void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception {
setUp(duplex, false);
addActiveStream(manager);
Http2StreamChannel activeStream = addActiveStream(manager);
manager.channelIdle();
Http2PingFrame ping = verifyWrite(instanceOf(Http2PingFrame.class));
ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled();
Expand All @@ -147,16 +155,20 @@ void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception {
assertThat("Ping ack timeout task cancelled.", ackTimeoutTask.promise.isCancelled(), is(false));

verifyChannelCloseOnMissingPingAck(ackTimeoutTask, duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
@ValueSource(booleans = {true, false})
void keepAliveMissingPingAck(boolean duplex) throws Exception {
setUp(duplex, false);
addActiveStream(manager);
Http2StreamChannel activeStream = addActiveStream(manager);
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -168,6 +180,7 @@ void gracefulCloseNoActiveStreams(boolean duplex) throws Exception {
sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, duplex);
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
verifyConnectionObserver(null);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -183,6 +196,7 @@ void gracefulCloseWithActiveStreams(boolean duplex) throws Exception {
activeStream.close().sync().await();
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
verifyConnectionObserver(null);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -196,6 +210,7 @@ void gracefulCloseNoActiveStreamsMissingPingAck(boolean duplex) throws Exception
verifySecondGoAway(duplex);
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -209,17 +224,16 @@ void gracefulCloseActiveStreamsMissingPingAck(boolean duplex) throws Exception {
pingAckTimeoutTask.runTask();
verifySecondGoAway(duplex);

assertThat("Channel closed.", channel.isOpen(), is(true));
activeStream.close().sync().await();
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
@ValueSource(booleans = {true, false})
void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception {
setUp(duplex, false);
addActiveStream(manager);
Http2StreamChannel activeStream = addActiveStream(manager);
Http2PingFrame pingFrame = initiateGracefulCloseVerifyGoAwayAndPing(manager);

sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, duplex);
Expand All @@ -228,6 +242,8 @@ void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception {
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand Down Expand Up @@ -291,6 +307,7 @@ void duplexGracefulCloseNoInputShutdown() throws Exception {
inputShutdownTimeoutTask.runTask();
closeLatch.await();
channel.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -300,6 +317,7 @@ void failureToWritePingClosesChannel(boolean duplex) throws Exception {
failWrite.set(true);
manager.channelIdle();
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -312,6 +330,7 @@ void failureToWriteLastGoAwayAfterPingAckTimeoutClosesChannel(boolean duplex) th
failWrite.set(true);
ackTimeoutTask.runTask();
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -321,6 +340,7 @@ void failureToWriteFirstGoAwayClosesChannel(boolean duplex) throws Exception {
failWrite.set(true);
initiateGracefulClose(manager);
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -331,6 +351,7 @@ void failureToWriteSecondGoAwayClosesChannel(boolean duplex) throws Exception {
failWrite.set(true);
manager.pingReceived(new DefaultHttp2PingFrame(pingFrame.content(), true));
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

private void verifyNoOtherActionPostClose(final KeepAliveManager manager) {
Expand Down Expand Up @@ -440,6 +461,7 @@ private Http2StreamChannel addActiveStream(final KeepAliveManager manager) {
return channel.newSucceededFuture();
});
manager.trackActiveStream(stream);
channel.closeFuture().addListener(f -> stream.close());
return stream;
}

Expand Down Expand Up @@ -482,6 +504,16 @@ private void shutdownInputIfDuplexChannel() throws InterruptedException {
}
}

private void verifyConnectionObserver(@Nullable Class<? extends Throwable> exceptionClass) {
if (exceptionClass != null) {
verify(connectionObserver).connectionClosed(any(exceptionClass));
verify(connectionObserver, never()).connectionClosed();
} else {
verify(connectionObserver).connectionClosed();
verify(connectionObserver, never()).connectionClosed(any(Throwable.class));
}
}

private static final class ScheduledTask {
final Runnable task;
final Promise<?> promise;
Expand Down