From ca3793dea73d94d6659a3a2f4f84c0d0a288cfde Mon Sep 17 00:00:00 2001 From: Maxim Lukyanenko Date: Mon, 30 Jan 2023 11:30:54 +0200 Subject: [PATCH 1/2] Move getting supported column types to DeltaLakeColumnHandle --- .../deltalake/AbstractDeltaLakePageSink.java | 18 +++--------------- .../deltalake/DeltaLakeColumnHandle.java | 13 +++++++++++++ .../plugin/deltalake/DeltaLakeMergeSink.java | 12 +----------- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java index b787b19b28da..2e6ef4d0f382 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java @@ -38,7 +38,6 @@ import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorPageSink; import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -63,7 +62,6 @@ import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageSize; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.canonicalizeColumnName; import static io.trino.plugin.hive.util.HiveUtil.escapePathName; -import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -160,7 +158,7 @@ public AbstractDeltaLakePageSink( dataColumnHandles.add(column); dataColumnsInputIndex.add(inputIndex); dataColumnNames.add(column.getName()); - dataColumnTypes.add(column.getType()); + dataColumnTypes.add(column.getSupportedType()); break; case SYNTHESIZED: processSynthesizedColumn(column); @@ -457,16 +455,6 @@ private FileWriter createParquetFileWriter(String path) try { Closeable rollbackAction = () -> fileSystem.deleteFile(path); - List parquetTypes = dataColumnTypes.stream() - .map(type -> { - if (type instanceof TimestampWithTimeZoneType) { - verify(((TimestampWithTimeZoneType) type).getPrecision() == 3, "Unsupported type: %s", type); - return TIMESTAMP_MILLIS; - } - return type; - }) - .collect(toImmutableList()); - // we use identity column mapping; input page already contains only data columns per // DataLagePageSink.getDataPage() int[] identityMapping = new int[dataColumnTypes.size()]; @@ -474,11 +462,11 @@ private FileWriter createParquetFileWriter(String path) identityMapping[i] = i; } - ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(parquetTypes, dataColumnNames, false, false); + ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(dataColumnTypes, dataColumnNames, false, false); return new ParquetFileWriter( fileSystem.newOutputFile(path), rollbackAction, - parquetTypes, + dataColumnTypes, dataColumnNames, schemaConverter.getMessageType(), schemaConverter.getPrimitiveTypes(), diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java index cd882ff4edc1..904833a63bf4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; import org.openjdk.jol.info.ClassLayout; @@ -24,12 +25,14 @@ import java.util.Optional; import java.util.OptionalInt; +import static com.google.common.base.Verify.verify; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType; import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.RowType.field; import static io.trino.spi.type.RowType.rowType; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.Math.toIntExact; @@ -184,4 +187,14 @@ public static DeltaLakeColumnHandle fileModifiedTimeColumnHandle() { return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED); } + + public Type getSupportedType() + { + Type supportedType = getType(); + if (supportedType instanceof TimestampWithTimeZoneType timestamp) { + verify(timestamp.getPrecision() == 3, "Unsupported type: %s", supportedType); + supportedType = TIMESTAMP_MILLIS; + } + return supportedType; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 765ec1eafbee..c5dc7b2a5f3d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -39,7 +39,6 @@ import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -61,7 +60,6 @@ import java.util.function.Supplier; import java.util.stream.IntStream; -import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.json.JsonCodec.listJsonCodec; import static io.airlift.slice.Slices.utf8Slice; @@ -76,7 +74,6 @@ import static io.trino.spi.block.ColumnarRow.toColumnarRow; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.BigintType.BIGINT; -import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.Math.toIntExact; @@ -356,14 +353,7 @@ private FileWriter createParquetFileWriter(String path, List fileSystem.deleteFile(path); List parquetTypes = dataColumns.stream() - .map(column -> { - Type type = column.getType(); - if (type instanceof TimestampWithTimeZoneType timestamp) { - verify(timestamp.getPrecision() == 3, "Unsupported type: %s", type); - return TIMESTAMP_MILLIS; - } - return type; - }) + .map(DeltaLakeColumnHandle::getSupportedType) .collect(toImmutableList()); List dataColumnNames = dataColumns.stream() From d469d0ce2bfac125a461fa87ead500b463d25917 Mon Sep 17 00:00:00 2001 From: Maxim Lukyanenko Date: Mon, 13 Feb 2023 13:40:55 +0200 Subject: [PATCH 2/2] Support DML operations on Delta tables with `name` column mapping --- .../deltalake/AbstractDeltaLakePageSink.java | 2 +- .../deltalake/DeltaLakeColumnHandle.java | 2 +- .../plugin/deltalake/DeltaLakeMergeSink.java | 8 +- .../plugin/deltalake/DeltaLakeMetadata.java | 51 ++- .../plugin/deltalake/DeltaLakeWriter.java | 4 +- .../TestDeltaLakeColumnMappingMode.java | 306 +++++++++++++++++- 6 files changed, 345 insertions(+), 28 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java index 2e6ef4d0f382..8986eb838c9d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java @@ -157,7 +157,7 @@ public AbstractDeltaLakePageSink( case REGULAR: dataColumnHandles.add(column); dataColumnsInputIndex.add(inputIndex); - dataColumnNames.add(column.getName()); + dataColumnNames.add(column.getPhysicalName()); dataColumnTypes.add(column.getSupportedType()); break; case SYNTHESIZED: diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java index 904833a63bf4..27dcb0405ec2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java @@ -190,7 +190,7 @@ public static DeltaLakeColumnHandle fileModifiedTimeColumnHandle() public Type getSupportedType() { - Type supportedType = getType(); + Type supportedType = getPhysicalType(); if (supportedType instanceof TimestampWithTimeZoneType timestamp) { verify(timestamp.getPrecision() == 3, "Unsupported type: %s", supportedType); supportedType = TIMESTAMP_MILLIS; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index c5dc7b2a5f3d..cbda1bc4cd24 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -352,13 +352,9 @@ private FileWriter createParquetFileWriter(String path, List fileSystem.deleteFile(path); - List parquetTypes = dataColumns.stream() - .map(DeltaLakeColumnHandle::getSupportedType) - .collect(toImmutableList()); + List dataColumnNames = dataColumns.stream().map(DeltaLakeColumnHandle::getPhysicalName).collect(toImmutableList()); + List parquetTypes = dataColumns.stream().map(DeltaLakeColumnHandle::getSupportedType).collect(toImmutableList()); - List dataColumnNames = dataColumns.stream() - .map(DeltaLakeColumnHandle::getName) - .collect(toImmutableList()); ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter( parquetTypes, dataColumnNames, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 584251c95c6e..79b6290ed949 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -42,6 +42,7 @@ import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.CdfFileEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -155,6 +156,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.MoreCollectors.onlyElement; import static com.google.common.collect.MoreCollectors.toOptional; import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME; @@ -277,6 +279,7 @@ public class DeltaLakeMetadata private static final int WRITER_VERSION = 2; // The highest writer version Trino supports writing to private static final int MAX_WRITER_VERSION = 4; + private static final int MAX_DML_WRITER_VERSION = 5; // This constant should be used only for a new table private static final ProtocolEntry DEFAULT_PROTOCOL = new ProtocolEntry(READER_VERSION, WRITER_VERSION); // Matches the dummy column Databricks stores in the metastore @@ -1297,7 +1300,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants"); } checkUnsupportedGeneratedColumns(table.getMetadataEntry()); - checkSupportedWriterVersion(session, table.getSchemaTableName()); + checkUnsupportedColumnMapping(table.getMetadataEntry()); + checkSupportedDmlWriterVersion(session, table); List inputColumns = columns.stream() .map(handle -> (DeltaLakeColumnHandle) handle) @@ -1391,8 +1395,7 @@ public Optional finishInsert( ISOLATION_LEVEL, true)); - // Note: during writes we want to preserve original case of partition columns - List partitionColumns = handle.getMetadataEntry().getOriginalPartitionColumns(); + List partitionColumns = getWritePartitionColumnNames(handle.getMetadataEntry().getOriginalPartitionColumns(), handle.getInputColumns()); appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, true); transactionLogWriter.flush(); @@ -1410,6 +1413,22 @@ public Optional finishInsert( return Optional.empty(); } + private static List getWritePartitionColumnNames(List originalPartitionColumns, List dataColumns) + { + return originalPartitionColumns.stream() + .map(columnName -> { + DeltaLakeColumnHandle dataColumn = dataColumns.stream() + .filter(column -> columnName.equalsIgnoreCase(column.getName())) + .collect(onlyElement()); + // Note: during writes we want to preserve original case of partition columns, if the column's name is not differ of column's physical name + if (dataColumn.getPhysicalName().equalsIgnoreCase(columnName)) { + return columnName; + } + return dataColumn.getPhysicalName(); + }) + .collect(toImmutableList()); + } + @Override public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -1449,7 +1468,8 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); - checkSupportedWriterVersion(session, handle.getSchemaTableName()); + checkUnsupportedColumnMapping(handle.getMetadataEntry()); + checkSupportedDmlWriterVersion(session, handle); ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); @@ -1536,7 +1556,9 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(file, writeTimestamp, true)); } - List partitionColumns = handle.getMetadataEntry().getOriginalPartitionColumns(); + List partitionColumns = getWritePartitionColumnNames( + handle.getMetadataEntry().getOriginalPartitionColumns(), + ((DeltaLakeMergeTableHandle) tableHandle).getInsertTableHandle().getInputColumns()); appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, true); transactionLogWriter.flush(); @@ -1787,6 +1809,25 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry) } } + private void checkUnsupportedColumnMapping(MetadataEntry metadataEntry) + { + ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry); + if (!(columnMappingMode == ColumnMappingMode.NONE || columnMappingMode == ColumnMappingMode.NAME)) { + throw new TrinoException(NOT_SUPPORTED, "Writing with column mapping id is not supported"); + } + } + + private void checkSupportedDmlWriterVersion(ConnectorSession session, DeltaLakeTableHandle table) + { + SchemaTableName schemaTableName = table.getSchemaTableName(); + int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion(); + ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry()); + if (requiredWriterVersion == MAX_DML_WRITER_VERSION && (columnMappingMode == ColumnMappingMode.NONE || columnMappingMode == ColumnMappingMode.NAME)) { + return; + } + checkSupportedWriterVersion(session, schemaTableName); + } + private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName) { int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index cc38aa719f58..cc7c903d0255 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -173,8 +173,8 @@ public long getRowCount() public DataFileInfo getDataFileInfo() throws IOException { - List dataColumnNames = columnHandles.stream().map(DeltaLakeColumnHandle::getName).collect(toImmutableList()); - List dataColumnTypes = columnHandles.stream().map(DeltaLakeColumnHandle::getType).collect(toImmutableList()); + List dataColumnNames = columnHandles.stream().map(DeltaLakeColumnHandle::getPhysicalName).collect(toImmutableList()); + List dataColumnTypes = columnHandles.stream().map(DeltaLakeColumnHandle::getSupportedType).collect(toImmutableList()); return new DataFileInfo( relativeFilePath, getWrittenBytes(), diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index 712e8e9313ce..a9ec01ac173a 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tempto.assertions.QueryAssert.Row; import io.trino.testng.services.Flaky; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.List; @@ -398,17 +399,42 @@ private void testShowStatsOnPartitionedForColumnMappingMode(String mode) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUnsupportedOperationsColumnMappingModeId() { - testUnsupportedOperationsColumnMappingModeName("id"); + String tableName = "test_dl_unsupported_column_mapping_mode_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT, a_string STRING)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " 'delta.columnMapping.mode'='id'," + + " 'delta.minReaderVersion'='2'," + + " 'delta.minWriterVersion'='5')"); + + try { + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 'one'), (2, 'two')")) + .hasMessageContaining("Writing with column mapping id is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + tableName)) + .hasMessageContaining("Writing with column mapping id is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("UPDATE default." + tableName + " SET a_string = 'test'")) + .hasMessageContaining("Writing with column mapping id is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " EXECUTE OPTIMIZE")) + .hasMessageContaining("Delta Lake writer version 5 which is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN new_col varchar")) + .hasMessageContaining("Delta Lake writer version 5 which is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN a_number TO renamed_column")) + .hasMessageContaining("This connector does not support renaming columns"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN a_number")) + .hasMessageContaining("This connector does not support dropping columns"); + } + finally { + onDelta().executeQuery("DROP TABLE default." + tableName); + } } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUnsupportedOperationsColumnMappingModeName() - { - testUnsupportedOperationsColumnMappingModeName("name"); - } - - private void testUnsupportedOperationsColumnMappingModeName(String mode) { String tableName = "test_dl_unsupported_column_mapping_mode_" + randomNameSuffix(); @@ -418,17 +444,17 @@ private void testUnsupportedOperationsColumnMappingModeName(String mode) " USING delta " + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + " TBLPROPERTIES (" + - " 'delta.columnMapping.mode'='" + mode + "'," + + " 'delta.columnMapping.mode'='name'," + " 'delta.minReaderVersion'='2'," + " 'delta.minWriterVersion'='5')"); try { - assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 'one'), (2, 'two')")) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); - assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + tableName)) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); - assertQueryFailure(() -> onTrino().executeQuery("UPDATE default." + tableName + " SET a_string = 'test'")) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); + assertThat(onTrino().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 'one'), (2, 'two')")) + .updatedRowsCountIsEqualTo(2); + assertThat(onTrino().executeQuery("DELETE FROM default." + tableName)) + .updatedRowsCountIsEqualTo(2); + assertThat(onTrino().executeQuery("UPDATE default." + tableName + " SET a_string = 'test'")) + .updatedRowsCountIsEqualTo(0); assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " EXECUTE OPTIMIZE")) .hasMessageContaining("Delta Lake writer version 5 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN new_col varchar")) @@ -485,4 +511,258 @@ private void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode) onDelta().executeQuery("DROP TABLE default." + tableName); } } + + @DataProvider + public Object[][] testSupportedWritesConfigDataProvider() + { + return new Object[][] { + {"none", false}, + {"none", true}, + {"name", false}, + {"name", true} + }; + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "testSupportedWritesConfigDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testSupportedNonPartitionedColumnMappingWrites(String columnMappingMode, boolean statsAsJsonEnabled) + { + String tableName = "test_dl_dml_column_mapping_mode_" + columnMappingMode + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " delta.checkpointInterval = 1, " + + " delta.checkpoint.writeStatsAsJson = " + statsAsJsonEnabled + ", " + + " delta.checkpoint.writeStatsAsStruct = " + !statsAsJsonEnabled + ", " + + " delta.columnMapping.mode='" + columnMappingMode + "')"); + + try { + String queryTrino = "a_number, a_string, array_col[1].array_struct_element, nested.field1"; + String queryDelta = "a_number, a_string, array_col[0].array_struct_element, nested.field1"; + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + + " VALUES (1, 'first value', ARRAY[ROW('nested 1')], ROW('databricks 1'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))"); + onDelta().executeQuery("INSERT INTO default." + tableName + + " VALUES (3, 'third value', array(struct('nested 3')), struct('databricks 3'))," + + " (4, 'four', array(struct('nested 4')), struct('databricks 4'))"); + assertDeltaTrinoTableEquals(tableName, queryTrino, queryDelta, ImmutableList.of( + row(1, "first value", "nested 1", "databricks 1"), + row(2, "two", "nested 2", "databricks 2"), + row(3, "third value", "nested 3", "databricks 3"), + row(4, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, null, 0.0, null, "1", "4"), + row("a_string", null, null, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_number = a_number + 10 WHERE a_number in (3, 4)"); + onDelta().executeQuery("UPDATE default." + tableName + " SET a_number = a_number + 20 WHERE a_number in (1, 2)"); + assertDeltaTrinoTableEquals(tableName, queryTrino, queryDelta, ImmutableList.of( + row(21, "first value", "nested 1", "databricks 1"), + row(22, "two", "nested 2", "databricks 2"), + row(13, "third value", "nested 3", "databricks 3"), + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, null, 0.0, null, "13", "22"), + row("a_string", null, null, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 22"); + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 13"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a_number = 21"); + assertDeltaTrinoTableEquals(tableName, queryTrino, queryDelta, ImmutableList.of( + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, null, 0.0, null, "14", "14"), + row("a_string", null, null, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 1.0, null, null))); + } + finally { + onDelta().executeQuery("DROP TABLE default." + tableName); + } + } + + private void assertDeltaTrinoTableEquals(String tableName, String queryTrino, String queryDelta, List expectedRows) + { + assertThat(onDelta().executeQuery("SELECT " + queryDelta + " FROM default." + tableName)) + .containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT " + queryTrino + " FROM delta.default." + tableName)) + .containsOnly(expectedRows); + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "testSupportedWritesConfigDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testSupportedPartitionedColumnMappingWrites(String columnMappingMode, boolean statsAsJsonEnabled) + { + String tableName = "test_dl_dml_column_mapping_mode_" + columnMappingMode + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " PARTITIONED BY (a_string)" + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " delta.checkpointInterval = 1, " + + " delta.checkpoint.writeStatsAsJson = " + statsAsJsonEnabled + ", " + + " delta.checkpoint.writeStatsAsStruct = " + !statsAsJsonEnabled + ", " + + " delta.columnMapping.mode ='" + columnMappingMode + "')"); + + try { + String queryTrino = "a_number, a_string, array_col[1].array_struct_element, nested.field1"; + String queryDelta = "a_number, a_string, array_col[0].array_struct_element, nested.field1"; + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + + " VALUES (1, 'first value', ARRAY[ROW('nested 1')], ROW('databricks 1'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))"); + onDelta().executeQuery("INSERT INTO default." + tableName + + " VALUES (3, 'third value', array(struct('nested 3')), struct('databricks 3'))," + + " (4, 'four', array(struct('nested 4')), struct('databricks 4'))"); + + assertDeltaTrinoTableEquals(tableName, queryTrino, queryDelta, ImmutableList.of( + row(1, "first value", "nested 1", "databricks 1"), + row(2, "two", "nested 2", "databricks 2"), + row(3, "third value", "nested 3", "databricks 3"), + row(4, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, null, 0.0, null, "1", "4"), + row("a_string", null, 4.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_number = a_number + 10 WHERE a_number in (3, 4)"); + onDelta().executeQuery("UPDATE default." + tableName + " SET a_number = a_number + 20 WHERE a_number in (1, 2)"); + assertDeltaTrinoTableEquals(tableName, queryTrino, queryDelta, ImmutableList.of( + row(21, "first value", "nested 1", "databricks 1"), + row(22, "two", "nested 2", "databricks 2"), + row(13, "third value", "nested 3", "databricks 3"), + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, null, 0.0, null, "13", "22"), + row("a_string", null, 4.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 22"); + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 13"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a_number = 21"); + assertDeltaTrinoTableEquals(tableName, queryTrino, queryDelta, ImmutableList.of( + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, null, 0.0, null, "14", "14"), + row("a_string", null, 1.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 1.0, null, null))); + } + finally { + onDelta().executeQuery("DROP TABLE default." + tableName); + } + } + + @DataProvider + public Object[][] testSupportedMergeColumnMappingConfigDataProvider() + { + return new Object[][] { + {"none"}, + {"name"}, + }; + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "testSupportedMergeColumnMappingConfigDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testMergeDeleteWithColumnMapping(String columnMappingMode) + { + String sourceTableName = "test_dl_dml_src_column_mapping_mode_" + columnMappingMode + randomNameSuffix(); + String targetTableName = "test_dl_dml_trg_column_mapping_mode_" + columnMappingMode + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + sourceTableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " PARTITIONED BY (a_string)" + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'" + + " TBLPROPERTIES (" + + " delta.columnMapping.mode ='" + columnMappingMode + "')"); + + onDelta().executeQuery("" + + "CREATE TABLE default." + targetTableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " PARTITIONED BY (a_string)" + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" + + " TBLPROPERTIES (" + + " delta.columnMapping.mode ='" + columnMappingMode + "')"); + try { + onTrino().executeQuery("INSERT INTO delta.default." + sourceTableName + + " VALUES (1, 'first value', ARRAY[ROW('nested 1')], ROW('databricks 1'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))"); + onDelta().executeQuery("INSERT INTO default." + sourceTableName + + " VALUES (3, 'third value', array(struct('nested 3')), struct('databricks 3'))," + + " (4, 'four', array(struct('nested 4')), struct('databricks 4'))"); + + String queryTrino = "a_number, a_string, array_col[1].array_struct_element, nested.field1"; + String queryDelta = "a_number, a_string, array_col[0].array_struct_element, nested.field1"; + assertDeltaTrinoTableEquals(sourceTableName, queryTrino, queryDelta, ImmutableList.of( + row(1, "first value", "nested 1", "databricks 1"), + row(2, "two", "nested 2", "databricks 2"), + row(3, "third value", "nested 3", "databricks 3"), + row(4, "four", "nested 4", "databricks 4"))); + + onTrino().executeQuery("INSERT INTO delta.default." + targetTableName + + " VALUES (1000, '1000 value', ARRAY[ROW('nested 1000')], ROW('databricks 1000'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))"); + onDelta().executeQuery("INSERT INTO default." + targetTableName + + " VALUES (3000, '3000 value', array(struct('nested 3000')), struct('databricks 3000'))," + + " (4, 'four', array(struct('nested 4')), struct('databricks 4'))"); + + assertDeltaTrinoTableEquals(targetTableName, queryTrino, queryDelta, ImmutableList.of( + row(1000, "1000 value", "nested 1000", "databricks 1000"), + row(2, "two", "nested 2", "databricks 2"), + row(3000, "3000 value", "nested 3000", "databricks 3000"), + row(4, "four", "nested 4", "databricks 4"))); + + onTrino().executeQuery("MERGE INTO delta.default." + targetTableName + " t USING delta.default." + sourceTableName + " s " + + "ON (t.a_number = s.a_number) " + + "WHEN MATCHED " + + " THEN DELETE " + + "WHEN NOT MATCHED " + + " THEN INSERT (a_number, a_string, array_col, nested) VALUES (s.a_number, s.a_string, s.array_col, s.nested)"); + + assertDeltaTrinoTableEquals(targetTableName, queryTrino, queryDelta, ImmutableList.of( + row(1000, "1000 value", "nested 1000", "databricks 1000"), + row(3000, "3000 value", "nested 3000", "databricks 3000"), + row(1, "first value", "nested 1", "databricks 1"), + row(3, "third value", "nested 3", "databricks 3"))); + } + finally { + onDelta().executeQuery("DROP TABLE default." + sourceTableName); + onDelta().executeQuery("DROP TABLE default." + targetTableName); + } + } }