Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,16 @@ public void testUpdatePartitionedTableWithCdf(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled(String columnMappingMode)
public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled()
{
String tableName = "test_updates_to_table_with_many_rows_inserted_in_one_query_cdf_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)");
onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 5 WHERE col1 = 'testValue3'");
Expand All @@ -187,17 +187,17 @@ public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled(Strin
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdfEnabled(String columnMappingMode)
public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdfEnabled()
{
String tableName = "test_updates_to_partitioned_table_with_many_rows_inserted_in_one_query_cdf_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) " +
"USING DELTA " +
"PARTITIONED BY (partitioning_column_1, partitioning_column_2) " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES " +
"('testValue1', 1, 'partition1'), " +
Expand All @@ -220,17 +220,17 @@ public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdf
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated(String columnMappingMode)
public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated()
{
String tableName = "test_updates_partitioning_column_in_table_with_cdf_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) " +
"USING DELTA " +
"PARTITIONED BY (partitioning_column_1, partitioning_column_2) " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES " +
"('testValue1', 1, 'partition1'), " +
Expand Down Expand Up @@ -258,15 +258,14 @@ public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated(Str
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated(String columnMappingMode)
public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated()
{
String tableName = "test_updates_to_table_with_cdf_enabled_later_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) " +
"USING DELTA " +
"TBLPROPERTIES ('delta.columnMapping.mode' = '" + columnMappingMode + "') " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)");
Expand Down Expand Up @@ -452,17 +451,17 @@ public void testMergeDeleteIntoTableWithCdfEnabled(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled(String columnMappingMode)
public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled()
Comment thread
ebyhr marked this conversation as resolved.
{
String targetTableName = "test_merge_mixed_delete_and_update_into_table_with_cdf_" + randomNameSuffix();
String sourceTableName = "test_merge_mixed_delete_and_update_into_table_with_cdf_data_table_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + targetTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");
onDelta().executeQuery("CREATE TABLE default." + sourceTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'");
Expand Down Expand Up @@ -514,17 +513,17 @@ public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled(String columnMa
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testDeleteFromNullPartitionWithCdfEnabled(String columnMappingMode)
public void testDeleteFromNullPartitionWithCdfEnabled()
{
String tableName = "test_delete_from_null_partition_with_cdf_enabled" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) " +
"USING DELTA " +
"PARTITIONED BY (partitioning_column_1, partitioning_column_2) " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES " +
"('testValue1', 1, 'partition1'), " +
Expand Down Expand Up @@ -553,14 +552,14 @@ public void testDeleteFromNullPartitionWithCdfEnabled(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTurningOnAndOffCdfFromTrino(String columnMappingMode)
public void testTurningOnAndOffCdfFromTrino()
{
String tableName = "test_turning_cdf_on_and_off_from_trino" + randomNameSuffix();
try {
onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (col1 VARCHAR, updated_column INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', change_data_feed_enabled = true, column_mapping_mode = '" + columnMappingMode + "')");
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', change_data_feed_enabled = true)");

assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + tableName).getOnlyValue().toString()).contains("change_data_feed_enabled = true");

Expand Down Expand Up @@ -617,17 +616,17 @@ public void testThatCdfDoesntWorkWhenPropertyIsNotSet()
assertThereIsNoCdfFileGenerated(tableName2, "change_data_feed_enabled = false");
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoCanReadCdfEntriesGeneratedByDelta(String columnMappingMode)
public void testTrinoCanReadCdfEntriesGeneratedByDelta()
{
String targetTableName = "test_trino_can_read_cdf_entries_generated_by_delta_target_" + randomNameSuffix();
String sourceTableName = "test_trino_can_read_cdf_entries_generated_by_delta_source_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + targetTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");
onDelta().executeQuery("CREATE TABLE default." + sourceTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'");
Expand Down Expand Up @@ -685,20 +684,20 @@ public void testTrinoCanReadCdfEntriesGeneratedByDelta(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testDeltaCanReadCdfEntriesGeneratedByTrino(String columnMappingMode)
public void testDeltaCanReadCdfEntriesGeneratedByTrino()
{
String targetTableName = "test_delta_can_read_cdf_entries_generated_by_trino_target_" + randomNameSuffix();
String sourceTableName = "test_delta_can_read_cdf_entries_generated_by_trino_source_" + randomNameSuffix();
try {
onTrino().executeQuery("CREATE TABLE delta.default." + targetTableName + " (page_id INT, page_url VARCHAR, views INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName +
"', change_data_feed_enabled = true, column_mapping_mode = '" + columnMappingMode + "')");
"', change_data_feed_enabled = true)");

onTrino().executeQuery("CREATE TABLE delta.default." + sourceTableName + " (page_id INT, page_url VARCHAR, views INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName +
"', change_data_feed_enabled = true, column_mapping_mode = '" + columnMappingMode + "')");
"', change_data_feed_enabled = true)");

onTrino().executeQuery("INSERT INTO delta.default." + targetTableName + " VALUES (1, 'pageUrl1', 100), (2, 'pageUrl2', 200), (3, 'pageUrl3', 300)");
onTrino().executeQuery("INSERT INTO delta.default." + targetTableName + " VALUES (4, 'pageUrl4', 400)");
Expand Down