diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java index 70b160e13fee..2e90dc7749d1 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark; import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; @@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator { RemoveCarryoverIterator(Iterator rowIterator, StructType rowType) { super(rowIterator, rowType); - this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(rowType.size()); + this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(); } @Override @@ -88,7 +90,7 @@ public Row next() { } // If the current row is a delete row, drain all identical delete rows - if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { + if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) { cachedDeletedRow = currentRow; deletedRowCount = 1; @@ -98,8 +100,8 @@ public Row next() { // row is the same record while (nextRow != null && cachedDeletedRow != null - && isSameRecord(cachedDeletedRow, nextRow)) { - if (nextRow.getString(changeTypeIndex()).equals(INSERT)) { + && isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) { + if (changeType(nextRow).equals(INSERT)) { deletedRowCount--; if (deletedRowCount == 0) { cachedDeletedRow = null; @@ -139,25 +141,8 @@ private boolean hasCachedDeleteRow() { return cachedDeletedRow != null; } - private int[] generateIndicesToIdentifySameRow(int columnSize) { - int[] indices = new int[columnSize - 1]; - for (int i = 0; i < indices.length; i++) { - if (i < changeTypeIndex()) { - indices[i] = i; - } else { - indices[i] = i + 1; - } - } - return indices; - } - - private boolean isSameRecord(Row currentRow, Row nextRow) { - for (int idx : indicesToIdentifySameRow) { - if (isDifferentValue(currentRow, nextRow, idx)) { - return false; - } - } - - return true; + private int[] generateIndicesToIdentifySameRow() { + Set metadataColumnIndices = Sets.newHashSet(changeTypeIndex()); + return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java new file mode 100644 index 000000000000..e4234755cdcf --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +/** + * This class computes the net changes across multiple snapshots. It is different from {@link + * RemoveCarryoverIterator}, which only removes carry-over rows within a single snapshot. It takes a + * row iterator, and assumes the following: + * + * + */ +public class RemoveNetCarryoverIterator extends ChangelogIterator { + + private final int[] indicesToIdentifySameRow; + + private Row cachedNextRow; + private Row cachedRow; + private long cachedRowCount; + + protected RemoveNetCarryoverIterator(Iterator rowIterator, StructType rowType) { + super(rowIterator, rowType); + this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(); + } + + @Override + public boolean hasNext() { + if (cachedRowCount > 0) { + return true; + } + + if (cachedNextRow != null) { + return true; + } + + return rowIterator().hasNext(); + } + + @Override + public Row next() { + // if there are cached rows, return one of them from the beginning + if (cachedRowCount > 0) { + cachedRowCount--; + return cachedRow; + } + + cachedRow = getCurrentRow(); + // return it directly if there is no more rows + if (!rowIterator().hasNext()) { + return cachedRow; + } + cachedRowCount = 1; + + cachedNextRow = rowIterator().next(); + + // pull rows from the iterator until two consecutive rows are different + while (isSameRecord(cachedRow, cachedNextRow, indicesToIdentifySameRow)) { + if (oppositeChangeType(cachedRow, cachedNextRow)) { + // two rows with opposite change types means no net changes, remove both + cachedRowCount--; + } else { + // two rows with same change types means potential net changes, cache the next row + cachedRowCount++; + } + + // stop pulling rows if there is no more rows or the next row is different + if (cachedRowCount <= 0 || !rowIterator().hasNext()) { + // reset the cached next row if there is no more rows + cachedNextRow = null; + break; + } + + cachedNextRow = rowIterator().next(); + } + + return null; + } + + private Row getCurrentRow() { + Row currentRow; + if (cachedNextRow != null) { + currentRow = cachedNextRow; + cachedNextRow = null; + } else { + currentRow = rowIterator().next(); + } + return currentRow; + } + + private boolean oppositeChangeType(Row currentRow, Row nextRow) { + return (changeType(nextRow).equals(INSERT) && changeType(currentRow).equals(DELETE)) + || (changeType(nextRow).equals(DELETE) && changeType(currentRow).equals(INSERT)); + } + + private int[] generateIndicesToIdentifySameRow() { + Set metadataColumnIndices = + Sets.newHashSet( + rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()), + rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()), + changeTypeIndex()); + return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices); + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java index bf98bebb9d50..0539598f147e 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java @@ -49,7 +49,17 @@ public class TestChangelogIterator extends SparkTestHelperBase { new StructField("name", DataTypes.StringType, false, Metadata.empty()), new StructField("data", DataTypes.StringType, true, Metadata.empty()), new StructField( - MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()) + MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()), + new StructField( + MetadataColumns.CHANGE_ORDINAL.name(), + DataTypes.IntegerType, + false, + Metadata.empty()), + new StructField( + MetadataColumns.COMMIT_SNAPSHOT_ID.name(), + DataTypes.LongType, + false, + Metadata.empty()) }); private static final String[] IDENTIFIER_FIELDS = new String[] {"id", "name"}; @@ -93,18 +103,18 @@ private List toOriginalRows(RowType rowType, int index) { switch (rowType) { case DELETED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE}, null)); + new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE, 0, 0}, null)); case INSERTED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT, 0, 0}, null)); case CARRY_OVER: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT, 0, 0}, null)); case UPDATED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT, 0, 0}, null)); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -114,18 +124,18 @@ private List toExpectedRows(RowType rowType, int order) { switch (rowType) { case DELETED: List rows = Lists.newArrayList(); - rows.add(new Object[] {order, "b", "data", DELETE}); + rows.add(new Object[] {order, "b", "data", DELETE, 0, 0}); return rows; case INSERTED: List insertedRows = Lists.newArrayList(); - insertedRows.add(new Object[] {order, "c", "data", INSERT}); + insertedRows.add(new Object[] {order, "c", "data", INSERT, 0, 0}); return insertedRows; case CARRY_OVER: return Lists.newArrayList(); case UPDATED: return Lists.newArrayList( - new Object[] {order, "a", "data", UPDATE_BEFORE}, - new Object[] {order, "a", "new_data", UPDATE_AFTER}); + new Object[] {order, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {order, "a", "new_data", UPDATE_AFTER, 0, 0}); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -146,16 +156,16 @@ private void permute(List arr, int start, List pm) { public void testRowsWithNullValue() { final List rowsWithNull = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {2, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {3, null, null, INSERT}, null), - new GenericRowWithSchema(new Object[] {4, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {4, null, null, INSERT}, null), + new GenericRowWithSchema(new Object[] {2, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, null, null, INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, INSERT, 0, 0}, null), // mixed null and non-null value in non-identifier columns - new GenericRowWithSchema(new Object[] {5, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {5, null, "data", INSERT}, null), + new GenericRowWithSchema(new Object[] {5, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {5, null, "data", INSERT, 0, 0}, null), // mixed null and non-null value in identifier columns - new GenericRowWithSchema(new Object[] {6, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {6, "name", null, INSERT}, null)); + new GenericRowWithSchema(new Object[] {6, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {6, "name", null, INSERT, 0, 0}, null)); Iterator iterator = ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA, IDENTIFIER_FIELDS); @@ -164,12 +174,12 @@ public void testRowsWithNullValue() { assertEquals( "Rows should match", Lists.newArrayList( - new Object[] {2, null, null, DELETE}, - new Object[] {3, null, null, INSERT}, - new Object[] {5, null, null, UPDATE_BEFORE}, - new Object[] {5, null, "data", UPDATE_AFTER}, - new Object[] {6, null, null, DELETE}, - new Object[] {6, "name", null, INSERT}), + new Object[] {2, null, null, DELETE, 0, 0}, + new Object[] {3, null, null, INSERT, 0, 0}, + new Object[] {5, null, null, UPDATE_BEFORE, 0, 0}, + new Object[] {5, null, "data", UPDATE_AFTER, 0, 0}, + new Object[] {6, null, null, DELETE, 0, 0}, + new Object[] {6, "name", null, INSERT, 0, 0}), rowsToJava(result)); } @@ -178,10 +188,10 @@ public void testUpdatedRowsWithDuplication() { List rowsWithDuplication = Lists.newArrayList( // two rows with same identifier fields(id, name) - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null)); Iterator iterator = ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); @@ -194,9 +204,9 @@ public void testUpdatedRowsWithDuplication() { // still allow extra insert rows rowsWithDuplication = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT}, null)); + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT, 0, 0}, null)); Iterator iterator1 = ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); @@ -204,9 +214,9 @@ public void testUpdatedRowsWithDuplication() { assertEquals( "Rows should match.", Lists.newArrayList( - new Object[] {1, "a", "data", UPDATE_BEFORE}, - new Object[] {1, "a", "new_data1", UPDATE_AFTER}, - new Object[] {1, "a", "new_data2", INSERT}), + new Object[] {1, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {1, "a", "new_data1", UPDATE_AFTER, 0, 0}, + new Object[] {1, "a", "new_data2", INSERT, 0, 0}), rowsToJava(Lists.newArrayList(iterator1))); } @@ -216,32 +226,28 @@ public void testCarryRowsRemoveWithDuplicates() { List rowsWithDuplication = Lists.newArrayList( // keep all delete rows for id 0 and id 1 since there is no insert row for them - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), // the same number of delete and insert rows for id 2 - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT}, null)); - - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT, 0, 0}, null)); - assertEquals( - "Rows should match.", + List expectedRows = Lists.newArrayList( - new Object[] {0, "a", "data", DELETE}, - new Object[] {0, "a", "data", DELETE}, - new Object[] {0, "a", "data", DELETE}, - new Object[] {1, "a", "old_data", DELETE}, - new Object[] {1, "a", "old_data", DELETE}, - new Object[] {3, "a", "new_data", INSERT}), - rowsToJava(result)); + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {3, "a", "new_data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test @@ -249,45 +255,39 @@ public void testCarryRowsRemoveLessInsertRows() { // less insert rows than delete rows List rowsWithDuplication = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT, 0, 0}, null)); - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); - - assertEquals( - "Rows should match.", + List expectedRows = Lists.newArrayList( - new Object[] {1, "d", "data", DELETE}, new Object[] {2, "d", "data", INSERT}), - rowsToJava(result)); + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {2, "d", "data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test public void testCarryRowsRemoveMoreInsertRows() { List rowsWithDuplication = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), // more insert rows than delete rows, should keep extra insert rows - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null)); - - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null)); - assertEquals( - "Rows should match.", + List expectedRows = Lists.newArrayList( - new Object[] {0, "d", "data", DELETE}, new Object[] {1, "d", "data", INSERT}), - rowsToJava(result)); + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test @@ -296,17 +296,64 @@ public void testCarryRowsRemoveNoInsertRows() { List rowsWithDuplication = Lists.newArrayList( // next two rows are identical - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null)); + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", DELETE, 0, 0}); + validateIterators(rowsWithDuplication, expectedRows); + } + + private void validateIterators(List rowsWithDuplication, List expectedRows) { Iterator iterator = ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); List result = Lists.newArrayList(iterator); - assertEquals( - "Duplicate rows should not be removed", + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); + + iterator = ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); + result = Lists.newArrayList(iterator); + + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); + } + + @Test + public void testRemoveNetCarryovers() { + List rowsWithDuplication = Lists.newArrayList( - new Object[] {1, "d", "data", DELETE}, new Object[] {1, "d", "data", DELETE}), - rowsToJava(result)); + // this row are different from other rows, it is a net change, should be kept + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + // a pair of delete and insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + // 2 delete rows and 2 insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + // a pair of insert and delete rows across snapshots, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 2, 2}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 3, 3}, null), + // extra insert rows, they are net changes, should be kept + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + // different key, net changes, should be kept + new GenericRowWithSchema(new Object[] {2, "d", "data", DELETE, 4, 4}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {2, "d", "data", DELETE, 4, 4}); + + Iterator iterator = + ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); + List result = Lists.newArrayList(iterator); + + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); } }