diff --git a/README.md b/README.md
index c06d9cebd..b7ba58216 100644
--- a/README.md
+++ b/README.md
@@ -1,86 +1,96 @@
# ClickHouse Java Client & JDBC Driver
-[](https://maven-badges.herokuapp.com/maven-central/ru.yandex.clickhouse/clickhouse-jdbc)  [](https://sonarcloud.io/dashboard?id=ClickHouse_clickhouse-jdbc)
+[](https://maven-badges.herokuapp.com/maven-central/com.clickhouse/clickhouse-jdbc)  [](https://sonarcloud.io/dashboard?id=ClickHouse_clickhouse-jdbc)
-Java client and JDBC driver for ClickHouse.
+Java client and JDBC driver for ClickHouse. Java client is async and light weight library for accessing ClickHouse in Java; while JDBC driver is built on top of the Java client with more dependencies and extensions for JDBC-compliance.
## Usage
### Java Client
-Use Java client when you prefer async and more "direct" way to communicate with ClickHouse. JDBC driver is actually a thin wrapper of the Java client.
-
```xml
com.clickhouse
-
- clickhouse-grpc-client
+
+ clickhouse-http-client
0.3.2
```
-Example:
-
-```Java
-// declare a server to connect to
-ClickHouseNode server = ClickHouseNode.of("server.domain", ClickHouseProtocol.GRPC, 9100, "my_db");
-
-// run multiple queries in one go and wait until it's finished
-ClickHouseClient.send(server,
- "create database if not exists test",
- "use test", // change current database from my_db to test
- "create table if not exists test_table(s String) engine=Memory",
- "insert into test_table values('1')('2')('3')",
- "select * from test_table limit 1",
- "truncate table test_table",
- "drop table if exists test_table").get();
-
-// query with named parameters
-try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.GRPC);
- ClickHouseResponse resp = client.connect(server)
- .format(ClickHouseFormat.RowBinaryWithNamesAndTypes).set("send_logs_level", "trace")
- .query("select id, name from some_table where id in :ids and name like :name").params(Arrays.asList(1,2,3), "%key%").execute().get()) {
- // you can also use resp.stream() as well
- for (ClickHouseRecord record : resp.records()) {
- int id = record.getValue(0).asInteger();
- String name = record.getValue(1).asString();
+
+ Expand to see example...
+
+ ```Java
+ // declare a server to connect to
+ ClickHouseNode server = ClickHouseNode.of("server.domain", ClickHouseProtocol.HTTP, 8123, "my_db");
+
+ // run multiple queries in one go and wait until they're completed
+ ClickHouseClient.send(server, "create database if not exists test",
+ "use test", // change current database from my_db to test
+ "create table if not exists test_table(s String) engine=Memory",
+ "insert into test_table values('1')('2')('3')",
+ "select * from test_table limit 1",
+ "truncate table test_table",
+ "drop table if exists test_table").get();
+
+ // query with named parameters
+ try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.GRPC);
+ ClickHouseResponse resp = client.connect(server)
+ .format(ClickHouseFormat.RowBinaryWithNamesAndTypes).set("send_logs_level", "trace")
+ .query("select id, name from some_table where id in :ids and name like :name").params(Arrays.asList(1,2,3), "%key%").execute().get()) {
+ // you can also use resp.stream() as well
+ for (ClickHouseRecord record : resp.records()) {
+ int id = record.getValue(0).asInteger();
+ String name = record.getValue(1).asString();
+ }
+
+ ClickHouseResponseSummary summary = resp.getSummary();
+ long totalRows = summary.getRows();
}
- ClickHouseResponseSummary summary = resp.getSummary();
- long totalRows = summary.getRows();
-}
+ // load data with custom writer
+ ClickHouseClient.load(server, "target_table", ClickHouseFormat.TabSeparated,
+ ClickHouseCompression.NONE, new ClickHouseWriter() {
+ @Override
+ public void write(OutputStream output) throws IOException {
+ output.write("1\t\\N\n".getBytes());
+ output.write("2\t123".getBytes());
+ }
+ }).get();
+ ```
+
-// load data with custom writer
-ClickHouseClient.load(server, "target_table", ClickHouseFormat.TabSeparated,
- ClickHouseCompression.NONE, new ClickHouseWriter() {
- @Override
- public void write(OutputStream output) throws IOException {
- output.write("1\t\\N\n".getBytes());
- output.write("2\t123".getBytes());
- }
- }).get();
-```
### JDBC Driver
```xml
-
- ru.yandex.clickhouse
+
+ com.clickhouse
clickhouse-jdbc
0.3.2
+
+ http
+
+
+ *
+ *
+
+
```
-URL syntax: `jdbc:clickhouse://:[/[?param1=value1¶m2=value2]]`, e.g. `jdbc:clickhouse://localhost:8123/test?socket_timeout=120000`
+URL Syntax: `jdbc:(clickhouse|ch)[:(grpc|http)]://:[][/[?param1=value1¶m2=value2]]`
+ - `jdbc:ch:grpc://localhost` is same as `jdbc:clickhouse:grpc://localhost:9100`
+ - `jdbc:ch://localhost/test?socket_timeout=120000` is same as `jdbc:clickhouse:http://localhost:8123/test?socket_timeout=120000`
-JDBC Driver Class: `ru.yandex.clickhouse.ClickHouseDriver` (will be changed to `com.clickhouse.jdbc.ClickHouseDriver` starting from 0.4.0)
+JDBC Driver Class: `com.clickhouse.jdbc.ClickHouseDriver` (will remove `ru.yandex.clickhouse.ClickHouseDriver` starting from 0.4.0)
For example:
```java
-String url = "jdbc:clickhouse://localhost:8123/test";
-ClickHouseProperties properties = new ClickHouseProperties();
+String url = "jdbc:ch://localhost/test";
+Properties properties = new Properties();
// set connection options - see more defined in ClickHouseConnectionSettings
properties.setClientName("Agent #1");
...
@@ -96,13 +106,11 @@ additionalDBParams.put(ClickHouseQueryParam.SESSION_ID, "new-session-id");
...
try (ClickHouseConnection conn = dataSource.getConnection();
ClickHouseStatement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(sql, additionalDBParams)) {
+ ResultSet rs = stmt.executeQuery(sql)) {
...
}
```
-Additionally, if you have a few instances, you can use `BalancedClickhouseDataSource`.
-
### Extended API
In order to provide non-JDBC complaint data manipulation functionality, proprietary API exists.
@@ -174,7 +182,7 @@ Java 8 or higher is required in order to use Java client and/or JDBC driver.
| \*String | Y | Y | |
| UUID | Y | Y | |
| AggregatedFunction | N | N | Partially supported |
-| Array | Y | N | |
+| Array | Y | Y | |
| Map | Y | Y | |
| Nested | Y | N | |
| Tuple | Y | N | |
diff --git a/clickhouse-benchmark/pom.xml b/clickhouse-benchmark/pom.xml
index fe975208c..18d27c64c 100644
--- a/clickhouse-benchmark/pom.xml
+++ b/clickhouse-benchmark/pom.xml
@@ -17,7 +17,7 @@
1.4.4
- 2.6.0
+ 2.6.1
UTF-8
1.33
benchmarks
@@ -138,7 +138,7 @@
*:*
- module-info.class
+ **/module-info.class
META-INF/MANIFEST.MF
META-INF/*.SF
META-INF/*.DSA
diff --git a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/ConsumeValueFunction.java b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/ConsumeValueFunction.java
index e5b3f5a36..5a390d360 100644
--- a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/ConsumeValueFunction.java
+++ b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/ConsumeValueFunction.java
@@ -7,5 +7,5 @@
@FunctionalInterface
public interface ConsumeValueFunction {
- void consume(Blackhole blackhole, ResultSet rs, int columnIndex) throws SQLException;
+ void consume(Blackhole blackhole, ResultSet rs, int rowIndex, int columnIndex) throws SQLException;
}
diff --git a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/DriverState.java b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/DriverState.java
index 681695f48..618f382f3 100644
--- a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/DriverState.java
+++ b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/DriverState.java
@@ -15,6 +15,7 @@
import com.clickhouse.benchmark.BaseState;
import com.clickhouse.benchmark.Constants;
import com.clickhouse.benchmark.ServerState;
+// import com.github.housepower.settings.ClickHouseDefines;
@State(Scope.Thread)
public class DriverState extends BaseState {
@@ -49,6 +50,8 @@ public void doSetup(ServerState serverState) throws Exception {
url = String.format(jdbcDriver.getUrlTemplate(), serverState.getHost(),
serverState.getPort(jdbcDriver.getDefaultPort()), serverState.getDatabase(), serverState.getUser(),
serverState.getPassword(), compression);
+ // ClickHouseDefines.WRITE_COMPRESS = false;
+ // ClickHouseDefines.READ_DECOMPRESS = Boolean.parseBoolean(compression);
conn = driver.connect(url, new Properties());
try (Statement s = conn.createStatement()) {
@@ -126,11 +129,11 @@ public boolean usePreparedStatement() {
public ConsumeValueFunction getConsumeFunction(ConsumeValueFunction defaultFunc) {
if ("string".equals(type)) {
- return (b, r, i) -> b.consume(r.getString(i));
+ return (b, r, l, i) -> b.consume(r.getString(i));
} else if ("object".equals(type)) {
- return (b, r, i) -> b.consume(r.getObject(i));
+ return (b, r, l, i) -> b.consume(r.getObject(i));
} else if (defaultFunc == null) {
- return (b, r, i) -> b.consume(i);
+ return (b, r, l, i) -> b.consume(i);
} else {
return defaultFunc;
}
diff --git a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/Query.java b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/Query.java
index 8436ae4cb..ef5f49eb2 100644
--- a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/Query.java
+++ b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/Query.java
@@ -2,157 +2,220 @@
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.Locale;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.infra.Blackhole;
public class Query extends DriverBenchmark {
@Benchmark
- public void selectArrayOfInts(Blackhole blackhole, DriverState state) throws Throwable {
+ public void selectArrayOfUInt16(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getArray(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getArray(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state,
"select range(100, number % 600) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
- public void selectMapOfInts(Blackhole blackhole, DriverState state) throws Throwable {
+ public void selectMapOfInt32(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getObject(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getObject(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state,
"select cast((arrayMap(x->x+1000, range(1, number % 100)), arrayMap(x->x+10000, range(1, number %100))) as Map(Int32, Int32)) as v from numbers(?)",
rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
- public void selectTupleOfInts(Blackhole blackhole, DriverState state) throws Throwable {
+ public void selectTupleOfInt16(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getArray(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getObject(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state,
- "select tuple(range(100, number % 600)) as v from numbers(?)", rows)) {
+ "select tuple(arrayMap(x -> cast(x as Int16), range(100, number % 600))) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectDateTime32(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getTimestamp(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getTimestamp(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state,
"select toDateTime32(1613826920 + number) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectDateTime64(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getTimestamp(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getTimestamp(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state,
"select toDateTime64(1613826920 + number / 1000000000, 9) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectInt8(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getByte(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getByte(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state, "select toInt8(number % 256) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectUInt8(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getShort(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getShort(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state, "select toUInt8(number % 256) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
+ }
+ }
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
+ }
+
+ @Benchmark
+ public void selectUuid(Blackhole blackhole, DriverState state) throws Throwable {
+ int num = state.getRandomNumber();
+ int rows = state.getSampleSize() + num;
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getString(i)));
+ int l = 0;
+ try (Statement stmt = executeQuery(state, "select generateUUIDv4() as v from numbers(?)", rows)) {
+ ResultSet rs = stmt.getResultSet();
+ while (rs.next()) {
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectInt32(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getInt(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getInt(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state, "select toInt32(number) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectString(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getString(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getString(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state, "select toString(number/3) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectUInt64(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getLong(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getLong(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state, "select number as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
@Benchmark
public void selectDecimal64(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
- ConsumeValueFunction func = state.getConsumeFunction((b, r, i) -> b.consume(r.getBigDecimal(i)));
+ ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getBigDecimal(i)));
+ int l = 0;
try (Statement stmt = executeQuery(state, "select toDecimal64(number + number / 10000, 4) as v from numbers(?)",
rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
- func.consume(blackhole, rs, 1);
+ func.consume(blackhole, rs, l++, 1);
}
}
+ if (l != rows) {
+ throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
+ }
}
}
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java
index 4008a86cf..01c191d4e 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java
@@ -97,7 +97,7 @@ public ClickHouseClient build() {
boolean noSelector = nodeSelector == null || nodeSelector == ClickHouseNodeSelector.EMPTY;
int counter = 0;
- for (ClickHouseClient c : ServiceLoader.load(ClickHouseClient.class)) {
+ for (ClickHouseClient c : ServiceLoader.load(ClickHouseClient.class, getClass().getClassLoader())) {
counter++;
if (noSelector || nodeSelector.match(c)) {
client = c;
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java
index 1f4601b50..76c83d976 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java
@@ -3,24 +3,190 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* Extended input stream.
*/
public abstract class ClickHouseInputStream extends InputStream {
- static final class SimpleInputStream extends ClickHouseInputStream {
- private final InputStream in;
+ /**
+ * Empty and read-only byte buffer.
+ */
+ public static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]).asReadOnlyBuffer();
+ static final class BlockingInputStream extends ClickHouseInputStream {
+ private final BlockingQueue queue;
+ private final int timeout;
+
+ // too much to maintain a 2-level buffer for reading?
+ private ByteBuffer buffer;
private boolean closed;
- protected SimpleInputStream(InputStream input) {
- this.in = ClickHouseChecker.nonNull(input, "InputStream");
+ BlockingInputStream(BlockingQueue queue, int timeout) {
+ this.queue = queue;
+ this.timeout = timeout;
+
+ this.buffer = null;
this.closed = false;
}
+ private void ensureOpen() throws IOException {
+ if (closed) {
+ throw new IOException("Stream has been closed");
+ }
+
+ if (buffer == null || (buffer != EMPTY && !buffer.hasRemaining())) {
+ updateBuffer();
+ }
+ }
+
+ private int updateBuffer() throws IOException {
+ try {
+ if (timeout > 0) {
+ buffer = queue.poll(timeout, TimeUnit.MILLISECONDS);
+ if (buffer == null) {
+ throw new IOException(ClickHouseUtils.format("Read timed out after %d ms", timeout));
+ }
+ } else {
+ buffer = queue.take();
+ }
+
+ return buffer.remaining();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Thread was interrupted when getting next buffer from queue", e);
+ }
+ }
+
@Override
public int available() throws IOException {
- return in.available();
+ ensureOpen();
+
+ return buffer.remaining();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // it's caller's responsiblity to consume all data in the queue, which will
+ // unblock writer
+ closed = true;
+ buffer = null;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ ensureOpen();
+
+ if (buffer == EMPTY) {
+ close();
+ throw new EOFException();
+ }
+
+ return buffer.get();
+ }
+
+ @Override
+ public int read() throws IOException {
+ ensureOpen();
+
+ if (buffer == EMPTY) {
+ return -1;
+ }
+
+ return 0xFF & buffer.get();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ ensureOpen();
+
+ int counter = 0;
+ while (len > 0) {
+ if (buffer == EMPTY) {
+ return counter > 0 ? counter : -1;
+ }
+
+ int remain = buffer.remaining();
+ if (remain >= len) {
+ buffer.get(b, off, len);
+ counter += len;
+ len = 0;
+ } else {
+ buffer.get(b, off, remain);
+ counter += remain;
+ off += remain;
+ len -= remain;
+
+ updateBuffer();
+ }
+ }
+
+ return counter;
+ }
+
+ @Override
+ public String readString(int byteLength, Charset charset) throws IOException {
+ ensureOpen();
+
+ if (byteLength < 1) {
+ return "";
+ }
+
+ if (charset == null) {
+ charset = StandardCharsets.UTF_8;
+ }
+
+ if (!buffer.isReadOnly() && byteLength > 8 && buffer.remaining() > byteLength) {
+ int pos = buffer.position();
+ ((Buffer) buffer).position(pos + byteLength);
+ return charset.decode(ByteBuffer.wrap(buffer.array(), pos, byteLength)).toString();
+ }
+
+ return new String(readBytes(byteLength), charset);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ ensureOpen();
+
+ // peforms better but this is a bit tricky
+ if (n == Long.MAX_VALUE) {
+ long counter = buffer.remaining();
+ while (buffer != EMPTY && buffer.limit() > 0) {
+ counter += buffer.limit();
+ updateBuffer();
+ }
+
+ return counter;
+ }
+
+ return super.skip(n);
+ }
+ }
+
+ static final class WrappedInputStream extends ClickHouseInputStream {
+ private final InputStream in;
+
+ private boolean closed;
+
+ WrappedInputStream(InputStream input) {
+ in = ClickHouseChecker.nonNull(input, "InputStream");
+ closed = false;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return !closed ? in.available() : 0;
}
@Override
@@ -41,8 +207,11 @@ public boolean isClosed() {
@Override
public void close() throws IOException {
- in.close();
- closed = true;
+ try {
+ in.close();
+ } finally {
+ closed = true;
+ }
}
@Override
@@ -61,8 +230,25 @@ public long skip(long n) throws IOException {
}
}
+ /**
+ * Wraps the given blocking queue.
+ *
+ * @param queue non-null blocking queue
+ * @param timeout read timeout in milliseconds
+ * @return wrapped input
+ */
+ public static ClickHouseInputStream of(BlockingQueue queue, int timeout) {
+ return new BlockingInputStream(ClickHouseChecker.nonNull(queue, "queue"), timeout);
+ }
+
+ /**
+ * Wraps the given input stream.
+ *
+ * @param input non-null input stream
+ * @return wrapped input
+ */
public static ClickHouseInputStream of(InputStream input) {
- return input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input : new SimpleInputStream(input);
+ return input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input : new WrappedInputStream(input);
}
/**
@@ -77,7 +263,7 @@ public int readUnsignedByte() throws IOException {
}
/**
- * Reads one single byte from the input stream. This is faster than
+ * Reads one single byte from the input stream. It's supposed to be faster than
* {@link #read()}.
*
* @return byte value if present
@@ -86,6 +272,142 @@ public int readUnsignedByte() throws IOException {
*/
public abstract byte readByte() throws IOException;
+ /**
+ * Reads {@code length} bytes from the input stream. It behaves in the same
+ * way as {@link java.io.DataInput#readFully(byte[])}.
+ *
+ * @param length number of bytes to read
+ * @return byte array and its length should be {@code length}
+ * @throws IOException when failed to read value from input stream, not able to
+ * retrieve all bytes, or reached end of the stream
+ */
+ public byte[] readBytes(int length) throws IOException {
+ byte[] bytes = new byte[length];
+
+ for (int l = length, c = 0, n = 0; l > 0; l -= n) {
+ n = read(bytes, c, l);
+ if (n != -1) {
+ c += n;
+ } else {
+ try {
+ close();
+ } catch (IOException e) {
+ // ignore
+ }
+
+ throw c == 0 ? new EOFException()
+ : new IOException(ClickHouseUtils
+ .format("Reached end of input stream after reading %d of %d bytes", c, length));
+ }
+ }
+
+ return bytes;
+ }
+
+ /**
+ * Reads string from the input stream. {@link #readVarInt()} will be called
+ * automatically to understand byte length of the string.
+ *
+ * @param charset charset, null is treated as {@link StandardCharsets#UTF_8}
+ * @return non-null string
+ * @throws IOException when failed to read value from input stream, not able to
+ * retrieve all bytes, or reached end of the stream
+ */
+ public String readString(Charset charset) throws IOException {
+ return readString(readVarInt(), charset);
+ }
+
+ /**
+ * Reads string from the input stream. When {@code byteLength} is zero or
+ * negative number, this method will always return empty string.
+ *
+ * @param byteLength length in byte
+ * @param charset charset, null is treated as {@link StandardCharsets#UTF_8}
+ * @return non-null string
+ * @throws IOException when failed to read value from input stream, not able to
+ * retrieve all bytes, or reached end of the stream
+ */
+ public String readString(int byteLength, Charset charset) throws IOException {
+ if (byteLength < 1) {
+ return "";
+ }
+
+ return new String(readBytes(byteLength), charset != null ? charset : StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Reads ascii string from input stream. {@link #readVarInt()} will be called
+ * automatically to understand byte length of the string.
+ *
+ * @return non-null ascii string
+ * @throws IOException when failed to read value from input stream, not able to
+ * retrieve all bytes, or reached end of the stream
+ */
+ public String readAsciiString() throws IOException {
+ return readString(readVarInt(), StandardCharsets.US_ASCII);
+ }
+
+ /**
+ * Reads ascii string from input stream. Similar as
+ * {@code readString(byteLength, StandardCharsets.US_ASCII)}.
+ *
+ * @param byteLength length in byte
+ * @return non-null ascii string
+ * @throws IOException when failed to read value from input stream, not able to
+ * retrieve all bytes, or reached end of the stream
+ */
+ public String readAsciiString(int byteLength) throws IOException {
+ return readString(byteLength, StandardCharsets.US_ASCII);
+ }
+
+ /**
+ * Reads unicode string from input stream.
+ *
+ * @return non-null unicode string
+ * @throws IOException when failed to read value from input stream, not able to
+ * retrieve all bytes, or reached end of the stream
+ */
+ public String readUnicodeString() throws IOException {
+ return readString(readVarInt(), StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Reads unicode string from input stream. Similar as
+ * {@code readString(byteLength, null)}.
+ *
+ * @param byteLength length in byte
+ * @return non-null unicode string
+ * @throws IOException when failed to read value from input stream, not able to
+ * retrieve all bytes, or reached end of the stream
+ */
+ public String readUnicodeString(int byteLength) throws IOException {
+ return readString(byteLength, StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Read varint from input stream.
+ *
+ * @return varint
+ * @throws IOException when failed to read value from input stream or reached
+ * end of the stream
+ */
+ public int readVarInt() throws IOException {
+ // https://github.com/ClickHouse/ClickHouse/blob/abe314feecd1647d7c2b952a25da7abf5c19f352/src/IO/VarInt.h#L126
+ long result = 0L;
+ int shift = 0;
+ for (int i = 0; i < 9; i++) {
+ // gets 7 bits from next byte
+ byte b = readByte();
+ result |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ }
+
+ return (int) result;
+ }
+
/**
* Checks if the input stream has been closed or not.
*
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseParameterizedQuery.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseParameterizedQuery.java
index 51278a7da..fd29077a0 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseParameterizedQuery.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseParameterizedQuery.java
@@ -2,16 +2,14 @@
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
/**
* A parameterized query is a parsed query with parameters being extracted for
@@ -30,17 +28,23 @@ public class ClickHouseParameterizedQuery implements Serializable {
* A part of query.
*/
protected static class QueryPart implements Serializable {
- protected final String part;
- protected final int paramIndex;
- protected final String paramName;
- protected final ClickHouseColumn paramType;
+ public final String part;
+ public final int paramIndex;
+ public final String paramName;
+ public final ClickHouseColumn paramType;
- protected QueryPart(String part, int paramIndex, String paramName, String paramType) {
+ protected QueryPart(String part, int paramIndex, String paramName, String paramType,
+ Map map) {
this.part = part;
this.paramIndex = paramIndex;
this.paramName = paramName != null ? paramName : String.valueOf(paramIndex);
- // what should be default? ClickHouseAnyValue(simply convert object to string)?
- this.paramType = paramType != null ? ClickHouseColumn.of("", paramType) : null;
+ if (paramType != null) {
+ this.paramType = ClickHouseColumn.of("", paramType);
+ map.put(paramName, ClickHouseValues.newValue(this.paramType));
+ } else {
+ this.paramType = null;
+ map.putIfAbsent(paramName, null);
+ }
}
@Override
@@ -74,7 +78,8 @@ public boolean equals(Object obj) {
* Substitute named parameters in given SQL.
*
* @param sql SQL containing named parameters
- * @param params mapping between parameter name and correspoding SQL expression
+ * @param params mapping between parameter name and correspoding SQL
+ * expression(NOT raw value)
* @return substituted SQL, or the given sql if one of {@code sql} and
* {@code params} is null or empty
*/
@@ -152,7 +157,7 @@ public static ClickHouseParameterizedQuery of(String query) {
protected final String originalQuery;
private final List parts;
- private final Set names;
+ private final Map names;
private final String lastPart;
/**
@@ -164,7 +169,7 @@ protected ClickHouseParameterizedQuery(String query) {
originalQuery = ClickHouseChecker.nonBlank(query, "query");
parts = new LinkedList<>();
- names = new LinkedHashSet<>();
+ names = new LinkedHashMap<>();
lastPart = parse();
}
@@ -193,8 +198,8 @@ protected void addPart(String part, int paramIndex, String paramName, String par
if (paramName == null) {
paramName = String.valueOf(paramIndex);
}
- parts.add(new QueryPart(part, paramIndex, paramName, paramType));
- names.add(paramName);
+
+ parts.add(new QueryPart(part, paramIndex, paramName, paramType, names));
}
/**
@@ -252,12 +257,12 @@ protected String parse() {
if (builder.length() > 0) {
paramName = builder.toString();
- if (names.add(paramName)) {
+ if (!names.containsKey(paramName)) {
paramIndex++;
}
}
- parts.add(new QueryPart(part, paramIndex, paramName, paramType));
+ parts.add(new QueryPart(part, paramIndex, paramName, paramType, names));
}
}
}
@@ -266,6 +271,19 @@ protected String parse() {
return partIndex < len ? originalQuery.substring(partIndex, len) : null;
}
+ /**
+ * Converts given raw value to SQL expression.
+ *
+ * @param paramName name of the parameter
+ * @param value raw value, could be null
+ * @return non-null SQL expression
+ */
+ protected String toSqlExpression(String paramName, Object value) {
+ ClickHouseValue template = names.get(paramName);
+ return template != null ? template.update(value).toSqlExpression()
+ : ClickHouseValues.convertToSqlExpression(value);
+ }
+
/**
* Applies stringified parameters to the query.
*
@@ -341,7 +359,7 @@ public String apply(Object param, Object... more) {
if (index > 0) {
param = index < len ? more[index - 1] : null;
}
- builder.append(ClickHouseValues.convertToSqlExpression(param));
+ builder.append(toSqlExpression(p.paramName, param));
index++;
}
@@ -370,7 +388,7 @@ public String apply(Object[] values) {
for (QueryPart p : parts) {
builder.append(p.part);
builder.append(
- index < len ? ClickHouseValues.convertToSqlExpression(values[index]) : ClickHouseValues.NULL_EXPR);
+ index < len ? toSqlExpression(p.paramName, values[index]) : ClickHouseValues.NULL_EXPR);
index++;
}
@@ -441,8 +459,16 @@ public String apply(String[] values) {
*
* @return list of named parameters
*/
- public List getNamedParameters() {
- return names.isEmpty() ? Collections.emptyList() : Arrays.asList(names.toArray(new String[0]));
+ public List getParameters() {
+ if (names.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List list = new ArrayList<>(names.size());
+ for (String n : names.keySet()) {
+ list.add(n);
+ }
+ return Collections.unmodifiableList(list);
}
/**
@@ -474,6 +500,20 @@ public List getQueryParts() {
return queryParts;
}
+ /**
+ * Gets parameter templates for converting value to SQL expression.
+ *
+ * @return parameter templates
+ */
+ public ClickHouseValue[] getParameterTemplates() {
+ int i = 0;
+ ClickHouseValue[] tempaltes = new ClickHouseValue[names.size()];
+ for (ClickHouseValue v : names.values()) {
+ tempaltes[i++] = v;
+ }
+ return tempaltes;
+ }
+
/**
* Checks if the query has at least one parameter or not.
*
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java
index eb14a9fd1..8e85458ee 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java
@@ -229,14 +229,6 @@ protected ClickHouseClient getClient() {
return client;
}
- protected ClickHouseParameterizedQuery getPreparedQuery() {
- if (preparedQuery == null) {
- preparedQuery = ClickHouseParameterizedQuery.of(getQuery());
- }
-
- return preparedQuery;
- }
-
/**
* Gets query, either set by {@code query()} or {@code table()}.
*
@@ -367,6 +359,20 @@ public Optional getQueryId() {
return ClickHouseChecker.isNullOrEmpty(queryId) ? Optional.empty() : Optional.of(queryId);
}
+ /**
+ * Gets prepared query, which is a loosely parsed query with the origianl query
+ * and list of parameters.
+ *
+ * @return prepared query
+ */
+ public ClickHouseParameterizedQuery getPreparedQuery() {
+ if (preparedQuery == null) {
+ preparedQuery = ClickHouseParameterizedQuery.of(getQuery());
+ }
+
+ return preparedQuery;
+ }
+
/**
* Gets immutable settings.
*
@@ -731,7 +737,7 @@ public SelfT params(Collection values) {
namedParameters.clear();
if (values != null && !values.isEmpty()) {
- List names = getPreparedQuery().getNamedParameters();
+ List names = getPreparedQuery().getParameters();
int size = names.size();
int index = 0;
for (String v : values) {
@@ -763,7 +769,7 @@ public SelfT params(ClickHouseValue value, ClickHouseValue... more) {
namedParameters.clear();
if (value != null) { // it doesn't make sense to pass null as first parameter
- List names = getPreparedQuery().getNamedParameters();
+ List names = getPreparedQuery().getParameters();
int size = names.size();
int index = 0;
@@ -800,7 +806,7 @@ public SelfT params(ClickHouseValue[] values) {
namedParameters.clear();
if (values != null && values.length > 0) {
- List names = getPreparedQuery().getNamedParameters();
+ List names = getPreparedQuery().getParameters();
int size = names.size();
int index = 0;
for (ClickHouseValue v : values) {
@@ -832,7 +838,7 @@ public SelfT params(String value, String... more) {
namedParameters.clear();
- List names = getPreparedQuery().getNamedParameters();
+ List names = getPreparedQuery().getParameters();
int size = names.size();
int index = 0;
namedParameters.put(names.get(index++), value);
@@ -867,7 +873,7 @@ public SelfT params(String[] values) {
namedParameters.clear();
if (values != null && values.length > 0) {
- List names = getPreparedQuery().getNamedParameters();
+ List names = getPreparedQuery().getParameters();
int size = names.size();
int index = 0;
for (String v : values) {
@@ -899,7 +905,7 @@ public SelfT params(Object value, Object... more) {
namedParameters.clear();
- List names = getPreparedQuery().getNamedParameters();
+ List names = getPreparedQuery().getParameters();
int size = names.size();
int index = 0;
namedParameters.put(names.get(index++), ClickHouseValues.convertToSqlExpression(value));
@@ -934,7 +940,7 @@ public SelfT params(Object[] values) {
namedParameters.clear();
if (values != null && values.length > 0) {
- List names = getPreparedQuery().getNamedParameters();
+ List names = getPreparedQuery().getParameters();
int size = names.size();
int index = 0;
for (Object v : values) {
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java
index da06b8a4e..7e21e9dbc 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java
@@ -99,7 +99,7 @@ private static T findFirstService(Class extends T> serviceInterface) {
T service = null;
- for (T s : ServiceLoader.load(serviceInterface)) {
+ for (T s : ServiceLoader.load(serviceInterface, ClickHouseUtils.class.getClassLoader())) {
if (s != null) {
service = s;
break;
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseValues.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseValues.java
index 746302f84..6f55d18e1 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseValues.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseValues.java
@@ -1,7 +1,6 @@
package com.clickhouse.client;
import java.io.IOException;
-import java.io.InputStream;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/BinaryStreamUtils.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/BinaryStreamUtils.java
index 49d9941ce..e1725e3f7 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/BinaryStreamUtils.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/BinaryStreamUtils.java
@@ -20,6 +20,7 @@
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.TimeZone;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.clickhouse.client.ClickHouseChecker;
import com.clickhouse.client.ClickHouseDataType;
@@ -86,6 +87,18 @@ private static > T toEnum(int value, Class enumType) {
ClickHouseUtils.format("Enum [%s] does not contain value [%d]", enumType, value));
}
+ public static int toInt32(byte[] bytes, int offset) {
+ return (0xFF & bytes[offset]) | ((0xFF & bytes[offset + 1]) << 8) | ((0xFF & bytes[offset + 2]) << 16)
+ | ((0xFF & bytes[offset + 3]) << 24);
+ }
+
+ public static long toInt64(byte[] bytes, int offset) {
+ return (0xFFL & bytes[offset]) | ((0xFFL & bytes[offset + 1]) << 8) | ((0xFFL & bytes[offset + 2]) << 16)
+ | ((0xFFL & bytes[offset + 3]) << 24) | ((0xFFL & bytes[offset + 4]) << 32)
+ | ((0xFFL & bytes[offset + 5]) << 40) | ((0xFFL & bytes[offset + 6]) << 48)
+ | ((0xFFL & bytes[offset + 7]) << 56);
+ }
+
/**
* Reverse the given byte array.
*
@@ -93,11 +106,13 @@ private static > T toEnum(int value, Class enumType) {
* @return same byte array but reserved
*/
public static byte[] reverse(byte[] bytes) {
- if (bytes != null && bytes.length > 1) {
- for (int i = 0, len = bytes.length / 2; i < len; i++) {
+ int l = bytes != null ? bytes.length : 0;
+ if (l > 1) {
+ for (int i = 0, len = l / 2; i < len; i++) {
byte b = bytes[i];
- bytes[i] = bytes[bytes.length - 1 - i];
- bytes[bytes.length - 1 - i] = b;
+ --l;
+ bytes[i] = bytes[l];
+ bytes[l] = b;
}
}
@@ -136,38 +151,6 @@ public static int getVarLongSize(long value) {
return result;
}
- /**
- * Reads {@code length} bytes from given input stream. It behaves in the same
- * way as {@link java.io.DataInput#readFully(byte[])}.
- *
- * @param input non-null input stream
- * @param length number of bytes to read
- * @return byte array and its length should be {@code length}
- * @throws IOException when failed to read value from input stream, not able to
- * retrieve all bytes, or reached end of the stream
- */
- public static byte[] readBytes(ClickHouseInputStream input, int length) throws IOException {
- int count = 0;
- byte[] bytes = new byte[length];
- while (count < length) {
- int n = input.read(bytes, count, length - count);
- if (n < 0) {
- try {
- input.close();
- } catch (IOException e) {
- // ignore
- }
-
- throw count == 0 ? new EOFException()
- : new IOException(ClickHouseUtils
- .format("Reached end of input stream after reading %d of %d bytes", count, length));
- }
- count += n;
- }
-
- return bytes;
- }
-
/**
* Writes bytes into given output stream.
*
@@ -579,7 +562,7 @@ public static void writeNonNull(OutputStream output) throws IOException {
* end of the stream
*/
public static Inet4Address readInet4Address(ClickHouseInputStream input) throws IOException {
- return (Inet4Address) InetAddress.getByAddress(reverse(readBytes(input, 4)));
+ return (Inet4Address) InetAddress.getByAddress(reverse(input.readBytes(4)));
}
/**
@@ -603,7 +586,7 @@ public static void writeInet4Address(OutputStream output, Inet4Address value) th
* end of the stream
*/
public static Inet6Address readInet6Address(ClickHouseInputStream input) throws IOException {
- return Inet6Address.getByAddress(null, readBytes(input, 16), null);
+ return Inet6Address.getByAddress(null, input.readBytes(16), null);
}
/**
@@ -688,8 +671,7 @@ public static void writeUnsignedInt8(OutputStream output, int value) throws IOEx
* end of the stream
*/
public static short readInt16(ClickHouseInputStream input) throws IOException {
- byte[] bytes = readBytes(input, 2);
- return (short) ((0xFF & bytes[0]) | (bytes[1] << 8));
+ return (short) (input.readUnsignedByte() | (input.readByte() << 8));
}
/**
@@ -751,9 +733,8 @@ public static void writeUnsignedInt16(OutputStream output, int value) throws IOE
* end of the stream
*/
public static int readInt32(ClickHouseInputStream input) throws IOException {
- byte[] bytes = readBytes(input, 4);
-
- return (0xFF & bytes[0]) | ((0xFF & bytes[1]) << 8) | ((0xFF & bytes[2]) << 16) | (bytes[3] << 24);
+ return input.readUnsignedByte() | (input.readUnsignedByte() << 8) | (input.readUnsignedByte() << 16)
+ | (input.readByte() << 24);
}
/**
@@ -803,11 +784,7 @@ public static void writeUnsignedInt32(OutputStream output, long value) throws IO
* end of the stream
*/
public static long readInt64(ClickHouseInputStream input) throws IOException {
- byte[] bytes = readBytes(input, 8);
-
- return (0xFFL & bytes[0]) | ((0xFFL & bytes[1]) << 8) | ((0xFFL & bytes[2]) << 16) | ((0xFFL & bytes[3]) << 24)
- | ((0xFFL & bytes[4]) << 32) | ((0xFFL & bytes[5]) << 40) | ((0xFFL & bytes[6]) << 48)
- | ((0xFFL & bytes[7]) << 56);
+ return toInt64(input.readBytes(8), 0);
}
/**
@@ -839,7 +816,7 @@ public static void writeInt64(OutputStream output, long value) throws IOExceptio
* end of the stream
*/
public static BigInteger readUnsignedInt64(ClickHouseInputStream input) throws IOException {
- return new BigInteger(1, reverse(readBytes(input, 8)));
+ return new BigInteger(1, reverse(input.readBytes(8)));
}
/**
@@ -879,7 +856,7 @@ public static void writeUnsignedInt64(OutputStream output, BigInteger value) thr
* end of the stream
*/
public static BigInteger readInt128(ClickHouseInputStream input) throws IOException {
- return new BigInteger(reverse(readBytes(input, 16)));
+ return new BigInteger(reverse(input.readBytes(16)));
}
/**
@@ -903,7 +880,7 @@ public static void writeInt128(OutputStream output, BigInteger value) throws IOE
* end of the stream
*/
public static BigInteger readUnsignedInt128(ClickHouseInputStream input) throws IOException {
- return new BigInteger(1, reverse(readBytes(input, 16)));
+ return new BigInteger(1, reverse(input.readBytes(16)));
}
/**
@@ -928,7 +905,7 @@ public static void writeUnsignedInt128(OutputStream output, BigInteger value) th
* end of the stream
*/
public static BigInteger readInt256(ClickHouseInputStream input) throws IOException {
- return new BigInteger(reverse(readBytes(input, 32)));
+ return new BigInteger(reverse(input.readBytes(32)));
}
/**
@@ -952,7 +929,7 @@ public static void writeInt256(OutputStream output, BigInteger value) throws IOE
* end of the stream
*/
public static BigInteger readUnsignedInt256(ClickHouseInputStream input) throws IOException {
- return new BigInteger(1, reverse(readBytes(input, 32)));
+ return new BigInteger(1, reverse(input.readBytes(32)));
}
/**
@@ -1024,8 +1001,9 @@ public static void writeFloat64(OutputStream output, double value) throws IOExce
* @throws IOException when failed to read value from input stream or reached
* end of the stream
*/
- public static java.util.UUID readUuid(ClickHouseInputStream input) throws IOException {
- return new java.util.UUID(readInt64(input), readInt64(input));
+ public static UUID readUuid(ClickHouseInputStream input) throws IOException {
+ byte[] bytes = input.readBytes(16);
+ return new UUID(toInt64(bytes, 0), toInt64(bytes, 8));
}
/**
@@ -1517,7 +1495,7 @@ public static String readFixedString(ClickHouseInputStream input, int length) th
* end of the stream
*/
public static String readFixedString(ClickHouseInputStream input, int length, Charset charset) throws IOException {
- byte[] bytes = readBytes(input, length);
+ byte[] bytes = input.readBytes(length);
return new String(bytes, charset == null ? StandardCharsets.UTF_8 : charset);
}
@@ -1556,45 +1534,6 @@ public static void writeFixedString(OutputStream output, String value, int lengt
output.write(bytes);
}
- /**
- * Read string from given input stream.
- *
- * @param input non-null input stream
- * @return string value
- * @throws IOException when failed to read value from input stream or reached
- * end of the stream
- */
- public static String readString(ClickHouseInputStream input) throws IOException {
- return readString(input, readVarInt(input), null);
- }
-
- /**
- * Read string from given input stream.
- *
- * @param input non-null input stream
- * @param charset charset used to convert byte array to string, null means UTF-8
- * @return string value
- * @throws IOException when failed to read value from input stream or reached
- * end of the stream
- */
- public static String readString(ClickHouseInputStream input, Charset charset) throws IOException {
- return readString(input, readVarInt(input), charset);
- }
-
- /**
- * Reads fixed string from given input stream.
- *
- * @param input non-null input stream
- * @param length length in byte
- * @param charset charset used to convert byte array to string, null means UTF-8
- * @return string value
- * @throws IOException when failed to read value from input stream or reached
- * end of the stream
- */
- public static String readString(ClickHouseInputStream input, int length, Charset charset) throws IOException {
- return new String(readBytes(input, length), charset == null ? StandardCharsets.UTF_8 : charset);
- }
-
/**
* Reads characters from given reader.
*
@@ -1644,23 +1583,6 @@ public static void writeString(OutputStream output, String value, Charset charse
* @throws IOException when failed to read value from input stream or reached
* end of the stream
*/
- public static int readVarInt(ClickHouseInputStream input) throws IOException {
- // https://github.com/ClickHouse/ClickHouse/blob/abe314feecd1647d7c2b952a25da7abf5c19f352/src/IO/VarInt.h#L126
- long result = 0L;
- int shift = 0;
- for (int i = 0; i < 9; i++) {
- // gets 7 bits from next byte
- int b = input.readUnsignedByte();
- result |= (b & 0x7F) << shift;
- if ((b & 0x80) == 0) {
- break;
- }
- shift += 7;
- }
-
- return (int) result;
- }
-
public static int readVarInt(InputStream input) throws IOException {
// https://github.com/ClickHouse/ClickHouse/blob/abe314feecd1647d7c2b952a25da7abf5c19f352/src/IO/VarInt.h#L126
long result = 0L;
@@ -1697,7 +1619,7 @@ public static int readVarInt(ByteBuffer buffer) {
int shift = 0;
for (int i = 0; i < 9; i++) {
// gets 7 bits from next byte
- int b = buffer.get();
+ byte b = buffer.get();
result |= (b & 0x7F) << shift;
if ((b & 0x80) == 0) {
break;
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseByteValue.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseByteValue.java
index 3e9717612..12005dad5 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseByteValue.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseByteValue.java
@@ -84,7 +84,7 @@ protected ClickHouseByteValue(boolean isNull, byte value) {
protected ClickHouseByteValue set(boolean isNull, byte value) {
this.isNull = isNull;
- this.value = isNull ? 0 : value;
+ this.value = isNull ? (byte) 0 : value;
return this;
}
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java
index e7af11102..5f40235b5 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java
@@ -21,6 +21,7 @@ public static class Builder {
private CompletableFuture content;
private ClickHouseFormat format;
private List columns;
+ private boolean asTempTable;
protected Builder() {
columns = new LinkedList<>();
@@ -88,8 +89,18 @@ public Builder columns(Collection columns) {
return this;
}
+ public Builder asTempTable() {
+ asTempTable = true;
+ return this;
+ }
+
+ public Builder asExternalTable() {
+ asTempTable = false;
+ return this;
+ }
+
public ClickHouseExternalTable build() {
- return new ClickHouseExternalTable(name, content, format, columns);
+ return new ClickHouseExternalTable(name, content, format, columns, asTempTable);
}
}
@@ -101,11 +112,12 @@ public static Builder builder() {
private final CompletableFuture content;
private final ClickHouseFormat format;
private final List columns;
+ private final boolean asTempTable;
private final String structure;
protected ClickHouseExternalTable(String name, CompletableFuture content, ClickHouseFormat format,
- Collection columns) {
+ Collection columns, boolean asTempTable) {
this.name = name == null ? "" : name.trim();
this.content = ClickHouseChecker.nonNull(content, "content");
this.format = format == null ? ClickHouseFormat.TabSeparated : format;
@@ -124,6 +136,8 @@ protected ClickHouseExternalTable(String name, CompletableFuture co
this.columns = Collections.unmodifiableList(list);
this.structure = builder.deleteCharAt(builder.length() - 1).toString();
}
+
+ this.asTempTable = asTempTable;
}
public boolean hasName() {
@@ -153,6 +167,10 @@ public List getColumns() {
return columns;
}
+ public boolean isTempTable() {
+ return asTempTable;
+ }
+
public String getStructure() {
return structure;
}
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java
index 6bb62f356..d563f8096 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java
@@ -3,8 +3,13 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import com.clickhouse.client.ClickHouseChecker;
+import com.clickhouse.client.ClickHouseInputStream;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
@@ -12,15 +17,63 @@
/**
* Reader from clickhouse in lz4.
*/
-public class ClickHouseLZ4InputStream extends InputStream {
+public class ClickHouseLZ4InputStream extends ClickHouseInputStream {
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
static final int MAGIC = 0x82;
private final InputStream stream;
- private byte[] currentBlock;
- private int pointer;
+ private ByteBuffer currentBlock;
+ private boolean closed;
+
+ private boolean checkNext() throws IOException {
+ if (currentBlock == null || !currentBlock.hasRemaining()) {
+ currentBlock = readNextBlock();
+ }
+ return currentBlock != null;
+ }
+
+ // every block is:
+ private ByteBuffer readNextBlock() throws IOException {
+ int read = stream.read();
+ if (read < 0) {
+ return null;
+ }
+
+ byte[] bytes = new byte[16];
+ bytes[0] = (byte) read;
+ // checksum - 16 bytes.
+ readFully(bytes, 1, 15);
+ ClickHouseBlockChecksum expected = ClickHouseBlockChecksum.fromBytes(bytes);
+ // header:
+ // 1 byte - 0x82 (shows this is LZ4)
+ int magic = readUnsignedByteFromInput();
+ if (magic != MAGIC) {
+ throw new IOException("Magic is not correct: " + magic);
+ }
+
+ readFully(bytes, 0, 8);
+ // 4 bytes - size of the compressed data including 9 bytes of the header
+ int compressedSizeWithHeader = BinaryStreamUtils.toInt32(bytes, 0);
+ // 4 bytes - size of uncompressed data
+ int uncompressedSize = BinaryStreamUtils.toInt32(bytes, 4);
+ int compressedSize = compressedSizeWithHeader - 9; // header
+ byte[] block = new byte[compressedSize];
+ // compressed data: compressed_size - 9 байт.
+ readFully(block, 0, block.length);
+
+ ClickHouseBlockChecksum real = ClickHouseBlockChecksum.calculateForBlock((byte) magic, compressedSizeWithHeader,
+ uncompressedSize, block, compressedSize);
+ if (!real.equals(expected)) {
+ throw new IllegalArgumentException("Checksum doesn't match: corrupted data.");
+ }
+
+ byte[] decompressed = new byte[uncompressedSize];
+ LZ4FastDecompressor decompressor = factory.fastDecompressor();
+ decompressor.decompress(block, 0, decompressed, 0, uncompressedSize);
+ return ByteBuffer.wrap(decompressed);
+ }
private void readFully(byte b[], int off, int len) throws IOException {
if (len < 0) {
@@ -30,15 +83,25 @@ private void readFully(byte b[], int off, int len) throws IOException {
while (n < len) {
int count = stream.read(b, off + n, len - n);
if (count < 0) {
+ try {
+ close();
+ } catch (IOException e) {
+ // ignore
+ }
throw new EOFException();
}
n += count;
}
}
- private int readUnsignedByte() throws IOException {
+ private int readUnsignedByteFromInput() throws IOException {
int ch = stream.read();
if (ch < 0) {
+ try {
+ close();
+ } catch (IOException e) {
+ // ignore
+ }
throw new EOFException();
}
return ch;
@@ -46,20 +109,39 @@ private int readUnsignedByte() throws IOException {
public ClickHouseLZ4InputStream(InputStream stream) {
this.stream = ClickHouseChecker.nonNull(stream, "InputStream");
+ this.closed = false;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ if (!checkNext()) {
+ try {
+ close();
+ } catch (IOException e) {
+ // ignore
+ }
+ throw new EOFException();
+ }
+
+ return currentBlock.get();
}
@Override
public int available() throws IOException {
+ if (closed) {
+ return 0;
+ }
+
int estimated = stream.available();
- if (estimated == 0 && currentBlock != null) {
- estimated = currentBlock.length - pointer;
+ if (estimated == 0 && checkNext()) {
+ estimated = currentBlock.remaining();
}
return estimated;
}
@Override
public int read() throws IOException {
- return checkNext() ? 0xFF & currentBlock[pointer++] : -1;
+ return checkNext() ? 0xFF & currentBlock.get() : -1;
}
@Override
@@ -72,80 +154,63 @@ public int read(byte[] b, int off, int len) throws IOException {
return 0;
}
- if (!checkNext())
+ if (!checkNext()) {
return -1;
+ }
int copied = 0;
- int targetPointer = off;
while (copied != len) {
- int toCopy = Math.min(currentBlock.length - pointer, len - copied);
- System.arraycopy(currentBlock, pointer, b, targetPointer, toCopy);
- targetPointer += toCopy;
- pointer += toCopy;
+ int toCopy = Math.min(currentBlock.remaining(), len - copied);
+ currentBlock.get(b, off, toCopy);
+ off += toCopy;
copied += toCopy;
- if (!checkNext()) { // finished
+
+ if (!checkNext()) {
break;
}
+
}
+
return copied;
}
@Override
public void close() throws IOException {
- stream.close();
- }
-
- private boolean checkNext() throws IOException {
- if (currentBlock == null || pointer == currentBlock.length) {
- currentBlock = readNextBlock();
- pointer = 0;
+ try {
+ stream.close();
+ } finally {
+ closed = true;
}
- return currentBlock != null && pointer < currentBlock.length;
}
- private int readInt() throws IOException {
- byte b1 = (byte) readUnsignedByte();
- byte b2 = (byte) readUnsignedByte();
- byte b3 = (byte) readUnsignedByte();
- byte b4 = (byte) readUnsignedByte();
-
- return b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
+ @Override
+ public boolean isClosed() {
+ return closed;
}
- // every block is:
- private byte[] readNextBlock() throws IOException {
- int read = stream.read();
- if (read < 0)
- return null;
+ @Override
+ public String readString(int byteLength, Charset charset) throws IOException {
+ if (byteLength < 1) {
+ return "";
+ } else if (!checkNext()) {
+ try {
+ close();
+ } catch (IOException e) {
+ // ignore
+ }
+ throw new EOFException();
+ }
- byte[] checksum = new byte[16];
- checksum[0] = (byte) read;
- // checksum - 16 bytes.
- readFully(checksum, 1, 15);
- ClickHouseBlockChecksum expected = ClickHouseBlockChecksum.fromBytes(checksum);
- // header:
- // 1 byte - 0x82 (shows this is LZ4)
- int magic = readUnsignedByte();
- if (magic != MAGIC)
- throw new IOException("Magic is not correct: " + magic);
- // 4 bytes - size of the compressed data including 9 bytes of the header
- int compressedSizeWithHeader = readInt();
- // 4 bytes - size of uncompressed data
- int uncompressedSize = readInt();
- int compressedSize = compressedSizeWithHeader - 9; // header
- byte[] block = new byte[compressedSize];
- // compressed data: compressed_size - 9 байт.
- readFully(block, 0, block.length);
+ if (charset == null) {
+ charset = StandardCharsets.UTF_8;
+ }
- ClickHouseBlockChecksum real = ClickHouseBlockChecksum.calculateForBlock((byte) magic, compressedSizeWithHeader,
- uncompressedSize, block, compressedSize);
- if (!real.equals(expected)) {
- throw new IllegalArgumentException("Checksum doesn't match: corrupted data.");
+ if (byteLength > 8 && currentBlock.remaining() > byteLength) {
+ int pos = currentBlock.position();
+ ((Buffer) currentBlock).position(pos + byteLength);
+ return charset.decode(ByteBuffer.wrap(currentBlock.array(), pos, byteLength)).toString();
}
- byte[] decompressed = new byte[uncompressedSize];
- LZ4FastDecompressor decompressor = factory.fastDecompressor();
- decompressor.decompress(block, 0, decompressed, 0, uncompressedSize);
- return decompressed;
+ return new String(readBytes(byteLength), charset);
}
}
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java
index fcb5790be..de4ca2f02 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java
@@ -1,6 +1,5 @@
package com.clickhouse.client.data;
-import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.Buffer;
@@ -20,171 +19,6 @@
* reader are on two separate threads.
*/
public class ClickHousePipedStream extends OutputStream {
- protected static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
-
- static class Input extends ClickHouseInputStream {
- private final BlockingQueue queue;
- private final int timeout;
-
- // too much to maintain a 2-level buffer for reading?
- private ByteBuffer buffer;
- private boolean closed;
-
- Input(BlockingQueue queue, int timeout) {
- this.queue = queue;
- this.timeout = timeout;
-
- this.buffer = null;
- this.closed = false;
- }
-
- private void ensureOpen() throws IOException {
- if (this.closed) {
- throw new IOException("Stream has been closed");
- }
-
- if (this.buffer == null) {
- updateBuffer();
- }
- }
-
- private int updateBuffer() throws IOException {
- try {
- if (timeout > 0) {
- buffer = queue.poll(timeout, TimeUnit.MILLISECONDS);
- if (buffer == null) {
- throw new IOException(ClickHouseUtils.format("Read timed out after %d ms", timeout));
- }
- } else {
- buffer = queue.take();
- }
-
- return buffer.remaining();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Thread was interrupted when getting next buffer from queue", e);
- }
- }
-
- @Override
- public int available() throws IOException {
- ensureOpen();
-
- if (buffer == EMPTY || buffer.limit() == 0) {
- return 0;
- }
-
- int available = buffer.remaining();
- if (available == 0) {
- available = updateBuffer();
- }
-
- return available;
- }
-
- @Override
- public boolean isClosed() {
- return this.closed;
- }
-
- @Override
- public void close() throws IOException {
- // it's caller's responsiblity to consume all data in the queue, which will
- // unblock writer
- this.closed = true;
- this.buffer = null;
- }
-
- @Override
- public byte readByte() throws IOException {
- ensureOpen();
-
- if (buffer == EMPTY || buffer.limit() == 0) {
- close();
- throw new EOFException();
- }
-
- if (buffer.hasRemaining()) {
- return buffer.get();
- } else {
- updateBuffer();
-
- return readByte();
- }
- }
-
- @Override
- public int read() throws IOException {
- ensureOpen();
-
- if (buffer == EMPTY || buffer.limit() == 0) {
- return -1;
- }
-
- if (buffer.hasRemaining()) {
- return 0xFF & buffer.get();
- } else {
- updateBuffer();
-
- return read();
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- ensureOpen();
-
- if (buffer == EMPTY || buffer.limit() == 0) {
- return -1;
- }
-
- int counter = 0;
- while (len > 0) {
- if (buffer == EMPTY || buffer.limit() == 0) {
- return counter;
- }
-
- int remain = buffer.remaining();
- if (remain > 0) {
- if (remain >= len) {
- buffer.get(b, off, len);
- counter += len;
- len = 0;
- } else {
- buffer.get(b, off, remain);
- counter += remain;
- off += remain;
- len -= remain;
-
- updateBuffer();
- }
- } else {
- updateBuffer();
- }
- }
-
- return counter;
- }
-
- @Override
- public long skip(long n) throws IOException {
- ensureOpen();
-
- // peforms better but this is a bit tricky
- if (n == Long.MAX_VALUE) {
- long counter = buffer.remaining();
- while (buffer != EMPTY && buffer.limit() > 0) {
- counter += buffer.limit();
- updateBuffer();
- }
-
- return counter;
- }
-
- return super.skip(n);
- }
- }
-
protected final BlockingQueue queue;
private final int bufferSize;
@@ -236,7 +70,7 @@ private void updateBuffer() throws IOException {
}
public ClickHouseInputStream getInput() {
- return new Input(queue, timeout);
+ return ClickHouseInputStream.of(queue, timeout);
}
@Override
@@ -247,7 +81,7 @@ public void close() throws IOException {
flush();
- buffer = EMPTY;
+ buffer = ClickHouseInputStream.EMPTY;
try {
if (timeout > 0) {
if (!queue.offer(buffer, timeout, TimeUnit.MILLISECONDS)) {
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseRowBinaryProcessor.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseRowBinaryProcessor.java
index e768d2ec4..79c2c8294 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseRowBinaryProcessor.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseRowBinaryProcessor.java
@@ -293,7 +293,7 @@ private MappedFunctions() {
c.getPrecision()),
ClickHouseDataType.FixedString);
buildMappings(deserializers, serializers,
- (r, f, c, i) -> ClickHouseStringValue.of(r, BinaryStreamUtils.readString(i)),
+ (r, f, c, i) -> ClickHouseStringValue.of(r, i.readUnicodeString()),
(v, f, c, o) -> BinaryStreamUtils.writeString(o, v.asString()), ClickHouseDataType.String);
buildMappings(deserializers, serializers,
(r, f, c, i) -> ClickHouseUuidValue.of(r, BinaryStreamUtils.readUuid(i)),
@@ -521,7 +521,7 @@ protected List readColumns() throws IOException {
int size = 0;
try {
- size = BinaryStreamUtils.readVarInt(input);
+ size = input.readVarInt();
} catch (EOFException e) {
// no result returned
return Collections.emptyList();
@@ -529,12 +529,13 @@ protected List readColumns() throws IOException {
String[] names = new String[ClickHouseChecker.between(size, "size", 0, Integer.MAX_VALUE)];
for (int i = 0; i < size; i++) {
- names[i] = BinaryStreamUtils.readString(input);
+ names[i] = input.readUnicodeString();
}
List columns = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- columns.add(ClickHouseColumn.of(names[i], BinaryStreamUtils.readString(input)));
+ // a bit risky here - what if ClickHouse support user type?
+ columns.add(ClickHouseColumn.of(names[i], input.readAsciiString()));
}
return columns;
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseShortValue.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseShortValue.java
index 5a8fe9f2a..8efddd6f8 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseShortValue.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseShortValue.java
@@ -84,7 +84,7 @@ protected ClickHouseShortValue(boolean isNull, short value) {
protected ClickHouseShortValue set(boolean isNull, short value) {
this.isNull = isNull;
- this.value = isNull ? 0 : value;
+ this.value = isNull ? (short) 0 : value;
return this;
}
diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseTupleValue.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseTupleValue.java
index a75f6a04b..9694d285d 100644
--- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseTupleValue.java
+++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseTupleValue.java
@@ -81,8 +81,7 @@ public Object[] asArray() {
return ClickHouseValues.EMPTY_OBJECT_ARRAY;
}
- List
@@ -192,10 +192,10 @@
javax/**
okio/**
org/**
+ **/module-info.class
META-INF/MANIFEST.MF
META-INF/maven/**
META-INF/native-image/**
- META-INF/versions/**
@@ -252,11 +252,11 @@
io/grpc/netty/**
javax/**
org/**
+ **/module-info.class
META-INF/MANIFEST.MF
META-INF/maven/**
META-INF/native/**
META-INF/native-image/**
- META-INF/versions/**
diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java
index 779451288..7d79ffe0d 100644
--- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java
+++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java
@@ -57,19 +57,19 @@ protected static Compression getResultCompression(ClickHouseConfig config) {
CompressionAlgorithm algorithm = CompressionAlgorithm.DEFLATE;
CompressionLevel level = CompressionLevel.COMPRESSION_MEDIUM;
switch (config.getDecompressAlgorithmForClientRequest()) {
- case NONE:
- algorithm = CompressionAlgorithm.NO_COMPRESSION;
- break;
- case DEFLATE:
- break;
- case GZIP:
- algorithm = CompressionAlgorithm.GZIP;
- break;
- // case STREAM_GZIP:
- default:
- log.warn("Unsupported algorithm [%s], change to [%s]", config.getDecompressAlgorithmForClientRequest(),
- algorithm);
- break;
+ case NONE:
+ algorithm = CompressionAlgorithm.NO_COMPRESSION;
+ break;
+ case DEFLATE:
+ break;
+ case GZIP:
+ algorithm = CompressionAlgorithm.GZIP;
+ break;
+ // case STREAM_GZIP:
+ default:
+ log.warn("Unsupported algorithm [%s], change to [%s]", config.getDecompressAlgorithmForClientRequest(),
+ algorithm);
+ break;
}
int l = config.getDecompressLevelForClientRequest();
@@ -269,7 +269,8 @@ protected CompletableFuture executeSync(ClickHouseRequest>
sealedRequest.getSettings(), result);
return result.hasException()
- ? failedResponse(ClickHouseException.of(result.getException().getDisplayText(), server))
+ ? failedResponse(new ClickHouseException(result.getException().getCode(),
+ result.getException().getDisplayText(), server))
: CompletableFuture.completedFuture(response);
} catch (IOException e) {
throw new CompletionException(ClickHouseException.of(e, server));
diff --git a/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java b/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java
index d44e8a9c4..a45edbbb9 100644
--- a/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java
+++ b/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java
@@ -277,6 +277,19 @@ public void testReadWriteDateTimeTypes() throws Exception {
}
}
+ @Test(groups = "integration")
+ public void testDropNonExistDb() throws Exception {
+ ClickHouseNode server = getServer(ClickHouseProtocol.GRPC);
+
+ try {
+ ClickHouseClient.send(server, "drop database non_exist_db").get();
+ Assert.fail("Exception is excepted");
+ } catch (ExecutionException e) {
+ ClickHouseException ce = (ClickHouseException) e.getCause();
+ Assert.assertEquals(ce.getErrorCode(), 81);
+ }
+ }
+
@Test(groups = "integration")
public void testReadWriteDomains() throws Exception {
ClickHouseNode server = getServer(ClickHouseProtocol.GRPC);
diff --git a/clickhouse-http-client/pom.xml b/clickhouse-http-client/pom.xml
index 368647e93..36e0dbea7 100644
--- a/clickhouse-http-client/pom.xml
+++ b/clickhouse-http-client/pom.xml
@@ -116,10 +116,10 @@
**/darwin/**
**/linux/**
**/win32/**
+ **/module-info.class
META-INF/MANIFEST.MF
META-INF/maven/**
META-INF/native-image/**
- META-INF/versions/**
diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java
index 7e166b451..f362066e1 100644
--- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java
+++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java
@@ -20,6 +20,7 @@
import com.clickhouse.client.ClickHouseCompression;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseInputStream;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseUtils;
@@ -231,28 +232,25 @@ protected OutputStream getRequestOutputStream(OutputStream out) throws IOExcepti
return out;
}
- protected InputStream getResponseInputStream(InputStream in) throws IOException {
- if (!config.isCompressServerResponse()) {
- return in;
+ protected ClickHouseInputStream getResponseInputStream(InputStream in) throws IOException {
+ if (config.isCompressServerResponse()) {
+ // TODO support more algorithms
+ ClickHouseCompression algorithm = config.getCompressAlgorithmForServerResponse();
+ switch (algorithm) {
+ case GZIP:
+ in = ClickHouseInputStream.of(new GZIPInputStream(in));
+ break;
+ case LZ4:
+ in = new ClickHouseLZ4InputStream(in);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported compression algorithm: " + algorithm);
+ }
}
- // TODO support more algorithms
- ClickHouseCompression algorithm = config.getCompressAlgorithmForServerResponse();
- switch (algorithm) {
- case GZIP:
- in = new GZIPInputStream(in);
- break;
- case LZ4:
- in = new ClickHouseLZ4InputStream(in);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported compression algorithm: " + algorithm);
- }
- return in;
+ return in instanceof ClickHouseInputStream ? (ClickHouseInputStream) in : ClickHouseInputStream.of(in);
}
- protected abstract String getResponseHeader(String header, String defaultValue);
-
/**
* Creates a merged map.
*
diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java
index 19d5a03db..5abbc1259 100644
--- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java
+++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java
@@ -1,9 +1,8 @@
package com.clickhouse.client.http;
-import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.Serializable;
+import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
@@ -32,7 +31,7 @@ private static long getLongValue(Map map, String key) {
}
private final ClickHouseHttpConnection connection;
- private final InputStream input;
+ private final ClickHouseInputStream input;
protected final String serverDisplayName;
protected final String queryId;
@@ -55,7 +54,8 @@ protected ClickHouseConfig getConfig(ClickHouseRequest> request) {
return config;
}
- public ClickHouseHttpResponse(ClickHouseHttpConnection connection, InputStream input) {
+ public ClickHouseHttpResponse(ClickHouseHttpConnection connection, ClickHouseInputStream input,
+ String serverDisplayName, String queryId, String summary, ClickHouseFormat format, TimeZone timeZone) {
if (connection == null || input == null) {
throw new IllegalArgumentException("Non-null connection and input stream are required");
}
@@ -63,13 +63,12 @@ public ClickHouseHttpResponse(ClickHouseHttpConnection connection, InputStream i
this.connection = connection;
this.input = input;
- this.serverDisplayName = connection.getResponseHeader("X-ClickHouse-Server-Display-Name",
- connection.server.getHost());
- // queryId, format and timeZone are only available for queries
- this.queryId = connection.getResponseHeader("X-ClickHouse-Query-Id", "");
+ this.serverDisplayName = !ClickHouseChecker.isNullOrEmpty(serverDisplayName) ? serverDisplayName
+ : connection.server.getHost();
+ this.queryId = !ClickHouseChecker.isNullOrEmpty(queryId) ? queryId : "";
// {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
Map map = (Map) ClickHouseUtils
- .parseJson(connection.getResponseHeader("X-ClickHouse-Summary", "{}"));
+ .parseJson(!ClickHouseChecker.isNullOrEmpty(summary) ? summary : "{}");
// discard those X-ClickHouse-Progress headers
this.summary = new ClickHouseResponseSummary(
new ClickHouseResponseSummary.Progress(getLongValue(map, "read_rows"), getLongValue(map, "read_bytes"),
@@ -77,32 +76,15 @@ public ClickHouseHttpResponse(ClickHouseHttpConnection connection, InputStream i
getLongValue(map, "written_bytes")),
null);
- if (ClickHouseChecker.isNullOrEmpty(this.queryId)) {
- this.format = connection.config.getFormat();
- this.timeZone = connection.config.getServerTimeZone();
- // better to close input stream since there's no response to read?
- // input.close();
- } else {
- String value = connection.getResponseHeader("X-ClickHouse-Format", "");
- this.format = !ClickHouseChecker.isNullOrEmpty(value) ? ClickHouseFormat.valueOf(value)
- : connection.config.getFormat();
- value = connection.getResponseHeader("X-ClickHouse-Timezone", "");
- this.timeZone = !ClickHouseChecker.isNullOrEmpty(value) ? TimeZone.getTimeZone(value)
- : connection.config.getServerTimeZone();
- }
+ this.format = format != null ? format : connection.config.getFormat();
+ this.timeZone = timeZone != null ? timeZone : connection.config.getServerTimeZone();
closed = false;
}
@Override
public byte readByte() throws IOException {
- int v = input.read();
- if (v == -1) {
- close();
- throw new EOFException();
- }
-
- return (byte) v;
+ return input.readByte();
}
@Override
@@ -158,4 +140,14 @@ public int read(byte[] b, int off, int len) throws IOException {
public long skip(long n) throws IOException {
return input.skip(n);
}
+
+ @Override
+ public byte[] readBytes(int length) throws IOException {
+ return input.readBytes(length);
+ }
+
+ @Override
+ public String readString(int byteLength, Charset charset) throws IOException {
+ return input.readString(byteLength, charset);
+ }
}
diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/DefaultHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/DefaultHttpConnection.java
index 905d0611b..afa4369d4 100644
--- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/DefaultHttpConnection.java
+++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/DefaultHttpConnection.java
@@ -1,5 +1,7 @@
package com.clickhouse.client.http;
+import com.clickhouse.client.ClickHouseChecker;
+import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseSslContextProvider;
@@ -23,6 +25,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.Map.Entry;
@@ -35,6 +38,33 @@ public class DefaultHttpConnection extends ClickHouseHttpConnection {
private final HttpURLConnection conn;
+ private ClickHouseHttpResponse buildResponse() throws IOException {
+ // X-ClickHouse-Server-Display-Name: xxx
+ // X-ClickHouse-Query-Id: xxx
+ // X-ClickHouse-Format: RowBinaryWithNamesAndTypes
+ // X-ClickHouse-Timezone: UTC
+ // X-ClickHouse-Summary:
+ // {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
+ String displayName = getResponseHeader("X-ClickHouse-Server-Display-Name", server.getHost());
+ String queryId = getResponseHeader("X-ClickHouse-Query-Id", "");
+ String summary = getResponseHeader("X-ClickHouse-Summary", "{}");
+
+ ClickHouseFormat format = config.getFormat();
+ TimeZone timeZone = config.getServerTimeZone();
+ // queryId, format and timeZone are only available for queries
+ if (!ClickHouseChecker.isNullOrEmpty(queryId)) {
+ String value = getResponseHeader("X-ClickHouse-Format", "");
+ format = !ClickHouseChecker.isNullOrEmpty(value) ? ClickHouseFormat.valueOf(value)
+ : format;
+ value = getResponseHeader("X-ClickHouse-Timezone", "");
+ timeZone = !ClickHouseChecker.isNullOrEmpty(value) ? TimeZone.getTimeZone(value)
+ : timeZone;
+ }
+
+ return new ClickHouseHttpResponse(this, getResponseInputStream(conn.getInputStream()),
+ displayName, queryId, summary, format, timeZone);
+ }
+
private HttpURLConnection newConnection(String url, boolean post) throws IOException {
HttpURLConnection newConn = (HttpURLConnection) new URL(url).openConnection();
@@ -64,6 +94,11 @@ private HttpURLConnection newConnection(String url, boolean post) throws IOExcep
return newConn;
}
+ private String getResponseHeader(String header, String defaultValue) {
+ String value = conn.getHeaderField(header);
+ return value != null ? value : defaultValue;
+ }
+
private void setHeaders(HttpURLConnection conn, Map headers) {
headers = mergeHeaders(headers);
@@ -106,12 +141,6 @@ protected boolean isReusable() {
return false;
}
- @Override
- protected String getResponseHeader(String header, String defaultValue) {
- String value = conn.getHeaderField(header);
- return value != null ? value : defaultValue;
- }
-
@Override
protected ClickHouseHttpResponse post(String sql, InputStream data, List tables,
Map headers) throws IOException {
@@ -168,14 +197,7 @@ protected ClickHouseHttpResponse post(String sql, InputStream data, List {
+ // An immutable ByteBuffer sentinel to mark that the last byte was received.
+ private static final List LAST_LIST = List.of(ClickHouseInputStream.EMPTY);
+
+ private final BlockingQueue buffers;
+ private final ClickHouseInputStream in;
+ private final AtomicBoolean subscribed;
+
+ ClickHouseResponseHandler(int bufferSize, int timeout) {
+ buffers = new LinkedBlockingDeque<>();
+ in = ClickHouseInputStream.of(buffers, timeout);
+ subscribed = new AtomicBoolean();
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ try {
+ if (!subscribed.compareAndSet(false, true)) {
+ s.cancel();
+ } else {
+ if (in.isClosed()) {
+ s.cancel();
+ return;
+ }
+ s.request(Long.MAX_VALUE);
+ }
+ } catch (Throwable t) {
+ try {
+ in.close();
+ } catch (IOException x) {
+ // ignore
+ } finally {
+ onError(t);
+ }
+ }
+ }
+
+ @Override
+ public void onNext(List item) {
+ try {
+ if (!buffers.addAll(item)) {
+ // should never happen
+ throw new IllegalStateException("Queue is full");
+ }
+ } catch (Throwable t) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ // ignore
+ } finally {
+ onError(t);
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ buffers.offer(ClickHouseInputStream.EMPTY);
+ }
+
+ @Override
+ public void onComplete() {
+ onNext(LAST_LIST);
+ }
+
+ @Override
+ public CompletionStage getBody() {
+ return CompletableFuture.completedStage(in);
+ }
+}
diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/DefaultHttpConnection.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/DefaultHttpConnection.java
new file mode 100644
index 000000000..099593c5f
--- /dev/null
+++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/DefaultHttpConnection.java
@@ -0,0 +1,247 @@
+package com.clickhouse.client.http;
+
+import com.clickhouse.client.ClickHouseChecker;
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseSslContextProvider;
+import com.clickhouse.client.data.ClickHouseExternalTable;
+import com.clickhouse.client.data.ClickHousePipedStream;
+import com.clickhouse.client.http.config.ClickHouseHttpOption;
+import com.clickhouse.client.logging.Logger;
+import com.clickhouse.client.logging.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpClient.Redirect;
+import java.net.http.HttpClient.Version;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import javax.net.ssl.SSLContext;
+
+public class DefaultHttpConnection extends ClickHouseHttpConnection {
+ private static final Logger log = LoggerFactory.getLogger(DefaultHttpConnection.class);
+
+ private final HttpClient httpClient;
+
+ private ClickHouseHttpResponse buildResponse(HttpResponse r) throws IOException {
+ HttpHeaders headers = r.headers();
+ String displayName = headers.firstValue("X-ClickHouse-Server-Display-Name").orElse(server.getHost());
+ String queryId = headers.firstValue("X-ClickHouse-Query-Id").orElse("");
+ String summary = headers.firstValue("X-ClickHouse-Summary").orElse("{}");
+
+ ClickHouseFormat format = config.getFormat();
+ TimeZone timeZone = config.getServerTimeZone();
+ // queryId, format and timeZone are only available for queries
+ if (!ClickHouseChecker.isNullOrEmpty(queryId)) {
+ String value = headers.firstValue("X-ClickHouse-Format").orElse("");
+ format = !ClickHouseChecker.isNullOrEmpty(value) ? ClickHouseFormat.valueOf(value)
+ : format;
+ value = headers.firstValue("X-ClickHouse-Timezone").orElse("");
+ timeZone = !ClickHouseChecker.isNullOrEmpty(value) ? TimeZone.getTimeZone(value)
+ : timeZone;
+ }
+
+ return new ClickHouseHttpResponse(this, getResponseInputStream(checkResponse(r).body()),
+ displayName, queryId, summary, format, timeZone);
+ }
+
+ private HttpResponse checkResponse(HttpResponse r) throws IOException {
+ if (r.statusCode() != HttpURLConnection.HTTP_OK) {
+ // TODO get exception from response header, for example:
+ // X-ClickHouse-Exception-Code: 47
+ StringBuilder builder = new StringBuilder();
+ try (Reader reader = new BufferedReader(
+ new InputStreamReader(getResponseInputStream(r.body()), StandardCharsets.UTF_8))) {
+ int c = 0;
+ while ((c = reader.read()) != -1) {
+ builder.append((char) c);
+ }
+ } catch (IOException e) {
+ log.warn("Error while reading error message", e);
+ }
+
+ throw new IOException(builder.toString());
+ }
+
+ return r;
+ }
+
+ private HttpRequest newRequest(String url) {
+ return HttpRequest.newBuilder()
+ .uri(URI.create(url))
+ .timeout(Duration.ofMillis(config.getSocketTimeout())).build();
+ }
+
+ protected DefaultHttpConnection(ClickHouseNode server, ClickHouseRequest> request) throws IOException {
+ super(server, request);
+
+ HttpClient.Builder builder = HttpClient.newBuilder()
+ .connectTimeout(Duration.ofMillis(config.getConnectionTimeout()))
+ .followRedirects(Redirect.ALWAYS)
+ .version(Version.HTTP_1_1);
+ if (config.isAsync()) {
+ builder.executor(ClickHouseClient.getExecutorService());
+ }
+ if (config.isSsl()) {
+ builder.sslContext(ClickHouseSslContextProvider.getProvider().getSslContext(SSLContext.class, config)
+ .orElse(null));
+ }
+
+ httpClient = builder.build();
+ }
+
+ @Override
+ protected boolean isReusable() {
+ return true;
+ }
+
+ private ClickHouseHttpResponse postStream(HttpRequest.Builder reqBuilder, String boundary, String sql,
+ InputStream data, List tables) throws IOException {
+ ClickHousePipedStream stream = new ClickHousePipedStream(config.getMaxBufferSize(),
+ config.getMaxQueuedBuffers(), config.getSocketTimeout());
+ reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInput));
+ // running in async is necessary to avoid deadlock of the piped stream
+ CompletableFuture> f = httpClient.sendAsync(reqBuilder.build(),
+ responseInfo -> new ClickHouseResponseHandler(config.getMaxBufferSize(),
+ config.getSocketTimeout()));
+ try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) {
+ if (boundary != null) {
+ String line = "\r\n--" + boundary + "\r\n";
+ writer.write(line);
+ writer.write("Content-Disposition: form-data; name=\"query\"\r\n\r\n");
+ writer.write(sql);
+
+ for (ClickHouseExternalTable t : tables) {
+ String tableName = t.getName();
+ StringBuilder builder = new StringBuilder();
+ builder.append(line).append("Content-Disposition: form-data; name=\"").append(tableName)
+ .append("_format\"\r\n\r\n").append(t.getFormat().name());
+ builder.append(line).append("Content-Disposition: form-data; name=\"").append(tableName)
+ .append("_structure\"\r\n\r\n").append(t.getStructure());
+ builder.append(line).append("Content-Disposition: form-data; name=\"").append(tableName)
+ .append("\"; filename=\"").append(tableName).append("\"\r\n")
+ .append("Content-Type: application/octet-stream\r\n")
+ .append("Content-Transfer-Encoding: binary\r\n\r\n");
+ writer.write(builder.toString());
+ writer.flush();
+
+ pipe(t.getContent(), stream, DEFAULT_BUFFER_SIZE);
+ }
+
+ writer.write("\r\n--" + boundary + "--\r\n");
+ writer.flush();
+ } else {
+ writer.write(sql);
+ writer.flush();
+
+ if (data.available() > 0) {
+ // append \n
+ if (sql.charAt(sql.length() - 1) != '\n') {
+ stream.write(10);
+ }
+
+ pipe(data, stream, DEFAULT_BUFFER_SIZE);
+ }
+ }
+ }
+
+ HttpResponse r;
+ try {
+ r = f.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Thread was interrupted when posting request or receiving response", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Failed to post request", e);
+ }
+
+ return buildResponse(r);
+ }
+
+ private ClickHouseHttpResponse postString(HttpRequest.Builder reqBuilder, String sql) throws IOException {
+ reqBuilder.POST(HttpRequest.BodyPublishers.ofString(sql));
+ HttpResponse r;
+ try {
+ CompletableFuture> f = httpClient.sendAsync(reqBuilder.build(),
+ responseInfo -> new ClickHouseResponseHandler(config.getMaxBufferSize(),
+ config.getSocketTimeout()));
+ r = f.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Thread was interrupted when posting request or receiving response", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Failed to post query", e);
+ }
+ return buildResponse(r);
+ }
+
+ @Override
+ protected ClickHouseHttpResponse post(String sql, InputStream data, List tables,
+ Map headers) throws IOException {
+ HttpRequest.Builder reqBuilder = HttpRequest.newBuilder()
+ .uri(URI.create(url))
+ .timeout(Duration.ofMillis(config.getSocketTimeout()));
+ String boundary = null;
+ if (tables != null && !tables.isEmpty()) {
+ boundary = UUID.randomUUID().toString();
+ reqBuilder.setHeader("Content-Type", "multipart/form-data; boundary=" + boundary);
+ } else {
+ reqBuilder.setHeader("Content-Type", "text/plain; charset=UTF-8");
+ }
+
+ headers = mergeHeaders(headers);
+ if (headers != null && !headers.isEmpty()) {
+ for (Entry header : headers.entrySet()) {
+ reqBuilder.setHeader(header.getKey(), header.getValue());
+ }
+ }
+
+ return boundary != null || data != null ? postStream(reqBuilder, boundary, sql, data, tables)
+ : postString(reqBuilder, sql);
+ }
+
+ @Override
+ public boolean ping(int timeout) {
+ String response = (String) config.getOption(ClickHouseHttpOption.DEFAULT_RESPONSE);
+ try {
+ HttpResponse r = httpClient.send(newRequest(getBaseUrl() + "ping"),
+ HttpResponse.BodyHandlers.ofString());
+ if (r.statusCode() != HttpURLConnection.HTTP_OK) {
+ throw new IOException(r.body());
+ }
+
+ return response.equals(r.body());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ log.debug("Failed to ping server: ", e.getMessage());
+ }
+
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/clickhouse-http-client/src/main/java11/module-info.java b/clickhouse-http-client/src/main/java11/module-info.java
new file mode 100644
index 000000000..7d1cb83e4
--- /dev/null
+++ b/clickhouse-http-client/src/main/java11/module-info.java
@@ -0,0 +1,12 @@
+module com.clickhouse.client.http {
+ exports com.clickhouse.client.http;
+ exports com.clickhouse.client.http.config;
+
+ provides com.clickhouse.client.ClickHouseClient with com.clickhouse.client.http.ClickHouseHttpClient;
+
+ requires java.net.http;
+
+ requires static com.google.gson;
+
+ requires transitive com.clickhouse.client;
+}
diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java
index 17eb4f37f..73af1b33e 100644
--- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java
+++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java
@@ -198,7 +198,8 @@ public void testPost() throws Exception {
try (ClickHouseClient client = ClickHouseClient.builder()
.defaultCredentials(ClickHouseCredentials.fromUserAndPassword("foo", "bar")).build()) {
// why no detailed error message for this: "select 1,2"
- try (ClickHouseResponse resp = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+ try (ClickHouseResponse resp = client.connect(server).compressServerResponse(false)
+ .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select 1,2").execute().get()) {
int count = 0;
for (ClickHouseRecord r : resp.records()) {
diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java
index e72fb6c2a..6026c29ff 100644
--- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java
+++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java
@@ -35,11 +35,6 @@ public boolean ping(int timeout) {
@Override
public void close() throws Exception {
}
-
- @Override
- protected String getResponseHeader(String header, String defaultValue) {
- return defaultValue;
- }
}
@Test(groups = { "unit" })
diff --git a/clickhouse-jdbc/pom.xml b/clickhouse-jdbc/pom.xml
index 9a2957f9d..43c272497 100644
--- a/clickhouse-jdbc/pom.xml
+++ b/clickhouse-jdbc/pom.xml
@@ -238,6 +238,7 @@
+ com.clickhouse.jdbc
${spec.title}
${spec.version}
@@ -261,11 +262,11 @@
**/darwin/**
**/linux/**
**/win32/**
+ **/module-info.class
META-INF/DEPENDENCIES
META-INF/MANIFEST.MF
META-INF/maven/**
META-INF/native-image/**
- META-INF/versions/**
META-INF/*.xml
@@ -303,6 +304,7 @@
+ com.clickhouse.jdbc
${spec.title}
${spec.version}
@@ -320,11 +322,11 @@
**/darwin/**
**/linux/**
**/win32/**
+ **/module-info.class
META-INF/DEPENDENCIES
META-INF/MANIFEST.MF
META-INF/maven/**
META-INF/native-image/**
- META-INF/versions/**
META-INF/*.xml
@@ -358,6 +360,7 @@
+ com.clickhouse.jdbc
${spec.title}
${spec.version}
@@ -387,11 +390,11 @@
**/darwin/**
**/linux/**
**/win32/**
+ **/module-info.class
META-INF/DEPENDENCIES
META-INF/MANIFEST.MF
META-INF/maven/**
META-INF/native-image/**
- META-INF/versions/**
META-INF/*.xml
@@ -415,6 +418,7 @@
+ com.clickhouse.jdbc
${spec.title}
${spec.version}
@@ -458,11 +462,11 @@
**/darwin/**
**/linux/**
**/win32/**
+ **/module-info.class
META-INF/DEPENDENCIES
META-INF/MANIFEST.MF
META-INF/maven/**
META-INF/native-image/**
- META-INF/versions/**
META-INF/*.xml
diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/AbstractResultSet.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/AbstractResultSet.java
index d4be1244e..b1ca6c1c3 100644
--- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/AbstractResultSet.java
+++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/AbstractResultSet.java
@@ -18,7 +18,7 @@
import java.sql.Time;
import java.sql.Timestamp;
-public abstract class AbstractResultSet extends Wrapper implements ResultSet {
+public abstract class AbstractResultSet extends JdbcWrapper implements ResultSet {
protected void ensureOpen() throws SQLException {
if (isClosed()) {
throw SqlExceptionUtils.clientError("Cannot operate on a closed ResultSet");
diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseConnection.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseConnection.java
index 648054c69..327b80f16 100644
--- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseConnection.java
+++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseConnection.java
@@ -8,12 +8,62 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Calendar;
+import java.util.Collections;
import java.util.Optional;
import java.util.TimeZone;
+import com.clickhouse.client.ClickHouseColumn;
+import com.clickhouse.client.ClickHouseConfig;
+import com.clickhouse.client.ClickHouseValue;
+import com.clickhouse.client.ClickHouseValues;
import com.clickhouse.client.ClickHouseVersion;
+import com.clickhouse.client.data.ClickHouseSimpleResponse;
+import com.clickhouse.jdbc.parser.ClickHouseSqlStatement;
public interface ClickHouseConnection extends Connection {
+ // The name of the application currently utilizing the connection
+ static final String PROP_APPLICATION_NAME = "ApplicationName";
+ static final String PROP_CUSTOM_HTTP_HEADERS = "CustomHttpHeaders";
+ static final String PROP_CUSTOM_HTTP_PARAMS = "CustomHttpParameters";
+ // The name of the user that the application using the connection is performing
+ // work for. This may not be the same as the user name that was used in
+ // establishing the connection.
+ // private static final String PROP_CLIENT_USER = "ClientUser";
+ // The hostname of the computer the application using the connection is running
+ // on.
+ // private static final String PROP_CLIENT_HOST = "ClientHostname";
+
+ @Override
+ default ClickHouseArray createArrayOf(String typeName, Object[] elements) throws SQLException {
+ ClickHouseColumn column = ClickHouseColumn.of("", typeName);
+ ClickHouseValue v = ClickHouseValues.newValue(column).update(elements);
+ ClickHouseResultSet rs = new ClickHouseResultSet("", "", createStatement(),
+ ClickHouseSimpleResponse.of(Collections.singletonList(column),
+ new Object[][] { new Object[] { v.asObject() } }));
+ rs.next();
+ return new ClickHouseArray(rs, 1);
+ }
+
+ @Override
+ default ClickHouseBlob createBlob() throws SQLException {
+ return new ClickHouseBlob();
+ }
+
+ @Override
+ default ClickHouseClob createClob() throws SQLException {
+ return new ClickHouseClob();
+ }
+
+ @Override
+ default ClickHouseStruct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return new ClickHouseStruct(typeName, attributes);
+ }
+
+ @Override
+ default ClickHouseXml createSQLXML() throws SQLException {
+ return new ClickHouseXml();
+ }
+
@Override
default ClickHouseStatement createStatement() throws SQLException {
return createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
@@ -130,11 +180,40 @@ default PreparedStatement prepareStatement(String sql, int resultSetType, int re
*/
TimeZone getServerTimeZone();
+ /**
+ * Gets server version.
+ *
+ * @return non-null server version
+ */
ClickHouseVersion getServerVersion();
+ /**
+ * Gets URI of the connection.
+ *
+ * @return URI of the connection
+ */
URI getUri();
- boolean isJdbcCompliant();
+ /**
+ * Gets JDBC-specific configuration.
+ *
+ * @return non-null JDBC-specific configuration
+ */
+ JdbcConfig getJdbcConfig();
+ /**
+ * Creates a new query ID.
+ *
+ * @return universal unique query ID
+ */
String newQueryId();
+
+ /**
+ * Parses the given sql.
+ *
+ * @param sql sql to parse
+ * @param config configuration which might be used for parsing, could be null
+ * @return non-null parsed sql statements
+ */
+ ClickHouseSqlStatement[] parse(String sql, ClickHouseConfig config);
}
diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDataSource.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDataSource.java
index 2c4344cdd..956311a33 100644
--- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDataSource.java
+++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDataSource.java
@@ -4,6 +4,7 @@
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.config.ClickHouseDefaults;
+import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
import com.clickhouse.jdbc.internal.ClickHouseJdbcUrlParser;
import com.clickhouse.jdbc.internal.ClickHouseJdbcUrlParser.ConnectionInfo;
@@ -15,41 +16,32 @@
import java.util.Properties;
import java.util.logging.Logger;
-public class ClickHouseDataSource extends Wrapper implements DataSource {
+public class ClickHouseDataSource extends JdbcWrapper implements DataSource {
private final String url;
- protected final ClickHouseDriver driver = new ClickHouseDriver();
-
- protected final Properties properties;
- protected final ClickHouseNode server;
- protected final URI uri;
+ protected final ClickHouseDriver driver;
+ protected final ConnectionInfo connInfo;
protected PrintWriter printWriter;
protected int loginTimeoutSeconds = 0;
- public ClickHouseDataSource(String url) {
+ public ClickHouseDataSource(String url) throws SQLException {
this(url, new Properties());
}
- public ClickHouseDataSource(String url, Properties properties) {
+ public ClickHouseDataSource(String url, Properties properties) throws SQLException {
if (url == null) {
throw new IllegalArgumentException("Incorrect ClickHouse jdbc url. It must be not null");
}
this.url = url;
- try {
- ConnectionInfo connInfo = ClickHouseJdbcUrlParser.parse(url, properties);
- this.properties = connInfo.getProperties();
- this.server = connInfo.getServer();
- this.uri = connInfo.getUri();
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
+ this.driver = new ClickHouseDriver();
+ this.connInfo = ClickHouseJdbcUrlParser.parse(url, properties);
}
@Override
public ClickHouseConnection getConnection() throws SQLException {
- return driver.connect(url, properties);
+ return new ClickHouseConnectionImpl(connInfo);
}
@Override
@@ -62,31 +54,32 @@ public ClickHouseConnection getConnection(String username, String password) thro
password = "";
}
- Properties props = new Properties(properties);
+ Properties props = new Properties(connInfo.getProperties());
props.setProperty(ClickHouseDefaults.USER.getKey(), username);
props.setProperty(ClickHouseDefaults.PASSWORD.getKey(), password);
return driver.connect(url, props);
}
public String getHost() {
- return server.getHost();
+ return connInfo.getServer().getHost();
}
public int getPort() {
- return server.getPort();
+ return connInfo.getServer().getPort();
}
public String getDatabase() {
- return server.getDatabase().orElse((String) ClickHouseDefaults.DATABASE.getEffectiveDefaultValue());
+ return connInfo.getServer().getDatabase()
+ .orElse((String) ClickHouseDefaults.DATABASE.getEffectiveDefaultValue());
}
// public String getUrl() {
// return url;
// }
- public Properties getProperties() {
- return properties;
- }
+ // public Properties getProperties() {
+ // return connInfo.getProperties();
+ // }
@Override
public PrintWriter getLogWriter() throws SQLException {
diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java
index 6aee3f297..4d354300f 100644
--- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java
+++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java
@@ -24,7 +24,7 @@
import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;
-public class ClickHouseDatabaseMetaData extends Wrapper implements DatabaseMetaData {
+public class ClickHouseDatabaseMetaData extends JdbcWrapper implements DatabaseMetaData {
private static final Logger log = LoggerFactory.getLogger(ClickHouseDatabaseMetaData.class);
private static final String DATABASE_NAME = "ClickHouse";
@@ -657,12 +657,13 @@ public int getMaxUserNameLength() throws SQLException {
@Override
public int getDefaultTransactionIsolation() throws SQLException {
- return connection.isJdbcCompliant() ? Connection.TRANSACTION_READ_COMMITTED : Connection.TRANSACTION_NONE;
+ return connection.getJdbcConfig().isJdbcCompliant() ? Connection.TRANSACTION_READ_COMMITTED
+ : Connection.TRANSACTION_NONE;
}
@Override
public boolean supportsTransactions() throws SQLException {
- return connection.isJdbcCompliant();
+ return connection.getJdbcConfig().isJdbcCompliant();
}
@Override
@@ -674,7 +675,7 @@ public boolean supportsTransactionIsolationLevel(int level) throws SQLException
throw SqlExceptionUtils.clientError("Unknown isolation level: " + level);
}
- return connection.isJdbcCompliant();
+ return connection.getJdbcConfig().isJdbcCompliant();
}
@Override
@@ -684,7 +685,7 @@ public boolean supportsDataDefinitionAndDataManipulationTransactions() throws SQ
@Override
public boolean supportsDataManipulationTransactionsOnly() throws SQLException {
- return connection.isJdbcCompliant();
+ return connection.getJdbcConfig().isJdbcCompliant();
}
@Override
@@ -753,7 +754,7 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam
params.put("table", ClickHouseChecker.isNullOrEmpty(tableNamePattern) ? "'%'"
: ClickHouseValues.convertToQuotedString(tableNamePattern));
params.put("types", builder.toString());
- String sql = JdbcParameterizedQuery
+ String sql = ClickHouseParameterizedQuery
.apply("select null as TABLE_CAT, t.database as TABLE_SCHEM, t.name as TABLE_NAME, "
+ "case when t.engine like '%Log' then 'LOG TABLE' "
+ "when t.engine in ('Buffer', 'Memory', 'Set') then 'MEMORY TABLE' "
@@ -807,7 +808,7 @@ public ResultSet getColumns(String catalog, String schemaPattern, String tableNa
params.put("defaultNullable", String.valueOf(DatabaseMetaData.typeNullable));
params.put("defaultNonNull", String.valueOf(DatabaseMetaData.typeNoNulls));
params.put("defaultType", String.valueOf(Types.OTHER));
- String sql = JdbcParameterizedQuery
+ String sql = ClickHouseParameterizedQuery
.apply("select null as TABLE_CAT, database as TABLE_SCHEM, table as TABLE_NAME, "
+ "name as COLUMN_NAME, toInt32(:defaultType) as DATA_TYPE, type as TYPE_NAME, toInt32(0) as COLUMN_SIZE, "
+ "0 as BUFFER_LENGTH, toInt32(null) as DECIMAL_DIGITS, 10 as NUM_PREC_RADIX, "
@@ -924,43 +925,43 @@ private Object[] toTypeRow(String typeName, String aliasTo) {
: DatabaseMetaData.typePredBasic;
int money = 0;
switch (type) {
- case Date:
- case Date32:
- case DateTime:
- case DateTime32:
- case DateTime64:
- case Enum:
- case Enum8:
- case Enum16:
- case String:
- case FixedString:
- case UUID:
- prefix = "'";
- suffix = "'";
- break;
- case Array:
- case Nested:
- case Ring:
- case Polygon:
- case MultiPolygon:
- prefix = "[";
- suffix = "]";
- nullable = DatabaseMetaData.typeNoNulls;
- break;
- case AggregateFunction:
- case Tuple:
- case Point:
- prefix = "(";
- suffix = ")";
- nullable = DatabaseMetaData.typeNoNulls;
- break;
- case Map:
- prefix = "{";
- suffix = "}";
- nullable = DatabaseMetaData.typeNoNulls;
- break;
- default:
- break;
+ case Date:
+ case Date32:
+ case DateTime:
+ case DateTime32:
+ case DateTime64:
+ case Enum:
+ case Enum8:
+ case Enum16:
+ case String:
+ case FixedString:
+ case UUID:
+ prefix = "'";
+ suffix = "'";
+ break;
+ case Array:
+ case Nested:
+ case Ring:
+ case Polygon:
+ case MultiPolygon:
+ prefix = "[";
+ suffix = "]";
+ nullable = DatabaseMetaData.typeNoNulls;
+ break;
+ case AggregateFunction:
+ case Tuple:
+ case Point:
+ prefix = "(";
+ suffix = ")";
+ nullable = DatabaseMetaData.typeNoNulls;
+ break;
+ case Map:
+ prefix = "{";
+ suffix = "}";
+ nullable = DatabaseMetaData.typeNoNulls;
+ break;
+ default:
+ break;
}
return new Object[] { typeName,
JdbcTypeMapping.toJdbcType(ClickHouseColumn.of("", type, false, false, new String[0])),
@@ -1005,21 +1006,21 @@ public ResultSet getIndexInfo(String catalog, String schema, String table, boole
+ "NON_UNIQUE UInt8, INDEX_QUALIFIER Nullable(String), INDEX_NAME Nullable(String), "
+ "TYPE Int16, ORDINAL_POSITION Int16, COLUMN_NAME Nullable(String), ASC_OR_DESC Nullable(String), "
+ "CARDINALITY Int64, PAGES Int64, FILTER_CONDITION Nullable(String)"),
- query(JdbcParameterizedQuery.apply(
+ query(ClickHouseParameterizedQuery.apply(
"select null as TABLE_CAT, database as TABLE_SCHEM, table as TABLE_NAME, toUInt8(0) as NON_UNIQUE, "
+ "null as INDEX_QUALIFIER, null as INDEX_NAME, toInt16(:statIndex) as TYPE, "
+ "toInt16(0) as ORDINAL_POSITION, null as COLUMN_NAME, null as ASC_OR_DESC, "
+ "sum(rows) as CARDINALITY, uniqExact(name) as PAGES, null as FILTER_CONDITION from system.parts "
+ "where active = 1 and database like :database and table like :table group by database, table",
params), true),
- query(JdbcParameterizedQuery.apply(
+ query(ClickHouseParameterizedQuery.apply(
"select null as TABLE_CAT, database as TABLE_SCHEM, table as TABLE_NAME, toUInt8(1) as NON_UNIQUE, "
+ "type as INDEX_QUALIFIER, name as INDEX_NAME, toInt16(:otherIndex) as TYPE, "
+ "toInt16(1) as ORDINAL_POSITION, expr as COLUMN_NAME, null as ASC_OR_DESC, "
+ "0 as CARDINALITY, 0 as PAGES, null as FILTER_CONDITION "
+ "from system.data_skipping_indices where database like :database and table like :table",
params), true),
- query(JdbcParameterizedQuery.apply(
+ query(ClickHouseParameterizedQuery.apply(
"select null as TABLE_CAT, database as TABLE_SCHEM, table as TABLE_NAME, toUInt8(1) as NON_UNIQUE, "
+ "null as INDEX_QUALIFIER, name as INDEX_NAME, toInt16(:otherIndex) as TYPE, "
+ "column_position as ORDINAL_POSITION, column as COLUMN_NAME, null as ASC_OR_DESC, "
@@ -1201,9 +1202,9 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce
ClickHouseChecker.isNullOrEmpty(schemaPattern) ? "'%'"
: ClickHouseValues.convertToQuotedString(schemaPattern));
return new CombinedResultSet(
- query(JdbcParameterizedQuery.apply("select name as TABLE_SCHEM, null as TABLE_CATALOG "
+ query(ClickHouseParameterizedQuery.apply("select name as TABLE_SCHEM, null as TABLE_CATALOG "
+ "from system.databases where name like :pattern order by name", params)),
- query(JdbcParameterizedQuery.apply(
+ query(ClickHouseParameterizedQuery.apply(
"select concat('jdbc(''', name, ''')') as TABLE_SCHEM, null as TABLE_CATALOG "
+ "from jdbc('', 'SHOW DATASOURCES') where TABLE_SCHEM like :pattern order by name",
params), true));
@@ -1221,14 +1222,25 @@ public boolean autoCommitFailureClosesAllResultSets() throws SQLException {
@Override
public ResultSet getClientInfoProperties() throws SQLException {
- // TODO Auto-generated method stub
- return null;
+ ClickHouseParameterizedQuery q = ClickHouseParameterizedQuery
+ .of("select :name as NAME, toInt32(0) as MAX_LEN, :default as DEFAULT_VALUE, :desc as DESCRIPTION");
+ StringBuilder builder = new StringBuilder();
+ builder.append(q.apply(ClickHouseConnection.PROP_APPLICATION_NAME,
+ connection.getClientInfo(ClickHouseConnection.PROP_APPLICATION_NAME), "Application name"))
+ .append(" union all ");
+ builder.append(q.apply(ClickHouseConnection.PROP_CUSTOM_HTTP_HEADERS,
+ connection.getClientInfo(ClickHouseConnection.PROP_CUSTOM_HTTP_HEADERS), "Custom HTTP headers"))
+ .append(" union all ");
+ builder.append(q.apply(ClickHouseConnection.PROP_CUSTOM_HTTP_PARAMS,
+ connection.getClientInfo(ClickHouseConnection.PROP_CUSTOM_HTTP_PARAMS),
+ "Customer HTTP query parameters"));
+ return query(builder.toString());
}
@Override
public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern)
throws SQLException {
- String sql = JdbcParameterizedQuery.apply(
+ String sql = ClickHouseParameterizedQuery.apply(
"select null as FUNCTION_CAT, null as FUNCTION_SCHEM, name as FUNCTION_NAME,\n"
+ "concat('case-', case_insensitive ? 'in' : '', 'sensitive function', is_aggregate ? ' for aggregation' : '') as REMARKS,"
+ "1 as FUNCTION_TYPE, name as SPECIFIC_NAME from system.functions\n"
diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDriver.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDriver.java
index 4d4fef910..d58440b09 100644
--- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDriver.java
+++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDriver.java
@@ -66,7 +66,7 @@ public class ClickHouseDriver implements Driver {
Map