Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions docs/reference/modules/transport.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,26 @@ example above:
* `tcp.keep_alive`: Configures the `SO_KEEPALIVE` option for this socket
* `tcp.keep_idle`: Configures the `TCP_KEEPIDLE` option for this socket, which
determines the time in seconds that a connection must be idle before
starting to send TCP keepalive probes.
Only applicable on Linux and Mac, and requires JDK 11 or newer.
Defaults to -1, which does not set this option at the socket level, but
uses default system configuration instead.
starting to send TCP keepalive probes. A value of `-1` means not to set
this option at the socket level but to use the system default instead.
Only applicable on Linux and Mac, and requires Java 11 or newer.
Defaults to -1. On applicable configurations, this value is set to
300 seconds (5 minutes) if the system default or user-configured setting
is higher than that. Values above 300 seconds are rejected for this setting.
* `tcp.keep_interval`: Configures the `TCP_KEEPINTVL` option for this socket,
which determines the time in seconds between sending TCP keepalive probes.
Only applicable on Linux and Mac, and requires JDK 11 or newer.
Defaults to -1, which does not set this option at the socket level, but
uses default system configuration instead.
A value of `-1` means not to set this option at the socket level but to
use the system default instead.
Only applicable on Linux and Mac, and requires Java 11 or newer.
Defaults to -1. On applicable configurations, this value is set to
300 seconds (5 minutes) if the system default or user-configured setting
is higher than that. Values above 300 seconds are rejected for this setting.
* `tcp.keep_count`: Configures the `TCP_KEEPCNT` option for this socket, which
determines the number of unacknowledged TCP keepalive probes that may be
sent on a connection before it is dropped.
Only applicable on Linux and Mac, and requires JDK 11 or newer.
Defaults to -1, which does not set this option at the socket level, but
uses default system configuration instead.
sent on a connection before it is dropped. A value of `-1` means not to set
this option at the socket level but to use the system default instead.
Only applicable on Linux and Mac, and requires Java 11 or newer.
Defaults to -1.
* `tcp.reuse_address`: Configures the `SO_REUSEADDR` option for this socket
* `tcp.send_buffer_size`: Configures the send buffer size of the socket
* `tcp.receive_buffer_size`: Configures the receive buffer size of the socket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package org.elasticsearch.core.internal.net;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NetworkChannel;
import java.util.Arrays;

/**
* Utilities for network-related methods.
Expand Down Expand Up @@ -59,4 +64,45 @@ private static <T> SocketOption<T> getExtendedSocketOptionOrNull(String fieldNam
return null;
}
}

/**
* If SO_KEEPALIVE is enabled (default), this method ensures sane default values for the extended socket options
* TCP_KEEPIDLE and TCP_KEEPINTERVAL. The default value for TCP_KEEPIDLE is system dependent, but is typically 2 hours.
* Such a high value can result in firewalls eagerly closing these connections. To tell any intermediate devices that
* the connection remains alive, we explicitly set these options to 5 minutes if the defaults are higher than that.
*/
public static void setSaneDefaultKeepAliveOptions(NetworkChannel socketChannel) {
assert socketChannel != null;
try {
if (socketChannel.supportedOptions().contains(StandardSocketOptions.SO_KEEPALIVE)) {
final Boolean keepalive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
if (keepalive != null && keepalive.booleanValue()) {
for (SocketOption<Integer> option : Arrays.asList(
NetUtils.getTcpKeepIdleSocketOptionOrNull(),
NetUtils.getTcpKeepIntervalSocketOptionOrNull())) {
setMinValueForSocketOption(socketChannel, option, 300);
}
}
}
} catch (Exception e) {
assert e instanceof IOException || e instanceof ClosedChannelException :
"unexpected exception when setting channel option: " + e.getClass() + ": " + e.getMessage();
}
}

