Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator {
/**
* for ConnectionManager
*/
VALIDATE_AFTER_INACTIVITY_MILLIS("validateAfterInactivityMillis", 3 * 1000, "period of inactivity in milliseconds after which persistent connections must be re-validated, this check helps detect connections that have become stale (half-closed) while kept inactive in the pool. "),
TIME_TO_LIVE_MILLIS("timeToLiveMillis", 60 * 1000, ""),
DEFAULT_MAX_PER_ROUTE("defaultMaxPerRoute", 500, ""),
MAX_TOTAL("maxTotal", 10000, ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ClickHouseProperties {
private int connectionTimeout;
private int dataTransferTimeout;
private int timeToLiveMillis;
private int validateAfterInactivityMillis;
private int defaultMaxPerRoute;
private int maxTotal;
private int maxRetries;
Expand Down Expand Up @@ -112,6 +113,7 @@ public ClickHouseProperties(Properties info) {
this.connectionTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.CONNECTION_TIMEOUT);
this.dataTransferTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT);
this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS);
this.validateAfterInactivityMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.VALIDATE_AFTER_INACTIVITY_MILLIS);
this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE);
this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL);
this.maxRetries = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_RETRIES);
Expand Down Expand Up @@ -182,6 +184,7 @@ public Properties asProperties() {
ret.put(ClickHouseConnectionSettings.CONNECTION_TIMEOUT.getKey(), String.valueOf(connectionTimeout));
ret.put(ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT.getKey(), String.valueOf(dataTransferTimeout));
ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis));
ret.put(ClickHouseConnectionSettings.VALIDATE_AFTER_INACTIVITY_MILLIS.getKey(), String.valueOf(validateAfterInactivityMillis));
ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute));
ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal));
ret.put(ClickHouseConnectionSettings.MAX_RETRIES.getKey(), String.valueOf(maxRetries));
Expand Down Expand Up @@ -254,6 +257,7 @@ public ClickHouseProperties(ClickHouseProperties properties) {
setConnectionTimeout(properties.connectionTimeout);
setDataTransferTimeout(properties.dataTransferTimeout);
setTimeToLiveMillis(properties.timeToLiveMillis);
setValidateAfterInactivityMillis(properties.validateAfterInactivityMillis);
setDefaultMaxPerRoute(properties.defaultMaxPerRoute);
setMaxTotal(properties.maxTotal);
setMaxRetries(properties.maxRetries);
Expand Down Expand Up @@ -582,6 +586,14 @@ public void setTimeToLiveMillis(int timeToLiveMillis) {
this.timeToLiveMillis = timeToLiveMillis;
}

public int getValidateAfterInactivityMillis() {
return validateAfterInactivityMillis;
}

public void setValidateAfterInactivityMillis(int validateAfterInactivityMillis) {
this.validateAfterInactivityMillis = validateAfterInactivityMillis;
}

public int getDefaultMaxPerRoute() {
return defaultMaxPerRoute;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ private PoolingHttpClientConnectionManager getConnectionManager()
TimeUnit.MILLISECONDS
);

connectionManager.setValidateAfterInactivity(properties.getValidateAfterInactivityMillis());
connectionManager.setDefaultMaxPerRoute(properties.getDefaultMaxPerRoute());
connectionManager.setMaxTotal(properties.getMaxTotal());
connectionManager.setDefaultConnectionConfig(getConnectionConfig());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package ru.yandex.clickhouse.util;

import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -176,18 +178,22 @@ private static Object[][] provideAuthUserPasswordTestData() {
};
}

private static WireMockServer newServer() {
private static WireMockServer newServer(int delayMillis) {
WireMockServer server = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort());
server.start();
server.stubFor(WireMock.post(WireMock.urlPathMatching("/*"))
.willReturn(WireMock.aResponse().withStatus(200).withHeader("Connection", "Keep-Alive")
.withHeader("Content-Type", "text/plain; charset=UTF-8")
.withHeader("Transfer-Encoding", "chunked").withHeader("Keep-Alive", "timeout=3")
.withBody("OK.........................").withFixedDelay(2)));
.withBody("OK.........................").withFixedDelay(delayMillis)));
return server;
}

private static WireMockServer newServer() {
return newServer(2);
}

private static void shutDownServerWithDelay(final WireMockServer server, final long delayMs) {
new Thread() {
public void run() {
Expand All @@ -203,38 +209,104 @@ public void run() {
}.start();
}

// @Test(groups = "unit", dependsOnMethods = { "testWithRetry" }, expectedExceptions = { NoHttpResponseException.class })
public void testWithoutRetry() throws Exception {
final WireMockServer server = newServer();
@Test(expectedExceptions = { NoHttpResponseException.class })
public void testReproduceFailedToResponseProblem() throws Exception {
final WireMockServer server = newServer(2);

ClickHouseProperties props = new ClickHouseProperties();
// Disable retry when "failed to respond" occurs.
props.setMaxRetries(0);
// Disable validation to reproduce "failed to respond" problem
props.setValidateAfterInactivityMillis(0);
// Ensure there is exactly one TCP connection in connection pool and therefore be re-used between
// multiple http requests.
props.setMaxTotal(1);
props.setDefaultMaxPerRoute(1);

ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201");

shutDownServerWithDelay(server, 500);
try {
// Make the 1st http request to establish one tcp connection and keep it in the pool.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}

// Close the server, now the pooling tcp connection is half closed.
server.shutdownServer();
server.stop();

// The 2nd http request will re-use the pooling tcp connection which is stale
// and "failed to respond" occurs.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}
} finally {
client.close();
}
}

@Test(expectedExceptions = { HttpHostConnectException.class })
public void testEnableValidation() throws Exception {
final WireMockServer server = newServer(2);

ClickHouseProperties props = new ClickHouseProperties();
// Disable retry when "failed to respond" occurs.
props.setMaxRetries(0);
// Disable validation to reproduce "failed to respond" problem
props.setValidateAfterInactivityMillis(1);
// Ensure there is exactly one TCP connection in connection pool and therefore be re-used between
// multiple http requests.
props.setMaxTotal(1);
props.setDefaultMaxPerRoute(1);

ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201");

try {
client.execute(post);
// Make the 1st http request to establish one tcp connection and keep it in the pool.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}

// Sleep a while to wait for the validation reaches inactivity timeout.
Thread.sleep(5);

// Close the server, now the pooling tcp connection is half closed.
server.shutdownServer();
server.stop();

// The 2nd http request re-uses the pooling tcp connection.
// But the validation checks that the connection has been stale, thus a
// new tcp connection is attempted to establish to the closed server
// which leads to HttpHostConnectException.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}
} finally {
client.close();
}
}

// @Test(groups = "unit", expectedExceptions = { HttpHostConnectException.class })
@Test(expectedExceptions = { HttpHostConnectException.class })
public void testWithRetry() throws Exception {
final WireMockServer server = newServer();
final WireMockServer server = newServer(500);

ClickHouseProperties props = new ClickHouseProperties();
// props.setMaxRetries(3);
props.setMaxRetries(3);
ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpContext context = new BasicHttpContext();
context.setAttribute("is_idempotent", Boolean.TRUE);
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%202");
shutDownServerWithDelay(server, 500);

shutDownServerWithDelay(server, 100);

try {
client.execute(post, context);
Expand Down