From 74983033be5d0133ccd6191877233d10b4253548 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 29 Apr 2025 17:03:43 -0700 Subject: [PATCH 1/4] transport: pass network channel exceptions to close listeners Previously, exceptions encountered on a netty channel were caught and logged at some level, but not passed to the TcpChannel or Transport.Connection close listeners. This limited observability. This change implements this exception reporting and passing, with TcpChannel.onException and NodeChannels.closeAndFail reporting exceptions and their close listeners receiving them. Some test infrastructure (FakeTcpChannel) and assertions in close listener onFailure methods have been updated. Closes: ES-11644 --- .../transport/netty4/Netty4TcpChannel.java | 17 +++++++- .../transport/netty4/Netty4Transport.java | 27 +++++++++---- .../common/network/CloseableChannel.java | 3 ++ .../org/elasticsearch/tasks/TaskManager.java | 4 +- .../transport/CloseableConnection.java | 6 +++ .../transport/InboundHandler.java | 1 + .../transport/OutboundHandler.java | 3 ++ .../elasticsearch/transport/TcpChannel.java | 7 ++++ .../elasticsearch/transport/TcpTransport.java | 39 +++++++++++++++++-- .../elasticsearch/transport/Transport.java | 6 +-- .../transport/InboundHandlerTests.java | 9 +++++ .../transport/OutboundHandlerTests.java | 23 +++++++++++ .../transport/TcpTransportTests.java | 6 ++- .../transport/FakeTcpChannel.java | 13 ++++++- 14 files changed, 145 insertions(+), 19 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 420fe16dcb689..b76201093daa2 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -34,6 +34,7 @@ public class Netty4TcpChannel implements TcpChannel { private final ListenableFuture closeContext = new ListenableFuture<>(); private final ChannelStats stats = new ChannelStats(); private final boolean rstOnClose; + private volatile Exception channelError = null; Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) { this.channel = channel; @@ -41,8 +42,17 @@ public class Netty4TcpChannel implements TcpChannel { this.profile = profile; this.connectContext = new ListenableFuture<>(); this.rstOnClose = rstOnClose; - addListener(this.channel.closeFuture(), closeContext); addListener(connectFuture, connectContext); + // The netty closeFuture is programmed to never return an exception. + // This listener takes close-causing exceptions reported during the + // channel lifetime, and reports them to the closeListener. + addListener(this.channel.closeFuture(), ActionListener.running(() -> { + if (channelError != null) { + closeContext.onFailure(channelError); + } else { + closeContext.onResponse(null); + } + })); } @Override @@ -95,6 +105,11 @@ public void addConnectListener(ActionListener listener) { connectContext.addListener(listener); } + @Override + public void onException(Exception e) { + channelError = e; + } + @Override public ChannelStats getChannelStats() { return stats; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d8b02a0e9a0df..9180df0be82d0 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -28,6 +28,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -281,6 +282,7 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile rstOnClose, connectFuture ); + addClosedExceptionLogger(nettyChannel); channel.attr(CHANNEL_KEY).set(nettyChannel); return nettyChannel; @@ -312,7 +314,6 @@ protected class ClientChannelInitializer extends ChannelInitializer { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); setupPipeline(ch, false); @@ -320,6 +321,12 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + if (cause instanceof Error) { + channel.onException(new Exception(cause)); + } else { + channel.onException((Exception) cause); + } ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -337,10 +344,10 @@ protected ServerChannelInitializer(String name) { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture()); + addClosedExceptionLogger(nettyTcpChannel); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); setupPipeline(ch, isRemoteClusterServerChannel); serverAcceptedChannel(nettyTcpChannel); @@ -348,6 +355,12 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + if (cause instanceof Error) { + channel.onException(new Exception(cause)); + } else { + channel.onException((Exception) cause); + } ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -383,12 +396,12 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster ); } - private static void addClosedExceptionLogger(Channel channel) { - Netty4Utils.addListener(channel.closeFuture(), channelFuture -> { - if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) { - logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause()); + private static void addClosedExceptionLogger(Netty4TcpChannel channel) { + channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> { + if (logger.isDebugEnabled()) { + logger.debug(format("exception while closing channel: %s", channel), e); } - }); + })); } @ChannelHandler.Sharable diff --git a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java index 0ca3182854f81..a7e24ada01dba 100644 --- a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java +++ b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable { * channel. If the channel is already closed when the listener is added the listener will immediately be * executed by the thread that is attempting to add the listener. * + * When the close completes but an exception prompted the closure, the exception will be passed to the + * listener's onFailure method. + * * @param listener to be executed */ void addCloseListener(ActionListener listener); diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 7e193567dc9fe..d1cf4bd799e4b 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu return curr; }); if (tracker.registered.compareAndSet(false, true)) { - channel.addCloseListener(ActionListener.wrap(r -> { + channel.addCloseListener(ActionListener.running(() -> { final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel); assert removedTracker == tracker; onChannelClosed(tracker); - }, e -> { assert false : new AssertionError("must not be here", e); })); + })); } return tracker; } diff --git a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java index 6148ed6121125..61d7869aec326 100644 --- a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java @@ -48,6 +48,12 @@ public void close() { } } + public void closeAndFail(Exception e) { + if (closed.compareAndSet(false, true)) { + closeContext.onFailure(e); + } + } + @Override public void onRemoved() { if (removed.compareAndSet(false, true)) { diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index ee4e5e31c584a..268139a836c73 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message) () -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel", e ); + channel.onException(e); channel.close(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index e198b8bd19bcc..a33a257f4785f 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -168,6 +168,7 @@ void sendResponse( ), ex ); + channel.onException(ex); channel.close(); } else { sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex); @@ -204,6 +205,7 @@ void sendErrorResponse( } catch (Exception sendException) { sendException.addSuppressed(error); logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException); + channel.onException(sendException); channel.close(); } } @@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) { } }); } catch (RuntimeException ex) { + channel.onException(ex); Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel)); throw ex; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index 6e39d388a40b9..d760ce670a41a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel { */ void addConnectListener(ActionListener listener); + /** + * Report an exception on this channel + * + * @param e the exception + */ + void onException(Exception e); + /** * Returns stats about this channel */ diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 9e2fbe737a81d..70818d37d16ac 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) { @Override public void close() { + handleClose(null); + } + + @Override + public void closeAndFail(Exception e) { + handleClose(e); + } + + private void handleClose(Exception e) { if (isClosing.compareAndSet(false, true)) { try { boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false; CloseableChannel.closeChannels(channels, block); } finally { // Call the super method to trigger listeners - super.close(); + if (e == null) { + super.close(); + } else { + super.closeAndFail(e); + } } } } @@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle } } finally { if (closeChannel) { + channel.onException(e); CloseableChannel.closeChannel(channel); } } @@ -1120,7 +1134,17 @@ public void onResponse(Void v) { nodeChannels.channels.forEach(ch -> { // Mark the channel init time ch.getChannelStats().markAccessed(relativeMillisTime); - ch.addCloseListener(ActionListener.running(nodeChannels::close)); + ch.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + nodeChannels.close(); + } + + @Override + public void onFailure(Exception e) { + nodeChannels.closeAndFail(e); + } + }); }); keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile); nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime)); @@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) { @Override public void onFailure(Exception e) { - assert false : e; // never called + long closeTimeMillis = threadPool.relativeTimeInMillis(); + logger.info( + () -> format( + "closed transport connection [{}] to [{}] with age [{}ms], exception:", + connectionId, + node, + closeTimeMillis - openTimeMillis + ), + e + ); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index a1d35ce3f255a..13b2752c929bb 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp TransportException; /** - * The listener's {@link ActionListener#onResponse(Object)} method will be called when this - * connection is closed. No implementations currently throw an exception during close, so - * {@link ActionListener#onFailure(Exception)} will not be called. + * The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method + * will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called + * when the connection has successfully closed, but an exception has prompted the close. * * @param listener to be called */ diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index a683c2332c451..f899fffe7a5d4 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; @@ -238,6 +239,9 @@ public void testClosesChannelOnErrorInHandshake() throws Exception { final AtomicBoolean isClosed = new AtomicBoolean(); channel.addCloseListener(ActionListener.running(() -> assertTrue(isClosed.compareAndSet(false, true)))); + PlainActionFuture closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); + final TransportVersion remoteVersion = TransportVersionUtils.randomVersionBetween( random(), TransportVersionUtils.getFirstVersion(), @@ -255,6 +259,11 @@ public void testClosesChannelOnErrorInHandshake() throws Exception { requestHeader.headers = Tuple.tuple(Map.of(), Map.of()); handler.inboundMessage(channel, requestMessage); assertTrue(isClosed.get()); + assertTrue(closeListener.isDone()); + try { + closeListener.get(); + assert false : "channel should have an exception reported"; + } catch (Exception e) {} assertNull(channel.getMessageCaptor().get()); mockLog.assertAllExpectationsMatched(); } diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index 8d18841f8e18e..443f25e15dfcd 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -66,6 +66,7 @@ public class OutboundHandlerTests extends ESTestCase { private InboundPipeline pipeline; private OutboundHandler handler; private FakeTcpChannel channel; + private PlainActionFuture closeListener; private DiscoveryNode node; private Compression.Scheme compressionScheme; @@ -73,6 +74,8 @@ public class OutboundHandlerTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address()); + closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); TransportAddress transportAddress = buildNewFakeTransportAddress(); node = DiscoveryNodeUtils.create("", transportAddress); StatsTracker statsTracker = new StatsTracker(); @@ -378,6 +381,7 @@ public void onResponseSent(long requestId, String action, Exception error) { assertThat(rme.getCause().getMessage(), equalTo("simulated cbe")); } assertTrue(channel.isOpen()); + assertFalse(closeListener.isDone()); } public void testFailToSendResponseThenFailToSendError() { @@ -388,6 +392,8 @@ public void sendMessage(BytesReference reference, ActionListener listener) throw new IllegalStateException("pipe broken"); } }; + closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); TransportVersion version = TransportVersionUtils.randomVersion(); String action = randomAlphaOfLength(10); long requestId = randomLongBetween(0, 300); @@ -431,6 +437,11 @@ public void writeTo(StreamOutput out) { assertNull(channel.getMessageCaptor().get()); assertNull(channel.getListenerCaptor().get()); assertFalse(channel.isOpen()); + assertTrue(closeListener.isDone()); + try { + closeListener.get(); + assert false : "channel should have an exception reported"; + } catch (Exception e) {} } public void testFailToSendHandshakeResponse() { @@ -473,6 +484,11 @@ public void onResponseSent(long requestId, String action, Exception error) { assertEquals(action, actionRef.get()); assertTrue(response.released.get()); assertFalse(channel.isOpen()); + assertTrue(closeListener.isDone()); + try { + closeListener.get(); + assert false : "channel should have an exception reported"; + } catch (Exception e) {} } public void testFailToSendErrorResponse() { @@ -483,6 +499,8 @@ public void sendMessage(BytesReference reference, ActionListener listener) throw new IllegalStateException("pipe broken"); } }; + closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); TransportVersion version = TransportVersionUtils.randomVersion(); String action = randomAlphaOfLength(10); long requestId = randomLongBetween(0, 300); @@ -515,6 +533,11 @@ public void onResponseSent(long requestId, String action, Exception error) { assertFalse(channel.isOpen()); assertNull(channel.getMessageCaptor().get()); assertNull(channel.getListenerCaptor().get()); + assertTrue(closeListener.isDone()); + try { + closeListener.get(); + assert false : "channel should have an exception reported"; + } catch (Exception e) {} } /** diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 7099c33dda75f..80982c4f80d59 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; /** Unit tests for {@link TcpTransport} */ @@ -606,7 +605,10 @@ private void testExceptionHandling( if (expectClosed) { assertTrue(listener.isDone()); - assertThat(listener.actionGet(), nullValue()); + try { + listener.get(); + assert false : "channel should have an exception reported"; + } catch (Exception e) {} } else { assertFalse(listener.isDone()); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java index c4c7a4f16da84..098bcc893964c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -29,6 +29,8 @@ public class FakeTcpChannel implements TcpChannel { private final AtomicReference messageCaptor; private final AtomicReference> listenerCaptor; + private volatile Exception channelError = null; + public FakeTcpChannel() { this(false, "profile", new AtomicReference<>()); } @@ -94,10 +96,19 @@ public void addConnectListener(ActionListener listener) { @Override public void close() { if (closed.compareAndSet(false, true)) { - closeContext.onResponse(null); + if (channelError != null) { + closeContext.onFailure(channelError); + } else { + closeContext.onResponse(null); + } } } + @Override + public void onException(Exception e) { + channelError = e; + } + @Override public void addCloseListener(ActionListener listener) { closeContext.addListener(listener); From 3ffdba7c7b53a9218e06bb429d126c579b0d3e12 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Thu, 8 May 2025 15:07:18 -0700 Subject: [PATCH 2/4] Addressed review feedback: - renamed channelError and onException - removed dead channel error logging code - used expectThrows pattern in tests - added closeListener test - added assert to onFailure branch of netty channel future/listener adapter - de-duplicated throwable/exception adapting code - log transport errors at debug --- .../transport/netty4/Netty4TcpChannel.java | 30 ++++++++++------- .../transport/netty4/Netty4Transport.java | 32 ++++++------------- .../transport/InboundHandler.java | 2 +- .../transport/OutboundHandler.java | 6 ++-- .../elasticsearch/transport/TcpChannel.java | 4 +-- .../elasticsearch/transport/TcpTransport.java | 4 +-- .../ClusterConnectionManagerTests.java | 4 +++ .../transport/InboundHandlerTests.java | 5 +-- .../transport/OutboundHandlerTests.java | 15 ++------- .../transport/TcpTransportTests.java | 5 +-- .../transport/FakeTcpChannel.java | 10 +++--- 11 files changed, 50 insertions(+), 67 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index b76201093daa2..e5b5a2609eb9e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -34,7 +34,8 @@ public class Netty4TcpChannel implements TcpChannel { private final ListenableFuture closeContext = new ListenableFuture<>(); private final ChannelStats stats = new ChannelStats(); private final boolean rstOnClose; - private volatile Exception channelError = null; + // Exception causing a close, reported to the closeContext listener + private volatile Exception closeException = null; Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) { this.channel = channel; @@ -43,16 +44,21 @@ public class Netty4TcpChannel implements TcpChannel { this.connectContext = new ListenableFuture<>(); this.rstOnClose = rstOnClose; addListener(connectFuture, connectContext); - // The netty closeFuture is programmed to never return an exception. - // This listener takes close-causing exceptions reported during the - // channel lifetime, and reports them to the closeListener. - addListener(this.channel.closeFuture(), ActionListener.running(() -> { - if (channelError != null) { - closeContext.onFailure(channelError); - } else { - closeContext.onResponse(null); + addListener(this.channel.closeFuture(), new ActionListener<>() { + @Override + public void onResponse(Void ignored) { + if (closeException != null) { + closeContext.onFailure(closeException); + } else { + closeContext.onResponse(null); + } + } + + @Override + public void onFailure(Exception e) { + assert false : new AssertionError("netty channel closeFuture should never report a failure"); } - })); + }); } @Override @@ -106,8 +112,8 @@ public void addConnectListener(ActionListener listener) { } @Override - public void onException(Exception e) { - channelError = e; + public void setCloseException(Exception e) { + closeException = e; } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 9180df0be82d0..b70a9207758a4 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -28,7 +28,6 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -55,7 +54,6 @@ import java.util.Map; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE; import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED; @@ -282,7 +280,6 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile rstOnClose, connectFuture ); - addClosedExceptionLogger(nettyChannel); channel.attr(CHANNEL_KEY).set(nettyChannel); return nettyChannel; @@ -310,6 +307,14 @@ protected void stopInternal() { }, serverBootstraps::clear, () -> clientBootstrap = null); } + private Exception exceptionFromThrowable(Throwable cause) { + if (cause instanceof Error) { + return new Exception(cause); + } else { + return (Exception) cause; + } + } + protected class ClientChannelInitializer extends ChannelInitializer { @Override @@ -322,11 +327,7 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); - if (cause instanceof Error) { - channel.onException(new Exception(cause)); - } else { - channel.onException((Exception) cause); - } + channel.setCloseException(exceptionFromThrowable(cause)); ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -347,7 +348,6 @@ protected void initChannel(Channel ch) throws Exception { assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture()); - addClosedExceptionLogger(nettyTcpChannel); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); setupPipeline(ch, isRemoteClusterServerChannel); serverAcceptedChannel(nettyTcpChannel); @@ -356,11 +356,7 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); - if (cause instanceof Error) { - channel.onException(new Exception(cause)); - } else { - channel.onException((Exception) cause); - } + channel.setCloseException(exceptionFromThrowable(cause)); ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -396,14 +392,6 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster ); } - private static void addClosedExceptionLogger(Netty4TcpChannel channel) { - channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> { - if (logger.isDebugEnabled()) { - logger.debug(format("exception while closing channel: %s", channel), e); - } - })); - } - @ChannelHandler.Sharable private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter { diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 268139a836c73..80a1f8f63e656 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -390,7 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message) () -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel", e ); - channel.onException(e); + channel.setCloseException(e); channel.close(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index a33a257f4785f..cab15fffa3fd0 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -168,7 +168,7 @@ void sendResponse( ), ex ); - channel.onException(ex); + channel.setCloseException(ex); channel.close(); } else { sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex); @@ -205,7 +205,7 @@ void sendErrorResponse( } catch (Exception sendException) { sendException.addSuppressed(error); logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException); - channel.onException(sendException); + channel.setCloseException(sendException); channel.close(); } } @@ -433,7 +433,7 @@ private void maybeLogSlowMessage(boolean success) { } }); } catch (RuntimeException ex) { - channel.onException(ex); + channel.setCloseException(ex); Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel)); throw ex; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index d760ce670a41a..00014e715afa2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -67,11 +67,11 @@ public interface TcpChannel extends CloseableChannel { void addConnectListener(ActionListener listener); /** - * Report an exception on this channel + * Report a close-causing exception on this channel * * @param e the exception */ - void onException(Exception e); + void setCloseException(Exception e); /** * Returns stats about this channel diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 70818d37d16ac..42cf791e5869b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -773,7 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle } } finally { if (closeChannel) { - channel.onException(e); + channel.setCloseException(e); CloseableChannel.closeChannel(channel); } } @@ -1206,7 +1206,7 @@ public void onResponse(Void ignored) { @Override public void onFailure(Exception e) { long closeTimeMillis = threadPool.relativeTimeInMillis(); - logger.info( + logger.debug( () -> format( "closed transport connection [{}] to [{}] with age [{}ms], exception:", connectionId, diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index 966da27bbce50..0a446e999c3c3 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -55,6 +55,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -107,6 +108,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti DiscoveryNode node = DiscoveryNodeUtils.create("", new TransportAddress(InetAddress.getLoopbackAddress(), 0)); Transport.Connection connection = new TestConnect(node); + PlainActionFuture closeListener = new PlainActionFuture<>(); + connection.addCloseListener(closeListener); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -137,6 +140,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti connection.close(); } assertTrue(connection.isClosed()); + assertThat(closeListener.actionGet(), nullValue()); assertEquals(0, connectionManager.size()); assertEquals(1, nodeConnectedCount.get()); assertEquals(1, nodeDisconnectedCount.get()); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index f899fffe7a5d4..c4e85e1d35d72 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -260,10 +260,7 @@ public void testClosesChannelOnErrorInHandshake() throws Exception { handler.inboundMessage(channel, requestMessage); assertTrue(isClosed.get()); assertTrue(closeListener.isDone()); - try { - closeListener.get(); - assert false : "channel should have an exception reported"; - } catch (Exception e) {} + expectThrows(Exception.class, () -> closeListener.get()); assertNull(channel.getMessageCaptor().get()); mockLog.assertAllExpectationsMatched(); } diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index 443f25e15dfcd..e80b88c095f85 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -438,10 +438,7 @@ public void writeTo(StreamOutput out) { assertNull(channel.getListenerCaptor().get()); assertFalse(channel.isOpen()); assertTrue(closeListener.isDone()); - try { - closeListener.get(); - assert false : "channel should have an exception reported"; - } catch (Exception e) {} + expectThrows(Exception.class, () -> closeListener.get()); } public void testFailToSendHandshakeResponse() { @@ -485,10 +482,7 @@ public void onResponseSent(long requestId, String action, Exception error) { assertTrue(response.released.get()); assertFalse(channel.isOpen()); assertTrue(closeListener.isDone()); - try { - closeListener.get(); - assert false : "channel should have an exception reported"; - } catch (Exception e) {} + expectThrows(Exception.class, () -> closeListener.get()); } public void testFailToSendErrorResponse() { @@ -534,10 +528,7 @@ public void onResponseSent(long requestId, String action, Exception error) { assertNull(channel.getMessageCaptor().get()); assertNull(channel.getListenerCaptor().get()); assertTrue(closeListener.isDone()); - try { - closeListener.get(); - assert false : "channel should have an exception reported"; - } catch (Exception e) {} + expectThrows(Exception.class, () -> closeListener.get()); } /** diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 80982c4f80d59..65fa39b87178f 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -605,10 +605,7 @@ private void testExceptionHandling( if (expectClosed) { assertTrue(listener.isDone()); - try { - listener.get(); - assert false : "channel should have an exception reported"; - } catch (Exception e) {} + expectThrows(Exception.class, () -> listener.get()); } else { assertFalse(listener.isDone()); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java index 098bcc893964c..e648d7baecf99 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -29,7 +29,7 @@ public class FakeTcpChannel implements TcpChannel { private final AtomicReference messageCaptor; private final AtomicReference> listenerCaptor; - private volatile Exception channelError = null; + private volatile Exception closeException = null; public FakeTcpChannel() { this(false, "profile", new AtomicReference<>()); @@ -96,8 +96,8 @@ public void addConnectListener(ActionListener listener) { @Override public void close() { if (closed.compareAndSet(false, true)) { - if (channelError != null) { - closeContext.onFailure(channelError); + if (closeException != null) { + closeContext.onFailure(closeException); } else { closeContext.onResponse(null); } @@ -105,8 +105,8 @@ public void close() { } @Override - public void onException(Exception e) { - channelError = e; + public void setCloseException(Exception e) { + closeException = e; } @Override From 86342af15d323344446ba0b7355fcea96103ae1b Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Fri, 9 May 2025 16:18:16 -0700 Subject: [PATCH 3/4] Addressed next round of review feedback: - added test to check for close logging, with and without exception - fixed up close logger formatting - javadoc fixup for channel exception field - further refactored remaining code with throwable -> exception in Netty4Transport --- .../transport/netty4/ESLoggingHandlerIT.java | 51 ++++++++++++++++++- .../transport/netty4/Netty4TcpChannel.java | 4 +- .../transport/netty4/Netty4Transport.java | 8 +-- .../elasticsearch/transport/TcpTransport.java | 2 +- 4 files changed, 56 insertions(+), 9 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java index 0ddf0e5513feb..1bb8f7c0188d4 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java @@ -11,14 +11,21 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportLogger; +import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; @ESIntegTestCase.ClusterScope(numDataNodes = 2, scope = ESIntegTestCase.Scope.TEST) public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { @@ -96,7 +103,7 @@ public void testConnectionLogging() throws IOException { "close connection log", TcpTransport.class.getCanonicalName(), Level.DEBUG, - ".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\].*" + ".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\]$" ) ); @@ -105,4 +112,46 @@ public void testConnectionLogging() throws IOException { mockLog.assertAllExpectationsMatched(); } + + @TestLogging( + value = "org.elasticsearch.transport.TcpTransport:DEBUG", + reason = "to ensure we log exception disconnect events on DEBUG level" + ) + public void testExceptionalDisconnectLogging() throws Exception { + mockLog.addExpectation( + new MockLog.PatternSeenEventExpectation( + "exceptional close connection log", + TcpTransport.class.getCanonicalName(), + Level.DEBUG, + ".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\], exception:.*" + ) + ); + + final String nodeName = internalCluster().startNode(); + + final CountDownLatch latch = new CountDownLatch(1); + String masterNode = internalCluster().getMasterName(); + ConnectionManager connManager = internalCluster().getInstance(TransportService.class, masterNode).getConnectionManager(); + connManager.addListener(new TransportConnectionListener() { + @Override + public void onConnectionClosed(Transport.Connection conn) { + conn.addCloseListener(new ActionListener<>() { + @Override + public void onResponse(Void ignored) {} + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }); + } + }); + + int failAttempts = 0; + do { + internalCluster().restartNode(nodeName); + } while (latch.await(500, TimeUnit.MILLISECONDS) == false && failAttempts++ < 10); + + mockLog.assertAllExpectationsMatched(); + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index e5b5a2609eb9e..9b8fd6ff2d116 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -34,7 +34,9 @@ public class Netty4TcpChannel implements TcpChannel { private final ListenableFuture closeContext = new ListenableFuture<>(); private final ChannelStats stats = new ChannelStats(); private final boolean rstOnClose; - // Exception causing a close, reported to the closeContext listener + /** + * Exception causing a close, reported to the {@link #closeContext} listener + */ private volatile Exception closeException = null; Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index b70a9207758a4..14607f563f1c6 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -307,7 +307,7 @@ protected void stopInternal() { }, serverBootstraps::clear, () -> clientBootstrap = null); } - private Exception exceptionFromThrowable(Throwable cause) { + static Exception exceptionFromThrowable(Throwable cause) { if (cause instanceof Error) { return new Exception(cause); } else { @@ -399,11 +399,7 @@ private static class ServerChannelExceptionHandler extends ChannelInboundHandler public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ExceptionsHelper.maybeDieOnAnotherThread(cause); Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get(); - if (cause instanceof Error) { - onServerException(serverChannel, new Exception(cause)); - } else { - onServerException(serverChannel, (Exception) cause); - } + onServerException(serverChannel, exceptionFromThrowable(cause)); } } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 42cf791e5869b..1c7a29e4919ed 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1208,7 +1208,7 @@ public void onFailure(Exception e) { long closeTimeMillis = threadPool.relativeTimeInMillis(); logger.debug( () -> format( - "closed transport connection [{}] to [{}] with age [{}ms], exception:", + "closed transport connection [%d] to [%s] with age [%dms], exception:", connectionId, node, closeTimeMillis - openTimeMillis From d8208105f114df23c765b2f8a8f1decbe22bd881 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 20 May 2025 16:00:58 -0700 Subject: [PATCH 4/4] Addressed last review feedback: - removed complicated do... while logic around network exceptions on close --- .../transport/netty4/ESLoggingHandlerIT.java | 31 +------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java index 1bb8f7c0188d4..9d054839849de 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java @@ -11,21 +11,14 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.ESNetty4IntegTestCase; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportLogger; -import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; @ESIntegTestCase.ClusterScope(numDataNodes = 2, scope = ESIntegTestCase.Scope.TEST) public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { @@ -128,29 +121,7 @@ public void testExceptionalDisconnectLogging() throws Exception { ); final String nodeName = internalCluster().startNode(); - - final CountDownLatch latch = new CountDownLatch(1); - String masterNode = internalCluster().getMasterName(); - ConnectionManager connManager = internalCluster().getInstance(TransportService.class, masterNode).getConnectionManager(); - connManager.addListener(new TransportConnectionListener() { - @Override - public void onConnectionClosed(Transport.Connection conn) { - conn.addCloseListener(new ActionListener<>() { - @Override - public void onResponse(Void ignored) {} - - @Override - public void onFailure(Exception e) { - latch.countDown(); - } - }); - } - }); - - int failAttempts = 0; - do { - internalCluster().restartNode(nodeName); - } while (latch.await(500, TimeUnit.MILLISECONDS) == false && failAttempts++ < 10); + internalCluster().restartNode(nodeName); mockLog.assertAllExpectationsMatched(); }