diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTransforms.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTransforms.java index 5a607403719a..6a4597901e82 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTransforms.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTransforms.java @@ -163,7 +163,7 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type sour private static ColumnTransform identity(Type type) { - return new ColumnTransform(type, false, true, Function.identity(), ValueTransform.identity(type)); + return new ColumnTransform(type, false, true, false, Function.identity(), ValueTransform.identity(type)); } @VisibleForTesting @@ -174,6 +174,7 @@ static ColumnTransform bucket(Type type, int count) INTEGER, false, false, + false, block -> bucketBlock(block, count, hasher), (block, position) -> { if (block.isNull(position)) { @@ -230,6 +231,7 @@ private static ColumnTransform yearsFromDate() INTEGER, false, true, + true, block -> transformBlock(DATE, INTEGER, block, transform), ValueTransform.from(DATE, transform)); } @@ -241,6 +243,7 @@ private static ColumnTransform monthsFromDate() INTEGER, false, true, + true, block -> transformBlock(DATE, INTEGER, block, transform), ValueTransform.from(DATE, transform)); } @@ -252,6 +255,7 @@ private static ColumnTransform daysFromDate() INTEGER, false, true, + true, block -> transformBlock(DATE, INTEGER, block, transform), ValueTransform.from(DATE, transform)); } @@ -263,6 +267,7 @@ private static ColumnTransform yearsFromTimestamp() INTEGER, false, true, + true, block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform), ValueTransform.from(TIMESTAMP_MICROS, transform)); } @@ -274,6 +279,7 @@ private static ColumnTransform monthsFromTimestamp() INTEGER, false, true, + true, block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform), ValueTransform.from(TIMESTAMP_MICROS, transform)); } @@ -285,6 +291,7 @@ private static ColumnTransform daysFromTimestamp() INTEGER, false, true, + true, block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform), ValueTransform.from(TIMESTAMP_MICROS, transform)); } @@ -296,6 +303,7 @@ private static ColumnTransform hoursFromTimestamp() INTEGER, false, true, + true, block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform), ValueTransform.from(TIMESTAMP_MICROS, transform)); } @@ -307,6 +315,7 @@ private static ColumnTransform yearsFromTimestampWithTimeZone() INTEGER, false, true, + true, block -> extractTimestampWithTimeZone(block, transform), ValueTransform.fromTimestampTzTransform(transform)); } @@ -318,6 +327,7 @@ private static ColumnTransform monthsFromTimestampWithTimeZone() INTEGER, false, true, + true, block -> extractTimestampWithTimeZone(block, transform), ValueTransform.fromTimestampTzTransform(transform)); } @@ -329,6 +339,7 @@ private static ColumnTransform daysFromTimestampWithTimeZone() INTEGER, false, true, + true, block -> extractTimestampWithTimeZone(block, transform), ValueTransform.fromTimestampTzTransform(transform)); } @@ -340,6 +351,7 @@ private static ColumnTransform hoursFromTimestampWithTimeZone() INTEGER, false, true, + true, block -> extractTimestampWithTimeZone(block, transform), ValueTransform.fromTimestampTzTransform(transform)); } @@ -453,6 +465,7 @@ private static ColumnTransform truncateInteger(int width) INTEGER, false, true, + false, block -> truncateInteger(block, width), (block, position) -> { if (block.isNull(position)) { @@ -487,6 +500,7 @@ private static ColumnTransform truncateBigint(int width) BIGINT, false, true, + false, block -> truncateBigint(block, width), (block, position) -> { if (block.isNull(position)) { @@ -522,6 +536,7 @@ private static ColumnTransform truncateShortDecimal(Type type, int width, Decima type, false, true, + false, block -> truncateShortDecimal(decimal, block, unscaledWidth), (block, position) -> { if (block.isNull(position)) { @@ -559,6 +574,7 @@ private static ColumnTransform truncateLongDecimal(Type type, int width, Decimal type, false, true, + false, block -> truncateLongDecimal(decimal, block, unscaledWidth), (block, position) -> { if (block.isNull(position)) { @@ -606,6 +622,7 @@ private static ColumnTransform truncateVarchar(int width) VARCHAR, false, true, + false, block -> truncateVarchar(block, width), (block, position) -> { if (block.isNull(position)) { @@ -647,6 +664,7 @@ private static ColumnTransform truncateVarbinary(int width) VARBINARY, false, true, + false, block -> truncateVarbinary(block, width), (block, position) -> { if (block.isNull(position)) { @@ -685,6 +703,7 @@ private static ColumnTransform voidTransform(Type type) type, true, true, + false, block -> RunLengthEncodedBlock.create(nullBlock, block.getPositionCount()), (block, position) -> null); } @@ -739,14 +758,16 @@ public static class ColumnTransform private final Type type; private final boolean preservesNonNull; private final boolean monotonic; + private final boolean temporal; private final Function blockTransform; private final ValueTransform valueTransform; - public ColumnTransform(Type type, boolean preservesNonNull, boolean monotonic, Function blockTransform, ValueTransform valueTransform) + public ColumnTransform(Type type, boolean preservesNonNull, boolean monotonic, boolean temporal, Function blockTransform, ValueTransform valueTransform) { this.type = requireNonNull(type, "type is null"); this.preservesNonNull = preservesNonNull; this.monotonic = monotonic; + this.temporal = temporal; this.blockTransform = requireNonNull(blockTransform, "transform is null"); this.valueTransform = requireNonNull(valueTransform, "valueTransform is null"); } @@ -769,6 +790,11 @@ public boolean isMonotonic() return monotonic; } + public boolean isTemporal() + { + return temporal; + } + public Function getBlockTransform() { return blockTransform; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index caee663d664f..852f4561c4a3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -22,6 +22,7 @@ import io.trino.plugin.iceberg.ColumnIdentity; import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition; import io.trino.plugin.iceberg.IcebergUtil; +import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; @@ -48,6 +49,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; +import org.apache.iceberg.types.Types; import java.io.IOException; import java.time.Duration; @@ -56,25 +58,35 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT; import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; +import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.mappedCopy; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA; import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.getStorageSchema; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; import static io.trino.plugin.iceberg.IcebergUtil.commit; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties; +import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata; +import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; +import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform; +import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.SmallintType.SMALLINT; import static io.trino.spi.type.TimeType.TIME_MICROS; import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; @@ -217,9 +229,46 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName()); SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName); - List columns = definition.getColumns().stream() - .map(column -> new ColumnMetadata(column.getName(), typeForMaterializedViewStorageTable(typeManager.getType(column.getType())))) - .collect(toImmutableList()); + + Schema schemaWithTimestampTzPreserved = schemaFromMetadata(mappedCopy( + definition.getColumns(), + column -> { + Type type = typeManager.getType(column.getType()); + if (type instanceof TimestampWithTimeZoneType timestampTzType && timestampTzType.getPrecision() <= 6) { + // For now preserve timestamptz columns so that we can parse partitioning + type = TIMESTAMP_TZ_MICROS; + } + else { + type = typeForMaterializedViewStorageTable(type); + } + return new ColumnMetadata(column.getName(), type); + })); + PartitionSpec partitionSpec = parsePartitionFields(schemaWithTimestampTzPreserved, getPartitioning(definition.getProperties())); + Set temporalPartitioningSources = partitionSpec.fields().stream() + .flatMap(partitionField -> { + Types.NestedField sourceField = schemaWithTimestampTzPreserved.findField(partitionField.sourceId()); + Type sourceType = toTrinoType(sourceField.type(), typeManager); + ColumnTransform columnTransform = getColumnTransform(partitionField, sourceType); + if (!columnTransform.isTemporal()) { + return Stream.of(); + } + return Stream.of(sourceField.name()); + }) + .collect(toImmutableSet()); + + List columns = mappedCopy( + definition.getColumns(), + column -> { + Type type = typeManager.getType(column.getType()); + if (type instanceof TimestampWithTimeZoneType timestampTzType && timestampTzType.getPrecision() <= 6 && temporalPartitioningSources.contains(column.getName())) { + // Apply point-in-time semantics to maintain partitioning capabilities + type = TIMESTAMP_TZ_MICROS; + } + else { + type = typeForMaterializedViewStorageTable(type); + } + return new ColumnMetadata(column.getName(), type); + }); ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session); @@ -258,9 +307,7 @@ private Type typeForMaterializedViewStorageTable(Type type) : VARCHAR; } if (type instanceof TimestampWithTimeZoneType) { - // Iceberg does not store the time zone - // TODO allow temporal partitioning on these columns, or MV property to - // drop zone info and use timestamptz directly + // Iceberg does not store the time zone. return VARCHAR; } if (type instanceof ArrayType arrayType) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java index d43e366806c2..865c1d03690b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java @@ -25,6 +25,7 @@ import io.trino.transaction.TransactionManager; import org.assertj.core.api.Condition; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.Optional; @@ -66,14 +67,15 @@ public void setUp() assertUpdate("CREATE TABLE base_table2 (_varchar VARCHAR, _bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_bigint', '_date'])"); assertUpdate("INSERT INTO base_table2 VALUES ('a', 0, DATE '2019-09-08'), ('a', 1, DATE '2019-09-08'), ('a', 0, DATE '2019-09-09')", 3); assertQuery("SELECT count(*) FROM base_table2", "VALUES 3"); + + assertUpdate("CREATE SCHEMA " + storageSchemaName); } @Test public void testShowTables() { - String schema = getSession().getSchema().orElseThrow(); assertUpdate("CREATE MATERIALIZED VIEW materialized_view_show_tables_test AS SELECT * FROM base_table1"); - SchemaTableName storageTableName = getStorageTable("iceberg", schema, "materialized_view_show_tables_test"); + SchemaTableName storageTableName = getStorageTable("materialized_view_show_tables_test"); Set expectedTables = ImmutableSet.of("base_table1", "base_table2", "materialized_view_show_tables_test", storageTableName.getTableName()); Set actualTables = computeActual("SHOW TABLES").getOnlyColumnAsSet().stream() @@ -259,9 +261,8 @@ public void testRefreshDenyPermission() @Test public void testRefreshAllowedWithRestrictedStorageTable() { - String schema = getSession().getSchema().orElseThrow(); assertUpdate("CREATE MATERIALIZED VIEW materialized_view_refresh AS SELECT * FROM base_table1"); - SchemaTableName storageTable = getStorageTable("iceberg", schema, "materialized_view_refresh"); + SchemaTableName storageTable = getStorageTable("materialized_view_refresh"); assertAccessAllowed( "REFRESH MATERIALIZED VIEW materialized_view_refresh", @@ -554,15 +555,13 @@ public void testNestedMaterializedViews() @Test public void testStorageSchemaProperty() { - String catalogName = getSession().getCatalog().orElseThrow(); String schemaName = getSession().getSchema().orElseThrow(); String viewName = "storage_schema_property_test"; - assertUpdate("CREATE SCHEMA IF NOT EXISTS " + catalogName + "." + storageSchemaName); assertUpdate( "CREATE MATERIALIZED VIEW " + viewName + " " + "WITH (storage_schema = '" + storageSchemaName + "') AS " + "SELECT * FROM base_table1"); - SchemaTableName storageTable = getStorageTable(catalogName, schemaName, viewName); + SchemaTableName storageTable = getStorageTable(viewName); assertThat(storageTable.getSchemaName()).isEqualTo(storageSchemaName); assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 6); @@ -594,13 +593,134 @@ public void testStorageSchemaProperty() .hasMessageContaining(format("'iceberg.%s.%s' does not exist", schemaName, viewName)); } - private SchemaTableName getStorageTable(String catalogName, String schemaName, String objectName) + @Test(dataProvider = "testBucketPartitioningDataProvider") + public void testBucketPartitioning(String dataType, String exampleValue) + { + // validate the example value type + assertThat(query("SELECT " + exampleValue)) + .matches("SELECT CAST(%s AS %S)".formatted(exampleValue, dataType)); + + assertUpdate("CREATE MATERIALIZED VIEW test_bucket_partitioning WITH (partitioning=ARRAY['bucket(col, 4)']) AS SELECT * FROM (VALUES CAST(NULL AS %s), %s) t(col)" + .formatted(dataType, exampleValue)); + try { + SchemaTableName storageTable = getStorageTable("test_bucket_partitioning"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + storageTable)) + .contains("partitioning = ARRAY['bucket(col, 4)']"); + + assertThat(query("SELECT * FROM test_bucket_partitioning WHERE col = " + exampleValue)) + .matches("SELECT " + exampleValue); + } + finally { + assertUpdate("DROP MATERIALIZED VIEW test_bucket_partitioning"); + } + } + + @DataProvider + public Object[][] testBucketPartitioningDataProvider() + { + // Iceberg supports bucket partitioning on int, long, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, binary + return new Object[][] { + {"integer", "20050909"}, + {"bigint", "200509091331001234"}, + {"decimal(8,5)", "DECIMAL '876.54321'"}, + {"decimal(28,21)", "DECIMAL '1234567.890123456789012345678'"}, + {"date", "DATE '2005-09-09'"}, + {"time(6)", "TIME '13:31:00.123456'"}, + {"timestamp(6)", "TIMESTAMP '2005-09-10 13:31:00.123456'"}, + {"timestamp(6) with time zone", "TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw'"}, + {"varchar", "VARCHAR 'Greetings from Warsaw!'"}, + {"uuid", "UUID '406caec7-68b9-4778-81b2-a12ece70c8b1'"}, + {"varbinary", "X'66696E6465706920726F636B7321'"}, + }; + } + + @Test(dataProvider = "testTruncatePartitioningDataProvider") + public void testTruncatePartitioning(String dataType, String exampleValue) + { + // validate the example value type + assertThat(query("SELECT " + exampleValue)) + .matches("SELECT CAST(%s AS %S)".formatted(exampleValue, dataType)); + + assertUpdate("CREATE MATERIALIZED VIEW test_truncate_partitioning WITH (partitioning=ARRAY['truncate(col, 4)']) AS SELECT * FROM (VALUES CAST(NULL AS %s), %s) t(col)" + .formatted(dataType, exampleValue)); + try { + SchemaTableName storageTable = getStorageTable("test_truncate_partitioning"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + storageTable)) + .contains("partitioning = ARRAY['truncate(col, 4)']"); + + assertThat(query("SELECT * FROM test_truncate_partitioning WHERE col = " + exampleValue)) + .matches("SELECT " + exampleValue); + } + finally { + assertUpdate("DROP MATERIALIZED VIEW test_truncate_partitioning"); + } + } + + @DataProvider + public Object[][] testTruncatePartitioningDataProvider() + { + // Iceberg supports truncate partitioning on int, long, decimal, string + return new Object[][] { + {"integer", "20050909"}, + {"bigint", "200509091331001234"}, + {"decimal(8,5)", "DECIMAL '876.54321'"}, + {"decimal(28,21)", "DECIMAL '1234567.890123456789012345678'"}, + {"varchar", "VARCHAR 'Greetings from Warsaw!'"}, + }; + } + + @Test(dataProvider = "testTemporalPartitioningDataProvider") + public void testTemporalPartitioning(String partitioning, String dataType, String exampleValue) + { + // validate the example value type + assertThat(query("SELECT " + exampleValue)) + .matches("SELECT CAST(%s AS %S)".formatted(exampleValue, dataType)); + + assertUpdate("CREATE MATERIALIZED VIEW test_temporal_partitioning WITH (partitioning=ARRAY['%s(col)']) AS SELECT * FROM (VALUES CAST(NULL AS %s), %s) t(col)" + .formatted(partitioning, dataType, exampleValue)); + try { + SchemaTableName storageTable = getStorageTable("test_temporal_partitioning"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + storageTable)) + .contains("partitioning = ARRAY['%s(col)']".formatted(partitioning)); + + assertThat(query("SELECT * FROM test_temporal_partitioning WHERE col = " + exampleValue)) + .matches("SELECT " + exampleValue); + } + finally { + assertUpdate("DROP MATERIALIZED VIEW test_temporal_partitioning"); + } + } + + @DataProvider + public Object[][] testTemporalPartitioningDataProvider() + { + return new Object[][] { + {"year", "date", "DATE '2005-09-09'"}, + {"year", "timestamp(6)", "TIMESTAMP '2005-09-10 13:31:00.123456'"}, + {"year", "timestamp(6) with time zone", "TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw'"}, + {"month", "date", "DATE '2005-09-09'"}, + {"month", "timestamp(6)", "TIMESTAMP '2005-09-10 13:31:00.123456'"}, + {"month", "timestamp(6) with time zone", "TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw'"}, + {"day", "date", "DATE '2005-09-09'"}, + {"day", "timestamp(6)", "TIMESTAMP '2005-09-10 13:31:00.123456'"}, + {"day", "timestamp(6) with time zone", "TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw'"}, + {"hour", "timestamp(6)", "TIMESTAMP '2005-09-10 13:31:00.123456'"}, + {"hour", "timestamp(6) with time zone", "TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw'"}, + }; + } + + private SchemaTableName getStorageTable(String materializedViewName) + { + return getStorageTable(getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow(), materializedViewName); + } + + private SchemaTableName getStorageTable(String catalogName, String schemaName, String materializedViewName) { TransactionManager transactionManager = getQueryRunner().getTransactionManager(); TransactionId transactionId = transactionManager.beginTransaction(false); Session session = getSession().beginTransactionId(transactionId, transactionManager, getQueryRunner().getAccessControl()); Optional materializedView = getQueryRunner().getMetadata() - .getMaterializedView(session, new QualifiedObjectName(catalogName, schemaName, objectName)); + .getMaterializedView(session, new QualifiedObjectName(catalogName, schemaName, materializedViewName)); assertThat(materializedView).isPresent(); return materializedView.get().getStorageTable().get().getSchemaTableName(); } diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java index 76c092e7144a..6ad769ac10b4 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java @@ -279,8 +279,8 @@ private List testTableNameTestData() .add("a\"quote") .add("an'apostrophe") .add("a`backtick`") - .add("a/slash`") - .add("a\\backslash`") + .add("a/slash") + .add("a\\backslash") .add("adigit0") .add("0startwithdigit") .add("[brackets]")