Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,10 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
ColumnMappingMode columnMappingMode = handle.getColumnMappingMode();
String schemaString = handle.getSchemaString();
List<String> columnNames = handle.getInputColumns().stream().map(DeltaLakeColumnHandle::getBaseColumnName).collect(toImmutableList());
List<String> physicalPartitionNames = handle.getInputColumns().stream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be something we didn't consider in #12638
I don't see at the moment any further related open issues.

.filter(column -> column.getColumnType() == PARTITION_KEY)
.map(DeltaLakeColumnHandle::getBasePhysicalColumnName)
.collect(toImmutableList());
try {
// For CTAS there is no risk of multiple writers racing. Using writer without transaction isolation so we are not limiting support for CTAS to
// filesystems for which we have proper implementations of TransactionLogSynchronizers.
Expand All @@ -1151,7 +1155,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
session,
handle.getComment(),
handle.getProtocolEntry());
appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), columnNames, true);
appendAddFileEntries(transactionLogWriter, dataFileInfos, physicalPartitionNames, columnNames, true);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a couple lines up in the appendTableEntries call, should we use physicalPartitionNames there too?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean handle.getPartitionedBy() for partitionColumns in metaData? The current implementation is same as OSS Delta Lake. They store non-physical partition column names in the field.

transactionLogWriter.flush();

if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,14 @@ public void testCreateTableAsSelectWithColumnMappingMode(ColumnMappingMode mode)
" AS SELECT 1 AS a_int, CAST(row(11) AS row(x integer)) AS a_row", 1));
}

@Test(dataProvider = "columnMappingModeDataProvider")
public void testCreatePartitionTableAsSelectWithColumnMappingMode(ColumnMappingMode mode)
{
testCreateTableColumnMappingMode(mode, tableName ->
assertUpdate("CREATE TABLE " + tableName + " WITH (column_mapping_mode='" + mode + "', partitioned_by=ARRAY['a_int'])" +
" AS SELECT 1 AS a_int, CAST(row(11) AS row(x integer)) AS a_row", 1));
}

private void testCreateTableColumnMappingMode(ColumnMappingMode mode, Consumer<String> createTable)
{
String tableName = "test_create_table_column_mapping_" + randomNameSuffix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,29 @@ public void testCreateTableWithNotNullColumn(String mode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "supportedColumnMappingForDmlDataProvider")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testCreatePartitionTableAsSelect(String mode)
{
String tableName = "test_dl_create_partition_table_as_select_" + randomNameSuffix();

onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName + " " +
"WITH (" +
"location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'," +
"column_mapping_mode = '" + mode + "'" +
")" +
"AS SELECT 1 AS id, 'part#1' AS part");
try {
Row expected = row(1, "part#1");
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected);
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected);
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testCreateTableWithComments(String mode)
Expand Down Expand Up @@ -1528,9 +1551,9 @@ public void testDropLastNonPartitionColumnWithColumnMappingMode(String mode)
// TODO https://github.com/delta-io/delta/issues/1929 Delta Lake disallows creating tables with all partitioned column, but allows dropping the non-partition column
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN data");

assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.hasMessageContaining("Index 0 out of bounds for length 0");
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName).getOnlyValue()).isNull();
Row expected = row("part#1");
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected);
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected);
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
Expand Down