Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/ru/yandex/clickhouse/ClickHouseStatement.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ru.yandex.clickhouse;

import ru.yandex.clickhouse.response.ClickHouseResponse;
import ru.yandex.clickhouse.response.ClickHouseResponseSummary;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;
import ru.yandex.clickhouse.util.ClickHouseRowBinaryInputStream;
import ru.yandex.clickhouse.util.ClickHouseStreamCallback;
Expand Down Expand Up @@ -105,4 +106,6 @@ ResultSet executeQuery(String sql,
* Returns extended write-API
*/
Writer write();

ClickHouseResponseSummary getResponseSummary();
}
60 changes: 56 additions & 4 deletions src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ru.yandex.clickhouse;

import com.google.common.base.Strings;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
Expand Down Expand Up @@ -38,7 +39,7 @@
import java.util.*;


public class ClickHouseStatementImpl implements ClickHouseStatement {
public class ClickHouseStatementImpl extends ConfigurableApi<ClickHouseStatement> implements ClickHouseStatement {

private static final Logger log = LoggerFactory.getLogger(ClickHouseStatementImpl.class);

Expand All @@ -52,6 +53,8 @@ public class ClickHouseStatementImpl implements ClickHouseStatement {

private ClickHouseRowBinaryInputStream currentRowBinaryResult;

private ClickHouseResponseSummary currentSummary;

private int currentUpdateCount = -1;

private int queryTimeout;
Expand All @@ -78,6 +81,7 @@ public class ClickHouseStatementImpl implements ClickHouseStatement {

public ClickHouseStatementImpl(CloseableHttpClient client, ClickHouseConnection connection,
ClickHouseProperties properties, int resultSetType) {
super(null);
this.client = client;
this.connection = connection;
this.properties = properties == null ? new ClickHouseProperties() : properties;
Expand Down Expand Up @@ -217,7 +221,8 @@ public int executeUpdate(String sql) throws SQLException {
} finally {
StreamUtils.close(is);
}
return 1;

return currentSummary != null ? (int) currentSummary.getWrittenRows() : 1;
}

@Override
Expand Down Expand Up @@ -444,6 +449,11 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface.isAssignableFrom(getClass());
}

@Override
public ClickHouseResponseSummary getResponseSummary() {
return currentSummary;
}

static String clickhousifySql(String sql) {
return addFormatIfAbsent(sql, ClickHouseFormat.TabSeparatedWithNamesAndTypes);
}
Expand Down Expand Up @@ -623,6 +633,13 @@ private InputStream getInputStream(
entity.writeTo(baos);
is = baos.convertToInputStream();
}

// retrieve response summary
if (isQueryParamSet(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, additionalClickHouseDBParams, additionalRequestParams)) {
Header summaryHeader = response.getFirstHeader("X-ClickHouse-Summary");
currentSummary = summaryHeader != null ? Jackson.getObjectMapper().readValue(summaryHeader.getValue(), ClickHouseResponseSummary.class) : null;
}

return is;
} catch (ClickHouseException e) {
throw e;
Expand Down Expand Up @@ -700,6 +717,8 @@ private List<NameValuePair> getUrlQueryParams(
params.put(ClickHouseQueryParam.DATABASE, initialDatabase);
}

params.putAll(getAdditionalDBParams());

if (additionalClickHouseDBParams != null && !additionalClickHouseDBParams.isEmpty()) {
params.putAll(additionalClickHouseDBParams);
}
Expand All @@ -712,6 +731,12 @@ private List<NameValuePair> getUrlQueryParams(
}
}

for (Map.Entry<String, String> entry : getRequestParams().entrySet()) {
if (!Strings.isNullOrEmpty(entry.getValue())) {
result.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
}
}

if (additionalRequestParams != null) {
for (Map.Entry<String, String> entry : additionalRequestParams.entrySet()) {
if (!Strings.isNullOrEmpty(entry.getValue())) {
Expand All @@ -720,10 +745,31 @@ private List<NameValuePair> getUrlQueryParams(
}
}


return result;
}

private boolean isQueryParamSet(ClickHouseQueryParam param, Map<ClickHouseQueryParam, String> additionalClickHouseDBParams, Map<String, String> additionalRequestParams) {
String value = getQueryParamValue(param, additionalClickHouseDBParams, additionalRequestParams);

return "true".equals(value) || "1".equals(value);
}

private String getQueryParamValue(ClickHouseQueryParam param, Map<ClickHouseQueryParam, String> additionalClickHouseDBParams, Map<String, String> additionalRequestParams) {
if (additionalRequestParams != null && additionalRequestParams.containsKey(param.getKey()) && !Strings.isNullOrEmpty(additionalRequestParams.get(param.getKey())))
return additionalRequestParams.get(param.getKey());

if (getRequestParams().containsKey(param.getKey()) && !Strings.isNullOrEmpty(getRequestParams().get(param.getKey())))
return getRequestParams().get(param.getKey());

if (additionalClickHouseDBParams != null && additionalClickHouseDBParams.containsKey(param) && !Strings.isNullOrEmpty(additionalClickHouseDBParams.get(param)))
return additionalClickHouseDBParams.get(param);

if (getAdditionalDBParams().containsKey(param) && !Strings.isNullOrEmpty(getAdditionalDBParams().get(param)))
return getAdditionalDBParams().get(param);

return properties.asProperties().getProperty(param.getKey());
}

private URI followRedirects(URI uri) throws IOException, URISyntaxException {
if (properties.isCheckForRedirects()) {
int redirects = 0;
Expand Down Expand Up @@ -850,6 +896,12 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException {
HttpResponse response = client.execute(httpPost);
entity = response.getEntity();
checkForErrorAndThrow(entity, response);

// retrieve response summary
if (isQueryParamSet(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, writer.getAdditionalDBParams(), writer.getRequestParams())) {
Header summaryHeader = response.getFirstHeader("X-ClickHouse-Summary");
currentSummary = summaryHeader != null ? Jackson.getObjectMapper().readValue(summaryHeader.getValue(), ClickHouseResponseSummary.class) : null;
}
} catch (ClickHouseException e) {
throw e;
} catch (Exception e) {
Expand Down Expand Up @@ -919,6 +971,6 @@ private Map<ClickHouseQueryParam, String> addQueryIdTo(Map<ClickHouseQueryParam,

@Override
public Writer write() {
return new Writer(this);
return new Writer(this).withDbParams(getAdditionalDBParams()).options(getRequestParams());
}
}
10 changes: 10 additions & 0 deletions src/main/java/ru/yandex/clickhouse/ConfigurableApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public T addDbParam(ClickHouseQueryParam param, String value) {
return (T) this;
}

public T removeDbParam(ClickHouseQueryParam param) {
additionalDBParams.remove(param);
return (T) this;
}

public T withDbParams(Map<ClickHouseQueryParam, String> dbParams) {
this.additionalDBParams = new HashMap<ClickHouseQueryParam, String>();
if (null != dbParams) {
Expand All @@ -49,4 +54,9 @@ public T option(String key, String value) {
return (T) this;
}

public T removeOption(String key) {
additionalRequestParams.remove(key);
return (T) this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ru.yandex.clickhouse.response;

import com.fasterxml.jackson.annotation.JsonProperty;

public class ClickHouseResponseSummary {
final private long readRows; // number of read rows for selects (may be more than rows in result set)
final private long writtenRows; // number of written rows for inserts
final private long readBytes;
final private long writtenBytes;
final private long totalRowsToRead;

public ClickHouseResponseSummary(@JsonProperty("read_rows") long readRows, @JsonProperty("written_rows") long writtenRows, @JsonProperty("read_bytes") long readBytes,
@JsonProperty("written_bytes") long writtenBytes, @JsonProperty("total_rows_to_read") long totalRowsToRead) {
this.readRows = readRows;
this.writtenRows = writtenRows;
this.readBytes = readBytes;
this.writtenBytes = writtenBytes;
this.totalRowsToRead = totalRowsToRead;
}

public long getReadRows() {
return readRows;
}

public long getWrittenRows() {
return writtenRows;
}

public long getReadBytes() {
return readBytes;
}

public long getWrittenBytes() {
return writtenBytes;
}

public long getTotalRowsToRead() {
return totalRowsToRead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public class ClickHouseProperties {
private Boolean insertDeduplicate;
private Boolean insertDistributedSync;
private Boolean anyJoinDistinctRightTableKeys;

private Boolean sendProgressInHttpHeaders;
private Boolean waitEndOfQuery;

public ClickHouseProperties() {
this(new Properties());
Expand Down Expand Up @@ -162,6 +163,8 @@ public ClickHouseProperties(Properties info) {
this.insertDeduplicate = getSetting(info, ClickHouseQueryParam.INSERT_DEDUPLICATE);
this.insertDistributedSync = getSetting(info, ClickHouseQueryParam.INSERT_DISTRIBUTED_SYNC);
this.anyJoinDistinctRightTableKeys = getSetting(info, ClickHouseQueryParam.ANY_JOIN_DISTINCT_RIGHT_TABLE_KEYS);
this.sendProgressInHttpHeaders = (Boolean)getSetting(info, ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS);
this.waitEndOfQuery = (Boolean)getSetting(info, ClickHouseQueryParam.WAIT_END_OF_QUERY);
}

public Properties asProperties() {
Expand Down Expand Up @@ -226,6 +229,8 @@ public Properties asProperties() {
ret.put(ClickHouseQueryParam.INSERT_DEDUPLICATE.getKey(), insertDeduplicate);
ret.put(ClickHouseQueryParam.INSERT_DISTRIBUTED_SYNC.getKey(), insertDistributedSync);
ret.put(ClickHouseQueryParam.ANY_JOIN_DISTINCT_RIGHT_TABLE_KEYS.getKey(), anyJoinDistinctRightTableKeys);
ret.put(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS.getKey(), sendProgressInHttpHeaders);
ret.put(ClickHouseQueryParam.WAIT_END_OF_QUERY.getKey(), waitEndOfQuery);

return ret.getProperties();
}
Expand Down Expand Up @@ -292,6 +297,8 @@ public ClickHouseProperties(ClickHouseProperties properties) {
setInsertDeduplicate(properties.insertDeduplicate);
setInsertDistributedSync(properties.insertDistributedSync);
setAnyJoinDistinctRightTableKeys(properties.anyJoinDistinctRightTableKeys);
setSendProgressInHttpHeaders(properties.sendProgressInHttpHeaders);
setWaitEndOfQuery(properties.waitEndOfQuery);
}

public Map<ClickHouseQueryParam, String> buildQueryParams(boolean ignoreDatabase){
Expand Down Expand Up @@ -379,6 +386,9 @@ public Map<ClickHouseQueryParam, String> buildQueryParams(boolean ignoreDatabase
params.put(ClickHouseQueryParam.ENABLE_OPTIMIZE_PREDICATE_EXPRESSION, enableOptimizePredicateExpression ? "1" : "0");
}

addQueryParam(sendProgressInHttpHeaders, ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, params);
addQueryParam(waitEndOfQuery, ClickHouseQueryParam.WAIT_END_OF_QUERY, params);

return params;
}

Expand Down Expand Up @@ -907,6 +917,22 @@ public Boolean getAnyJoinDistinctRightTableKeys() {
return anyJoinDistinctRightTableKeys;
}

public Boolean getSendProgressInHttpHeaders() {
return sendProgressInHttpHeaders;
}

public void setSendProgressInHttpHeaders(Boolean sendProgressInHttpHeaders) {
this.sendProgressInHttpHeaders = sendProgressInHttpHeaders;
}

public Boolean getWaitEndOfQuery() {
return waitEndOfQuery;
}

public void setWaitEndOfQuery(Boolean waitEndOfQuery) {
this.waitEndOfQuery = waitEndOfQuery;
}

private static class PropertiesBuilder {
private final Properties properties;
public PropertiesBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public enum ClickHouseQueryParam implements DriverPropertyCreator {

SELECT_SEQUENTIAL_CONSISTENCY("select_sequential_consistency", null, Long.class, ""),

SEND_PROGRESS_IN_HTTP_HEADERS("send_progress_in_http_headers", null, Boolean.class, "Allow to populate summary in ClickHouseStatement with read/written rows/bytes"),

SEND_TIMEOUT("send_timeout", null, Integer.class, ""),

SESSION_CHECK("session_check", false, Boolean.class, ""),
Expand Down Expand Up @@ -253,6 +255,8 @@ public enum ClickHouseQueryParam implements DriverPropertyCreator {
PREFERRED_BLOCK_SIZE_BYTES("preferred_block_size_bytes", null, Long.class, "Adaptively estimates number of required rows in a block."),

ENABLE_OPTIMIZE_PREDICATE_EXPRESSION("enable_optimize_predicate_expression", null, Boolean.class, "See Clickhouse server description for this parameter. Default value is null so that server setting is taken."),

WAIT_END_OF_QUERY("wait_end_of_query", null, Boolean.class, "Buffer the response server-side before sending to client. Useful when using SEND_PROGRESS_IN_HTTP_HEADERS to get accurate stats."),
;

private final String key;
Expand Down
49 changes: 47 additions & 2 deletions src/test/java/ru/yandex/clickhouse/ClickHouseStatementTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.net.URISyntaxException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;

import org.apache.http.impl.client.HttpClientBuilder;
Expand All @@ -13,6 +14,7 @@
import com.google.common.collect.ImmutableMap;

import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -109,7 +111,7 @@ public void testMaxMemoryUsage() throws Exception {
}

@Test
public void testAdditionalRequestParams() throws Exception {
public void testAdditionalRequestParams() {
ClickHouseProperties properties = new ClickHouseProperties();
ClickHouseStatementImpl statement = new ClickHouseStatementImpl(
HttpClientBuilder.create().build(),
Expand All @@ -118,15 +120,58 @@ public void testAdditionalRequestParams() throws Exception {
ResultSet.TYPE_FORWARD_ONLY
);

statement.option("cache_namespace", "aaaa");
URI uri = statement.buildRequestUri(
null,
null,
null,
ImmutableMap.of("cache_namespace", "aaaa"),
null,
false
);
String query = uri.getQuery();
assertTrue(query.contains("cache_namespace=aaaa"), "cache_namespace param is missing in URL");

uri = statement.buildRequestUri(
null,
null,
null,
ImmutableMap.of("cache_namespace", "bbbb"),
false
);
query = uri.getQuery();
assertTrue(query.contains("cache_namespace=bbbb"), "cache_namespace param is missing in URL");

// check that statement level params are given to Writer
assertEquals(statement.write().getRequestParams().get("cache_namespace"), "aaaa");
}

@Test
public void testAdditionalDBParams() {
ClickHouseProperties properties = new ClickHouseProperties();
properties.setMaxThreads(1);

ClickHouseStatementImpl statement = new ClickHouseStatementImpl(
HttpClientBuilder.create().build(),
null,
properties,
ResultSet.TYPE_FORWARD_ONLY
);

URI uri = statement.buildRequestUri(null, null, null, null, false);
assertTrue(uri.getQuery().contains("max_threads=1"));

// override on statement level
statement.addDbParam(ClickHouseQueryParam.MAX_THREADS, "2");

uri = statement.buildRequestUri(null, null, null, null, false);
assertTrue(uri.getQuery().contains("max_threads=2"));

// override on method level
uri = statement.buildRequestUri(null, null, Collections.singletonMap(ClickHouseQueryParam.MAX_THREADS, "3"), null, false);
assertTrue(uri.getQuery().contains("max_threads=3"));

// check that statement level params are given to Writer
assertEquals(statement.write().getAdditionalDBParams().get(ClickHouseQueryParam.MAX_THREADS), "2");
}

@Test
Expand Down
Loading