diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 1f1858547e22..21156358bf54 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1135,6 +1135,10 @@ public Optional finishCreateTable( ColumnMappingMode columnMappingMode = handle.getColumnMappingMode(); String schemaString = handle.getSchemaString(); List columnNames = handle.getInputColumns().stream().map(DeltaLakeColumnHandle::getBaseColumnName).collect(toImmutableList()); + List physicalPartitionNames = handle.getInputColumns().stream() + .filter(column -> column.getColumnType() == PARTITION_KEY) + .map(DeltaLakeColumnHandle::getBasePhysicalColumnName) + .collect(toImmutableList()); try { // For CTAS there is no risk of multiple writers racing. Using writer without transaction isolation so we are not limiting support for CTAS to // filesystems for which we have proper implementations of TransactionLogSynchronizers. @@ -1151,7 +1155,7 @@ public Optional finishCreateTable( session, handle.getComment(), handle.getProtocolEntry()); - appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), columnNames, true); + appendAddFileEntries(transactionLogWriter, dataFileInfos, physicalPartitionNames, columnNames, true); transactionLogWriter.flush(); if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty()) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java index 7ead2b5816c2..fecf22c20a9c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java @@ -1097,6 +1097,14 @@ public void testCreateTableAsSelectWithColumnMappingMode(ColumnMappingMode mode) " AS SELECT 1 AS a_int, CAST(row(11) AS row(x integer)) AS a_row", 1)); } + @Test(dataProvider = "columnMappingModeDataProvider") + public void testCreatePartitionTableAsSelectWithColumnMappingMode(ColumnMappingMode mode) + { + testCreateTableColumnMappingMode(mode, tableName -> + assertUpdate("CREATE TABLE " + tableName + " WITH (column_mapping_mode='" + mode + "', partitioned_by=ARRAY['a_int'])" + + " AS SELECT 1 AS a_int, CAST(row(11) AS row(x integer)) AS a_row", 1)); + } + private void testCreateTableColumnMappingMode(ColumnMappingMode mode, Consumer createTable) { String tableName = "test_create_table_column_mapping_" + randomNameSuffix(); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index 8d7653f1f885..5ecd8b52a2af 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -441,6 +441,29 @@ public void testCreateTableWithNotNullColumn(String mode) } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "supportedColumnMappingForDmlDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testCreatePartitionTableAsSelect(String mode) + { + String tableName = "test_dl_create_partition_table_as_select_" + randomNameSuffix(); + + onTrino().executeQuery("" + + "CREATE TABLE delta.default." + tableName + " " + + "WITH (" + + "location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'," + + "column_mapping_mode = '" + mode + "'" + + ")" + + "AS SELECT 1 AS id, 'part#1' AS part"); + try { + Row expected = row(1, "part#1"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected); + } + finally { + onTrino().executeQuery("DROP TABLE delta.default." + tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testCreateTableWithComments(String mode) @@ -1528,9 +1551,9 @@ public void testDropLastNonPartitionColumnWithColumnMappingMode(String mode) // TODO https://github.com/delta-io/delta/issues/1929 Delta Lake disallows creating tables with all partitioned column, but allows dropping the non-partition column onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN data"); - assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) - .hasMessageContaining("Index 0 out of bounds for length 0"); - assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName).getOnlyValue()).isNull(); + Row expected = row("part#1"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected); } finally { onTrino().executeQuery("DROP TABLE delta.default." + tableName);