From feb8f3071393a2ce0dd9f06642c8493bb9b86ad7 Mon Sep 17 00:00:00 2001 From: Armin Date: Sat, 23 Sep 2017 23:59:10 +0200 Subject: [PATCH 1/3] #26701 retry in org.elasticsearch.transport.MockTcpTransport#connectToChannels --- .../transport/MockTcpTransport.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index bbfccb8229ad1..16002f6da2c50 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.net.ConnectException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -181,14 +182,28 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p final MockChannel[] mockChannels = new MockChannel[1]; final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here boolean success = false; - final MockSocket socket = new MockSocket(); + MockSocket socket = new MockSocket(); try { final InetSocketAddress address = node.getAddress().address(); // we just use a single connections configureSocket(socket); final TimeValue connectTimeout = profile.getConnectTimeout(); try { - socket.connect(address, Math.toIntExact(connectTimeout.millis())); + int retries = 0; + while (true) { + ++retries; + try { + socket.connect(address, Math.toIntExact(connectTimeout.millis())); + break; + } catch (final ConnectException ex) { + if (retries > 10) { + throw ex; + } + IOUtils.close(socket); + socket = new MockSocket(); + configureSocket(socket); + } + } } catch (SocketTimeoutException ex) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", ex); } From f79f00857f4f45c87d62bcee2528bb9ecd01ca83 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 25 Sep 2017 10:01:36 +0200 Subject: [PATCH 2/3] #27071 Stop lingering on closed client connections in MockTcpTransport --- .../transport/MockTcpTransport.java | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 16002f6da2c50..19ead898997db 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.transport; -import java.net.ConnectException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -118,12 +117,12 @@ protected InetSocketAddress getLocalAddress(MockChannel mockChannel) { @Override protected MockChannel bind(final String name, InetSocketAddress address) throws IOException { MockServerSocket socket = new MockServerSocket(); - socket.bind(address); socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings); if (tcpReceiveBufferSize.getBytes() > 0) { socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt()); } + socket.bind(address); MockChannel serverMockChannel = new MockChannel(socket, name); CountDownLatch started = new CountDownLatch(1); executor.execute(new AbstractRunnable() { @@ -182,28 +181,14 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p final MockChannel[] mockChannels = new MockChannel[1]; final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here boolean success = false; - MockSocket socket = new MockSocket(); + final MockSocket socket = new MockSocket(); try { final InetSocketAddress address = node.getAddress().address(); // we just use a single connections configureSocket(socket); final TimeValue connectTimeout = profile.getConnectTimeout(); try { - int retries = 0; - while (true) { - ++retries; - try { - socket.connect(address, Math.toIntExact(connectTimeout.millis())); - break; - } catch (final ConnectException ex) { - if (retries > 10) { - throw ex; - } - IOUtils.close(socket); - socket = new MockSocket(); - configureSocket(socket); - } - } + socket.connect(address, Math.toIntExact(connectTimeout.millis())); } catch (SocketTimeoutException ex) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", ex); } @@ -318,6 +303,7 @@ public void accept(Executor executor) throws IOException { MockChannel incomingChannel = null; try { configureSocket(incomingSocket); + incomingSocket.setSoLinger(true, 0); synchronized (this) { if (isOpen.get()) { incomingChannel = new MockChannel(incomingSocket, From 1ff8d6e86aa594093150b5a59965fd234b155a1e Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 25 Sep 2017 20:33:42 +0200 Subject: [PATCH 3/3] #26701 Added option to RST instead of FIN to TcpTransport#closeChannels --- .../org/elasticsearch/transport/TcpTransport.java | 13 +++++++------ .../elasticsearch/transport/TCPTransportTests.java | 2 +- .../transport/netty4/Netty4Transport.java | 7 ++++++- .../elasticsearch/transport/MockTcpTransport.java | 12 +++++++++--- .../elasticsearch/transport/nio/NioTransport.java | 9 +++++++-- 5 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 1af4f101e04a2..e9de9aaa3c819 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -442,7 +442,7 @@ public Channel channel(TransportRequestOptions.Type type) { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false); + closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false, true); } finally { transportService.onConnectionClosed(this); } @@ -640,7 +640,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n protected final void closeChannelWhileHandlingExceptions(final Channel channel) { if (isOpen(channel)) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e) { logger.warn("failed to close channel", e); } @@ -902,7 +902,7 @@ protected final void doStop() { // first stop to accept any incoming connections so nobody can connect to this transport for (Map.Entry> entry : serverChannels.entrySet()) { try { - closeChannels(entry.getValue(), true); + closeChannels(entry.getValue(), true, true); } catch (Exception e) { logger.debug( (Supplier) () -> new ParameterizedMessage( @@ -975,7 +975,7 @@ protected void onException(Channel channel, Exception e) { @Override protected void innerInnerOnResponse(Channel channel) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e1) { logger.debug("failed to close httpOnTransport channel", e1); } @@ -984,7 +984,7 @@ protected void innerInnerOnResponse(Channel channel) { @Override protected void innerOnFailure(Exception e) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e1) { e.addSuppressed(e1); logger.debug("failed to close httpOnTransport channel", e1); @@ -1021,8 +1021,9 @@ protected void innerOnFailure(Exception e) { * * @param channels the channels to close * @param blocking whether the channels should be closed synchronously + * @param closingTransport whether we abort the connection on RST instead of FIN */ - protected abstract void closeChannels(List channels, boolean blocking) throws IOException; + protected abstract void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException; /** * Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index c386b2865afa3..55457cc8ae431 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -191,7 +191,7 @@ protected Object bind(String name, InetSocketAddress address) throws IOException } @Override - protected void closeChannels(List channel, boolean blocking) throws IOException { + protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException { } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d92066e1fcc63..a196976dc125e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -331,7 +331,12 @@ protected void sendMessage(Channel channel, BytesReference reference, ActionList } @Override - protected void closeChannels(final List channels, boolean blocking) throws IOException { + protected void closeChannels(final List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (Channel channel : channels) { + channel.config().setOption(ChannelOption.SO_LINGER, 0); + } + } if (blocking) { Netty4Utils.closeChannels(channels); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 19ead898997db..085469059bf61 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -242,8 +242,15 @@ protected void sendMessage(MockChannel mockChannel, BytesReference reference, Ac } @Override - protected void closeChannels(List channel, boolean blocking) throws IOException { - IOUtils.close(channel); + protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (MockChannel channel : channels) { + if (channel.activeChannel != null) { + channel.activeChannel.setSoLinger(true, 0); + } + } + } + IOUtils.close(channels); } @Override @@ -303,7 +310,6 @@ public void accept(Executor executor) throws IOException { MockChannel incomingChannel = null; try { configureSocket(incomingSocket); - incomingSocket.setSoLinger(true, 0); synchronized (this) { if (isOpen.get()) { incomingChannel = new MockChannel(incomingSocket, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 5a9dba29f5e57..b22feb5697695 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio; +import java.net.StandardSocketOptions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -28,7 +29,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -99,7 +99,12 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th } @Override - protected void closeChannels(List channels, boolean blocking) throws IOException { + protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (NioChannel channel : channels) { + channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0); + } + } ArrayList futures = new ArrayList<>(channels.size()); for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) {