private static void setMinValueForSocketOption(NetworkChannel socketChannel, SocketOption<Integer> option, int minValue) {
if (option != null && socketChannel.supportedOptions().contains(option)) {
try {
final Integer currentIdleVal = socketChannel.getOption(option);
if (currentIdleVal != null && currentIdleVal.intValue() > minValue) {
socketChannel.setOption(option, minValue);
}
} catch (Exception e) {
// Getting an exception here should be ok when concurrently closing the channel
// An UnsupportedOperationException or IllegalArgumentException, however, should not happen
assert e instanceof IOException || e instanceof ClosedChannelException :
"unexpected exception when setting channel option: " + e.getClass() + ": " + e.getMessage();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ private void configureSocket(Socket socket, boolean isConnectComplete) throws IO
}
}
}
NetUtils.setSaneDefaultKeepAliveOptions(socket.getChannel());
socket.setTcpNoDelay(socketConfig.tcpNoDelay());
int tcpSendBufferSize = socketConfig.tcpSendBufferSize();
if (tcpSendBufferSize > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public void setUpHandler() throws IOException {
SocketChannel rawChannel = mock(SocketChannel.class);
when(rawChannel.finishConnect()).thenReturn(true);
NioSocketChannel channel = new NioSocketChannel(rawChannel);
when(rawChannel.socket()).thenReturn(mock(Socket.class));
Socket socket = mock(Socket.class);
when(rawChannel.socket()).thenReturn(socket);
when(socket.getChannel()).thenReturn(rawChannel);
context = new DoNotRegisterSocketContext(channel, selector, channelExceptionHandler, readWriteHandler);
channel.setContext(context);
handler.handleRegistration(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void setup() throws Exception {
});
rawSocket = mock(Socket.class);
when(rawChannel.socket()).thenReturn(rawSocket);
when(rawSocket.getChannel()).thenReturn(rawChannel);
}

public void testIOExceptionSetIfEncountered() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
* local buffer with a defined size.
*/
@SuppressForbidden(reason = "Channel#write")
public class CopyBytesSocketChannel extends NioSocketChannel {
public class CopyBytesSocketChannel extends Netty4NioSocketChannel {

private static final int MAX_BYTES_PER_WRITE = StrictMath.toIntExact(ByteSizeValue.parseBytesSizeValue(
System.getProperty("es.transport.buffer.size", "1m"), "es.transport.buffer.size").getBytes());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.channels.SocketChannel;

/**
* Helper class to expose {@link #javaChannel()} method
*/
public class Netty4NioSocketChannel extends NioSocketChannel {

public Netty4NioSocketChannel() {
super();
}

public Netty4NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
}

@Override
public SocketChannel javaChannel() {
return super.javaChannel();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.monitor.jvm.JvmInfo;

Expand Down Expand Up @@ -68,7 +67,7 @@ public static Class<? extends Channel> getChannelType() {
if (ALLOCATOR instanceof NoDirectBuffers) {
return CopyBytesSocketChannel.class;
} else {
return NioSocketChannel.class;
return Netty4NioSocketChannel.class;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.net.NetUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.Netty4NioSocketChannel;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;

Expand Down Expand Up @@ -143,31 +143,30 @@ private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGro
bootstrap.group(sharedGroup.getLowLevelGroup());

// NettyAllocator will return the channel type designed to work with the configured allocator
assert Netty4NioSocketChannel.class.isAssignableFrom(NettyAllocator.getChannelType());
bootstrap.channel(NettyAllocator.getChannelType());
bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());

bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) {
// Netty logs a warning if it can't set the option, so try this only on supported platforms
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
}
// Note that Netty logs a warning if it can't set the option
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
}
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
}
}
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
}
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
}
}
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
}
}
}
Expand Down Expand Up @@ -215,26 +214,24 @@ private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupF
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
if (profileSettings.tcpKeepAlive) {
// Netty logs a warning if it can't set the option, so try this only on supported platforms
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
if (profileSettings.tcpKeepIdle >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
}
// Note that Netty logs a warning if it can't set the option
if (profileSettings.tcpKeepIdle >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
}
if (profileSettings.tcpKeepInterval >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
}

}
if (profileSettings.tcpKeepInterval >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
}
if (profileSettings.tcpKeepCount >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
}

}
if (profileSettings.tcpKeepCount >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
}
}
}
Expand Down Expand Up @@ -281,7 +278,6 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOExceptio
ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
throw new IOException(connectFuture.cause());
}
addClosedExceptionLogger(channel);

Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
channel.attr(CHANNEL_KEY).set(nettyChannel);
Expand Down Expand Up @@ -311,6 +307,11 @@ protected class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
if (ch instanceof Netty4NioSocketChannel) {
NetUtils.setSaneDefaultKeepAliveOptions(((Netty4NioSocketChannel) ch).javaChannel());
}
ch.pipeline().addLast("logging", new ESLoggingHandler());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
Expand All @@ -334,6 +335,10 @@ protected ServerChannelInitializer(String name) {
@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
if (ch instanceof Netty4NioSocketChannel) {
NetUtils.setSaneDefaultKeepAliveOptions(((Netty4NioSocketChannel) ch).javaChannel());
}
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
Expand Down
Loading