diff --git a/src/main/java/ru/yandex/clickhouse/ClickHouseStatement.java b/src/main/java/ru/yandex/clickhouse/ClickHouseStatement.java index 262257a3d..1773698ae 100644 --- a/src/main/java/ru/yandex/clickhouse/ClickHouseStatement.java +++ b/src/main/java/ru/yandex/clickhouse/ClickHouseStatement.java @@ -1,6 +1,7 @@ package ru.yandex.clickhouse; import ru.yandex.clickhouse.response.ClickHouseResponse; +import ru.yandex.clickhouse.response.ClickHouseResponseSummary; import ru.yandex.clickhouse.settings.ClickHouseQueryParam; import ru.yandex.clickhouse.util.ClickHouseRowBinaryInputStream; import ru.yandex.clickhouse.util.ClickHouseStreamCallback; @@ -105,4 +106,6 @@ ResultSet executeQuery(String sql, * Returns extended write-API */ Writer write(); + + ClickHouseResponseSummary getResponseSummary(); } diff --git a/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java b/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java index 3df0ea2cb..a6ce8afc1 100644 --- a/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java +++ b/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java @@ -1,6 +1,7 @@ package ru.yandex.clickhouse; import com.google.common.base.Strings; +import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; @@ -38,7 +39,7 @@ import java.util.*; -public class ClickHouseStatementImpl implements ClickHouseStatement { +public class ClickHouseStatementImpl extends ConfigurableApi implements ClickHouseStatement { private static final Logger log = LoggerFactory.getLogger(ClickHouseStatementImpl.class); @@ -52,6 +53,8 @@ public class ClickHouseStatementImpl implements ClickHouseStatement { private ClickHouseRowBinaryInputStream currentRowBinaryResult; + private ClickHouseResponseSummary currentSummary; + private int currentUpdateCount = -1; private int queryTimeout; @@ -78,6 +81,7 @@ public class ClickHouseStatementImpl implements ClickHouseStatement { public ClickHouseStatementImpl(CloseableHttpClient client, ClickHouseConnection connection, ClickHouseProperties properties, int resultSetType) { + super(null); this.client = client; this.connection = connection; this.properties = properties == null ? new ClickHouseProperties() : properties; @@ -217,7 +221,8 @@ public int executeUpdate(String sql) throws SQLException { } finally { StreamUtils.close(is); } - return 1; + + return currentSummary != null ? (int) currentSummary.getWrittenRows() : 1; } @Override @@ -444,6 +449,11 @@ public boolean isWrapperFor(Class iface) throws SQLException { return iface.isAssignableFrom(getClass()); } + @Override + public ClickHouseResponseSummary getResponseSummary() { + return currentSummary; + } + static String clickhousifySql(String sql) { return addFormatIfAbsent(sql, ClickHouseFormat.TabSeparatedWithNamesAndTypes); } @@ -623,6 +633,13 @@ private InputStream getInputStream( entity.writeTo(baos); is = baos.convertToInputStream(); } + + // retrieve response summary + if (isQueryParamSet(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, additionalClickHouseDBParams, additionalRequestParams)) { + Header summaryHeader = response.getFirstHeader("X-ClickHouse-Summary"); + currentSummary = summaryHeader != null ? Jackson.getObjectMapper().readValue(summaryHeader.getValue(), ClickHouseResponseSummary.class) : null; + } + return is; } catch (ClickHouseException e) { throw e; @@ -700,6 +717,8 @@ private List getUrlQueryParams( params.put(ClickHouseQueryParam.DATABASE, initialDatabase); } + params.putAll(getAdditionalDBParams()); + if (additionalClickHouseDBParams != null && !additionalClickHouseDBParams.isEmpty()) { params.putAll(additionalClickHouseDBParams); } @@ -712,6 +731,12 @@ private List getUrlQueryParams( } } + for (Map.Entry entry : getRequestParams().entrySet()) { + if (!Strings.isNullOrEmpty(entry.getValue())) { + result.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); + } + } + if (additionalRequestParams != null) { for (Map.Entry entry : additionalRequestParams.entrySet()) { if (!Strings.isNullOrEmpty(entry.getValue())) { @@ -720,10 +745,31 @@ private List getUrlQueryParams( } } - return result; } + private boolean isQueryParamSet(ClickHouseQueryParam param, Map additionalClickHouseDBParams, Map additionalRequestParams) { + String value = getQueryParamValue(param, additionalClickHouseDBParams, additionalRequestParams); + + return "true".equals(value) || "1".equals(value); + } + + private String getQueryParamValue(ClickHouseQueryParam param, Map additionalClickHouseDBParams, Map additionalRequestParams) { + if (additionalRequestParams != null && additionalRequestParams.containsKey(param.getKey()) && !Strings.isNullOrEmpty(additionalRequestParams.get(param.getKey()))) + return additionalRequestParams.get(param.getKey()); + + if (getRequestParams().containsKey(param.getKey()) && !Strings.isNullOrEmpty(getRequestParams().get(param.getKey()))) + return getRequestParams().get(param.getKey()); + + if (additionalClickHouseDBParams != null && additionalClickHouseDBParams.containsKey(param) && !Strings.isNullOrEmpty(additionalClickHouseDBParams.get(param))) + return additionalClickHouseDBParams.get(param); + + if (getAdditionalDBParams().containsKey(param) && !Strings.isNullOrEmpty(getAdditionalDBParams().get(param))) + return getAdditionalDBParams().get(param); + + return properties.asProperties().getProperty(param.getKey()); + } + private URI followRedirects(URI uri) throws IOException, URISyntaxException { if (properties.isCheckForRedirects()) { int redirects = 0; @@ -850,6 +896,12 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException { HttpResponse response = client.execute(httpPost); entity = response.getEntity(); checkForErrorAndThrow(entity, response); + + // retrieve response summary + if (isQueryParamSet(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, writer.getAdditionalDBParams(), writer.getRequestParams())) { + Header summaryHeader = response.getFirstHeader("X-ClickHouse-Summary"); + currentSummary = summaryHeader != null ? Jackson.getObjectMapper().readValue(summaryHeader.getValue(), ClickHouseResponseSummary.class) : null; + } } catch (ClickHouseException e) { throw e; } catch (Exception e) { @@ -919,6 +971,6 @@ private Map addQueryIdTo(Map dbParams) { this.additionalDBParams = new HashMap(); if (null != dbParams) { @@ -49,4 +54,9 @@ public T option(String key, String value) { return (T) this; } + public T removeOption(String key) { + additionalRequestParams.remove(key); + return (T) this; + } + } diff --git a/src/main/java/ru/yandex/clickhouse/response/ClickHouseResponseSummary.java b/src/main/java/ru/yandex/clickhouse/response/ClickHouseResponseSummary.java new file mode 100644 index 000000000..5b0702758 --- /dev/null +++ b/src/main/java/ru/yandex/clickhouse/response/ClickHouseResponseSummary.java @@ -0,0 +1,40 @@ +package ru.yandex.clickhouse.response; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ClickHouseResponseSummary { + final private long readRows; // number of read rows for selects (may be more than rows in result set) + final private long writtenRows; // number of written rows for inserts + final private long readBytes; + final private long writtenBytes; + final private long totalRowsToRead; + + public ClickHouseResponseSummary(@JsonProperty("read_rows") long readRows, @JsonProperty("written_rows") long writtenRows, @JsonProperty("read_bytes") long readBytes, + @JsonProperty("written_bytes") long writtenBytes, @JsonProperty("total_rows_to_read") long totalRowsToRead) { + this.readRows = readRows; + this.writtenRows = writtenRows; + this.readBytes = readBytes; + this.writtenBytes = writtenBytes; + this.totalRowsToRead = totalRowsToRead; + } + + public long getReadRows() { + return readRows; + } + + public long getWrittenRows() { + return writtenRows; + } + + public long getReadBytes() { + return readBytes; + } + + public long getWrittenBytes() { + return writtenBytes; + } + + public long getTotalRowsToRead() { + return totalRowsToRead; + } +} diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java index c99014c82..14ce0774f 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java @@ -94,7 +94,8 @@ public class ClickHouseProperties { private Boolean insertDeduplicate; private Boolean insertDistributedSync; private Boolean anyJoinDistinctRightTableKeys; - + private Boolean sendProgressInHttpHeaders; + private Boolean waitEndOfQuery; public ClickHouseProperties() { this(new Properties()); @@ -162,6 +163,8 @@ public ClickHouseProperties(Properties info) { this.insertDeduplicate = getSetting(info, ClickHouseQueryParam.INSERT_DEDUPLICATE); this.insertDistributedSync = getSetting(info, ClickHouseQueryParam.INSERT_DISTRIBUTED_SYNC); this.anyJoinDistinctRightTableKeys = getSetting(info, ClickHouseQueryParam.ANY_JOIN_DISTINCT_RIGHT_TABLE_KEYS); + this.sendProgressInHttpHeaders = (Boolean)getSetting(info, ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS); + this.waitEndOfQuery = (Boolean)getSetting(info, ClickHouseQueryParam.WAIT_END_OF_QUERY); } public Properties asProperties() { @@ -226,6 +229,8 @@ public Properties asProperties() { ret.put(ClickHouseQueryParam.INSERT_DEDUPLICATE.getKey(), insertDeduplicate); ret.put(ClickHouseQueryParam.INSERT_DISTRIBUTED_SYNC.getKey(), insertDistributedSync); ret.put(ClickHouseQueryParam.ANY_JOIN_DISTINCT_RIGHT_TABLE_KEYS.getKey(), anyJoinDistinctRightTableKeys); + ret.put(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS.getKey(), sendProgressInHttpHeaders); + ret.put(ClickHouseQueryParam.WAIT_END_OF_QUERY.getKey(), waitEndOfQuery); return ret.getProperties(); } @@ -292,6 +297,8 @@ public ClickHouseProperties(ClickHouseProperties properties) { setInsertDeduplicate(properties.insertDeduplicate); setInsertDistributedSync(properties.insertDistributedSync); setAnyJoinDistinctRightTableKeys(properties.anyJoinDistinctRightTableKeys); + setSendProgressInHttpHeaders(properties.sendProgressInHttpHeaders); + setWaitEndOfQuery(properties.waitEndOfQuery); } public Map buildQueryParams(boolean ignoreDatabase){ @@ -379,6 +386,9 @@ public Map buildQueryParams(boolean ignoreDatabase params.put(ClickHouseQueryParam.ENABLE_OPTIMIZE_PREDICATE_EXPRESSION, enableOptimizePredicateExpression ? "1" : "0"); } + addQueryParam(sendProgressInHttpHeaders, ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, params); + addQueryParam(waitEndOfQuery, ClickHouseQueryParam.WAIT_END_OF_QUERY, params); + return params; } @@ -907,6 +917,22 @@ public Boolean getAnyJoinDistinctRightTableKeys() { return anyJoinDistinctRightTableKeys; } + public Boolean getSendProgressInHttpHeaders() { + return sendProgressInHttpHeaders; + } + + public void setSendProgressInHttpHeaders(Boolean sendProgressInHttpHeaders) { + this.sendProgressInHttpHeaders = sendProgressInHttpHeaders; + } + + public Boolean getWaitEndOfQuery() { + return waitEndOfQuery; + } + + public void setWaitEndOfQuery(Boolean waitEndOfQuery) { + this.waitEndOfQuery = waitEndOfQuery; + } + private static class PropertiesBuilder { private final Properties properties; public PropertiesBuilder() { diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseQueryParam.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseQueryParam.java index c1d1a991a..4d6e57baa 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseQueryParam.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseQueryParam.java @@ -217,6 +217,8 @@ public enum ClickHouseQueryParam implements DriverPropertyCreator { SELECT_SEQUENTIAL_CONSISTENCY("select_sequential_consistency", null, Long.class, ""), + SEND_PROGRESS_IN_HTTP_HEADERS("send_progress_in_http_headers", null, Boolean.class, "Allow to populate summary in ClickHouseStatement with read/written rows/bytes"), + SEND_TIMEOUT("send_timeout", null, Integer.class, ""), SESSION_CHECK("session_check", false, Boolean.class, ""), @@ -253,6 +255,8 @@ public enum ClickHouseQueryParam implements DriverPropertyCreator { PREFERRED_BLOCK_SIZE_BYTES("preferred_block_size_bytes", null, Long.class, "Adaptively estimates number of required rows in a block."), ENABLE_OPTIMIZE_PREDICATE_EXPRESSION("enable_optimize_predicate_expression", null, Boolean.class, "See Clickhouse server description for this parameter. Default value is null so that server setting is taken."), + + WAIT_END_OF_QUERY("wait_end_of_query", null, Boolean.class, "Buffer the response server-side before sending to client. Useful when using SEND_PROGRESS_IN_HTTP_HEADERS to get accurate stats."), ; private final String key; diff --git a/src/test/java/ru/yandex/clickhouse/ClickHouseStatementTest.java b/src/test/java/ru/yandex/clickhouse/ClickHouseStatementTest.java index 36762fa19..c45738411 100644 --- a/src/test/java/ru/yandex/clickhouse/ClickHouseStatementTest.java +++ b/src/test/java/ru/yandex/clickhouse/ClickHouseStatementTest.java @@ -5,6 +5,7 @@ import java.net.URISyntaxException; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Collections; import java.util.Properties; import org.apache.http.impl.client.HttpClientBuilder; @@ -13,6 +14,7 @@ import com.google.common.collect.ImmutableMap; import ru.yandex.clickhouse.settings.ClickHouseProperties; +import ru.yandex.clickhouse.settings.ClickHouseQueryParam; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -109,7 +111,7 @@ public void testMaxMemoryUsage() throws Exception { } @Test - public void testAdditionalRequestParams() throws Exception { + public void testAdditionalRequestParams() { ClickHouseProperties properties = new ClickHouseProperties(); ClickHouseStatementImpl statement = new ClickHouseStatementImpl( HttpClientBuilder.create().build(), @@ -118,15 +120,58 @@ public void testAdditionalRequestParams() throws Exception { ResultSet.TYPE_FORWARD_ONLY ); + statement.option("cache_namespace", "aaaa"); URI uri = statement.buildRequestUri( null, null, null, - ImmutableMap.of("cache_namespace", "aaaa"), + null, false ); String query = uri.getQuery(); assertTrue(query.contains("cache_namespace=aaaa"), "cache_namespace param is missing in URL"); + + uri = statement.buildRequestUri( + null, + null, + null, + ImmutableMap.of("cache_namespace", "bbbb"), + false + ); + query = uri.getQuery(); + assertTrue(query.contains("cache_namespace=bbbb"), "cache_namespace param is missing in URL"); + + // check that statement level params are given to Writer + assertEquals(statement.write().getRequestParams().get("cache_namespace"), "aaaa"); + } + + @Test + public void testAdditionalDBParams() { + ClickHouseProperties properties = new ClickHouseProperties(); + properties.setMaxThreads(1); + + ClickHouseStatementImpl statement = new ClickHouseStatementImpl( + HttpClientBuilder.create().build(), + null, + properties, + ResultSet.TYPE_FORWARD_ONLY + ); + + URI uri = statement.buildRequestUri(null, null, null, null, false); + assertTrue(uri.getQuery().contains("max_threads=1")); + + // override on statement level + statement.addDbParam(ClickHouseQueryParam.MAX_THREADS, "2"); + + uri = statement.buildRequestUri(null, null, null, null, false); + assertTrue(uri.getQuery().contains("max_threads=2")); + + // override on method level + uri = statement.buildRequestUri(null, null, Collections.singletonMap(ClickHouseQueryParam.MAX_THREADS, "3"), null, false); + assertTrue(uri.getQuery().contains("max_threads=3")); + + // check that statement level params are given to Writer + assertEquals(statement.write().getAdditionalDBParams().get(ClickHouseQueryParam.MAX_THREADS), "2"); } @Test diff --git a/src/test/java/ru/yandex/clickhouse/integration/ResultSummaryTest.java b/src/test/java/ru/yandex/clickhouse/integration/ResultSummaryTest.java new file mode 100644 index 000000000..79ad4661a --- /dev/null +++ b/src/test/java/ru/yandex/clickhouse/integration/ResultSummaryTest.java @@ -0,0 +1,136 @@ +package ru.yandex.clickhouse.integration; + +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; +import ru.yandex.clickhouse.*; +import ru.yandex.clickhouse.settings.ClickHouseQueryParam; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.AssertJUnit.assertTrue; + +public class ResultSummaryTest { + private ClickHouseConnection connection; + + @BeforeTest + public void setUp() throws Exception { + connection = ClickHouseContainerForTest.newDataSource().getConnection(); + connection.createStatement().execute("CREATE DATABASE IF NOT EXISTS test"); + } + + @AfterTest + public void tearDown() throws Exception { + connection.createStatement().execute("DROP DATABASE IF EXISTS test"); + } + + @Test + public void select() throws Exception { + ClickHouseStatement st = connection.createStatement(); + st.executeQuery("SELECT * FROM numbers(10)", Collections.singletonMap(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true")); + + assertTrue(st.getResponseSummary().getReadRows() >= 10); + assertTrue(st.getResponseSummary().getReadBytes() > 0); + } + + @Test + public void largeSelect() throws Exception { + ClickHouseStatement st = connection.createStatement(); + st.executeQuery("SELECT * FROM numbers(10000000)", Collections.singletonMap(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true")); + + assertTrue(st.getResponseSummary().getReadRows() < 10000000); + assertTrue(st.getResponseSummary().getReadBytes() > 0); + } + + @Test + public void largeSelectWaitEndOfQuery() throws Exception { + ClickHouseStatement st = connection.createStatement(); + st.executeQuery("SELECT * FROM numbers(10000000)", largeSelectWaitEndOfQueryParams()); + + assertTrue(st.getResponseSummary().getReadRows() >= 10000000); + assertTrue(st.getResponseSummary().getReadBytes() > 0); + } + + private Map largeSelectWaitEndOfQueryParams() { + Map res = new HashMap<>(); + res.put(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true"); + res.put(ClickHouseQueryParam.WAIT_END_OF_QUERY, "true"); + return res; + } + + @Test + public void selectWithoutParam() throws Exception { + ClickHouseStatement st = connection.createStatement(); + st.executeQuery("SELECT * FROM numbers(10)", Collections.singletonMap(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true")); + + assertTrue(st.getResponseSummary().getReadRows() >= 10); + assertTrue(st.getResponseSummary().getReadBytes() > 0); + } + + @Test + public void insertSingle() throws Exception { + createInsertTestTable(); + + ClickHousePreparedStatement ps = (ClickHousePreparedStatement) connection.prepareStatement("INSERT INTO test.insert_test VALUES(?)"); + ps.setLong(1, 1); + ps.executeQuery(Collections.singletonMap(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true")); + + assertEquals(ps.getResponseSummary().getWrittenRows(), 1); + assertTrue(ps.getResponseSummary().getWrittenBytes() > 0); + } + + @Test + public void insertBatch() throws Exception { + createInsertTestTable(); + + ClickHousePreparedStatement ps = (ClickHousePreparedStatement) connection.prepareStatement("INSERT INTO test.insert_test VALUES(?)"); + for (long i = 0; i < 10; i++) { + ps.setLong(1, i); + ps.addBatch(); + } + ps.executeBatch(Collections.singletonMap(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true")); + + assertEquals(ps.getResponseSummary().getWrittenRows(), 10); + assertTrue(ps.getResponseSummary().getWrittenBytes() > 0); + } + + @Test + public void insertSelect() throws Exception { + createInsertTestTable(); + + ClickHousePreparedStatement ps = (ClickHousePreparedStatement) connection.prepareStatement("INSERT INTO test.insert_test SELECT number FROM numbers(10)"); + ps.executeQuery(Collections.singletonMap(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true")); + + assertEquals(ps.getResponseSummary().getWrittenRows(), 10); + assertTrue(ps.getResponseSummary().getWrittenBytes() > 0); + } + + @Test + public void insertLargeSelect() throws Exception { + createInsertTestTable(); + + ClickHousePreparedStatement ps = (ClickHousePreparedStatement) connection.prepareStatement("INSERT INTO test.insert_test SELECT number FROM numbers(10000000)"); + ps.executeQuery(Collections.singletonMap(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, "true")); + + assertEquals(ps.getResponseSummary().getWrittenRows(), 10000000); + assertTrue(ps.getResponseSummary().getWrittenBytes() > 0); + } + + @Test + public void noSummary() throws Exception { + ClickHouseStatement st = connection.createStatement(); + st.executeQuery("SELECT * FROM numbers(10)"); + + assertNull(st.getResponseSummary()); + } + + private void createInsertTestTable() throws SQLException { + connection.createStatement().execute("DROP TABLE IF EXISTS test.insert_test"); + connection.createStatement().execute("CREATE TABLE IF NOT EXISTS test.insert_test (value UInt32) ENGINE = TinyLog"); + } +}