diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java index 202574fed..c4451efe7 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java @@ -28,6 +28,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator { KEEP_ALIVE_TIMEOUT("keepAliveTimeout", 30 * 1000, ""), + REUSE_CONNECTIONS("reuseConnections", false, ""), /** * for ConnectionManager diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java index ecf08fa4e..dea27b4f3 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java @@ -93,7 +93,7 @@ public class ClickHouseProperties { private Boolean insertDeduplicate; private Boolean insertDistributedSync; private Boolean anyJoinDistinctRightTableKeys; - + private boolean reuseConnections; public ClickHouseProperties() { this(new Properties()); @@ -108,6 +108,7 @@ public ClickHouseProperties(Properties info) { this.connectionTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.CONNECTION_TIMEOUT); this.dataTransferTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT); this.keepAliveTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.KEEP_ALIVE_TIMEOUT); + this.reuseConnections = (Boolean)getSetting(info, ClickHouseConnectionSettings.REUSE_CONNECTIONS); this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS); this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE); this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL); @@ -172,6 +173,7 @@ public Properties asProperties() { ret.put(ClickHouseConnectionSettings.CONNECTION_TIMEOUT.getKey(), String.valueOf(connectionTimeout)); ret.put(ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT.getKey(), String.valueOf(dataTransferTimeout)); ret.put(ClickHouseConnectionSettings.KEEP_ALIVE_TIMEOUT.getKey(), String.valueOf(keepAliveTimeout)); + ret.put(ClickHouseConnectionSettings.REUSE_CONNECTIONS.getKey(), String.valueOf(reuseConnections)); ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis)); ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute)); ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal)); @@ -239,6 +241,7 @@ public ClickHouseProperties(ClickHouseProperties properties) { setConnectionTimeout(properties.connectionTimeout); setDataTransferTimeout(properties.dataTransferTimeout); setKeepAliveTimeout(properties.keepAliveTimeout); + setReuseConnections(properties.reuseConnections); setTimeToLiveMillis(properties.timeToLiveMillis); setDefaultMaxPerRoute(properties.defaultMaxPerRoute); setMaxTotal(properties.maxTotal); @@ -527,6 +530,14 @@ public void setKeepAliveTimeout(int keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } + public boolean getReuseConnections() { + return reuseConnections; + } + + public void setReuseConnections(boolean reuseConnections) { + this.reuseConnections = reuseConnections; + } + public String getUser() { return user; } diff --git a/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java b/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java index 83047cd06..3416084d5 100644 --- a/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java +++ b/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java @@ -1,5 +1,6 @@ package ru.yandex.clickhouse.util; +import org.apache.http.ConnectionReuseStrategy; import org.apache.http.Header; import org.apache.http.HeaderElement; import org.apache.http.HeaderElementIterator; @@ -13,7 +14,9 @@ import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultClientConnectionReuseStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeader; @@ -59,6 +62,7 @@ public ClickHouseHttpClientBuilder(ClickHouseProperties properties) { public CloseableHttpClient buildClient() throws Exception { return HttpClientBuilder.create() .setConnectionManager(getConnectionManager()) + .setConnectionReuseStrategy(createReuseStrategy()) .setKeepAliveStrategy(createKeepAliveStrategy()) .setDefaultConnectionConfig(getConnectionConfig()) .setDefaultRequestConfig(getRequestConfig()) @@ -115,25 +119,38 @@ private Collection
getDefaultHeaders() { return headers; } - private ConnectionKeepAliveStrategy createKeepAliveStrategy() { - return new ConnectionKeepAliveStrategy() { + private ConnectionReuseStrategy createReuseStrategy() { + return properties.getReuseConnections() ? new DefaultClientConnectionReuseStrategy() { + @Override - public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) { - // in case of errors keep-alive not always works. close connection just in case + public boolean keepAlive(HttpResponse httpResponse, HttpContext context) { if (httpResponse.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { - return -1; + return false; } + boolean keepAliveHeaderFound = false; HeaderElementIterator it = new BasicHeaderElementIterator( httpResponse.headerIterator(HTTP.CONN_DIRECTIVE)); while (it.hasNext()) { HeaderElement he = it.nextElement(); String param = he.getName(); - //String value = he.getValue(); if (param != null && param.equalsIgnoreCase(HTTP.CONN_KEEP_ALIVE)) { - return properties.getKeepAliveTimeout(); + keepAliveHeaderFound = true; + break; } } - return -1; + if (!keepAliveHeaderFound) { + return false; + } + return super.keepAlive(httpResponse, context); + } + } : new NoConnectionReuseStrategy(); + } + + private ConnectionKeepAliveStrategy createKeepAliveStrategy() { + return new ConnectionKeepAliveStrategy() { + @Override + public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) { + return properties.getKeepAliveTimeout(); } }; }