Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<jackson-databind.version>2.9.10.8</jackson-databind.version>
<guava.version>29.0-jre</guava.version>
<jaxb.version>2.3.1</jaxb.version>
<jdk.version>1.7</jdk.version>
<jdk.version>1.8</jdk.version>
<testcontainers.version>1.15.1</testcontainers.version>
<testng.version>6.14.3</testng.version>
<mockito.version>1.10.19</mockito.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public enum ClickHouseFormat {
Native,
Null,
XML,
CapnProto;
CapnProto,
Parquet,
ORC
;

public static boolean containsFormat(String statement) {
if (statement == null || statement.isEmpty()) {
Expand Down
13 changes: 11 additions & 2 deletions src/test/java/ru/yandex/clickhouse/ClickHouseContainerForTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package ru.yandex.clickhouse;

import java.time.Duration;

import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;

Expand All @@ -23,6 +23,7 @@ public class ClickHouseContainerForTest {

static {
String imageTag = System.getProperty("clickhouseVersion");

if (imageTag == null || (imageTag = imageTag.trim()).isEmpty()) {
clickhouseVersion = imageTag = "";
} else {
Expand All @@ -33,7 +34,15 @@ public class ClickHouseContainerForTest {
}
imageTag = ":" + imageTag;
}
clickhouseContainer = new GenericContainer<>("yandex/clickhouse-server" + imageTag)

final String imageNameWithTag = "yandex/clickhouse-server" + imageTag;

clickhouseContainer = new GenericContainer<>( new ImageFromDockerfile()
.withDockerfileFromBuilder(builder ->
builder
.from( imageNameWithTag )
.run("apt-get update && apt-get install tzdata")
))
.withExposedPorts(HTTP_PORT, NATIVE_PORT, MYSQL_PORT)
.withClasspathResourceMapping(
"ru/yandex/clickhouse/users.d",
Expand Down
164 changes: 163 additions & 1 deletion src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.util.ClickHouseVersionNumberUtil;

import java.io.*;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -216,4 +218,164 @@ public void CSVInsertCompressedIntoTable() throws SQLException, IOException {
Assert.assertEquals(rs.getLong("uniq"), 1);
}

@Test
public void ORCInsertCompressedIntoTable() throws SQLException {
// clickhouse-client -q "select number int, toString(number) str, 1/number flt, toDecimal64( 1/(number+1) , 9) dcml,
// toDateTime('2020-01-01 00:00:00') + number time from numbers(100) format ORC"|gzip > test_sample.orc.gz

String version = connection.getServerVersion();
if (version.compareTo("20.8") < 0) {
return;
}

connection.createStatement().execute("DROP TABLE IF EXISTS test.orc_stream_compressed");
connection.createStatement().execute(
"CREATE TABLE test.orc_stream_compressed (int Int64, str String, flt Float64, " +
"dcml Decimal64(9), time DateTime) ENGINE = Log();"
);

InputStream inputStream = StreamSQLTest.class.getResourceAsStream("/data_samples/test_sample.orc.gz");

connection.createStatement().
write()
.table("test.orc_stream_compressed")
.format(ClickHouseFormat.ORC)
.dataCompression(ClickHouseCompression.gzip)
.data(inputStream)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, " +
"sum(int) sum_int, " +
"round(sum(flt),2) AS sum_flt, " +
"uniqExact(str) uniq_str, " +
"max(dcml) max_dcml, " +
"min(time) min_time, " +
"max(time) max_time " +
"FROM test.orc_stream_compressed");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("cnt"), 100);
Assert.assertEquals(rs.getLong("sum_int"), 4950);
Assert.assertEquals(rs.getFloat("sum_flt"), Float.POSITIVE_INFINITY);
Assert.assertEquals(rs.getLong("uniq_str"), 100);
Assert.assertEquals(rs.getBigDecimal("max_dcml"), new BigDecimal("1.000000000"));
Assert.assertEquals(rs.getString("min_time"), "2020-01-01 00:00:00");
Assert.assertEquals(rs.getString("max_time"), "2020-01-01 00:01:39");
}

@Test
public void ORCInsertCompressedIntoTable1() throws SQLException {
// clickhouse-client -q "select number int, toString(number) str, 1/number flt, toDecimal64( 1/(number+1) , 9) dcml,
// toDateTime('2020-01-01 00:00:00') + number time from numbers(100) format ORC"|gzip > test_sample.orc.gz

String version = connection.getServerVersion();
if (version.compareTo("20.8") < 0) {
return;
}

connection.createStatement().execute("DROP TABLE IF EXISTS test.orc1_stream_compressed");
connection.createStatement().execute(
"CREATE TABLE test.orc1_stream_compressed (int Int64, str String, flt Float64, " +
"dcml Decimal64(9), time DateTime) ENGINE = Log();"
);

InputStream inputStream = StreamSQLTest.class.getResourceAsStream("/data_samples/test_sample.orc.gz");

connection.createStatement().
write()
.sql("insert into test.orc1_stream_compressed format ORC")
.dataCompression(ClickHouseCompression.gzip)
.data(inputStream)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"select * from test.orc1_stream_compressed where int=42");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("int"), 42);
Assert.assertEquals(rs.getString("str"), "42");
Assert.assertTrue( Math.abs(rs.getFloat("flt") - 0.023809524) < 0.0001);
Assert.assertTrue( Math.abs(rs.getFloat("dcml") - 0.023255813) < 0.0001);
Assert.assertEquals(rs.getString("time"), "2020-01-01 00:00:42");
}

@Test
public void ParquetInsertCompressedIntoTable() throws SQLException {
// clickhouse-client -q "select number int, toString(number) str, 1/number flt, toDecimal64( 1/(number+1) , 9) dcml,
// toDateTime('2020-01-01 00:00:00') + number time from numbers(100) format Parquet"|gzip > test_sample.parquet.gz

String version = connection.getServerVersion();
if (version.compareTo("20.8") < 0) {
return;
}

connection.createStatement().execute("DROP TABLE IF EXISTS test.parquet_stream_compressed");
connection.createStatement().execute(
"CREATE TABLE test.parquet_stream_compressed (int Int64, str String, flt Float64, " +
"dcml Decimal64(9), time DateTime) ENGINE = Log();"
);

InputStream inputStream = StreamSQLTest.class.getResourceAsStream("/data_samples/test_sample.parquet.gz");

connection.createStatement().
write()
.table("test.parquet_stream_compressed")
.format(ClickHouseFormat.Parquet)
.dataCompression(ClickHouseCompression.gzip)
.data(inputStream)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, " +
"sum(int) sum_int, " +
"round(sum(flt),2) AS sum_flt, " +
"uniqExact(str) uniq_str, " +
"max(dcml) max_dcml, " +
"min(time) min_time, " +
"max(time) max_time " +
"FROM test.parquet_stream_compressed");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("cnt"), 100);
Assert.assertEquals(rs.getLong("sum_int"), 4950);
Assert.assertEquals(rs.getFloat("sum_flt"), Float.POSITIVE_INFINITY);
Assert.assertEquals(rs.getLong("uniq_str"), 100);
Assert.assertEquals(rs.getBigDecimal("max_dcml"), new BigDecimal("1.000000000"));
Assert.assertEquals(rs.getString("min_time"), "2020-01-01 00:00:00");
Assert.assertEquals(rs.getString("max_time"), "2020-01-01 00:01:39");
}

