Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,67 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS parquet_table");
}

@Test
public void testCoalesceDelete() throws Exception {
createAndInitUnpartitionedTable();

Employee[] employees = new Employee[100];
for (int index = 0; index < 100; index++) {
employees[index] = new Employee(index, "hr");
}
append(tableName, employees);
append(tableName, employees);
append(tableName, employees);
append(tableName, employees);

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
DELETE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
() -> {
SparkPlan plan =
executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
});

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);

if (mode(table) == COPY_ON_WRITE) {
// CoW DELETE requests the remaining records to be range distributed by `_file`, `_pos`
// every task has data for each of 200 reducers
// AQE detects that all shuffle blocks are small and processes them in 1 task
// otherwise, there would be 200 tasks writing to the table
validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
} else {
// MoR DELETE requests the deleted records to be range distributed by partition and `_file`
// each task contains only 1 file and therefore writes only 1 shuffle block
// that means 4 shuffle blocks are distributed among 200 reducers
// AQE detects that all 4 shuffle blocks are small and processes them in 1 task
// otherwise, there would be 4 tasks processing 1 shuffle block each
validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1");
}

Assert.assertEquals(
"Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget()));
}

@Test
public void testSkewDelete() throws Exception {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,75 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS source");
}

@Test
public void testCoalesceMerge() {
createAndInitTable("id INT, salary INT, dep STRING");

String[] records = new String[100];
for (int index = 0; index < 100; index++) {
records[index] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", index);
}
append(tableName, records);
append(tableName, records);
append(tableName, records);
append(tableName, records);

// set the open file cost large enough to produce a separate scan task per file
// disable any write distribution
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
MERGE_DISTRIBUTION_MODE,
DistributionMode.NONE.modeName());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

spark.range(0, 100).createOrReplaceTempView("source");

// enable AQE and set the advisory partition big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// disable broadcast joins to make sure the join triggers a shuffle
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
() -> {
sql(
"MERGE INTO %s t USING source "
+ "ON t.id = source.id "
+ "WHEN MATCHED THEN "
+ " UPDATE SET salary = -1 ",
commitTarget());
});

Table table = validationCatalog.loadTable(tableIdent);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);

if (mode(table) == COPY_ON_WRITE) {
// CoW MERGE would perform a join on `id`
// every task has data for each of 200 reducers
// AQE detects that all shuffle blocks are small and processes them in 1 task
// otherwise, there would be 200 tasks writing to the table
validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
} else {
// MoR MERGE would perform a join on `id`
// every task has data for each of 200 reducers
// AQE detects that all shuffle blocks are small and processes them in 1 task
// otherwise, there would be 200 tasks writing to the table
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1");
}

Assert.assertEquals(
"Row count must match",
400L,
scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget()));
}

@Test
public void testSkewMerge() {
createAndInitTable("id INT, salary INT, dep STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,69 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS deleted_employee");
}

@Test
public void testCoalesceUpdate() {
createAndInitTable("id INT, dep STRING");

String[] records = new String[100];
for (int index = 0; index < 100; index++) {
records[index] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", index);
}
append(tableName, records);
append(tableName, records);
append(tableName, records);
append(tableName, records);

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
UPDATE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
() -> {
SparkPlan plan =
executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
});

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);

if (mode(table) == COPY_ON_WRITE) {
// CoW UPDATE requests the updated records to be range distributed by `_file`, `_pos`
// every task has data for each of 200 reducers
// AQE detects that all shuffle blocks are small and processes them in 1 task
// otherwise, there would be 200 tasks writing to the table
validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
} else {
// MoR UPDATE requests the deleted records to be range distributed by partition and `_file`
// each task contains only 1 file and therefore writes only 1 shuffle block
// that means 4 shuffle blocks are distributed among 200 reducers
// AQE detects that all 4 shuffle blocks are small and processes them in 1 task
// otherwise, there would be 4 tasks processing 1 shuffle block each
validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1");
}

Assert.assertEquals(
"Row count must match",
200L,
scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget()));
}

@Test
public void testSkewUpdate() {
createAndInitTable("id INT, dep STRING");
Expand Down