diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/StandardColumnMappings.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/StandardColumnMappings.java
index 9b47d7f37480..e7dabe2d4d1c 100644
--- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/StandardColumnMappings.java
+++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/StandardColumnMappings.java
@@ -509,14 +509,7 @@ public static ColumnMapping timestampColumnMappingUsingSqlTimestampWithRounding(
checkArgument(timestampType.getPrecision() <= TimestampType.MAX_SHORT_PRECISION, "Precision is out of range: %s", timestampType.getPrecision());
return ColumnMapping.longMapping(
timestampType,
- (resultSet, columnIndex) -> {
- LocalDateTime localDateTime = resultSet.getTimestamp(columnIndex).toLocalDateTime();
- int roundedNanos = toIntExact(round(localDateTime.getNano(), 9 - timestampType.getPrecision()));
- LocalDateTime rounded = localDateTime
- .withNano(0)
- .plusNanos(roundedNanos);
- return toTrinoTimestamp(timestampType, rounded);
- },
+ timestampWithRoundingReadFunction(timestampType),
timestampWriteFunctionUsingSqlTimestamp(timestampType),
// NOTE: pushdown is disabled because the values stored in remote system might not match the values as
// read by Trino due to rounding. This can lead to incorrect results if operations are pushed down to
@@ -524,6 +517,32 @@ public static ColumnMapping timestampColumnMappingUsingSqlTimestampWithRounding(
DISABLE_PUSHDOWN);
}
+ @Deprecated
+ public static ColumnMapping timestampColumnMappingUsingSqlTimestampWithRoundingFullPushdown(TimestampType timestampType)
+ {
+ // TODO: https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#timestampspec imply Druid support nanosecond precision
+ checkArgument(timestampType.getPrecision() <= TimestampType.MAX_SHORT_PRECISION, "Precision is out of range: %s", timestampType.getPrecision());
+ return ColumnMapping.longMapping(
+ timestampType,
+ timestampWithRoundingReadFunction(timestampType),
+ timestampWriteFunctionUsingSqlTimestamp(timestampType),
+ // NOTE: as noted in the "DISABLE_PUSHDOWN" version, the values stored in remote system might not match
+ // the values as read by Trino due to rounding.
+ FULL_PUSHDOWN);
+ }
+
+ private static LongReadFunction timestampWithRoundingReadFunction(TimestampType timestampType)
+ {
+ return (resultSet, columnIndex) -> {
+ LocalDateTime localDateTime = resultSet.getTimestamp(columnIndex).toLocalDateTime();
+ int roundedNanos = toIntExact(round(localDateTime.getNano(), 9 - timestampType.getPrecision()));
+ LocalDateTime rounded = localDateTime
+ .withNano(0)
+ .plusNanos(roundedNanos);
+ return toTrinoTimestamp(timestampType, rounded);
+ };
+ }
+
public static ColumnMapping timestampColumnMapping(TimestampType timestampType)
{
if (timestampType.getPrecision() <= TimestampType.MAX_SHORT_PRECISION) {
diff --git a/plugin/trino-druid/pom.xml b/plugin/trino-druid/pom.xml
index 326e14c42b99..e635a6c9a4da 100644
--- a/plugin/trino-druid/pom.xml
+++ b/plugin/trino-druid/pom.xml
@@ -38,6 +38,11 @@
units
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
com.fasterxml.jackson.core
jackson-databind
@@ -159,6 +164,13 @@
test
+
+ org.freemarker
+ freemarker
+ 2.3.31
+ test
+
+
org.jetbrains
annotations
@@ -182,5 +194,6 @@
testng
test
+
diff --git a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java
index fc3eb7c059ee..e2de8a2700ef 100644
--- a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java
+++ b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java
@@ -79,7 +79,7 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timeColumnMappingUsingSqlTime;
-import static io.trino.plugin.jdbc.StandardColumnMappings.timestampColumnMappingUsingSqlTimestampWithRounding;
+import static io.trino.plugin.jdbc.StandardColumnMappings.timestampColumnMappingUsingSqlTimestampWithRoundingFullPushdown;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryColumnMapping;
@@ -107,12 +107,12 @@
public class DruidJdbcClient
extends BaseJdbcClient
{
+ // All the datasources in Druid are created under schema "druid"
+ private static final String DRUID_SCHEMA = "druid";
// Druid maintains its datasources related metadata by setting the catalog name as "druid"
// Note that while a user may name the catalog name as something else, metadata queries made
// to druid will always have the TABLE_CATALOG set to DRUID_CATALOG
private static final String DRUID_CATALOG = "druid";
- // All the datasources in Druid are created under schema "druid"
- public static final String DRUID_SCHEMA = "druid";
@Inject
public DruidJdbcClient(BaseJdbcConfig config, ConnectionFactory connectionFactory, IdentifierMapping identifierMapping)
@@ -248,8 +248,10 @@ public Optional toColumnMapping(ConnectorSession session, Connect
return Optional.of(timeColumnMappingUsingSqlTime());
case Types.TIMESTAMP:
- // TODO Consider using `StandardColumnMappings.timestampColumnMapping`
- return Optional.of(timestampColumnMappingUsingSqlTimestampWithRounding(TIMESTAMP_MILLIS));
+ // TODO: use timestampColumnMapping when https://issues.apache.org/jira/browse/CALCITE-1630 gets resolved
+ // As mentioned in https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#timestampspec
+ // Druid supports `nanosecond` in ingestion. There is no clear document for querying, so for now keep it using MILLISECOND
+ return Optional.of(timestampColumnMappingUsingSqlTimestampWithRoundingFullPushdown(TIMESTAMP_MILLIS));
}
if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java
index 8264a2cdd976..9da3393dc84a 100644
--- a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java
@@ -13,12 +13,16 @@
*/
package io.trino.plugin.druid;
+import com.google.common.collect.ImmutableMap;
+import io.trino.Session;
import io.trino.plugin.jdbc.BaseJdbcConnectorTest;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.predicate.TupleDomain;
import io.trino.sql.planner.assertions.PlanMatchPattern;
import io.trino.sql.planner.plan.AggregationNode;
+import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TopNNode;
@@ -35,6 +39,7 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree;
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
+import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static org.assertj.core.api.Assertions.assertThat;
@@ -275,4 +280,122 @@ public void testLimitPushDown()
"LIMIT 30"))
.isNotFullyPushedDown(joinOverTableScans);
}
+
+ @Test
+ public void testPredicatePushdown()
+ {
+ assertThat(query("SELECT * FROM orders where __time > DATE'1970-01-01'")).isFullyPushedDown();
+ assertThat(query("SELECT * FROM orders where totalprice > 0")).isFullyPushedDown();
+ assertThat(query("SELECT * FROM orders where comment = ''")).isFullyPushedDown();
+
+ // varchar equality
+ assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name = 'ROMANIA'"))
+ .matches("VALUES (BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar(25)))")
+ .isFullyPushedDown();
+
+ // varchar range
+ assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name BETWEEN 'POLAND' AND 'RPA'"))
+ .matches("VALUES (BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar(25)))")
+ .isNotFullyPushedDown(FilterNode.class);
+
+ // varchar IN without domain compaction
+ assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name IN ('POLAND', 'ROMANIA', 'VIETNAM')"))
+ .matches("VALUES " +
+ "(BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar(25))), " +
+ "(BIGINT '2', BIGINT '21', CAST('VIETNAM' AS varchar(25)))")
+ .isFullyPushedDown();
+
+ // varchar IN with small compaction threshold
+ assertThat(query(
+ Session.builder(getSession())
+ .setCatalogSessionProperty("postgresql", "domain_compaction_threshold", "1")
+ .build(),
+ "SELECT regionkey, nationkey, name FROM nation WHERE name IN ('POLAND', 'ROMANIA', 'VIETNAM')"))
+ .matches("VALUES " +
+ "(BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar(25))), " +
+ "(BIGINT '2', BIGINT '21', CAST('VIETNAM' AS varchar(25)))")
+ // Filter node is retained as no constraint is pushed into connector.
+ // The compacted domain is a range predicate which can give wrong results
+ // if pushed down as PostgreSQL has different sort ordering for letters from Trino
+ .isNotFullyPushedDown(
+ node(
+ FilterNode.class,
+ // verify that no constraint is applied by the connector
+ tableScan(
+ tableHandle -> ((JdbcTableHandle) tableHandle).getConstraint().isAll(),
+ TupleDomain.all(),
+ ImmutableMap.of())));
+
+ // varchar different case
+ assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name = 'romania'"))
+ .returnsEmptyResult()
+ .isFullyPushedDown();
+
+ // bigint equality
+ assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE nationkey = 19"))
+ .matches("VALUES (BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar(25)))")
+ .isFullyPushedDown();
+
+ // bigint equality with small compaction threshold
+ assertThat(query(
+ Session.builder(getSession())
+ .setCatalogSessionProperty("postgresql", "domain_compaction_threshold", "1")
+ .build(),
+ "SELECT regionkey, nationkey, name FROM nation WHERE nationkey IN (19, 21)"))
+ .matches("VALUES " +
+ "(BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar(25))), " +
+ "(BIGINT '2', BIGINT '21', CAST('VIETNAM' AS varchar(25)))")
+ .isNotFullyPushedDown(FilterNode.class);
+
+ // bigint range, with decimal to bigint simplification
+ assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE nationkey BETWEEN 18.5 AND 19.5"))
+ .matches("VALUES (BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar(25)))")
+ .isFullyPushedDown();
+
+ // date equality
+ assertThat(query("SELECT orderkey FROM orders WHERE orderdate = DATE '1992-09-29'"))
+ .matches("VALUES BIGINT '1250', 34406, 38436, 57570")
+ .isFullyPushedDown();
+
+ // predicate over aggregation key (likely to be optimized before being pushed down into the connector)
+ assertThat(query("SELECT * FROM (SELECT regionkey, sum(nationkey) FROM nation GROUP BY regionkey) WHERE regionkey = 3"))
+ .matches("VALUES (BIGINT '3', BIGINT '77')")
+ .isFullyPushedDown();
+
+ // predicate over aggregation result
+ assertThat(query("SELECT regionkey, sum(nationkey) FROM nation GROUP BY regionkey HAVING sum(nationkey) = 77"))
+ .matches("VALUES (BIGINT '3', BIGINT '77')")
+ .isFullyPushedDown();
+
+ // predicate over TopN result
+ assertThat(query("" +
+ "SELECT orderkey " +
+ "FROM (SELECT * FROM orders ORDER BY orderdate DESC, orderkey ASC LIMIT 10)" +
+ "WHERE orderdate = DATE '1998-08-01'"))
+ .matches("VALUES BIGINT '27588', 22403, 37735")
+ .ordered()
+ .isFullyPushedDown();
+
+ assertThat(query("" +
+ "SELECT custkey " +
+ "FROM (SELECT SUM(totalprice) as sum, custkey, COUNT(*) as cnt FROM orders GROUP BY custkey order by sum desc limit 10) " +
+ "WHERE cnt > 30"))
+ .matches("VALUES BIGINT '643', 898")
+ .ordered()
+ .isFullyPushedDown();
+
+ // predicate over join
+ Session joinPushdownEnabled = joinPushdownEnabled(getSession());
+ assertThat(query(joinPushdownEnabled, "SELECT c.name, n.name FROM customer c JOIN nation n ON c.custkey = n.nationkey WHERE acctbal > 8000"))
+ .isFullyPushedDown();
+
+ // varchar predicate over join
+ assertThat(query(joinPushdownEnabled, "SELECT c.name, n.name FROM customer c JOIN nation n ON c.custkey = n.nationkey WHERE address = 'TcGe5gaZNgVePxU5kRrvXBfkasDTea'"))
+ .isFullyPushedDown();
+ assertThat(query(joinPushdownEnabled, "SELECT c.name, n.name FROM customer c JOIN nation n ON c.custkey = n.nationkey WHERE address < 'TcGe5gaZNgVePxU5kRrvXBfkasDTea'"))
+ .isNotFullyPushedDown(
+ node(JoinNode.class,
+ anyTree(node(TableScanNode.class)),
+ anyTree(node(TableScanNode.class))));
+ }
}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/DruidCreateAndInsertDataSetup.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/DruidCreateAndInsertDataSetup.java
new file mode 100644
index 000000000000..e5a05bb63ea1
--- /dev/null
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/DruidCreateAndInsertDataSetup.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.druid;
+
+import io.airlift.log.Logger;
+import io.trino.plugin.druid.ingestion.IndexTaskBuilder;
+import io.trino.plugin.druid.ingestion.TimestampSpec;
+import io.trino.testing.datatype.ColumnSetup;
+import io.trino.testing.datatype.DataSetup;
+import io.trino.testing.sql.SqlExecutor;
+import io.trino.testing.sql.TestTable;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+public class DruidCreateAndInsertDataSetup
+ implements DataSetup
+{
+ private static final Logger log = Logger.get(DruidCreateAndInsertDataSetup.class);
+ private final SqlExecutor sqlExecutor;
+ private final TestingDruidServer druidServer;
+ private final String dataSourceNamePrefix;
+
+ public DruidCreateAndInsertDataSetup(SqlExecutor sqlExecutor, TestingDruidServer druidServer, String dataSourceNamePrefix)
+ {
+ this.sqlExecutor = sqlExecutor;
+ this.druidServer = druidServer;
+ this.dataSourceNamePrefix = dataSourceNamePrefix;
+ }
+
+ @Override
+ public TestTable setupTestTable(List inputs)
+ {
+ TestTable testTable = new TestTable(this.sqlExecutor, this.dataSourceNamePrefix, "(col1 TIMESTAMP(3))", false);
+ try {
+ ingestData(testTable, inputs);
+ }
+ catch (Exception e) {
+ log.error(e);
+ }
+ return testTable;
+ }
+
+ private void ingestData(TestTable testTable, List inputs)
+ throws Exception
+ {
+ IndexTaskBuilder builder = new IndexTaskBuilder();
+ builder.setDatasource(testTable.getName());
+ TimestampSpec timestampSpec = getTimestampSpec(inputs);
+ builder.setTimestampSpec(timestampSpec);
+
+ List normalInputs = inputs.stream().filter(input -> !isTimestampDimension(input)).collect(Collectors.toList());
+ for (int index = 0; index < inputs.size() - 1; index++) {
+ builder.addColumn(format("col_%s", index), normalInputs.get(index).getDeclaredType().orElse("string"));
+ }
+
+ String dataFilePath = format("%s.tsv", testTable.getName());
+ writeTsvFile(dataFilePath, inputs);
+
+ log.info(builder.build());
+ this.druidServer.ingestDataWithoutTaskFile(builder.build(), dataFilePath, testTable.getName());
+ }
+
+ private TimestampSpec getTimestampSpec(List inputs)
+ {
+ List timestampInputs = inputs.stream().filter(this::isTimestampDimension).collect(Collectors.toList());
+
+ if (timestampInputs.size() > 1) {
+ throw new UnsupportedOperationException("Druid only allows one timestamp field");
+ }
+
+ return new TimestampSpec("dummy_druid_ts", "auto");
+ }
+
+ private boolean isTimestampDimension(ColumnSetup input)
+ {
+ if (input.getDeclaredType().isEmpty()) {
+ return false;
+ }
+ String type = input.getDeclaredType().get();
+
+ // TODO: support more types
+ if (type.startsWith("timestamp")) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private void writeTsvFile(String dataFilePath, List inputs)
+ throws IOException
+ {
+ String tsvFileLocation = format("%s/%s", druidServer.getHostWorkingDirectory(), dataFilePath);
+ File file = new File(tsvFileLocation);
+ try (BufferedWriter bw = new BufferedWriter(new FileWriter(file))) {
+ bw.write(inputs.stream().map(ColumnSetup::getInputLiteral).collect(Collectors.joining("\t")));
+ }
+ }
+}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/DruidSqlDataTypeTest.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/DruidSqlDataTypeTest.java
new file mode 100644
index 000000000000..b7d862c66309
--- /dev/null
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/DruidSqlDataTypeTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.druid;
+
+import io.trino.Session;
+import io.trino.spi.type.Type;
+import io.trino.sql.query.QueryAssertions;
+import io.trino.sql.query.QueryAssertions.QueryAssert;
+import io.trino.testing.MaterializedResult;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.datatype.ColumnSetup;
+import io.trino.testing.datatype.DataSetup;
+import io.trino.testing.sql.TestTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.joining;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public final class DruidSqlDataTypeTest
+{
+ public static DruidSqlDataTypeTest create()
+ {
+ return new DruidSqlDataTypeTest();
+ }
+
+ private final List testCases = new ArrayList<>();
+
+ private DruidSqlDataTypeTest() {}
+
+ public DruidSqlDataTypeTest addRoundTrip(String literal)
+ {
+ return addRoundTrip(literal, literal);
+ }
+
+ public DruidSqlDataTypeTest addRoundTrip(String inputLiteral, String expectedLiteral)
+ {
+ testCases.add(new TestCase(Optional.empty(), inputLiteral, Optional.empty(), expectedLiteral));
+ return this;
+ }
+
+ public DruidSqlDataTypeTest addRoundTrip(String inputType, String literal, Type expectedType)
+ {
+ return addRoundTrip(inputType, literal, expectedType, literal);
+ }
+
+ public DruidSqlDataTypeTest addRoundTrip(String inputType, String inputLiteral, Type expectedType, String expectedLiteral)
+ {
+ testCases.add(new TestCase(Optional.of(inputType), inputLiteral, Optional.of(expectedType), expectedLiteral));
+ return this;
+ }
+
+ public DruidSqlDataTypeTest execute(QueryRunner queryRunner, DataSetup dataSetup)
+ {
+ return execute(queryRunner, queryRunner.getDefaultSession(), dataSetup);
+ }
+
+ public DruidSqlDataTypeTest execute(QueryRunner queryRunner, Session session, DataSetup dataSetup)
+ {
+ checkState(!testCases.isEmpty(), "No test cases");
+ try (TestTable testTable = dataSetup.setupTestTable(unmodifiableList(testCases))) {
+ verifySelect(queryRunner, session, testTable);
+ verifyPredicate(queryRunner, session, testTable);
+ }
+ return this;
+ }
+
+ private void verifySelect(QueryRunner queryRunner, Session session, TestTable testTable)
+ {
+ @SuppressWarnings("resource") // Closing QueryAssertions would close the QueryRunner
+ QueryAssertions queryAssertions = new QueryAssertions(queryRunner);
+
+ QueryAssert assertion = assertThat(queryAssertions.query(session, "SELECT * FROM " + testTable.getName()));
+ MaterializedResult expected = queryRunner.execute(session, testCases.stream()
+ .map(TestCase::getExpectedLiteral)
+ .collect(joining(",", "VALUES (", ")")));
+
+ // Verify types if specified
+ for (int column = 0; column < testCases.size(); column++) {
+ TestCase testCase = testCases.get(column);
+ if (testCase.getExpectedType().isPresent()) {
+ Type expectedType = testCase.getExpectedType().get();
+ assertion.outputHasType(column, expectedType);
+ assertThat(expected.getTypes())
+ .as("Expected literal type (check consistency of expected type and expected literal)")
+ .element(column).isEqualTo(expectedType);
+ }
+ }
+
+ assertion.matches(expected);
+ }
+
+ private void verifyPredicate(QueryRunner queryRunner, Session session, TestTable testTable)
+ {
+ String queryWithAll = "SELECT 'all found' FROM " + testTable.getName() + " WHERE " +
+ IntStream.range(0, testCases.size())
+ .mapToObj(this::getPredicate)
+ .collect(joining(" AND "));
+
+ MaterializedResult result = queryRunner.execute(session, queryWithAll);
+ if (result.getOnlyColumnAsSet().equals(Set.of("all found"))) {
+ return;
+ }
+
+ @SuppressWarnings("resource") // Closing QueryAssertions would close the QueryRunner
+ QueryAssertions queryAssertions = new QueryAssertions(queryRunner);
+
+ for (int column = 0; column < testCases.size(); column++) {
+ String query = "SELECT 'found' FROM " + testTable.getName() + " WHERE " + getPredicate(column);
+ assertThat(queryAssertions.query(session, query))
+ .matches("VALUES 'found'");
+ }
+ }
+
+ private String getPredicate(int column)
+ {
+ if (column == 0) {
+ return format("__time IS NOT DISTINCT FROM %s", testCases.get(column).getExpectedLiteral());
+ }
+ return format("col_%s IS NOT DISTINCT FROM %s", column - 1, testCases.get(column).getExpectedLiteral());
+ }
+
+ private static class TestCase
+ implements ColumnSetup
+ {
+ private final Optional declaredType;
+ private final String inputLiteral;
+ private final Optional expectedType;
+ private final String expectedLiteral;
+
+ public TestCase(Optional declaredType, String inputLiteral, Optional expectedType, String expectedLiteral)
+ {
+ this.declaredType = requireNonNull(declaredType, "declaredType is null");
+ this.expectedType = requireNonNull(expectedType, "expectedType is null");
+ this.inputLiteral = requireNonNull(inputLiteral, "inputLiteral is null");
+ this.expectedLiteral = requireNonNull(expectedLiteral, "expectedLiteral is null");
+ }
+
+ @Override
+ public Optional getDeclaredType()
+ {
+ return declaredType;
+ }
+
+ @Override
+ public String getInputLiteral()
+ {
+ return inputLiteral;
+ }
+
+ public Optional getExpectedType()
+ {
+ return expectedType;
+ }
+
+ public String getExpectedLiteral()
+ {
+ return expectedLiteral;
+ }
+ }
+}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidTypeMapping.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidTypeMapping.java
new file mode 100644
index 000000000000..10ba20f78442
--- /dev/null
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidTypeMapping.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.druid;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.datatype.DataSetup;
+import io.trino.testing.sql.TrinoSqlExecutor;
+import org.testng.annotations.Test;
+
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+
+public class TestDruidTypeMapping
+ extends AbstractTestQueryFramework
+{
+ private static final String DRUID_DOCKER_IMAGE = "apache/druid:0.18.0";
+ protected TestingDruidServer druidServer;
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ this.druidServer = new TestingDruidServer(DRUID_DOCKER_IMAGE);
+ return DruidQueryRunner.createDruidQueryRunnerTpch(druidServer, ImmutableMap.of(), ImmutableList.of());
+ }
+
+ @Test
+ public void testTimestampAndDouble()
+ {
+ DruidSqlDataTypeTest.create()
+ .addRoundTrip("timestamp(3)", "2020-01-01 00:00:00.000", TIMESTAMP_MILLIS, "TIMESTAMP '2020-01-01 00:00:00.000'")
+ .addRoundTrip("double", "1.0E100", DOUBLE, "1.0E100")
+ .addRoundTrip("double", "123.456E10", DOUBLE, "123.456E10")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_double"));
+ }
+
+ @Test
+ public void testTimestampAndVarchar()
+ {
+ DruidSqlDataTypeTest.create()
+ .addRoundTrip("timestamp(3)", "2020-01-01 00:00:00.000", TIMESTAMP_MILLIS, "TIMESTAMP '2020-01-01 00:00:00.000'")
+ .addRoundTrip("string", "dummy_varchar", VARCHAR, "cast('dummy_varchar' AS VARCHAR)")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_timestamp_varchar"));
+ }
+
+ @Test
+ public void testTimestampAndLong()
+ {
+ DruidSqlDataTypeTest.create()
+ .addRoundTrip("timestamp(3)", "2020-01-01 00:00:00.000", TIMESTAMP_MILLIS, "TIMESTAMP '2020-01-01 00:00:00.000'")
+ .addRoundTrip("long", "1000000000", BIGINT, "cast(1000000000 AS BIGINT)")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_timestamp_long"));
+ }
+
+ private DataSetup trinoCreateAsSelect(String dataSourceName)
+ {
+ return new DruidCreateAndInsertDataSetup(new TrinoSqlExecutor(getQueryRunner()), this.druidServer, dataSourceName);
+ }
+}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestingDruidServer.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestingDruidServer.java
index bf1acb411296..2fa4a115a613 100644
--- a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestingDruidServer.java
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestingDruidServer.java
@@ -17,6 +17,7 @@
import com.google.common.io.Closer;
import com.google.common.io.MoreFiles;
import com.google.common.io.Resources;
+import io.airlift.log.Logger;
import io.trino.testing.assertions.Assert;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -51,6 +52,7 @@
public class TestingDruidServer
implements Closeable
{
+ private static final Logger log = Logger.get(TestingDruidServer.class);
private final String hostWorkingDirectory;
private final GenericContainer> broker;
private final GenericContainer> coordinator;
@@ -77,7 +79,7 @@ public TestingDruidServer(String dockerImageName)
// Cannot use Files.createTempDirectory() because on Mac by default it uses
// /var/folders/ which is not visible to Docker for Mac
hostWorkingDirectory = Files.createDirectory(
- Paths.get("/tmp/docker-tests-files-" + randomUUID().toString()))
+ Paths.get("/tmp/docker-tests-files-" + randomUUID().toString()))
.toAbsolutePath().toString();
File f = new File(hostWorkingDirectory);
// Enable read/write/exec access for the services running in containers
@@ -238,13 +240,28 @@ void ingestData(String datasource, String indexTaskFile, String dataFilePath)
middleManager.withCopyFileToContainer(forHostPath(dataFilePath),
getMiddleManagerContainerPathForDataFile(dataFilePath));
String indexTask = Resources.toString(getResource(indexTaskFile), Charset.defaultCharset());
+ ingest(indexTask, datasource);
+ }
+
+ void ingestDataWithoutTaskFile(String indexTask, String dataFilePath, String datasource)
+ throws IOException, InterruptedException
+ {
+ middleManager.withCopyFileToContainer(forHostPath(dataFilePath),
+ getMiddleManagerContainerPathForDataFile(dataFilePath));
+ ingest(indexTask, datasource);
+ }
+ void ingest(String indexTask, String datasource)
+ throws IOException, InterruptedException
+ {
Request.Builder requestBuilder = new Request.Builder();
requestBuilder.addHeader("content-type", "application/json;charset=utf-8")
.url("http://localhost:" + getCoordinatorOverlordPort() + "/druid/indexer/v1/task")
.post(RequestBody.create(null, indexTask));
Request ingestionRequest = requestBuilder.build();
- try (Response ignored = httpClient.newCall(ingestionRequest).execute()) {
+
+ try (Response taskIdResponse = httpClient.newCall(ingestionRequest).execute()) {
+ log.info(taskIdResponse.body().string());
Assert.assertTrue(checkDatasourceAvailable(datasource), "Datasource " + datasource + " not loaded");
}
}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/ColumnSpec.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/ColumnSpec.java
new file mode 100644
index 000000000000..5d5202a0907b
--- /dev/null
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/ColumnSpec.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.druid.ingestion;
+
+public class ColumnSpec
+{
+ private final String name;
+ private final String type;
+
+ public ColumnSpec(String name, String type)
+ {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/IndexTaskBuilder.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/IndexTaskBuilder.java
new file mode 100644
index 000000000000..4011d9122e74
--- /dev/null
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/IndexTaskBuilder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.druid.ingestion;
+
+import com.google.common.io.Resources;
+import freemarker.template.Template;
+import io.airlift.log.Logger;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+
+import static com.google.common.io.Resources.getResource;
+
+/**
+ * This builder is used to build Druid index task request body
+ * and follows same format as files inside resources directory
+ */
+
+public class IndexTaskBuilder
+{
+ private static final Logger log = Logger.get(IndexTaskBuilder.class);
+ private final ArrayList columns;
+ private String datasource;
+ private TimestampSpec timestampSpec;
+
+ public IndexTaskBuilder()
+ {
+ this.columns = new ArrayList<>();
+ }
+
+ public IndexTaskBuilder addColumn(String name, String type)
+ {
+ columns.add(new ColumnSpec(name, type));
+ return this;
+ }
+
+ public String getDatasource()
+ {
+ return datasource;
+ }
+
+ public IndexTaskBuilder setDatasource(String datasource)
+ {
+ this.datasource = datasource;
+ return this;
+ }
+
+ public TimestampSpec getTimestampSpec()
+ {
+ return timestampSpec;
+ }
+
+ public IndexTaskBuilder setTimestampSpec(TimestampSpec timestampSpec)
+ {
+ this.timestampSpec = timestampSpec;
+ return this;
+ }
+
+ public ArrayList getColumns()
+ {
+ return columns;
+ }
+
+ public String build()
+ {
+ String tplContent;
+ try {
+ String tmpFile = "ingestion-index.tpl";
+ tplContent = Resources.toString(getResource(tmpFile), Charset.defaultCharset());
+ Template template = new Template("ingestion-task", new StringReader(tplContent));
+ StringWriter writer = new StringWriter();
+ template.process(this, writer);
+ return writer.toString();
+ }
+ catch (Exception ex) {
+ log.error(ex);
+ }
+ return "";
+ }
+}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/TestIndexTaskBuilder.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/TestIndexTaskBuilder.java
new file mode 100644
index 000000000000..21026ecc6f6b
--- /dev/null
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/TestIndexTaskBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.druid.ingestion;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+public class TestIndexTaskBuilder
+{
+ @Test
+ public void testIngestionSpec()
+ throws JsonProcessingException
+ {
+ String expected = "{\"type\":\"index\",\"spec\":{\"dataSchema\":{\"dataSource\":\"test_datasource\",\"parser\":{\"type\":\"string\",\"parseSpec\":{\"format\":\"tsv\",\"timestampSpec\":{\"column\":\"dummy_druid_ts\",\"format\":\"auto\"},\"columns\":[\"dummy_druid_ts\",\"col_0\",\"col_1\"],\"dimensionsSpec\":{\"dimensions\":[{\"name\":\"col_0\",\"type\":\"string\"},{\"name\":\"col_1\",\"type\":\"long\"}]}}},\"granularitySpec\":{\"type\":\"uniform\",\"intervals\":[\"1992-01-02/2028-12-01\"],\"segmentGranularity\":\"year\",\"queryGranularity\":\"day\"}},\"ioConfig\":{\"type\":\"index\",\"firehose\":{\"type\":\"local\",\"baseDir\":\"/opt/druid/var/\",\"filter\":\"test_datasource.tsv\"},\"appendToExisting\":false},\"tuningConfig\":{\"type\":\"index\",\"maxRowsPerSegment\":5000000,\"maxRowsInMemory\":250000,\"segmentWriteOutMediumFactory\":{\"type\":\"offHeapMemory\"}}}}";
+ IndexTaskBuilder builder = new IndexTaskBuilder();
+ builder.setDatasource("test_datasource");
+ builder.addColumn("col_0", "string");
+ builder.addColumn("col_1", "long");
+ builder.setTimestampSpec(new TimestampSpec("dummy_druid_ts", "auto"));
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonNode = objectMapper.readValue(builder.build(), JsonNode.class);
+
+ assertThat(jsonNode.toString()).isEqualTo(expected);
+ }
+}
diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/TimestampSpec.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/TimestampSpec.java
new file mode 100644
index 000000000000..153ec81226b2
--- /dev/null
+++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/ingestion/TimestampSpec.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.druid.ingestion;
+
+public class TimestampSpec
+{
+ private final String column;
+ private final String format;
+
+ public TimestampSpec(String column, String format)
+ {
+ this.column = column;
+ this.format = format;
+ }
+
+ public String getColumn()
+ {
+ return column;
+ }
+
+ public String getFormat()
+ {
+ return format;
+ }
+}
diff --git a/plugin/trino-druid/src/test/resources/ingestion-index.tpl b/plugin/trino-druid/src/test/resources/ingestion-index.tpl
new file mode 100644
index 000000000000..5fe10e5bdb0d
--- /dev/null
+++ b/plugin/trino-druid/src/test/resources/ingestion-index.tpl
@@ -0,0 +1,59 @@
+ {
+ "type": "index",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "${datasource}",
+ "parser": {
+ "type": "string",
+ "parseSpec": {
+ "format": "tsv",
+ "timestampSpec": {
+ "column": "${timestampSpec.column}",
+ "format": "${timestampSpec.format}"
+ },
+ "columns": [
+ "${timestampSpec.column}",
+ <#list columns as column>
+ "${column.name}"<#sep>, #sep>
+ #list>
+ ],
+ "dimensionsSpec": {
+ "dimensions": [
+ <#list columns as column>
+ {
+ "name": "${column.name}",
+ "type": "${column.type}"
+ }<#sep>,
+ #list>
+ ]
+ }
+ }
+ },
+ "granularitySpec": {
+ "type": "uniform",
+ "intervals": [
+ "1992-01-02/2028-12-01"
+ ],
+ "segmentGranularity": "year",
+ "queryGranularity": "day"
+ }
+ },
+ "ioConfig": {
+ "type": "index",
+ "firehose": {
+ "type": "local",
+ "baseDir": "/opt/druid/var/",
+ "filter": "${datasource}.tsv"
+ },
+ "appendToExisting": false
+ },
+ "tuningConfig": {
+ "type": "index",
+ "maxRowsPerSegment": 5000000,
+ "maxRowsInMemory": 250000,
+ "segmentWriteOutMediumFactory": {
+ "type": "offHeapMemory"
+ }
+ }
+ }
+ }
diff --git a/testing/trino-testing/src/main/java/io/trino/testing/sql/TestTable.java b/testing/trino-testing/src/main/java/io/trino/testing/sql/TestTable.java
index 5343b83b0a25..1dfef46b49e9 100644
--- a/testing/trino-testing/src/main/java/io/trino/testing/sql/TestTable.java
+++ b/testing/trino-testing/src/main/java/io/trino/testing/sql/TestTable.java
@@ -36,35 +36,49 @@ public class TestTable
private final SqlExecutor sqlExecutor;
private final String name;
+ private final Boolean supportSqlCreateAndDrop;
public TestTable(SqlExecutor sqlExecutor, String namePrefix, String tableDefinition)
{
- this(sqlExecutor, namePrefix, tableDefinition, ImmutableList.of());
+ this(sqlExecutor, namePrefix, tableDefinition, true);
+ }
+
+ public TestTable(SqlExecutor sqlExecutor, String namePrefix, Boolean supportSqlCreateAndDrop)
+ {
+ this(sqlExecutor, namePrefix, "dummy table definition", ImmutableList.of(), supportSqlCreateAndDrop);
+ }
+
+ public TestTable(SqlExecutor sqlExecutor, String namePrefix, String tableDefinition, Boolean supportSqlCreateAndDrop)
+ {
+ this(sqlExecutor, namePrefix, tableDefinition, ImmutableList.of(), supportSqlCreateAndDrop);
}
public TestTable(SqlExecutor sqlExecutor, String namePrefix, String tableDefinition, List rowsToInsert)
{
+ this(sqlExecutor, namePrefix, tableDefinition, rowsToInsert, true);
+ }
+
+ public TestTable(SqlExecutor sqlExecutor, String namePrefix, String tableDefinition, List rowsToInsert, Boolean supportSqlCreateAndDrop)
+ {
+ this.supportSqlCreateAndDrop = supportSqlCreateAndDrop;
this.sqlExecutor = sqlExecutor;
- this.name = namePrefix + randomTableSuffix();
- sqlExecutor.execute(format("CREATE TABLE %s %s", name, tableDefinition));
- try {
- for (String row : rowsToInsert) {
- // some databases do not support multi value insert statement
- sqlExecutor.execute(format("INSERT INTO %s VALUES (%s)", name, row));
+ this.name = namePrefix + "_" + randomTableSuffix();
+ if (supportSqlCreateAndDrop) {
+ sqlExecutor.execute(format("CREATE TABLE %s %s", name, tableDefinition));
+ try {
+ for (String row : rowsToInsert) {
+ // some databases do not support multi value insert statement
+ sqlExecutor.execute(format("INSERT INTO %s VALUES (%s)", name, row));
+ }
}
- }
- catch (Exception e) {
- try (TestTable ignored = this) {
- throw e;
+ catch (Exception e) {
+ try (TestTable ignored = this) {
+ throw e;
+ }
}
}
}
- public String getName()
- {
- return name;
- }
-
public static TestTable fromColumns(SqlExecutor sqlExecutor, String namePrefix, Map> columns)
{
return fromColumns(
@@ -116,15 +130,22 @@ public static TestTable fromColumnValueProviders(SqlExecutor sqlExecutor, String
return new TestTable(sqlExecutor, namePrefix, tableDefinition, rows.build());
}
- @Override
- public void close()
- {
- sqlExecutor.execute("DROP TABLE " + name);
- }
-
public static String randomTableSuffix()
{
String randomSuffix = Long.toString(abs(random.nextLong()), MAX_RADIX);
return randomSuffix.substring(0, min(RANDOM_SUFFIX_LENGTH, randomSuffix.length()));
}
+
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public void close()
+ {
+ if (supportSqlCreateAndDrop) {
+ sqlExecutor.execute("DROP TABLE " + name);
+ }
+ }
}