@Test
public void ParquetInsertCompressedIntoTable1() throws SQLException {
// clickhouse-client -q "select number int, toString(number) str, 1/number flt, toDecimal64( 1/(number+1) , 9) dcml,
// toDateTime('2020-01-01 00:00:00') + number time from numbers(100) format Parquet"|gzip > test_sample.parquet.gz

String version = connection.getServerVersion();
if (version.compareTo("20.8") < 0) {
return;
}

connection.createStatement().execute("DROP TABLE IF EXISTS test.parquet1_stream_compressed");
connection.createStatement().execute(
"CREATE TABLE test.parquet1_stream_compressed (int Int64, str String, flt Float64, " +
"dcml Decimal64(9), time DateTime) ENGINE = Log();"
);

InputStream inputStream = StreamSQLTest.class.getResourceAsStream("/data_samples/test_sample.parquet.gz");

connection.createStatement().
write()
.sql("insert into test.parquet1_stream_compressed format Parquet")
.dataCompression(ClickHouseCompression.gzip)
.data(inputStream)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"select * from test.parquet1_stream_compressed where int=42");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("int"), 42);
Assert.assertEquals(rs.getString("str"), "42");
Assert.assertTrue( Math.abs(rs.getFloat("flt") - 0.023809524) < 0.0001);
Assert.assertTrue( Math.abs(rs.getFloat("dcml") - 0.023255813) < 0.0001);
Assert.assertEquals(rs.getString("time"), "2020-01-01 00:00:42");
}

}
Binary file not shown.
Binary file not shown.