From 8ecc8b244fe740d074775f536876bea6379df70d Mon Sep 17 00:00:00 2001 From: yufei Date: Fri, 30 Jun 2023 15:01:26 -0700 Subject: [PATCH 1/2] Spark 3.2: Output the net changes across snapshots for carryover rows in CDC --- .../TestCreateChangelogViewProcedure.java | 310 ++++++++--------- .../iceberg/spark/ChangelogIterator.java | 4 +- .../iceberg/spark/ComputeUpdateIterator.java | 10 +- .../spark/RemoveCarryoverIterator.java | 35 +- .../spark/RemoveNetCarryoverIterator.java | 129 +++++++ .../CreateChangelogViewProcedure.java | 72 ++-- .../iceberg/spark/TestChangelogIterator.java | 317 ++++++++++-------- 7 files changed, 519 insertions(+), 358 deletions(-) create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java index dc12b0145d50..6852bd939ef3 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java @@ -36,7 +36,7 @@ public class TestCreateChangelogViewProcedure extends SparkExtensionsTestBase { private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); public TestCreateChangelogViewProcedure( - String catalogName, String implementation, Map config) { + String catalogName, String implementation, Map config) { super(catalogName, implementation, config); } @@ -80,17 +80,17 @@ public void testCustomizedViewName() { Snapshot snap2 = table.currentSnapshot(); sql( - "CALL %s.system.create_changelog_view(" - + "table => '%s'," - + "options => map('%s','%s','%s','%s')," - + "changelog_view => '%s')", - catalogName, - tableName, - SparkReadOptions.START_SNAPSHOT_ID, - snap1.snapshotId(), - SparkReadOptions.END_SNAPSHOT_ID, - snap2.snapshotId(), - "cdc_view"); + "CALL %s.system.create_changelog_view(" + + "table => '%s'," + + "options => map('%s','%s','%s','%s')," + + "changelog_view => '%s')", + catalogName, + tableName, + SparkReadOptions.START_SNAPSHOT_ID, + snap1.snapshotId(), + SparkReadOptions.END_SNAPSHOT_ID, + snap2.snapshotId(), + "cdc_view"); long rowCount = sql("select * from %s", "cdc_view").stream().count(); Assert.assertEquals(2, rowCount); @@ -112,19 +112,19 @@ public void testNoSnapshotIdInput() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" + "table => '%s')", - catalogName, tableName, "cdc_view"); + sql( + "CALL %s.system.create_changelog_view(" + "table => '%s')", + catalogName, tableName, "cdc_view"); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap0.snapshotId()), + row(2, "b", INSERT, 1, snap1.snapshotId()), + row(-2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", DELETE, 2, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id", viewName)); } @Test @@ -147,44 +147,44 @@ public void testTimestampsBasedQuery() { long afterInsertOverwrite = waitUntilAfter(snap2.timestampMillis()); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', " - + "options => map('%s', '%s','%s', '%s'))", - catalogName, - tableName, - SparkReadOptions.START_TIMESTAMP, - beginning, - SparkReadOptions.END_TIMESTAMP, - afterInsertOverwrite); + sql( + "CALL %s.system.create_changelog_view(table => '%s', " + + "options => map('%s', '%s','%s', '%s'))", + catalogName, + tableName, + SparkReadOptions.START_TIMESTAMP, + beginning, + SparkReadOptions.END_TIMESTAMP, + afterInsertOverwrite); assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap0.snapshotId()), + row(2, "b", INSERT, 1, snap1.snapshotId()), + row(-2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", DELETE, 2, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); // query the timestamps starting from the second insert returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', " - + "options => map('%s', '%s', '%s', '%s'))", - catalogName, - tableName, - SparkReadOptions.START_TIMESTAMP, - afterFirstInsert, - SparkReadOptions.END_TIMESTAMP, - afterInsertOverwrite); + sql( + "CALL %s.system.create_changelog_view(table => '%s', " + + "options => map('%s', '%s', '%s', '%s'))", + catalogName, + tableName, + SparkReadOptions.START_TIMESTAMP, + afterFirstInsert, + SparkReadOptions.END_TIMESTAMP, + afterInsertOverwrite); assertEquals( - "Rows should match", - ImmutableList.of( - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(-2, "b", INSERT, 1, snap2.snapshotId()), - row(2, "b", DELETE, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); + "Rows should match", + ImmutableList.of( + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(-2, "b", INSERT, 1, snap2.snapshotId()), + row(2, "b", DELETE, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); } @Test @@ -203,23 +203,23 @@ public void testWithCarryovers() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName, "cdc_view"); + sql( + "CALL %s.system.create_changelog_view(" + + "remove_carryovers => false," + + "table => '%s')", + catalogName, tableName, "cdc_view"); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, _change_type", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap0.snapshotId()), + row(2, "b", INSERT, 1, snap1.snapshotId()), + row(-2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", DELETE, 2, snap2.snapshotId()), + row(2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", INSERT, 2, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, _change_type", viewName)); } @Test @@ -237,20 +237,20 @@ public void testUpdate() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap1.snapshotId()), - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), - row(3, "c", INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap1.snapshotId()), + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), + row(3, "c", INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -266,19 +266,19 @@ public void testUpdateWithIdentifierField() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', compute_updates => true)", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(table => '%s', compute_updates => true)", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), - row(3, "c", INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), + row(3, "c", INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -296,21 +296,21 @@ public void testUpdateWithFilter() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap1.snapshotId()), - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())), - // the predicate on partition columns will filter out the insert of (3, 'c') at the planning - // phase - sql("select * from %s where id != 3 order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap1.snapshotId()), + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())), + // the predicate on partition columns will filter out the insert of (3, 'c') at the planning + // phase + sql("select * from %s where id != 3 order by _change_ordinal, id, data", viewName)); } @Test @@ -326,23 +326,23 @@ public void testUpdateWithMultipleIdentifierColumns() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "identifier_columns => array('id','age')," - + "table => '%s')", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(" + + "identifier_columns => array('id','age')," + + "table => '%s')", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), - row(2, "e", 12, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), + row(2, "e", 12, INSERT, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -359,24 +359,24 @@ public void testRemoveCarryOvers() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "identifier_columns => array('id','age'), " - + "table => '%s')", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(" + + "identifier_columns => array('id','age'), " + + "table => '%s')", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "e", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -393,22 +393,22 @@ public void testRemoveCarryOversWithoutUpdatedRows() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql("CALL %s.system.create_changelog_view(table => '%s')", catalogName, tableName); + sql("CALL %s.system.create_changelog_view(table => '%s')", catalogName, tableName); String viewName = (String) returns.get(0)[0]; // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even // though update-row is not computed assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, DELETE, 1, snap2.snapshotId()), - row(2, "d", 11, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "e", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, DELETE, 1, snap2.snapshotId()), + row(2, "d", 11, INSERT, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -425,26 +425,26 @@ public void testNotRemoveCarryOvers() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(" + + "remove_carryovers => false," + + "table => '%s')", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, DELETE, 1, snap2.snapshotId()), - row(2, "d", 11, INSERT, 1, snap2.snapshotId()), - // the following two rows are carry-over rows - row(2, "e", 12, DELETE, 1, snap2.snapshotId()), - row(2, "e", 12, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "e", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, DELETE, 1, snap2.snapshotId()), + row(2, "d", 11, INSERT, 1, snap2.snapshotId()), + // the following two rows are carry-over rows + row(2, "e", 12, DELETE, 1, snap2.snapshotId()), + row(2, "e", 12, INSERT, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName)); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java index 5f30c5fd4e56..5b3e0071af5b 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java @@ -60,10 +60,10 @@ protected Iterator rowIterator() { * @return a new iterator instance */ public static Iterator computeUpdates( - Iterator rowIterator, StructType rowType, String[] identifierFields) { + Iterator rowIterator, StructType rowType, String[] identifierFields) { Iterator carryoverRemoveIterator = removeCarryovers(rowIterator, rowType); ChangelogIterator changelogIterator = - new ComputeUpdateIterator(carryoverRemoveIterator, rowType, identifierFields); + new ComputeUpdateIterator(carryoverRemoveIterator, rowType, identifierFields); return Iterators.filter(changelogIterator, Objects::nonNull); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java index 23e6a19a17e7..66a58a44827a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java @@ -57,7 +57,7 @@ public class ComputeUpdateIterator extends ChangelogIterator { ComputeUpdateIterator(Iterator rowIterator, StructType rowType, String[] identifierFields) { super(rowIterator, rowType); this.identifierFieldIdx = - Arrays.stream(identifierFields).map(rowType::fieldIndex).collect(Collectors.toList()); + Arrays.stream(identifierFields).map(rowType::fieldIndex).collect(Collectors.toList()); this.identifierFields = identifierFields; } @@ -89,10 +89,10 @@ public Row next() { String nextRowChangeType = nextRow.getString(changeTypeIndex()); Preconditions.checkState( - nextRowChangeType.equals(INSERT), - "Cannot compute updates because there are multiple rows with the same identifier" - + " fields([%s]). Please make sure the rows are unique.", - String.join(",", identifierFields)); + nextRowChangeType.equals(INSERT), + "Cannot compute updates because there are multiple rows with the same identifier" + + " fields([%s]). Please make sure the rows are unique.", + String.join(",", identifierFields)); currentRow = modify(currentRow, changeTypeIndex(), UPDATE_BEFORE); cachedRow = modify(nextRow, changeTypeIndex(), UPDATE_AFTER); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java index 70b160e13fee..8395a354edb4 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark; import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; @@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator { RemoveCarryoverIterator(Iterator rowIterator, StructType rowType) { super(rowIterator, rowType); - this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(rowType.size()); + this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(); } @Override @@ -88,7 +90,7 @@ public Row next() { } // If the current row is a delete row, drain all identical delete rows - if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { + if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) { cachedDeletedRow = currentRow; deletedRowCount = 1; @@ -97,9 +99,9 @@ public Row next() { // drain all identical delete rows when there is at least one cached delete row and the next // row is the same record while (nextRow != null - && cachedDeletedRow != null - && isSameRecord(cachedDeletedRow, nextRow)) { - if (nextRow.getString(changeTypeIndex()).equals(INSERT)) { + && cachedDeletedRow != null + && isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) { + if (changeType(nextRow).equals(INSERT)) { deletedRowCount--; if (deletedRowCount == 0) { cachedDeletedRow = null; @@ -139,25 +141,8 @@ private boolean hasCachedDeleteRow() { return cachedDeletedRow != null; } - private int[] generateIndicesToIdentifySameRow(int columnSize) { - int[] indices = new int[columnSize - 1]; - for (int i = 0; i < indices.length; i++) { - if (i < changeTypeIndex()) { - indices[i] = i; - } else { - indices[i] = i + 1; - } - } - return indices; - } - - private boolean isSameRecord(Row currentRow, Row nextRow) { - for (int idx : indicesToIdentifySameRow) { - if (isDifferentValue(currentRow, nextRow, idx)) { - return false; - } - } - - return true; + private int[] generateIndicesToIdentifySameRow() { + Set metadataColumnIndices = Sets.newHashSet(changeTypeIndex()); + return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java new file mode 100644 index 000000000000..e4234755cdcf --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +/** + * This class computes the net changes across multiple snapshots. It is different from {@link + * RemoveCarryoverIterator}, which only removes carry-over rows within a single snapshot. It takes a + * row iterator, and assumes the following: + * + *
    + *
  • The row iterator is partitioned by all columns. + *
  • The row iterator is sorted by all columns, change order, and change type. The change order + * is 1-to-1 mapping to snapshot id. + *
+ */ +public class RemoveNetCarryoverIterator extends ChangelogIterator { + + private final int[] indicesToIdentifySameRow; + + private Row cachedNextRow; + private Row cachedRow; + private long cachedRowCount; + + protected RemoveNetCarryoverIterator(Iterator rowIterator, StructType rowType) { + super(rowIterator, rowType); + this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(); + } + + @Override + public boolean hasNext() { + if (cachedRowCount > 0) { + return true; + } + + if (cachedNextRow != null) { + return true; + } + + return rowIterator().hasNext(); + } + + @Override + public Row next() { + // if there are cached rows, return one of them from the beginning + if (cachedRowCount > 0) { + cachedRowCount--; + return cachedRow; + } + + cachedRow = getCurrentRow(); + // return it directly if there is no more rows + if (!rowIterator().hasNext()) { + return cachedRow; + } + cachedRowCount = 1; + + cachedNextRow = rowIterator().next(); + + // pull rows from the iterator until two consecutive rows are different + while (isSameRecord(cachedRow, cachedNextRow, indicesToIdentifySameRow)) { + if (oppositeChangeType(cachedRow, cachedNextRow)) { + // two rows with opposite change types means no net changes, remove both + cachedRowCount--; + } else { + // two rows with same change types means potential net changes, cache the next row + cachedRowCount++; + } + + // stop pulling rows if there is no more rows or the next row is different + if (cachedRowCount <= 0 || !rowIterator().hasNext()) { + // reset the cached next row if there is no more rows + cachedNextRow = null; + break; + } + + cachedNextRow = rowIterator().next(); + } + + return null; + } + + private Row getCurrentRow() { + Row currentRow; + if (cachedNextRow != null) { + currentRow = cachedNextRow; + cachedNextRow = null; + } else { + currentRow = rowIterator().next(); + } + return currentRow; + } + + private boolean oppositeChangeType(Row currentRow, Row nextRow) { + return (changeType(nextRow).equals(INSERT) && changeType(currentRow).equals(DELETE)) + || (changeType(nextRow).equals(DELETE) && changeType(currentRow).equals(INSERT)); + } + + private int[] generateIndicesToIdentifySameRow() { + Set metadataColumnIndices = + Sets.newHashSet( + rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()), + rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()), + changeTypeIndex()); + return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 85043d2df3d6..f2fc524bd327 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -81,33 +81,33 @@ public class CreateChangelogViewProcedure extends BaseProcedure { private static final ProcedureParameter TABLE_PARAM = - ProcedureParameter.required("table", DataTypes.StringType); + ProcedureParameter.required("table", DataTypes.StringType); private static final ProcedureParameter CHANGELOG_VIEW_PARAM = - ProcedureParameter.optional("changelog_view", DataTypes.StringType); + ProcedureParameter.optional("changelog_view", DataTypes.StringType); private static final ProcedureParameter OPTIONS_PARAM = - ProcedureParameter.optional("options", STRING_MAP); + ProcedureParameter.optional("options", STRING_MAP); private static final ProcedureParameter COMPUTE_UPDATES_PARAM = - ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); + ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM = - ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); + ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM = - ProcedureParameter.optional("identifier_columns", STRING_ARRAY); + ProcedureParameter.optional("identifier_columns", STRING_ARRAY); private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - TABLE_PARAM, - CHANGELOG_VIEW_PARAM, - OPTIONS_PARAM, - COMPUTE_UPDATES_PARAM, - REMOVE_CARRYOVERS_PARAM, - IDENTIFIER_COLUMNS_PARAM, - }; + new ProcedureParameter[] { + TABLE_PARAM, + CHANGELOG_VIEW_PARAM, + OPTIONS_PARAM, + COMPUTE_UPDATES_PARAM, + REMOVE_CARRYOVERS_PARAM, + IDENTIFIER_COLUMNS_PARAM, + }; private static final StructType OUTPUT_TYPE = - new StructType( - new StructField[] { - new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty()) - }); + new StructType( + new StructField[] { + new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty()) + }); public static SparkProcedures.ProcedureBuilder builder() { return new BaseProcedure.Builder() { @@ -157,8 +157,8 @@ public InternalRow[] call(InternalRow args) { private Dataset computeUpdateImages(String[] identifierColumns, Dataset df) { Preconditions.checkArgument( - identifierColumns.length > 0, - "Cannot compute the update images because identifier columns are not set"); + identifierColumns.length > 0, + "Cannot compute the update images because identifier columns are not set"); Column[] repartitionSpec = new Column[identifierColumns.length + 1]; for (int i = 0; i < identifierColumns.length; i++) { @@ -181,10 +181,10 @@ private boolean shouldRemoveCarryoverRows(ProcedureInput input) { private Dataset removeCarryoverRows(Dataset df) { Column[] repartitionSpec = - Arrays.stream(df.columns()) - .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name())) - .map(df::col) - .toArray(Column[]::new); + Arrays.stream(df.columns()) + .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name())) + .map(df::col) + .toArray(Column[]::new); return applyCarryoverRemoveIterator(df, repartitionSpec); } @@ -217,15 +217,15 @@ private Dataset applyChangelogIterator(Dataset df, Column[] repartitio Column[] sortSpec = sortSpec(df, repartitionSpec); StructType schema = df.schema(); String[] identifierFields = - Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new); + Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new); return df.repartition(repartitionSpec) - .sortWithinPartitions(sortSpec) - .mapPartitions( - (MapPartitionsFunction) - rowIterator -> - ChangelogIterator.computeUpdates(rowIterator, schema, identifierFields), - RowEncoder.apply(schema)); + .sortWithinPartitions(sortSpec) + .mapPartitions( + (MapPartitionsFunction) + rowIterator -> + ChangelogIterator.computeUpdates(rowIterator, schema, identifierFields), + RowEncoder.apply(schema)); } private Dataset applyCarryoverRemoveIterator(Dataset df, Column[] repartitionSpec) { @@ -233,11 +233,11 @@ private Dataset applyCarryoverRemoveIterator(Dataset df, Column[] repa StructType schema = df.schema(); return df.repartition(repartitionSpec) - .sortWithinPartitions(sortSpec) - .mapPartitions( - (MapPartitionsFunction) - rowIterator -> ChangelogIterator.removeCarryovers(rowIterator, schema), - RowEncoder.apply(schema)); + .sortWithinPartitions(sortSpec) + .mapPartitions( + (MapPartitionsFunction) + rowIterator -> ChangelogIterator.removeCarryovers(rowIterator, schema), + RowEncoder.apply(schema)); } private static Column[] sortSpec(Dataset df, Column[] repartitionSpec) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java index bf98bebb9d50..e99da5077205 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java @@ -43,14 +43,24 @@ public class TestChangelogIterator extends SparkTestHelperBase { private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); private static final StructType SCHEMA = - new StructType( - new StructField[] { - new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("name", DataTypes.StringType, false, Metadata.empty()), - new StructField("data", DataTypes.StringType, true, Metadata.empty()), - new StructField( - MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()) - }); + new StructType( + new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("name", DataTypes.StringType, false, Metadata.empty()), + new StructField("data", DataTypes.StringType, true, Metadata.empty()), + new StructField( + MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()), + new StructField( + MetadataColumns.CHANGE_ORDINAL.name(), + DataTypes.IntegerType, + false, + Metadata.empty()), + new StructField( + MetadataColumns.COMMIT_SNAPSHOT_ID.name(), + DataTypes.LongType, + false, + Metadata.empty()) + }); private static final String[] IDENTIFIER_FIELDS = new String[] {"id", "name"}; private enum RowType { @@ -65,9 +75,9 @@ public void testIterator() { List permutations = Lists.newArrayList(); // generate 24 permutations permute( - Arrays.asList(RowType.DELETED, RowType.INSERTED, RowType.CARRY_OVER, RowType.UPDATED), - 0, - permutations); + Arrays.asList(RowType.DELETED, RowType.INSERTED, RowType.CARRY_OVER, RowType.UPDATED), + 0, + permutations); Assert.assertEquals(24, permutations.size()); for (Object[] permutation : permutations) { @@ -84,7 +94,7 @@ private void validate(Object[] permutation) { } Iterator iterator = - ChangelogIterator.computeUpdates(rows.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rows.iterator(), SCHEMA, IDENTIFIER_FIELDS); List result = Lists.newArrayList(iterator); assertEquals("Rows should match", expectedRows, rowsToJava(result)); } @@ -93,18 +103,18 @@ private List toOriginalRows(RowType rowType, int index) { switch (rowType) { case DELETED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE}, null)); + new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE, 0, 0}, null)); case INSERTED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT, 0, 0}, null)); case CARRY_OVER: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT, 0, 0}, null)); case UPDATED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT, 0, 0}, null)); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -114,18 +124,18 @@ private List toExpectedRows(RowType rowType, int order) { switch (rowType) { case DELETED: List rows = Lists.newArrayList(); - rows.add(new Object[] {order, "b", "data", DELETE}); + rows.add(new Object[] {order, "b", "data", DELETE, 0, 0}); return rows; case INSERTED: List insertedRows = Lists.newArrayList(); - insertedRows.add(new Object[] {order, "c", "data", INSERT}); + insertedRows.add(new Object[] {order, "c", "data", INSERT, 0, 0}); return insertedRows; case CARRY_OVER: return Lists.newArrayList(); case UPDATED: return Lists.newArrayList( - new Object[] {order, "a", "data", UPDATE_BEFORE}, - new Object[] {order, "a", "new_data", UPDATE_AFTER}); + new Object[] {order, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {order, "a", "new_data", UPDATE_AFTER, 0, 0}); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -145,168 +155,205 @@ private void permute(List arr, int start, List pm) { @Test public void testRowsWithNullValue() { final List rowsWithNull = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {2, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {3, null, null, INSERT}, null), - new GenericRowWithSchema(new Object[] {4, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {4, null, null, INSERT}, null), - // mixed null and non-null value in non-identifier columns - new GenericRowWithSchema(new Object[] {5, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {5, null, "data", INSERT}, null), - // mixed null and non-null value in identifier columns - new GenericRowWithSchema(new Object[] {6, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {6, "name", null, INSERT}, null)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {2, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, null, null, INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, INSERT, 0, 0}, null), + // mixed null and non-null value in non-identifier columns + new GenericRowWithSchema(new Object[] {5, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {5, null, "data", INSERT, 0, 0}, null), + // mixed null and non-null value in identifier columns + new GenericRowWithSchema(new Object[] {6, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {6, "name", null, INSERT, 0, 0}, null)); Iterator iterator = - ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA, IDENTIFIER_FIELDS); List result = Lists.newArrayList(iterator); assertEquals( - "Rows should match", - Lists.newArrayList( - new Object[] {2, null, null, DELETE}, - new Object[] {3, null, null, INSERT}, - new Object[] {5, null, null, UPDATE_BEFORE}, - new Object[] {5, null, "data", UPDATE_AFTER}, - new Object[] {6, null, null, DELETE}, - new Object[] {6, "name", null, INSERT}), - rowsToJava(result)); + "Rows should match", + Lists.newArrayList( + new Object[] {2, null, null, DELETE, 0, 0}, + new Object[] {3, null, null, INSERT, 0, 0}, + new Object[] {5, null, null, UPDATE_BEFORE, 0, 0}, + new Object[] {5, null, "data", UPDATE_AFTER, 0, 0}, + new Object[] {6, null, null, DELETE, 0, 0}, + new Object[] {6, "name", null, INSERT, 0, 0}), + rowsToJava(result)); } @Test public void testUpdatedRowsWithDuplication() { List rowsWithDuplication = - Lists.newArrayList( - // two rows with same identifier fields(id, name) - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT}, null)); + Lists.newArrayList( + // two rows with same identifier fields(id, name) + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null)); Iterator iterator = - ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); assertThrows( - "Cannot compute updates because there are multiple rows with the same identifier fields([id, name]). Please make sure the rows are unique.", - IllegalStateException.class, - () -> Lists.newArrayList(iterator)); + "Cannot compute updates because there are multiple rows with the same identifier fields([id, name]). Please make sure the rows are unique.", + IllegalStateException.class, + () -> Lists.newArrayList(iterator)); // still allow extra insert rows rowsWithDuplication = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT}, null)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT, 0, 0}, null)); Iterator iterator1 = - ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); assertEquals( - "Rows should match.", - Lists.newArrayList( - new Object[] {1, "a", "data", UPDATE_BEFORE}, - new Object[] {1, "a", "new_data1", UPDATE_AFTER}, - new Object[] {1, "a", "new_data2", INSERT}), - rowsToJava(Lists.newArrayList(iterator1))); + "Rows should match.", + Lists.newArrayList( + new Object[] {1, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {1, "a", "new_data1", UPDATE_AFTER, 0, 0}, + new Object[] {1, "a", "new_data2", INSERT, 0, 0}), + rowsToJava(Lists.newArrayList(iterator1))); } @Test public void testCarryRowsRemoveWithDuplicates() { // assume rows are sorted by id and change type List rowsWithDuplication = - Lists.newArrayList( - // keep all delete rows for id 0 and id 1 since there is no insert row for them - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE}, null), - // the same number of delete and insert rows for id 2 - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT}, null)); - - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); - - assertEquals( - "Rows should match.", - Lists.newArrayList( - new Object[] {0, "a", "data", DELETE}, - new Object[] {0, "a", "data", DELETE}, - new Object[] {0, "a", "data", DELETE}, - new Object[] {1, "a", "old_data", DELETE}, - new Object[] {1, "a", "old_data", DELETE}, - new Object[] {3, "a", "new_data", INSERT}), - rowsToJava(result)); + Lists.newArrayList( + // keep all delete rows for id 0 and id 1 since there is no insert row for them + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), + // the same number of delete and insert rows for id 2 + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT, 0, 0}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {3, "a", "new_data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test public void testCarryRowsRemoveLessInsertRows() { // less insert rows than delete rows List rowsWithDuplication = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT}, null)); - - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); - - assertEquals( - "Rows should match.", - Lists.newArrayList( - new Object[] {1, "d", "data", DELETE}, new Object[] {2, "d", "data", INSERT}), - rowsToJava(result)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT, 0, 0}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {2, "d", "data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test public void testCarryRowsRemoveMoreInsertRows() { List rowsWithDuplication = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - // more insert rows than delete rows, should keep extra insert rows - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + // more insert rows than delete rows, should keep extra insert rows + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); + } + @Test + public void testCarryRowsRemoveNoInsertRows() { + // no insert row + List rowsWithDuplication = + Lists.newArrayList( + // next two rows are identical + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", DELETE, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); + } + + private void validateIterators(List rowsWithDuplication, List expectedRows) { Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); + ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); List result = Lists.newArrayList(iterator); - assertEquals( - "Rows should match.", - Lists.newArrayList( - new Object[] {0, "d", "data", DELETE}, new Object[] {1, "d", "data", INSERT}), - rowsToJava(result)); + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); + + iterator = ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); + result = Lists.newArrayList(iterator); + + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); } @Test - public void testCarryRowsRemoveNoInsertRows() { - // no insert row + public void testRemoveNetCarryovers() { List rowsWithDuplication = - Lists.newArrayList( - // next two rows are identical - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null)); + Lists.newArrayList( + // this row are different from other rows, it is a net change, should be kept + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + // a pair of delete and insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + // 2 delete rows and 2 insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + // a pair of insert and delete rows across snapshots, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 2, 2}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 3, 3}, null), + // extra insert rows, they are net changes, should be kept + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + // different key, net changes, should be kept + new GenericRowWithSchema(new Object[] {2, "d", "data", DELETE, 4, 4}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {2, "d", "data", DELETE, 4, 4}); Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); + ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); List result = Lists.newArrayList(iterator); - assertEquals( - "Duplicate rows should not be removed", - Lists.newArrayList( - new Object[] {1, "d", "data", DELETE}, new Object[] {1, "d", "data", DELETE}), - rowsToJava(result)); + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); } } From 3d138ed737dbf41226655c7f03479f49d641f5be Mon Sep 17 00:00:00 2001 From: yufei Date: Fri, 30 Jun 2023 23:46:52 -0700 Subject: [PATCH 2/2] Fix style issues --- .../TestCreateChangelogViewProcedure.java | 310 +++++++++--------- .../iceberg/spark/ChangelogIterator.java | 4 +- .../iceberg/spark/ComputeUpdateIterator.java | 10 +- .../spark/RemoveCarryoverIterator.java | 4 +- .../CreateChangelogViewProcedure.java | 72 ++-- .../iceberg/spark/TestChangelogIterator.java | 292 ++++++++--------- 6 files changed, 346 insertions(+), 346 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java index 6852bd939ef3..dc12b0145d50 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java @@ -36,7 +36,7 @@ public class TestCreateChangelogViewProcedure extends SparkExtensionsTestBase { private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); public TestCreateChangelogViewProcedure( - String catalogName, String implementation, Map config) { + String catalogName, String implementation, Map config) { super(catalogName, implementation, config); } @@ -80,17 +80,17 @@ public void testCustomizedViewName() { Snapshot snap2 = table.currentSnapshot(); sql( - "CALL %s.system.create_changelog_view(" - + "table => '%s'," - + "options => map('%s','%s','%s','%s')," - + "changelog_view => '%s')", - catalogName, - tableName, - SparkReadOptions.START_SNAPSHOT_ID, - snap1.snapshotId(), - SparkReadOptions.END_SNAPSHOT_ID, - snap2.snapshotId(), - "cdc_view"); + "CALL %s.system.create_changelog_view(" + + "table => '%s'," + + "options => map('%s','%s','%s','%s')," + + "changelog_view => '%s')", + catalogName, + tableName, + SparkReadOptions.START_SNAPSHOT_ID, + snap1.snapshotId(), + SparkReadOptions.END_SNAPSHOT_ID, + snap2.snapshotId(), + "cdc_view"); long rowCount = sql("select * from %s", "cdc_view").stream().count(); Assert.assertEquals(2, rowCount); @@ -112,19 +112,19 @@ public void testNoSnapshotIdInput() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" + "table => '%s')", - catalogName, tableName, "cdc_view"); + sql( + "CALL %s.system.create_changelog_view(" + "table => '%s')", + catalogName, tableName, "cdc_view"); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap0.snapshotId()), + row(2, "b", INSERT, 1, snap1.snapshotId()), + row(-2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", DELETE, 2, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id", viewName)); } @Test @@ -147,44 +147,44 @@ public void testTimestampsBasedQuery() { long afterInsertOverwrite = waitUntilAfter(snap2.timestampMillis()); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', " - + "options => map('%s', '%s','%s', '%s'))", - catalogName, - tableName, - SparkReadOptions.START_TIMESTAMP, - beginning, - SparkReadOptions.END_TIMESTAMP, - afterInsertOverwrite); + sql( + "CALL %s.system.create_changelog_view(table => '%s', " + + "options => map('%s', '%s','%s', '%s'))", + catalogName, + tableName, + SparkReadOptions.START_TIMESTAMP, + beginning, + SparkReadOptions.END_TIMESTAMP, + afterInsertOverwrite); assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap0.snapshotId()), + row(2, "b", INSERT, 1, snap1.snapshotId()), + row(-2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", DELETE, 2, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); // query the timestamps starting from the second insert returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', " - + "options => map('%s', '%s', '%s', '%s'))", - catalogName, - tableName, - SparkReadOptions.START_TIMESTAMP, - afterFirstInsert, - SparkReadOptions.END_TIMESTAMP, - afterInsertOverwrite); + sql( + "CALL %s.system.create_changelog_view(table => '%s', " + + "options => map('%s', '%s', '%s', '%s'))", + catalogName, + tableName, + SparkReadOptions.START_TIMESTAMP, + afterFirstInsert, + SparkReadOptions.END_TIMESTAMP, + afterInsertOverwrite); assertEquals( - "Rows should match", - ImmutableList.of( - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(-2, "b", INSERT, 1, snap2.snapshotId()), - row(2, "b", DELETE, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); + "Rows should match", + ImmutableList.of( + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(-2, "b", INSERT, 1, snap2.snapshotId()), + row(2, "b", DELETE, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); } @Test @@ -203,23 +203,23 @@ public void testWithCarryovers() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName, "cdc_view"); + sql( + "CALL %s.system.create_changelog_view(" + + "remove_carryovers => false," + + "table => '%s')", + catalogName, tableName, "cdc_view"); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, _change_type", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap0.snapshotId()), + row(2, "b", INSERT, 1, snap1.snapshotId()), + row(-2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", DELETE, 2, snap2.snapshotId()), + row(2, "b", INSERT, 2, snap2.snapshotId()), + row(2, "b", INSERT, 2, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, _change_type", viewName)); } @Test @@ -237,20 +237,20 @@ public void testUpdate() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap1.snapshotId()), - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), - row(3, "c", INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap1.snapshotId()), + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), + row(3, "c", INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -266,19 +266,19 @@ public void testUpdateWithIdentifierField() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', compute_updates => true)", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(table => '%s', compute_updates => true)", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), - row(3, "c", INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()), + row(3, "c", INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -296,21 +296,21 @@ public void testUpdateWithFilter() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap1.snapshotId()), - row(2, "b", INSERT, 0, snap1.snapshotId()), - row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())), - // the predicate on partition columns will filter out the insert of (3, 'c') at the planning - // phase - sql("select * from %s where id != 3 order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", INSERT, 0, snap1.snapshotId()), + row(2, "b", INSERT, 0, snap1.snapshotId()), + row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())), + // the predicate on partition columns will filter out the insert of (3, 'c') at the planning + // phase + sql("select * from %s where id != 3 order by _change_ordinal, id, data", viewName)); } @Test @@ -326,23 +326,23 @@ public void testUpdateWithMultipleIdentifierColumns() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "identifier_columns => array('id','age')," - + "table => '%s')", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(" + + "identifier_columns => array('id','age')," + + "table => '%s')", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), - row(2, "e", 12, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), + row(2, "e", 12, INSERT, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -359,24 +359,24 @@ public void testRemoveCarryOvers() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "identifier_columns => array('id','age'), " - + "table => '%s')", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(" + + "identifier_columns => array('id','age'), " + + "table => '%s')", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), - row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "e", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()), + row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -393,22 +393,22 @@ public void testRemoveCarryOversWithoutUpdatedRows() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql("CALL %s.system.create_changelog_view(table => '%s')", catalogName, tableName); + sql("CALL %s.system.create_changelog_view(table => '%s')", catalogName, tableName); String viewName = (String) returns.get(0)[0]; // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even // though update-row is not computed assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, DELETE, 1, snap2.snapshotId()), - row(2, "d", 11, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "e", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, DELETE, 1, snap2.snapshotId()), + row(2, "d", 11, INSERT, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data", viewName)); } @Test @@ -425,26 +425,26 @@ public void testNotRemoveCarryOvers() { Snapshot snap2 = table.currentSnapshot(); List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName); + sql( + "CALL %s.system.create_changelog_view(" + + "remove_carryovers => false," + + "table => '%s')", + catalogName, tableName); String viewName = (String) returns.get(0)[0]; assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, DELETE, 1, snap2.snapshotId()), - row(2, "d", 11, INSERT, 1, snap2.snapshotId()), - // the following two rows are carry-over rows - row(2, "e", 12, DELETE, 1, snap2.snapshotId()), - row(2, "e", 12, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName)); + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, INSERT, 0, snap1.snapshotId()), + row(2, "e", 12, INSERT, 0, snap1.snapshotId()), + row(2, "b", 11, DELETE, 1, snap2.snapshotId()), + row(2, "d", 11, INSERT, 1, snap2.snapshotId()), + // the following two rows are carry-over rows + row(2, "e", 12, DELETE, 1, snap2.snapshotId()), + row(2, "e", 12, INSERT, 1, snap2.snapshotId()), + row(3, "c", 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName)); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java index 5b3e0071af5b..5f30c5fd4e56 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java @@ -60,10 +60,10 @@ protected Iterator rowIterator() { * @return a new iterator instance */ public static Iterator computeUpdates( - Iterator rowIterator, StructType rowType, String[] identifierFields) { + Iterator rowIterator, StructType rowType, String[] identifierFields) { Iterator carryoverRemoveIterator = removeCarryovers(rowIterator, rowType); ChangelogIterator changelogIterator = - new ComputeUpdateIterator(carryoverRemoveIterator, rowType, identifierFields); + new ComputeUpdateIterator(carryoverRemoveIterator, rowType, identifierFields); return Iterators.filter(changelogIterator, Objects::nonNull); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java index 66a58a44827a..23e6a19a17e7 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java @@ -57,7 +57,7 @@ public class ComputeUpdateIterator extends ChangelogIterator { ComputeUpdateIterator(Iterator rowIterator, StructType rowType, String[] identifierFields) { super(rowIterator, rowType); this.identifierFieldIdx = - Arrays.stream(identifierFields).map(rowType::fieldIndex).collect(Collectors.toList()); + Arrays.stream(identifierFields).map(rowType::fieldIndex).collect(Collectors.toList()); this.identifierFields = identifierFields; } @@ -89,10 +89,10 @@ public Row next() { String nextRowChangeType = nextRow.getString(changeTypeIndex()); Preconditions.checkState( - nextRowChangeType.equals(INSERT), - "Cannot compute updates because there are multiple rows with the same identifier" - + " fields([%s]). Please make sure the rows are unique.", - String.join(",", identifierFields)); + nextRowChangeType.equals(INSERT), + "Cannot compute updates because there are multiple rows with the same identifier" + + " fields([%s]). Please make sure the rows are unique.", + String.join(",", identifierFields)); currentRow = modify(currentRow, changeTypeIndex(), UPDATE_BEFORE); cachedRow = modify(nextRow, changeTypeIndex(), UPDATE_AFTER); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java index 8395a354edb4..2e90dc7749d1 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java @@ -99,8 +99,8 @@ public Row next() { // drain all identical delete rows when there is at least one cached delete row and the next // row is the same record while (nextRow != null - && cachedDeletedRow != null - && isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) { + && cachedDeletedRow != null + && isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) { if (changeType(nextRow).equals(INSERT)) { deletedRowCount--; if (deletedRowCount == 0) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index f2fc524bd327..85043d2df3d6 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -81,33 +81,33 @@ public class CreateChangelogViewProcedure extends BaseProcedure { private static final ProcedureParameter TABLE_PARAM = - ProcedureParameter.required("table", DataTypes.StringType); + ProcedureParameter.required("table", DataTypes.StringType); private static final ProcedureParameter CHANGELOG_VIEW_PARAM = - ProcedureParameter.optional("changelog_view", DataTypes.StringType); + ProcedureParameter.optional("changelog_view", DataTypes.StringType); private static final ProcedureParameter OPTIONS_PARAM = - ProcedureParameter.optional("options", STRING_MAP); + ProcedureParameter.optional("options", STRING_MAP); private static final ProcedureParameter COMPUTE_UPDATES_PARAM = - ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); + ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM = - ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); + ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM = - ProcedureParameter.optional("identifier_columns", STRING_ARRAY); + ProcedureParameter.optional("identifier_columns", STRING_ARRAY); private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - TABLE_PARAM, - CHANGELOG_VIEW_PARAM, - OPTIONS_PARAM, - COMPUTE_UPDATES_PARAM, - REMOVE_CARRYOVERS_PARAM, - IDENTIFIER_COLUMNS_PARAM, - }; + new ProcedureParameter[] { + TABLE_PARAM, + CHANGELOG_VIEW_PARAM, + OPTIONS_PARAM, + COMPUTE_UPDATES_PARAM, + REMOVE_CARRYOVERS_PARAM, + IDENTIFIER_COLUMNS_PARAM, + }; private static final StructType OUTPUT_TYPE = - new StructType( - new StructField[] { - new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty()) - }); + new StructType( + new StructField[] { + new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty()) + }); public static SparkProcedures.ProcedureBuilder builder() { return new BaseProcedure.Builder() { @@ -157,8 +157,8 @@ public InternalRow[] call(InternalRow args) { private Dataset computeUpdateImages(String[] identifierColumns, Dataset df) { Preconditions.checkArgument( - identifierColumns.length > 0, - "Cannot compute the update images because identifier columns are not set"); + identifierColumns.length > 0, + "Cannot compute the update images because identifier columns are not set"); Column[] repartitionSpec = new Column[identifierColumns.length + 1]; for (int i = 0; i < identifierColumns.length; i++) { @@ -181,10 +181,10 @@ private boolean shouldRemoveCarryoverRows(ProcedureInput input) { private Dataset removeCarryoverRows(Dataset df) { Column[] repartitionSpec = - Arrays.stream(df.columns()) - .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name())) - .map(df::col) - .toArray(Column[]::new); + Arrays.stream(df.columns()) + .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name())) + .map(df::col) + .toArray(Column[]::new); return applyCarryoverRemoveIterator(df, repartitionSpec); } @@ -217,15 +217,15 @@ private Dataset applyChangelogIterator(Dataset df, Column[] repartitio Column[] sortSpec = sortSpec(df, repartitionSpec); StructType schema = df.schema(); String[] identifierFields = - Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new); + Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new); return df.repartition(repartitionSpec) - .sortWithinPartitions(sortSpec) - .mapPartitions( - (MapPartitionsFunction) - rowIterator -> - ChangelogIterator.computeUpdates(rowIterator, schema, identifierFields), - RowEncoder.apply(schema)); + .sortWithinPartitions(sortSpec) + .mapPartitions( + (MapPartitionsFunction) + rowIterator -> + ChangelogIterator.computeUpdates(rowIterator, schema, identifierFields), + RowEncoder.apply(schema)); } private Dataset applyCarryoverRemoveIterator(Dataset df, Column[] repartitionSpec) { @@ -233,11 +233,11 @@ private Dataset applyCarryoverRemoveIterator(Dataset df, Column[] repa StructType schema = df.schema(); return df.repartition(repartitionSpec) - .sortWithinPartitions(sortSpec) - .mapPartitions( - (MapPartitionsFunction) - rowIterator -> ChangelogIterator.removeCarryovers(rowIterator, schema), - RowEncoder.apply(schema)); + .sortWithinPartitions(sortSpec) + .mapPartitions( + (MapPartitionsFunction) + rowIterator -> ChangelogIterator.removeCarryovers(rowIterator, schema), + RowEncoder.apply(schema)); } private static Column[] sortSpec(Dataset df, Column[] repartitionSpec) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java index e99da5077205..0539598f147e 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java @@ -43,24 +43,24 @@ public class TestChangelogIterator extends SparkTestHelperBase { private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); private static final StructType SCHEMA = - new StructType( - new StructField[] { - new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("name", DataTypes.StringType, false, Metadata.empty()), - new StructField("data", DataTypes.StringType, true, Metadata.empty()), - new StructField( - MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()), - new StructField( - MetadataColumns.CHANGE_ORDINAL.name(), - DataTypes.IntegerType, - false, - Metadata.empty()), - new StructField( - MetadataColumns.COMMIT_SNAPSHOT_ID.name(), - DataTypes.LongType, - false, - Metadata.empty()) - }); + new StructType( + new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("name", DataTypes.StringType, false, Metadata.empty()), + new StructField("data", DataTypes.StringType, true, Metadata.empty()), + new StructField( + MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()), + new StructField( + MetadataColumns.CHANGE_ORDINAL.name(), + DataTypes.IntegerType, + false, + Metadata.empty()), + new StructField( + MetadataColumns.COMMIT_SNAPSHOT_ID.name(), + DataTypes.LongType, + false, + Metadata.empty()) + }); private static final String[] IDENTIFIER_FIELDS = new String[] {"id", "name"}; private enum RowType { @@ -75,9 +75,9 @@ public void testIterator() { List permutations = Lists.newArrayList(); // generate 24 permutations permute( - Arrays.asList(RowType.DELETED, RowType.INSERTED, RowType.CARRY_OVER, RowType.UPDATED), - 0, - permutations); + Arrays.asList(RowType.DELETED, RowType.INSERTED, RowType.CARRY_OVER, RowType.UPDATED), + 0, + permutations); Assert.assertEquals(24, permutations.size()); for (Object[] permutation : permutations) { @@ -94,7 +94,7 @@ private void validate(Object[] permutation) { } Iterator iterator = - ChangelogIterator.computeUpdates(rows.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rows.iterator(), SCHEMA, IDENTIFIER_FIELDS); List result = Lists.newArrayList(iterator); assertEquals("Rows should match", expectedRows, rowsToJava(result)); } @@ -103,18 +103,18 @@ private List toOriginalRows(RowType rowType, int index) { switch (rowType) { case DELETED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE, 0, 0}, null)); + new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE, 0, 0}, null)); case INSERTED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT, 0, 0}, null)); + new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT, 0, 0}, null)); case CARRY_OVER: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT, 0, 0}, null)); + new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT, 0, 0}, null)); case UPDATED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT, 0, 0}, null)); + new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT, 0, 0}, null)); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -134,8 +134,8 @@ private List toExpectedRows(RowType rowType, int order) { return Lists.newArrayList(); case UPDATED: return Lists.newArrayList( - new Object[] {order, "a", "data", UPDATE_BEFORE, 0, 0}, - new Object[] {order, "a", "new_data", UPDATE_AFTER, 0, 0}); + new Object[] {order, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {order, "a", "new_data", UPDATE_AFTER, 0, 0}); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -155,97 +155,97 @@ private void permute(List arr, int start, List pm) { @Test public void testRowsWithNullValue() { final List rowsWithNull = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {2, null, null, DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {3, null, null, INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {4, null, null, DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {4, null, null, INSERT, 0, 0}, null), - // mixed null and non-null value in non-identifier columns - new GenericRowWithSchema(new Object[] {5, null, null, DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {5, null, "data", INSERT, 0, 0}, null), - // mixed null and non-null value in identifier columns - new GenericRowWithSchema(new Object[] {6, null, null, DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {6, "name", null, INSERT, 0, 0}, null)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {2, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, null, null, INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, INSERT, 0, 0}, null), + // mixed null and non-null value in non-identifier columns + new GenericRowWithSchema(new Object[] {5, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {5, null, "data", INSERT, 0, 0}, null), + // mixed null and non-null value in identifier columns + new GenericRowWithSchema(new Object[] {6, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {6, "name", null, INSERT, 0, 0}, null)); Iterator iterator = - ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA, IDENTIFIER_FIELDS); List result = Lists.newArrayList(iterator); assertEquals( - "Rows should match", - Lists.newArrayList( - new Object[] {2, null, null, DELETE, 0, 0}, - new Object[] {3, null, null, INSERT, 0, 0}, - new Object[] {5, null, null, UPDATE_BEFORE, 0, 0}, - new Object[] {5, null, "data", UPDATE_AFTER, 0, 0}, - new Object[] {6, null, null, DELETE, 0, 0}, - new Object[] {6, "name", null, INSERT, 0, 0}), - rowsToJava(result)); + "Rows should match", + Lists.newArrayList( + new Object[] {2, null, null, DELETE, 0, 0}, + new Object[] {3, null, null, INSERT, 0, 0}, + new Object[] {5, null, null, UPDATE_BEFORE, 0, 0}, + new Object[] {5, null, "data", UPDATE_AFTER, 0, 0}, + new Object[] {6, null, null, DELETE, 0, 0}, + new Object[] {6, "name", null, INSERT, 0, 0}), + rowsToJava(result)); } @Test public void testUpdatedRowsWithDuplication() { List rowsWithDuplication = - Lists.newArrayList( - // two rows with same identifier fields(id, name) - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null)); + Lists.newArrayList( + // two rows with same identifier fields(id, name) + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null)); Iterator iterator = - ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); assertThrows( - "Cannot compute updates because there are multiple rows with the same identifier fields([id, name]). Please make sure the rows are unique.", - IllegalStateException.class, - () -> Lists.newArrayList(iterator)); + "Cannot compute updates because there are multiple rows with the same identifier fields([id, name]). Please make sure the rows are unique.", + IllegalStateException.class, + () -> Lists.newArrayList(iterator)); // still allow extra insert rows rowsWithDuplication = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT, 0, 0}, null)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT, 0, 0}, null)); Iterator iterator1 = - ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); + ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); assertEquals( - "Rows should match.", - Lists.newArrayList( - new Object[] {1, "a", "data", UPDATE_BEFORE, 0, 0}, - new Object[] {1, "a", "new_data1", UPDATE_AFTER, 0, 0}, - new Object[] {1, "a", "new_data2", INSERT, 0, 0}), - rowsToJava(Lists.newArrayList(iterator1))); + "Rows should match.", + Lists.newArrayList( + new Object[] {1, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {1, "a", "new_data1", UPDATE_AFTER, 0, 0}, + new Object[] {1, "a", "new_data2", INSERT, 0, 0}), + rowsToJava(Lists.newArrayList(iterator1))); } @Test public void testCarryRowsRemoveWithDuplicates() { // assume rows are sorted by id and change type List rowsWithDuplication = - Lists.newArrayList( - // keep all delete rows for id 0 and id 1 since there is no insert row for them - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), - // the same number of delete and insert rows for id 2 - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT, 0, 0}, null)); + Lists.newArrayList( + // keep all delete rows for id 0 and id 1 since there is no insert row for them + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), + // the same number of delete and insert rows for id 2 + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT, 0, 0}, null)); List expectedRows = - Lists.newArrayList( - new Object[] {0, "a", "data", DELETE, 0, 0}, - new Object[] {0, "a", "data", DELETE, 0, 0}, - new Object[] {0, "a", "data", DELETE, 0, 0}, - new Object[] {1, "a", "old_data", DELETE, 0, 0}, - new Object[] {1, "a", "old_data", DELETE, 0, 0}, - new Object[] {3, "a", "new_data", INSERT, 0, 0}); + Lists.newArrayList( + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {3, "a", "new_data", INSERT, 0, 0}); validateIterators(rowsWithDuplication, expectedRows); } @@ -254,16 +254,16 @@ public void testCarryRowsRemoveWithDuplicates() { public void testCarryRowsRemoveLessInsertRows() { // less insert rows than delete rows List rowsWithDuplication = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT, 0, 0}, null)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT, 0, 0}, null)); List expectedRows = - Lists.newArrayList( - new Object[] {1, "d", "data", DELETE, 0, 0}, - new Object[] {2, "d", "data", INSERT, 0, 0}); + Lists.newArrayList( + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {2, "d", "data", INSERT, 0, 0}); validateIterators(rowsWithDuplication, expectedRows); } @@ -271,21 +271,21 @@ public void testCarryRowsRemoveLessInsertRows() { @Test public void testCarryRowsRemoveMoreInsertRows() { List rowsWithDuplication = - Lists.newArrayList( - new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), - // more insert rows than delete rows, should keep extra insert rows - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null)); + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + // more insert rows than delete rows, should keep extra insert rows + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null)); List expectedRows = - Lists.newArrayList( - new Object[] {0, "d", "data", DELETE, 0, 0}, - new Object[] {1, "d", "data", INSERT, 0, 0}); + Lists.newArrayList( + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 0, 0}); validateIterators(rowsWithDuplication, expectedRows); } @@ -294,22 +294,22 @@ public void testCarryRowsRemoveMoreInsertRows() { public void testCarryRowsRemoveNoInsertRows() { // no insert row List rowsWithDuplication = - Lists.newArrayList( - // next two rows are identical - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null)); + Lists.newArrayList( + // next two rows are identical + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null)); List expectedRows = - Lists.newArrayList( - new Object[] {1, "d", "data", DELETE, 0, 0}, - new Object[] {1, "d", "data", DELETE, 0, 0}); + Lists.newArrayList( + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", DELETE, 0, 0}); validateIterators(rowsWithDuplication, expectedRows); } private void validateIterators(List rowsWithDuplication, List expectedRows) { Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); + ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); List result = Lists.newArrayList(iterator); assertEquals("Rows should match.", expectedRows, rowsToJava(result)); @@ -323,35 +323,35 @@ private void validateIterators(List rowsWithDuplication, List exp @Test public void testRemoveNetCarryovers() { List rowsWithDuplication = - Lists.newArrayList( - // this row are different from other rows, it is a net change, should be kept - new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), - // a pair of delete and insert rows, should be removed - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), - // 2 delete rows and 2 insert rows, should be removed - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), - // a pair of insert and delete rows across snapshots, should be removed - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 2, 2}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 3, 3}, null), - // extra insert rows, they are net changes, should be kept - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), - // different key, net changes, should be kept - new GenericRowWithSchema(new Object[] {2, "d", "data", DELETE, 4, 4}, null)); + Lists.newArrayList( + // this row are different from other rows, it is a net change, should be kept + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + // a pair of delete and insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + // 2 delete rows and 2 insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + // a pair of insert and delete rows across snapshots, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 2, 2}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 3, 3}, null), + // extra insert rows, they are net changes, should be kept + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + // different key, net changes, should be kept + new GenericRowWithSchema(new Object[] {2, "d", "data", DELETE, 4, 4}, null)); List expectedRows = - Lists.newArrayList( - new Object[] {0, "d", "data", DELETE, 0, 0}, - new Object[] {1, "d", "data", INSERT, 4, 4}, - new Object[] {1, "d", "data", INSERT, 4, 4}, - new Object[] {2, "d", "data", DELETE, 4, 4}); + Lists.newArrayList( + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {2, "d", "data", DELETE, 4, 4}); Iterator iterator = - ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); + ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); List result = Lists.newArrayList(iterator); assertEquals("Rows should match.", expectedRows, rowsToJava(result));