Skip to content

Commit

Permalink
Spark 3.5: Drop the “remove_carryovers” flag for CDC view creation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
flyrain authored Sep 26, 2023
1 parent ad6d21a commit 6b33b82
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() {
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testWithCarryovers() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName, "cdc_view");

String viewName = (String) returns.get(0)[0];
assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()),
row(2, "b", INSERT, 1, snap1.snapshotId()),
row(-2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", DELETE, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
}

@Test
public void testUpdate() {
createTableWithTwoColumns();
Expand Down Expand Up @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() {
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// carry-over row (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
// the following two rows are carry-over rows
row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);

/**
* Enable or disable the remove carry-over rows.
*
* @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over
* rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove
* carry-over rows.
*/
@Deprecated
private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);

private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
Expand All @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
Expand Down Expand Up @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
} else if (shouldRemoveCarryoverRows(input)) {
} else {
df = removeCarryoverRows(df, netChanges);
}

Expand Down Expand Up @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}

private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}

private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
Expand Down

0 comments on commit 6b33b82

Please sign in to comment.