diff --git a/presto-main/pom.xml b/presto-main/pom.xml
index 62a3290c3c61..944ec4e6faf8 100644
--- a/presto-main/pom.xml
+++ b/presto-main/pom.xml
@@ -308,23 +308,6 @@
reactor-netty-http
-
- io.micrometer
- micrometer-core
- 1.11.0
-
-
- org.hdrhistogram
- HdrHistogram
-
-
-
-
- io.micrometer
- micrometer-registry-jmx
- 1.11.0
-
-
io.projectreactor
reactor-core
diff --git a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java
index 1813c5eb0699..075934ece22c 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java
@@ -84,6 +84,8 @@
import com.facebook.presto.server.protocol.QueryBlockingRateLimiter;
import com.facebook.presto.server.protocol.QueuedStatementResource;
import com.facebook.presto.server.protocol.RetryCircuitBreaker;
+import com.facebook.presto.server.remotetask.HttpClientConnectionPoolStats;
+import com.facebook.presto.server.remotetask.HttpClientStats;
import com.facebook.presto.server.remotetask.HttpRemoteTaskFactory;
import com.facebook.presto.server.remotetask.ReactorNettyHttpClient;
import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig;
@@ -277,6 +279,10 @@ protected void setup(Binder binder)
ReactorNettyHttpClientConfig reactorNettyHttpClientConfig = buildConfigObject(ReactorNettyHttpClientConfig.class);
if (reactorNettyHttpClientConfig.isReactorNettyHttpClientEnabled()) {
binder.bind(ReactorNettyHttpClient.class).in(Scopes.SINGLETON);
+ binder.bind(HttpClientStats.class).in(Scopes.SINGLETON);
+ newExporter(binder).export(HttpClientStats.class).withGeneratedName();
+ binder.bind(HttpClientConnectionPoolStats.class).in(Scopes.SINGLETON);
+ newExporter(binder).export(HttpClientConnectionPoolStats.class).withGeneratedName();
binder.bind(HttpClient.class).annotatedWith(ForScheduler.class).to(ReactorNettyHttpClient.class);
}
else {
diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpClientConnectionPoolStats.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpClientConnectionPoolStats.java
new file mode 100644
index 000000000000..91513980c3ba
--- /dev/null
+++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpClientConnectionPoolStats.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.remotetask;
+
+import com.facebook.airlift.stats.DistributionStat;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.weakref.jmx.Managed;
+import org.weakref.jmx.Nested;
+import reactor.netty.resources.ConnectionPoolMetrics;
+import reactor.netty.resources.ConnectionProvider;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Singleton
+public class HttpClientConnectionPoolStats
+ implements ConnectionProvider.MeterRegistrar
+{
+ private final ConcurrentHashMap poolMetrics = new ConcurrentHashMap<>();
+
+ private final DistributionStat activeConnections = new DistributionStat();
+ private final DistributionStat totalConnections = new DistributionStat();
+ private final DistributionStat idleConnections = new DistributionStat();
+ private final DistributionStat pendingAcquires = new DistributionStat();
+ private final DistributionStat maxConnections = new DistributionStat();
+ private final DistributionStat maxPendingAcquires = new DistributionStat();
+
+ @Inject
+ public HttpClientConnectionPoolStats()
+ {
+ scheduleStatsExport();
+ }
+
+ @Override
+ public void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics)
+ {
+ poolMetrics.put(createPoolKey(poolName, remoteAddress), metrics);
+ }
+
+ private static String createPoolKey(String poolName, SocketAddress remoteAddress)
+ {
+ return poolName + ":" + formatSocketAddress(remoteAddress);
+ }
+
+ private static String formatSocketAddress(SocketAddress socketAddress)
+ {
+ if (socketAddress != null) {
+ if (socketAddress instanceof InetSocketAddress) {
+ InetSocketAddress address = (InetSocketAddress) socketAddress;
+ return address.getHostString().replace(".", "_");
+ }
+ else {
+ return socketAddress.toString().replace(".", "_");
+ }
+ }
+ return "UNKNOWN";
+ }
+
+ private void scheduleStatsExport()
+ {
+ Executors.newSingleThreadScheduledExecutor()
+ .scheduleAtFixedRate(
+ () -> {
+ for (ConnectionPoolMetrics metrics : poolMetrics.values()) {
+ activeConnections.add(metrics.acquiredSize());
+ totalConnections.add(metrics.allocatedSize());
+ idleConnections.add(metrics.idleSize());
+ pendingAcquires.add(metrics.pendingAcquireSize());
+ maxConnections.add(metrics.maxAllocatedSize());
+ maxPendingAcquires.add(metrics.maxPendingAcquireSize());
+ }
+ },
+ 0,
+ 1,
+ TimeUnit.SECONDS);
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getActiveConnections()
+ {
+ return activeConnections;
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getTotalConnections()
+ {
+ return totalConnections;
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getIdleConnections()
+ {
+ return idleConnections;
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getPendingAcquires()
+ {
+ return pendingAcquires;
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getMaxConnections()
+ {
+ return maxConnections;
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getMaxPendingAcquires()
+ {
+ return maxPendingAcquires;
+ }
+}
diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpClientStats.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpClientStats.java
new file mode 100644
index 000000000000..6d5af54e8063
--- /dev/null
+++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpClientStats.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.remotetask;
+
+import com.facebook.airlift.stats.CounterStat;
+import com.facebook.airlift.stats.DistributionStat;
+import com.facebook.airlift.stats.TimeStat;
+import com.google.inject.Singleton;
+import org.weakref.jmx.Managed;
+import org.weakref.jmx.Nested;
+import reactor.netty.http.client.ContextAwareHttpClientMetricsRecorder;
+import reactor.util.context.ContextView;
+
+import java.net.SocketAddress;
+import java.time.Duration;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+@Singleton
+public class HttpClientStats
+ extends ContextAwareHttpClientMetricsRecorder
+{
+ // HTTP level metrics
+ private final TimeStat responseTime = new TimeStat();
+ private final TimeStat dataReceivedTime = new TimeStat();
+ private final TimeStat dataSentTime = new TimeStat();
+ private final CounterStat errorsCount = new CounterStat();
+ private final CounterStat bytesReceived = new CounterStat();
+ private final CounterStat bytesSent = new CounterStat();
+ private final DistributionStat payloadSize = new DistributionStat();
+
+ // Channel level metrics
+ private final TimeStat connectTime = new TimeStat();
+ private final TimeStat tlsHandshakeTime = new TimeStat();
+ private final TimeStat resolveAddressTime = new TimeStat();
+ private final CounterStat channelErrorsCount = new CounterStat();
+ private final CounterStat channelBytesReceived = new CounterStat();
+ private final CounterStat channelBytesSent = new CounterStat();
+ private final CounterStat connectionsOpened = new CounterStat();
+ private final CounterStat connectionsClosed = new CounterStat();
+
+ // HTTP level metrics recording
+
+ /**
+ * Records the time that is spent in consuming incoming data
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux
+ * @param remoteAddress The remote peer
+ * @param uri The requested URI
+ * @param method The HTTP method
+ * @param status The HTTP status
+ * @param time The time in nanoseconds that is spent in consuming incoming data
+ */
+ @Override
+ public void recordDataReceivedTime(
+ ContextView contextView,
+ SocketAddress remoteAddress,
+ String uri,
+ String method,
+ String status,
+ Duration time)
+ {
+ dataReceivedTime.add(time.toMillis(), MILLISECONDS);
+ }
+
+ /**
+ * Records the time that is spent in sending outgoing data
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux
+ * @param remoteAddress The remote peer
+ * @param uri The requested URI
+ * @param method The HTTP method
+ * @param time The time in nanoseconds that is spent in sending outgoing data
+ */
+ @Override
+ public void recordDataSentTime(
+ ContextView contextView,
+ SocketAddress remoteAddress,
+ String uri,
+ String method,
+ Duration time)
+ {
+ dataSentTime.add(time.toMillis(), MILLISECONDS);
+ }
+
+ /**
+ * Records the total time for the request/response
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux
+ * @param remoteAddress The remote peer
+ * @param uri The requested URI
+ * @param method The HTTP method
+ * @param status The HTTP status
+ * @param time The total time in nanoseconds for the request/response
+ */
+ @Override
+ public void recordResponseTime(
+ ContextView contextView,
+ SocketAddress remoteAddress,
+ String uri,
+ String method,
+ String status,
+ Duration time)
+ {
+ responseTime.add(time.toMillis(), MILLISECONDS);
+ }
+
+ /**
+ * Increments the number of the errors that are occurred
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux
+ * @param remoteAddress The remote peer
+ * @param uri The requested URI
+ */
+ @Override
+ public void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress, String uri)
+ {
+ errorsCount.update(1);
+ }
+
+ /**
+ * Records the amount of the data that is received, in bytes
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux
+ * @param remoteAddress The remote peer
+ * @param uri The requested URI
+ * @param bytes The amount of the data that is received, in bytes
+ */
+ @Override
+ public void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, String uri, long bytes)
+ {
+ bytesReceived.update(bytes);
+ }
+
+ /**
+ * Records the amount of the data that is sent, in bytes
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux
+ * @param remoteAddress The remote peer
+ * @param uri The requested URI
+ * @param bytes The amount of the data that is sent, in bytes
+ */
+ @Override
+ public void recordDataSent(ContextView contextView, SocketAddress remoteAddress, String uri, long bytes)
+ {
+ bytesSent.update(bytes);
+ payloadSize.add(bytes);
+ }
+
+ // Channel level metrics recording
+
+ /**
+ * Increments the number of the errors that are occurred
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline
+ * @param remoteAddress The remote peer
+ */
+ @Override
+ public void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress)
+ {
+ channelErrorsCount.update(1);
+ }
+
+ /**
+ * Records the time that is spent for connecting to the remote address Relevant only when on the
+ * client
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline
+ * @param remoteAddress The remote peer
+ * @param time The time in nanoseconds that is spent for connecting to the remote address
+ * @param status The status of the operation
+ */
+ @Override
+ public void recordConnectTime(
+ ContextView contextView,
+ SocketAddress remoteAddress,
+ Duration time,
+ String status)
+ {
+ connectTime.add(time.toMillis(), MILLISECONDS);
+ }
+
+ /**
+ * Records the amount of the data that is received, in bytes
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline
+ * @param remoteAddress The remote peer
+ * @param bytes The amount of the data that is received, in bytes
+ */
+ @Override
+ public void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, long bytes)
+ {
+ channelBytesReceived.update(bytes);
+ }
+
+ /**
+ * Records the amount of the data that is sent, in bytes
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline
+ * @param remoteAddress The remote peer
+ * @param bytes The amount of the data that is sent, in bytes
+ */
+ @Override
+ public void recordDataSent(ContextView contextView, SocketAddress remoteAddress, long bytes)
+ {
+ channelBytesSent.update(bytes);
+ }
+
+ /**
+ * Records the time that is spent for TLS handshake
+ *
+ * @param contextView The current {@link ContextView} associated with the Mono/Flux pipeline
+ * @param remoteAddress The remote peer
+ * @param time The time in nanoseconds that is spent for TLS handshake
+ * @param status The status of the operation
+ */
+ @Override
+ public void recordTlsHandshakeTime(
+ ContextView contextView,
+ SocketAddress remoteAddress,
+ Duration time,
+ String status)
+ {
+ tlsHandshakeTime.add(time.toMillis(), MILLISECONDS);
+ }
+
+ /**
+ * Records the time that is spent for resolving the remote address Relevant only when on the
+ * client
+ *
+ * @param remoteAddress The remote peer
+ * @param time the time in nanoseconds that is spent for resolving to the remote address
+ * @param status the status of the operation
+ */
+ @Override
+ public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, String status)
+ {
+ resolveAddressTime.add(time.toMillis(), MILLISECONDS);
+ }
+
+ /**
+ * Records a just accepted server connection
+ *
+ * @param localAddress the server local address
+ * @since 1.0.15
+ */
+ @Override
+ public void recordServerConnectionOpened(SocketAddress localAddress)
+ {
+ connectionsOpened.update(1);
+ }
+
+ /**
+ * Records a just disconnected server connection
+ *
+ * @param localAddress the server local address
+ * @since 1.0.15
+ */
+ @Override
+ public void recordServerConnectionClosed(SocketAddress localAddress)
+ {
+ connectionsClosed.update(1);
+ }
+
+ // JMX exposed metrics
+
+ @Managed
+ @Nested
+ public TimeStat getResponseTime()
+ {
+ return responseTime;
+ }
+
+ @Managed
+ @Nested
+ public TimeStat getDataReceivedTime()
+ {
+ return dataReceivedTime;
+ }
+
+ @Managed
+ @Nested
+ public TimeStat getDataSentTime()
+ {
+ return dataSentTime;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getErrorsCount()
+ {
+ return errorsCount;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getBytesReceived()
+ {
+ return bytesReceived;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getBytesSent()
+ {
+ return bytesSent;
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getPayloadSize()
+ {
+ return payloadSize;
+ }
+
+ @Managed
+ @Nested
+ public TimeStat getConnectTime()
+ {
+ return connectTime;
+ }
+
+ @Managed
+ @Nested
+ public TimeStat getTlsHandshakeTime()
+ {
+ return tlsHandshakeTime;
+ }
+
+ @Managed
+ @Nested
+ public TimeStat getResolveAddressTime()
+ {
+ return resolveAddressTime;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getChannelErrorsCount()
+ {
+ return channelErrorsCount;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getChannelBytesReceived()
+ {
+ return channelBytesReceived;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getChannelBytesSent()
+ {
+ return channelBytesSent;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getConnectionsOpened()
+ {
+ return connectionsOpened;
+ }
+
+ @Managed
+ @Nested
+ public CounterStat getConnectionsClosed()
+ {
+ return connectionsClosed;
+ }
+}
diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java
index bd071f81ffd4..a018bebaaa8b 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java
@@ -26,8 +26,6 @@
import com.google.common.collect.ListMultimap;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
-import io.micrometer.core.instrument.Metrics;
-import io.micrometer.jmx.JmxMeterRegistry;
import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.Epoll;
import io.netty.handler.codec.http.HttpHeaders;
@@ -39,7 +37,6 @@
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
-import reactor.netty.channel.MicrometerChannelMetricsRecorder;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.Http2AllocationStrategy;
import reactor.netty.http.client.HttpClient;
@@ -64,11 +61,10 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import static com.facebook.airlift.security.pem.PemReader.loadPrivateKey;
import static com.facebook.airlift.security.pem.PemReader.readCertificateChain;
-import static io.micrometer.core.instrument.Clock.SYSTEM;
-import static io.micrometer.jmx.JmxConfig.DEFAULT;
import static io.netty.handler.ssl.ApplicationProtocolConfig.Protocol.ALPN;
import static io.netty.handler.ssl.ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT;
import static io.netty.handler.ssl.ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE;
@@ -91,10 +87,14 @@ public class ReactorNettyHttpClient
private final Duration requestTimeout;
private HttpClient httpClient;
+ private final HttpClientConnectionPoolStats connectionPoolStats;
+ private final HttpClientStats httpClientStats;
@Inject
- public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config)
+ public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config, HttpClientConnectionPoolStats connectionPoolStats, HttpClientStats httpClientStats)
{
+ this.connectionPoolStats = connectionPoolStats;
+ this.httpClientStats = httpClientStats;
SslContext sslContext = null;
if (config.isHttpsEnabled()) {
try {
@@ -151,6 +151,11 @@ public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config)
*/
ConnectionProvider pool = ConnectionProvider.builder("shared-pool")
.maxConnections(config.getMaxConnections())
+ .fifo()
+ .maxIdleTime(java.time.Duration.of(config.getMaxIdleTime().toMillis(), MILLIS))
+ .evictInBackground(java.time.Duration.of(config.getEvictBackgroundTime().toMillis(), MILLIS))
+ .pendingAcquireTimeout(java.time.Duration.of(config.getPendingAcquireTimeout().toMillis(), MILLIS))
+ .metrics(true, () -> connectionPoolStats)
.allocationStrategy((Http2AllocationStrategy.builder()
.maxConnections(config.getMaxConnections())
.maxConcurrentStreams(config.getMaxStreamPerChannel())
@@ -159,10 +164,6 @@ public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config)
LoopResources loopResources = LoopResources.create("event-loop", config.getSelectorThreadCount(), config.getEventLoopThreadCount(), true, false);
- // Add the JMX MeterRegistry to the global Metrics registry
- JmxMeterRegistry jmxMeterRegistry = new JmxMeterRegistry(DEFAULT, SYSTEM);
- Metrics.addRegistry(jmxMeterRegistry);
-
// Create HTTP/2 client
SslContext finalSslContext = sslContext;
this.httpClient = HttpClient
@@ -170,10 +171,16 @@ public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config)
.create(pool)
.protocol(HttpProtocol.H2, HttpProtocol.HTTP11)
.runOn(loopResources, true)
- .http2Settings(settings -> settings.maxConcurrentStreams(config.getMaxStreamPerChannel()))
+ .http2Settings(settings -> {
+ settings.maxConcurrentStreams(config.getMaxStreamPerChannel());
+ settings.initialWindowSize((int) (config.getMaxInitialWindowSize().toBytes()));
+ settings.maxFrameSize((int) (config.getMaxFrameSize().toBytes()));
+ })
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) config.getConnectTimeout().getValue())
- // Track the metrics for all the tcp connections
- .metrics(true, () -> new MicrometerChannelMetricsRecorder("reactor.netty.http.client", "tcp", false));
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.TCP_NODELAY, true)
+ // Track HTTP client metrics
+ .metrics(true, () -> httpClientStats, Function.identity());
if (config.isHttpsEnabled()) {
if (finalSslContext == null) {
diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java
index ab068ca31988..63ccc174b364 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java
@@ -15,11 +15,13 @@
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
+import com.facebook.airlift.units.DataSize;
import com.facebook.airlift.units.Duration;
import jakarta.validation.constraints.Min;
import java.util.Optional;
+import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.SECONDS;
public class ReactorNettyHttpClientConfig
@@ -33,6 +35,11 @@ public class ReactorNettyHttpClientConfig
private int eventLoopThreadCount = Runtime.getRuntime().availableProcessors();
private Duration connectTimeout = new Duration(10, SECONDS);
private Duration requestTimeout = new Duration(10, SECONDS);
+ private Duration maxIdleTime = new Duration(45, SECONDS);
+ private Duration evictBackgroundTime = new Duration(15, SECONDS);
+ private Duration pendingAcquireTimeout = new Duration(2, SECONDS);
+ private DataSize maxInitialWindowSize = new DataSize(25, MEGABYTE);
+ private DataSize maxFrameSize = new DataSize(8, MEGABYTE);
private String keyStorePath;
private String keyStorePassword;
private String trustStorePath;
@@ -154,6 +161,66 @@ public ReactorNettyHttpClientConfig setRequestTimeout(Duration requestTimeout)
return this;
}
+ public Duration getMaxIdleTime()
+ {
+ return maxIdleTime;
+ }
+
+ @Config("reactor.max-idle-time")
+ public ReactorNettyHttpClientConfig setMaxIdleTime(Duration maxIdleTime)
+ {
+ this.maxIdleTime = maxIdleTime;
+ return this;
+ }
+
+ public Duration getEvictBackgroundTime()
+ {
+ return evictBackgroundTime;
+ }
+
+ @Config("reactor.evict-background-time")
+ public ReactorNettyHttpClientConfig setEvictBackgroundTime(Duration evictBackgroundTime)
+ {
+ this.evictBackgroundTime = evictBackgroundTime;
+ return this;
+ }
+
+ public Duration getPendingAcquireTimeout()
+ {
+ return pendingAcquireTimeout;
+ }
+
+ @Config("reactor.pending-acquire-timeout")
+ public ReactorNettyHttpClientConfig setPendingAcquireTimeout(Duration pendingAcquireTimeout)
+ {
+ this.pendingAcquireTimeout = pendingAcquireTimeout;
+ return this;
+ }
+
+ public DataSize getMaxInitialWindowSize()
+ {
+ return maxInitialWindowSize;
+ }
+
+ @Config("reactor.max-initial-window-size")
+ public ReactorNettyHttpClientConfig setMaxInitialWindowSize(DataSize maxInitialWindowSize)
+ {
+ this.maxInitialWindowSize = maxInitialWindowSize;
+ return this;
+ }
+
+ public DataSize getMaxFrameSize()
+ {
+ return maxFrameSize;
+ }
+
+ @Config("reactor.max-frame-size")
+ public ReactorNettyHttpClientConfig setMaxFrameSize(DataSize maxFrameSize)
+ {
+ this.maxFrameSize = maxFrameSize;
+ return this;
+ }
+
public String getKeyStorePath()
{
return keyStorePath;
diff --git a/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClient.java b/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClient.java
index a49b69f6f864..061615cf37fc 100644
--- a/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClient.java
+++ b/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClient.java
@@ -20,6 +20,8 @@
import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.airlift.units.Duration;
import com.facebook.presto.execution.TaskStatus;
+import com.facebook.presto.server.remotetask.HttpClientConnectionPoolStats;
+import com.facebook.presto.server.remotetask.HttpClientStats;
import com.facebook.presto.server.remotetask.ReactorNettyHttpClient;
import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig;
import com.facebook.presto.server.smile.AdaptingJsonResponseHandler;
@@ -88,7 +90,7 @@ public static void setUp()
ReactorNettyHttpClientConfig reactorNettyHttpClientConfig = new ReactorNettyHttpClientConfig()
.setRequestTimeout(new Duration(30, TimeUnit.SECONDS))
.setConnectTimeout(new Duration(30, TimeUnit.SECONDS));
- reactorNettyHttpClient = new ReactorNettyHttpClient(reactorNettyHttpClientConfig);
+ reactorNettyHttpClient = new ReactorNettyHttpClient(reactorNettyHttpClientConfig, new HttpClientConnectionPoolStats(), new HttpClientStats());
}
@AfterClass
diff --git a/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java b/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java
index 60d78542769d..29d913fb3ada 100644
--- a/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java
+++ b/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java
@@ -13,6 +13,7 @@
*/
package com.facebook.presto.remotetask;
+import com.facebook.airlift.units.DataSize;
import com.facebook.airlift.units.Duration;
import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig;
import com.google.common.collect.ImmutableMap;
@@ -23,6 +24,7 @@
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.SECONDS;
public class TestReactorNettyHttpClientConfig
@@ -40,6 +42,11 @@ public void testDefaults()
.setEventLoopThreadCount(Runtime.getRuntime().availableProcessors())
.setConnectTimeout(new Duration(10, SECONDS))
.setRequestTimeout(new Duration(10, SECONDS))
+ .setMaxIdleTime(new Duration(45, SECONDS))
+ .setEvictBackgroundTime(new Duration(15, SECONDS))
+ .setPendingAcquireTimeout(new Duration(2, SECONDS))
+ .setMaxInitialWindowSize(new DataSize(25, MEGABYTE)) // 25MB
+ .setMaxFrameSize(new DataSize(8, MEGABYTE)) // 8MB
.setKeyStorePath(null)
.setKeyStorePassword(null)
.setTrustStorePath(null)
@@ -59,6 +66,11 @@ public void testExplicitPropertyMappings()
.put("reactor.event-loop-thread-count", "150")
.put("reactor.connect-timeout", "2s")
.put("reactor.request-timeout", "1s")
+ .put("reactor.max-idle-time", "120s")
+ .put("reactor.evict-background-time", "120s")
+ .put("reactor.pending-acquire-timeout", "10s")
+ .put("reactor.max-initial-window-size", "10MB")
+ .put("reactor.max-frame-size", "4MB")
.put("reactor.keystore-path", "/var/abc/def/presto.jks")
.put("reactor.truststore-path", "/var/abc/def/presto.jks")
.put("reactor.keystore-password", "password")
@@ -75,6 +87,11 @@ public void testExplicitPropertyMappings()
.setEventLoopThreadCount(150)
.setConnectTimeout(new Duration(2, SECONDS))
.setRequestTimeout(new Duration(1, SECONDS))
+ .setMaxIdleTime(new Duration(120, SECONDS))
+ .setEvictBackgroundTime(new Duration(120, SECONDS))
+ .setPendingAcquireTimeout(new Duration(10, SECONDS))
+ .setMaxInitialWindowSize(new DataSize(10, MEGABYTE)) // 10MB
+ .setMaxFrameSize(new DataSize(4, MEGABYTE)) // 4MB
.setKeyStorePath("/var/abc/def/presto.jks")
.setTrustStorePath("/var/abc/def/presto.jks")
.setKeyStorePassword("password")