From a44e2f80e488d39f77029aad7b768c0a227a34d1 Mon Sep 17 00:00:00 2001 From: "praveenkrishna.d" Date: Mon, 24 Jun 2024 20:03:29 +0530 Subject: [PATCH 1/2] Throw TrinoException for Kudu failures Inserting/Merging/Updating invalid data in Kudu doesn't throw a Kudu specific exception so we wrap all the exceptions as Trino specific exceptions --- .../io/trino/plugin/kudu/KuduPageSink.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java index 29dc98d338af..adf0063c05fb 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorMergeSink; import io.trino.spi.connector.ConnectorPageSink; @@ -47,6 +48,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -138,8 +140,8 @@ public CompletableFuture appendPage(Page page) } return NOT_BLOCKED; } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } @@ -228,8 +230,8 @@ public void storeMergedRows(Page page) try { operationApplier.applyOperationAsync(delete); } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } @@ -248,14 +250,14 @@ public void storeMergedRows(Page page) try { operationApplier.applyOperationAsync(insert); } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } } } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } From 592b7512af779b2afec64f6057df622d2294ffb3 Mon Sep 17 00:00:00 2001 From: Pascal Gasp Date: Tue, 12 Dec 2023 14:23:21 +0100 Subject: [PATCH 2/2] Add date data type support for Kudu Co-authored-by: Pascal Gasp --- docs/src/main/sphinx/connector/kudu.md | 4 +- .../io/trino/plugin/kudu/KuduPageSink.java | 4 ++ .../java/io/trino/plugin/kudu/TypeHelper.java | 13 ++++- .../plugin/kudu/TestKuduConnectorTest.java | 55 +++++++++---------- .../plugin/kudu/TestKuduTypeMapping.java | 45 +++++++++++++++ 5 files changed, 87 insertions(+), 34 deletions(-) diff --git a/docs/src/main/sphinx/connector/kudu.md b/docs/src/main/sphinx/connector/kudu.md index 9882ec2287ef..71b3bd207a9e 100644 --- a/docs/src/main/sphinx/connector/kudu.md +++ b/docs/src/main/sphinx/connector/kudu.md @@ -217,6 +217,8 @@ this table: - `VARCHAR` * - `BINARY` - `VARBINARY` +* - `DATE` + - `DATE` * - `UNIXTIME_MICROS` - `TIMESTAMP(3)` ::: @@ -266,7 +268,7 @@ this table: - `BINARY` - * - `DATE` - - `STRING` + - `DATE` - * - `TIMESTAMP(3)` - `UNIXTIME_MICROS` diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java index adf0063c05fb..0c71ee27c2e4 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java @@ -63,6 +63,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.kudu.util.DateUtil.epochDaysToSqlDate; public class KuduPageSink implements ConnectorPageSink, ConnectorMergeSink @@ -152,6 +153,9 @@ private void appendColumn(PartialRow row, Page page, int position, int channel, if (block.isNull(position)) { row.setNull(destChannel); } + else if (DATE.equals(type)) { + row.addDate(destChannel, epochDaysToSqlDate(INTEGER.getInt(block, position))); + } else if (TIMESTAMP_MILLIS.equals(type)) { row.addLong(destChannel, truncateEpochMicrosToMillis(TIMESTAMP_MILLIS.getLong(block, position))); } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java index 88763e9525e8..48d5a707b45c 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java @@ -84,7 +84,7 @@ public static org.apache.kudu.Type toKuduClientType(Type type) return org.apache.kudu.Type.BINARY; } if (type == DateType.DATE) { - return org.apache.kudu.Type.STRING; + return org.apache.kudu.Type.DATE; } if (type.equals(TIMESTAMP_MILLIS)) { return org.apache.kudu.Type.UNIXTIME_MICROS; @@ -116,15 +116,16 @@ private static Type fromKuduClientType(org.apache.kudu.Type ktype, ColumnTypeAtt return DoubleType.DOUBLE; case DECIMAL: return DecimalType.createDecimalType(attributes.getPrecision(), attributes.getScale()); - // TODO: add support for varchar and date types: https://github.com/trinodb/trino/issues/11009 case STRING: return VarcharType.VARCHAR; case BINARY: return VarbinaryType.VARBINARY; + case DATE: + return DateType.DATE; case UNIXTIME_MICROS: return TIMESTAMP_MILLIS; + // TODO: add support for varchar types: https://github.com/trinodb/trino/issues/11009 case VARCHAR: - case DATE: break; } throw new IllegalStateException("Kudu type not implemented for " + ktype); @@ -166,6 +167,9 @@ public static Object getJavaValue(Type type, Object nativeValue) if (type instanceof VarbinaryType) { return ((Slice) nativeValue).toByteBuffer(); } + if (type.equals(DateType.DATE)) { + return nativeValue; + } if (type.equals(TIMESTAMP_MILLIS)) { // Kudu's native format is in microseconds return nativeValue; @@ -204,6 +208,9 @@ public static long getLong(Type type, RowResult row, int field) } throw new IllegalStateException("getLong not supported for long decimal: " + type); } + if (type.equals(DateType.DATE)) { + return row.getInt(field); + } if (type.equals(TIMESTAMP_MILLIS)) { return truncateEpochMicrosToMillis(row.getLong(field)); } diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java index 28c33f135629..62f436be508a 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java @@ -173,7 +173,7 @@ protected MaterializedResult getDescribeOrdersResult() .row("custkey", "bigint", extra, "") .row("orderstatus", "varchar", extra, "") .row("totalprice", "double", extra, "") - .row("orderdate", "varchar", extra, "") + .row("orderdate", "date", extra, "") .row("orderpriority", "varchar", extra, "") .row("clerk", "varchar", extra, "") .row("shippriority", "integer", extra, "") @@ -198,7 +198,7 @@ public void testShowCreateTable() " custkey bigint COMMENT '' WITH (nullable = true),\n" + " orderstatus varchar COMMENT '' WITH (nullable = true),\n" + " totalprice double COMMENT '' WITH (nullable = true),\n" + - " orderdate varchar COMMENT '' WITH (nullable = true),\n" + + " orderdate date COMMENT '' WITH (nullable = true),\n" + " orderpriority varchar COMMENT '' WITH (nullable = true),\n" + " clerk varchar COMMENT '' WITH (nullable = true),\n" + " shippriority integer COMMENT '' WITH (nullable = true),\n" + @@ -601,8 +601,6 @@ public void testInsertHighestUnicodeCharacter() public void testInsertNegativeDate() { // TODO Remove this overriding test once kudu connector can create tables with default partitions - // TODO Update this test once kudu connector supports DATE type: https://github.com/trinodb/trino/issues/11009 - // DATE type is not supported by Kudu connector try (TestTable table = new TestTable(getQueryRunner()::execute, "insert_date", "(dt DATE WITH (primary_key=true)) " + "WITH (partition_by_hash_columns = ARRAY['dt'], partition_by_hash_buckets = 2)")) { @@ -613,7 +611,7 @@ public void testInsertNegativeDate() @Override protected String errorMessageForInsertNegativeDate(String date) { - return "Insert query has mismatched column types: Table: \\[varchar\\], Query: \\[date\\]"; + return "Date value <-719893>} is out of range '0001-01-01':'9999-12-31'"; } @Test @@ -731,37 +729,24 @@ public void testWrittenStats() @Test @Override - public void testCreateTableAsSelectNegativeDate() + public void testVarcharCastToDateInPredicate() { - // Map date column type to varchar - String tableName = "negative_date_" + randomNameSuffix(); + assertThatThrownBy(super::testVarcharCastToDateInPredicate) + .hasStackTraceContaining("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions"); - try { - assertUpdate(format("CREATE TABLE %s AS SELECT DATE '-0001-01-01' AS dt", tableName), 1); - assertQuery("SELECT * FROM " + tableName, "VALUES '-0001-01-01'"); - assertQuery(format("SELECT * FROM %s WHERE dt = '-0001-01-01'", tableName), "VALUES '-0001-01-01'"); - } - finally { - assertUpdate("DROP TABLE IF EXISTS " + tableName); - } + abort("TODO: implement the test for Kudu"); } @Test @Override + @SuppressWarnings("deprecation") public void testDateYearOfEraPredicate() { - assertThatThrownBy(super::testDateYearOfEraPredicate) - .hasStackTraceContaining("Cannot apply operator: varchar = date"); - } - - @Test - @Override - public void testVarcharCastToDateInPredicate() - { - assertThatThrownBy(super::testVarcharCastToDateInPredicate) - .hasStackTraceContaining("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions"); - - abort("TODO: implement the test for Kudu"); + // Override because the connector throws an exception instead of an empty result when the value is out of supported range + assertQuery("SELECT orderdate FROM orders WHERE orderdate = DATE '1997-09-14'", "VALUES DATE '1997-09-14'"); + // TODO Replace failure with a TrinoException + assertThat(query("SELECT * FROM orders WHERE orderdate = DATE '-1996-09-14'")) + .nonTrinoExceptionFailure().hasMessageContaining("integer value out of range for Type: date column: -1448295"); } @Test @@ -1011,13 +996,17 @@ protected Optional filterDataMappingSmokeTestData(DataMapp return Optional.of(dataMappingTestSetup.asUnsupported()); } - if (typeName.equals("date") // date gets stored as varchar - || typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416) + if (typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416) || (typeName.startsWith("char") && dataMappingTestSetup.getSampleValueLiteral().contains(" "))) { // TODO: https://github.com/trinodb/trino/issues/3597 // TODO this should either work or fail cleanly return Optional.empty(); } + if (typeName.equals("date") && dataMappingTestSetup.getSampleValueLiteral().equals("DATE '1582-10-05'")) { + // Kudu connector returns +10 days during julian->gregorian switch. The test case exists in TestKuduTypeMapping.testDate(). + return Optional.empty(); + } + return Optional.of(dataMappingTestSetup); } @@ -1065,6 +1054,12 @@ protected void verifyColumnNameLengthFailurePermissible(Throwable e) assertThat(e).hasMessageContaining("invalid column name: identifier"); } + @Override + protected String errorMessageForCreateTableAsSelectNegativeDate(String date) + { + return ".*Date value <-719893>} is out of range '0001-01-01':'9999-12-31'.*"; + } + private void assertTableProperty(String tableProperties, String key, String regexValue) { assertThat(Pattern.compile(key + "\\s*=\\s*" + regexValue + ",?\\s+").matcher(tableProperties).find()) diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java index 1ffcf5aab226..57060be2b200 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java @@ -28,11 +28,13 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.function.Function; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.DecimalType.createDecimalType; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; @@ -42,6 +44,7 @@ import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.String.format; import static java.time.ZoneOffset.UTC; final class TestKuduTypeMapping @@ -244,6 +247,48 @@ void testVarbinary() .execute(getQueryRunner(), trinoCreateAndInsert("test_varbinary")); } + @Test + void testDate() + { + testDate(UTC); + testDate(jvmZone); + // using two non-JVM zones + testDate(vilnius); + testDate(kathmandu); + testDate(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId()); + } + + private void testDate(ZoneId sessionZone) + { + Session session = Session.builder(getSession()) + .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId())) + .build(); + + dateTest(inputLiteral -> format("DATE %s", inputLiteral)) + .execute(getQueryRunner(), session, trinoCreateAsSelect(session, "test_date")) + .execute(getQueryRunner(), session, trinoCreateAsSelect("test_date")) + .execute(getQueryRunner(), session, trinoCreateAndInsert(session, "test_date")) + .execute(getQueryRunner(), session, trinoCreateAndInsert("test_date")); + } + + private static SqlDataTypeTest dateTest(Function inputLiteralFactory) + { + return SqlDataTypeTest.create() + .addRoundTrip("date", "NULL", DATE, "CAST(NULL AS DATE)") + .addRoundTrip("date", inputLiteralFactory.apply("'0001-01-01'"), DATE, "DATE '0001-01-01'") // mon value in Kudu + .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-04'"), DATE, "DATE '1582-10-04'") // before julian->gregorian switch + .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-05'"), DATE, "DATE '1582-10-15'") // begin julian->gregorian switch + .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-14'"), DATE, "DATE '1582-10-24'") // end julian->gregorian switch + .addRoundTrip("date", inputLiteralFactory.apply("'1952-04-03'"), DATE, "DATE '1952-04-03'") // before epoch + .addRoundTrip("date", inputLiteralFactory.apply("'1970-01-01'"), DATE, "DATE '1970-01-01'") + .addRoundTrip("date", inputLiteralFactory.apply("'1970-02-03'"), DATE, "DATE '1970-02-03'") + .addRoundTrip("date", inputLiteralFactory.apply("'1983-04-01'"), DATE, "DATE '1983-04-01'") + .addRoundTrip("date", inputLiteralFactory.apply("'1983-10-01'"), DATE, "DATE '1983-10-01'") + .addRoundTrip("date", inputLiteralFactory.apply("'2017-07-01'"), DATE, "DATE '2017-07-01'") // summer on northern hemisphere (possible DST) + .addRoundTrip("date", inputLiteralFactory.apply("'2017-01-01'"), DATE, "DATE '2017-01-01'") // winter on northern hemisphere (possible DST on southern hemisphere) + .addRoundTrip("date", inputLiteralFactory.apply("'9999-12-31'"), DATE, "DATE '9999-12-31'"); // max value in Kudu + } + @Test void testTimestamp() {