diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java index dc12b0145d50..0a4a1073c3f9 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.junit.Assert.assertThrows; + import java.util.List; import java.util.Map; import org.apache.iceberg.ChangelogOperation; @@ -45,13 +47,13 @@ public void removeTable() { sql("DROP TABLE IF EXISTS %s", tableName); } - public void createTableWith2Columns() { + public void createTableWithTwoColumns() { sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 1); sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName); } - private void createTableWith3Columns() { + private void createTableWithThreeColumns() { sql("CREATE TABLE %s (id INT, data STRING, age INT) USING iceberg", tableName); sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 1); sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName); @@ -65,7 +67,7 @@ private void createTableWithIdentifierField() { @Test public void testCustomizedViewName() { - createTableWith2Columns(); + createTableWithTwoColumns(); sql("INSERT INTO %s VALUES (1, 'a')", tableName); sql("INSERT INTO %s VALUES (2, 'b')", tableName); @@ -98,7 +100,7 @@ public void testCustomizedViewName() { @Test public void testNoSnapshotIdInput() { - createTableWith2Columns(); + createTableWithTwoColumns(); sql("INSERT INTO %s VALUES (1, 'a')", tableName); Table table = validationCatalog.loadTable(tableIdent); Snapshot snap0 = table.currentSnapshot(); @@ -129,7 +131,7 @@ public void testNoSnapshotIdInput() { @Test public void testTimestampsBasedQuery() { - createTableWith2Columns(); + createTableWithTwoColumns(); long beginning = System.currentTimeMillis(); sql("INSERT INTO %s VALUES (1, 'a')", tableName); @@ -189,7 +191,7 @@ public void testTimestampsBasedQuery() { @Test public void testWithCarryovers() { - createTableWith2Columns(); + createTableWithTwoColumns(); sql("INSERT INTO %s VALUES (1, 'a')", tableName); Table table = validationCatalog.loadTable(tableIdent); Snapshot snap0 = table.currentSnapshot(); @@ -224,7 +226,7 @@ public void testWithCarryovers() { @Test public void testUpdate() { - createTableWith2Columns(); + createTableWithTwoColumns(); sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName); sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName); @@ -283,7 +285,7 @@ public void testUpdateWithIdentifierField() { @Test public void testUpdateWithFilter() { - createTableWith2Columns(); + createTableWithTwoColumns(); sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName); sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName); @@ -315,7 +317,7 @@ public void testUpdateWithFilter() { @Test public void testUpdateWithMultipleIdentifierColumns() { - createTableWith3Columns(); + createTableWithThreeColumns(); sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName); Table table = validationCatalog.loadTable(tableIdent); @@ -347,7 +349,7 @@ public void testUpdateWithMultipleIdentifierColumns() { @Test public void testRemoveCarryOvers() { - createTableWith3Columns(); + createTableWithThreeColumns(); sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName); Table table = validationCatalog.loadTable(tableIdent); @@ -381,7 +383,7 @@ public void testRemoveCarryOvers() { @Test public void testRemoveCarryOversWithoutUpdatedRows() { - createTableWith3Columns(); + createTableWithThreeColumns(); sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName); Table table = validationCatalog.loadTable(tableIdent); @@ -411,9 +413,74 @@ public void testRemoveCarryOversWithoutUpdatedRows() { sql("select * from %s order by _change_ordinal, id, data", viewName)); } + @Test + public void testNetChangesWithRemoveCarryOvers() { + // partitioned by id + createTableWithThreeColumns(); + + // insert rows: (1, 'a', 12) (2, 'b', 11) (2, 'e', 12) + sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snap1 = table.currentSnapshot(); + + // delete rows: (2, 'b', 11) (2, 'e', 12) + // insert rows: (3, 'c', 13) (2, 'd', 11) (2, 'e', 12) + sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName); + table.refresh(); + Snapshot snap2 = table.currentSnapshot(); + + // delete rows: (2, 'd', 11) (2, 'e', 12) (3, 'c', 13) + // insert rows: (3, 'c', 15) (2, 'e', 12) + sql("INSERT OVERWRITE %s VALUES (3, 'c', 15), (2, 'e', 12)", tableName); + table.refresh(); + Snapshot snap3 = table.currentSnapshot(); + + // test with all snapshots + List returns = + sql( + "CALL %s.system.create_changelog_view(table => '%s', net_changes => true)", + catalogName, tableName); + + String viewName = (String) returns.get(0)[0]; + + assertEquals( + "Rows should match", + ImmutableList.of( + row(1, "a", 12, INSERT, 0, snap1.snapshotId()), + row(3, "c", 15, INSERT, 2, snap3.snapshotId()), + row(2, "e", 12, INSERT, 2, snap3.snapshotId())), + sql("select * from %s order by _change_ordinal, data", viewName)); + + // test with snap2 and snap3 + sql( + "CALL %s.system.create_changelog_view(table => '%s', " + + "options => map('start-snapshot-id','%s'), " + + "net_changes => true)", + catalogName, tableName, snap1.snapshotId()); + + assertEquals( + "Rows should match", + ImmutableList.of( + row(2, "b", 11, DELETE, 0, snap2.snapshotId()), + row(3, "c", 15, INSERT, 1, snap3.snapshotId())), + sql("select * from %s order by _change_ordinal, data", viewName)); + } + + @Test + public void testNetChangesWithComputeUpdates() { + createTableWithTwoColumns(); + assertThrows( + "Should fail because net_changes is not supported with computing updates", + IllegalArgumentException.class, + () -> + sql( + "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)", + catalogName, tableName)); + } + @Test public void testNotRemoveCarryOvers() { - createTableWith3Columns(); + createTableWithThreeColumns(); sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName); Table table = validationCatalog.loadTable(tableIdent); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java index 5f30c5fd4e56..cc44b1f3992c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java @@ -20,8 +20,10 @@ import java.util.Iterator; import java.util.Objects; +import java.util.Set; import org.apache.iceberg.ChangelogOperation; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; @@ -35,9 +37,11 @@ public abstract class ChangelogIterator implements Iterator { private final Iterator rowIterator; private final int changeTypeIndex; + private final StructType rowType; protected ChangelogIterator(Iterator rowIterator, StructType rowType) { this.rowIterator = rowIterator; + this.rowType = rowType; this.changeTypeIndex = rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name()); } @@ -45,6 +49,16 @@ protected int changeTypeIndex() { return changeTypeIndex; } + protected StructType rowType() { + return rowType; + } + + protected String changeType(Row row) { + String changeType = row.getString(changeTypeIndex()); + Preconditions.checkNotNull(changeType, "Change type should not be null"); + return changeType; + } + protected Iterator rowIterator() { return rowIterator; } @@ -79,7 +93,35 @@ public static Iterator removeCarryovers(Iterator rowIterator, StructTy return Iterators.filter(changelogIterator, Objects::nonNull); } + public static Iterator removeNetCarryovers(Iterator rowIterator, StructType rowType) { + ChangelogIterator changelogIterator = new RemoveNetCarryoverIterator(rowIterator, rowType); + return Iterators.filter(changelogIterator, Objects::nonNull); + } + + protected boolean isSameRecord(Row currentRow, Row nextRow, int[] indicesToIdentifySameRow) { + for (int idx : indicesToIdentifySameRow) { + if (isDifferentValue(currentRow, nextRow, idx)) { + return false; + } + } + + return true; + } + protected boolean isDifferentValue(Row currentRow, Row nextRow, int idx) { return !Objects.equals(nextRow.get(idx), currentRow.get(idx)); } + + protected static int[] generateIndicesToIdentifySameRow( + int totalColumnCount, Set metadataColumnIndices) { + int[] indices = new int[totalColumnCount - metadataColumnIndices.size()]; + + for (int i = 0, j = 0; i < indices.length; i++) { + if (!metadataColumnIndices.contains(i)) { + indices[j] = i; + j++; + } + } + return indices; + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java index 23e6a19a17e7..6951c33e51aa 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java @@ -81,15 +81,13 @@ public Row next() { // either a cached record which is not an UPDATE or the next record in the iterator. Row currentRow = currentRow(); - if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { + if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) { Row nextRow = rowIterator().next(); cachedRow = nextRow; if (sameLogicalRow(currentRow, nextRow)) { - String nextRowChangeType = nextRow.getString(changeTypeIndex()); - Preconditions.checkState( - nextRowChangeType.equals(INSERT), + changeType(nextRow).equals(INSERT), "Cannot compute updates because there are multiple rows with the same identifier" + " fields([%s]). Please make sure the rows are unique.", String.join(",", identifierFields)); @@ -118,7 +116,7 @@ private Row modify(Row row, int valueIndex, Object value) { } private boolean cachedUpdateRecord() { - return cachedRow != null && cachedRow.getString(changeTypeIndex()).equals(UPDATE_AFTER); + return cachedRow != null && changeType(cachedRow).equals(UPDATE_AFTER); } private Row currentRow() { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java index 70b160e13fee..2e90dc7749d1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark; import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; @@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator { RemoveCarryoverIterator(Iterator rowIterator, StructType rowType) { super(rowIterator, rowType); - this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(rowType.size()); + this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(); } @Override @@ -88,7 +90,7 @@ public Row next() { } // If the current row is a delete row, drain all identical delete rows - if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { + if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) { cachedDeletedRow = currentRow; deletedRowCount = 1; @@ -98,8 +100,8 @@ public Row next() { // row is the same record while (nextRow != null && cachedDeletedRow != null - && isSameRecord(cachedDeletedRow, nextRow)) { - if (nextRow.getString(changeTypeIndex()).equals(INSERT)) { + && isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) { + if (changeType(nextRow).equals(INSERT)) { deletedRowCount--; if (deletedRowCount == 0) { cachedDeletedRow = null; @@ -139,25 +141,8 @@ private boolean hasCachedDeleteRow() { return cachedDeletedRow != null; } - private int[] generateIndicesToIdentifySameRow(int columnSize) { - int[] indices = new int[columnSize - 1]; - for (int i = 0; i < indices.length; i++) { - if (i < changeTypeIndex()) { - indices[i] = i; - } else { - indices[i] = i + 1; - } - } - return indices; - } - - private boolean isSameRecord(Row currentRow, Row nextRow) { - for (int idx : indicesToIdentifySameRow) { - if (isDifferentValue(currentRow, nextRow, idx)) { - return false; - } - } - - return true; + private int[] generateIndicesToIdentifySameRow() { + Set metadataColumnIndices = Sets.newHashSet(changeTypeIndex()); + return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java new file mode 100644 index 000000000000..e4234755cdcf --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +/** + * This class computes the net changes across multiple snapshots. It is different from {@link + * RemoveCarryoverIterator}, which only removes carry-over rows within a single snapshot. It takes a + * row iterator, and assumes the following: + * + *
    + *
  • The row iterator is partitioned by all columns. + *
  • The row iterator is sorted by all columns, change order, and change type. The change order + * is 1-to-1 mapping to snapshot id. + *
+ */ +public class RemoveNetCarryoverIterator extends ChangelogIterator { + + private final int[] indicesToIdentifySameRow; + + private Row cachedNextRow; + private Row cachedRow; + private long cachedRowCount; + + protected RemoveNetCarryoverIterator(Iterator rowIterator, StructType rowType) { + super(rowIterator, rowType); + this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(); + } + + @Override + public boolean hasNext() { + if (cachedRowCount > 0) { + return true; + } + + if (cachedNextRow != null) { + return true; + } + + return rowIterator().hasNext(); + } + + @Override + public Row next() { + // if there are cached rows, return one of them from the beginning + if (cachedRowCount > 0) { + cachedRowCount--; + return cachedRow; + } + + cachedRow = getCurrentRow(); + // return it directly if there is no more rows + if (!rowIterator().hasNext()) { + return cachedRow; + } + cachedRowCount = 1; + + cachedNextRow = rowIterator().next(); + + // pull rows from the iterator until two consecutive rows are different + while (isSameRecord(cachedRow, cachedNextRow, indicesToIdentifySameRow)) { + if (oppositeChangeType(cachedRow, cachedNextRow)) { + // two rows with opposite change types means no net changes, remove both + cachedRowCount--; + } else { + // two rows with same change types means potential net changes, cache the next row + cachedRowCount++; + } + + // stop pulling rows if there is no more rows or the next row is different + if (cachedRowCount <= 0 || !rowIterator().hasNext()) { + // reset the cached next row if there is no more rows + cachedNextRow = null; + break; + } + + cachedNextRow = rowIterator().next(); + } + + return null; + } + + private Row getCurrentRow() { + Row currentRow; + if (cachedNextRow != null) { + currentRow = cachedNextRow; + cachedNextRow = null; + } else { + currentRow = rowIterator().next(); + } + return currentRow; + } + + private boolean oppositeChangeType(Row currentRow, Row nextRow) { + return (changeType(nextRow).equals(INSERT) && changeType(currentRow).equals(DELETE)) + || (changeType(nextRow).equals(DELETE) && changeType(currentRow).equals(INSERT)); + } + + private int[] generateIndicesToIdentifySameRow() { + Set metadataColumnIndices = + Sets.newHashSet( + rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()), + rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()), + changeTypeIndex()); + return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 85043d2df3d6..259254aa2d51 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -21,11 +21,14 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.ChangelogIterator; import org.apache.iceberg.spark.source.SparkChangelogTable; import org.apache.spark.api.java.function.MapPartitionsFunction; @@ -88,10 +91,22 @@ public class CreateChangelogViewProcedure extends BaseProcedure { ProcedureParameter.optional("options", STRING_MAP); private static final ProcedureParameter COMPUTE_UPDATES_PARAM = ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); + + /** + * Enable or disable the remove carry-over rows. + * + * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over + * rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove + * carry-over rows. + */ + @Deprecated private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM = ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); + private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM = ProcedureParameter.optional("identifier_columns", STRING_ARRAY); + private static final ProcedureParameter NET_CHANGES = + ProcedureParameter.optional("net_changes", DataTypes.BooleanType); private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { @@ -101,6 +116,7 @@ public class CreateChangelogViewProcedure extends BaseProcedure { COMPUTE_UPDATES_PARAM, REMOVE_CARRYOVERS_PARAM, IDENTIFIER_COLUMNS_PARAM, + NET_CHANGES, }; private static final StructType OUTPUT_TYPE = @@ -142,10 +158,13 @@ public InternalRow[] call(InternalRow args) { Identifier changelogTableIdent = changelogTableIdent(tableIdent); Dataset df = loadRows(changelogTableIdent, options(input)); + boolean netChanges = input.asBoolean(NET_CHANGES, false); + if (shouldComputeUpdateImages(input)) { + Preconditions.checkArgument(!netChanges, "Not support net changes with update images"); df = computeUpdateImages(identifierColumns(input, tableIdent), df); } else if (shouldRemoveCarryoverRows(input)) { - df = removeCarryoverRows(df); + df = removeCarryoverRows(df, netChanges); } String viewName = viewName(input, tableIdent.name()); @@ -164,6 +183,7 @@ private Dataset computeUpdateImages(String[] identifierColumns, Dataset removeCarryoverRows(Dataset df) { + private Dataset removeCarryoverRows(Dataset df, boolean netChanges) { + Predicate columnsToKeep; + if (netChanges) { + Set metadataColumn = + Sets.newHashSet( + MetadataColumns.CHANGE_TYPE.name(), + MetadataColumns.CHANGE_ORDINAL.name(), + MetadataColumns.COMMIT_SNAPSHOT_ID.name()); + + columnsToKeep = column -> !metadataColumn.contains(column); + } else { + columnsToKeep = column -> !column.equals(MetadataColumns.CHANGE_TYPE.name()); + } + Column[] repartitionSpec = - Arrays.stream(df.columns()) - .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name())) - .map(df::col) - .toArray(Column[]::new); - return applyCarryoverRemoveIterator(df, repartitionSpec); + Arrays.stream(df.columns()).filter(columnsToKeep).map(df::col).toArray(Column[]::new); + return applyCarryoverRemoveIterator(df, repartitionSpec, netChanges); } private String[] identifierColumns(ProcedureInput input, Identifier tableIdent) { @@ -214,7 +244,7 @@ private String viewName(ProcedureInput input, String tableName) { } private Dataset applyChangelogIterator(Dataset df, Column[] repartitionSpec) { - Column[] sortSpec = sortSpec(df, repartitionSpec); + Column[] sortSpec = sortSpec(df, repartitionSpec, false); StructType schema = df.schema(); String[] identifierFields = Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new); @@ -228,22 +258,33 @@ private Dataset applyChangelogIterator(Dataset df, Column[] repartitio RowEncoder.apply(schema)); } - private Dataset applyCarryoverRemoveIterator(Dataset df, Column[] repartitionSpec) { - Column[] sortSpec = sortSpec(df, repartitionSpec); + private Dataset applyCarryoverRemoveIterator( + Dataset df, Column[] repartitionSpec, boolean netChanges) { + Column[] sortSpec = sortSpec(df, repartitionSpec, netChanges); StructType schema = df.schema(); return df.repartition(repartitionSpec) .sortWithinPartitions(sortSpec) .mapPartitions( (MapPartitionsFunction) - rowIterator -> ChangelogIterator.removeCarryovers(rowIterator, schema), + rowIterator -> + netChanges + ? ChangelogIterator.removeNetCarryovers(rowIterator, schema) + : ChangelogIterator.removeCarryovers(rowIterator, schema), RowEncoder.apply(schema)); } - private static Column[] sortSpec(Dataset df, Column[] repartitionSpec) { - Column[] sortSpec = new Column[repartitionSpec.length + 1]; + private static Column[] sortSpec(Dataset df, Column[] repartitionSpec, boolean netChanges) { + Column changeType = df.col(MetadataColumns.CHANGE_TYPE.name()); + Column changeOrdinal = df.col(MetadataColumns.CHANGE_ORDINAL.name()); + Column[] extraColumns = + netChanges ? new Column[] {changeOrdinal, changeType} : new Column[] {changeType}; + + Column[] sortSpec = new Column[repartitionSpec.length + extraColumns.length]; + System.arraycopy(repartitionSpec, 0, sortSpec, 0, repartitionSpec.length); - sortSpec[sortSpec.length - 1] = df.col(MetadataColumns.CHANGE_TYPE.name()); + System.arraycopy(extraColumns, 0, sortSpec, repartitionSpec.length, extraColumns.length); + return sortSpec; } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java index bf98bebb9d50..0539598f147e 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java @@ -49,7 +49,17 @@ public class TestChangelogIterator extends SparkTestHelperBase { new StructField("name", DataTypes.StringType, false, Metadata.empty()), new StructField("data", DataTypes.StringType, true, Metadata.empty()), new StructField( - MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()) + MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, false, Metadata.empty()), + new StructField( + MetadataColumns.CHANGE_ORDINAL.name(), + DataTypes.IntegerType, + false, + Metadata.empty()), + new StructField( + MetadataColumns.COMMIT_SNAPSHOT_ID.name(), + DataTypes.LongType, + false, + Metadata.empty()) }); private static final String[] IDENTIFIER_FIELDS = new String[] {"id", "name"}; @@ -93,18 +103,18 @@ private List toOriginalRows(RowType rowType, int index) { switch (rowType) { case DELETED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE}, null)); + new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE, 0, 0}, null)); case INSERTED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT, 0, 0}, null)); case CARRY_OVER: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT, 0, 0}, null)); case UPDATED: return Lists.newArrayList( - new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {index, "a", "new_data", INSERT, 0, 0}, null)); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -114,18 +124,18 @@ private List toExpectedRows(RowType rowType, int order) { switch (rowType) { case DELETED: List rows = Lists.newArrayList(); - rows.add(new Object[] {order, "b", "data", DELETE}); + rows.add(new Object[] {order, "b", "data", DELETE, 0, 0}); return rows; case INSERTED: List insertedRows = Lists.newArrayList(); - insertedRows.add(new Object[] {order, "c", "data", INSERT}); + insertedRows.add(new Object[] {order, "c", "data", INSERT, 0, 0}); return insertedRows; case CARRY_OVER: return Lists.newArrayList(); case UPDATED: return Lists.newArrayList( - new Object[] {order, "a", "data", UPDATE_BEFORE}, - new Object[] {order, "a", "new_data", UPDATE_AFTER}); + new Object[] {order, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {order, "a", "new_data", UPDATE_AFTER, 0, 0}); default: throw new IllegalArgumentException("Unknown row type: " + rowType); } @@ -146,16 +156,16 @@ private void permute(List arr, int start, List pm) { public void testRowsWithNullValue() { final List rowsWithNull = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {2, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {3, null, null, INSERT}, null), - new GenericRowWithSchema(new Object[] {4, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {4, null, null, INSERT}, null), + new GenericRowWithSchema(new Object[] {2, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, null, null, INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {4, null, null, INSERT, 0, 0}, null), // mixed null and non-null value in non-identifier columns - new GenericRowWithSchema(new Object[] {5, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {5, null, "data", INSERT}, null), + new GenericRowWithSchema(new Object[] {5, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {5, null, "data", INSERT, 0, 0}, null), // mixed null and non-null value in identifier columns - new GenericRowWithSchema(new Object[] {6, null, null, DELETE}, null), - new GenericRowWithSchema(new Object[] {6, "name", null, INSERT}, null)); + new GenericRowWithSchema(new Object[] {6, null, null, DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {6, "name", null, INSERT, 0, 0}, null)); Iterator iterator = ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA, IDENTIFIER_FIELDS); @@ -164,12 +174,12 @@ public void testRowsWithNullValue() { assertEquals( "Rows should match", Lists.newArrayList( - new Object[] {2, null, null, DELETE}, - new Object[] {3, null, null, INSERT}, - new Object[] {5, null, null, UPDATE_BEFORE}, - new Object[] {5, null, "data", UPDATE_AFTER}, - new Object[] {6, null, null, DELETE}, - new Object[] {6, "name", null, INSERT}), + new Object[] {2, null, null, DELETE, 0, 0}, + new Object[] {3, null, null, INSERT, 0, 0}, + new Object[] {5, null, null, UPDATE_BEFORE, 0, 0}, + new Object[] {5, null, "data", UPDATE_AFTER, 0, 0}, + new Object[] {6, null, null, DELETE, 0, 0}, + new Object[] {6, "name", null, INSERT, 0, 0}), rowsToJava(result)); } @@ -178,10 +188,10 @@ public void testUpdatedRowsWithDuplication() { List rowsWithDuplication = Lists.newArrayList( // two rows with same identifier fields(id, name) - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 0, 0}, null)); Iterator iterator = ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); @@ -194,9 +204,9 @@ public void testUpdatedRowsWithDuplication() { // still allow extra insert rows rowsWithDuplication = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT}, null)); + new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data1", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data2", INSERT, 0, 0}, null)); Iterator iterator1 = ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), SCHEMA, IDENTIFIER_FIELDS); @@ -204,9 +214,9 @@ public void testUpdatedRowsWithDuplication() { assertEquals( "Rows should match.", Lists.newArrayList( - new Object[] {1, "a", "data", UPDATE_BEFORE}, - new Object[] {1, "a", "new_data1", UPDATE_AFTER}, - new Object[] {1, "a", "new_data2", INSERT}), + new Object[] {1, "a", "data", UPDATE_BEFORE, 0, 0}, + new Object[] {1, "a", "new_data1", UPDATE_AFTER, 0, 0}, + new Object[] {1, "a", "new_data2", INSERT, 0, 0}), rowsToJava(Lists.newArrayList(iterator1))); } @@ -216,32 +226,28 @@ public void testCarryRowsRemoveWithDuplicates() { List rowsWithDuplication = Lists.newArrayList( // keep all delete rows for id 0 and id 1 since there is no insert row for them - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 0, 0}, null), // the same number of delete and insert rows for id 2 - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT}, null)); - - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT, 0, 0}, null)); - assertEquals( - "Rows should match.", + List expectedRows = Lists.newArrayList( - new Object[] {0, "a", "data", DELETE}, - new Object[] {0, "a", "data", DELETE}, - new Object[] {0, "a", "data", DELETE}, - new Object[] {1, "a", "old_data", DELETE}, - new Object[] {1, "a", "old_data", DELETE}, - new Object[] {3, "a", "new_data", INSERT}), - rowsToJava(result)); + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {0, "a", "data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {1, "a", "old_data", DELETE, 0, 0}, + new Object[] {3, "a", "new_data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test @@ -249,45 +255,39 @@ public void testCarryRowsRemoveLessInsertRows() { // less insert rows than delete rows List rowsWithDuplication = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT}, null)); + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT, 0, 0}, null)); - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); - - assertEquals( - "Rows should match.", + List expectedRows = Lists.newArrayList( - new Object[] {1, "d", "data", DELETE}, new Object[] {2, "d", "data", INSERT}), - rowsToJava(result)); + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {2, "d", "data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test public void testCarryRowsRemoveMoreInsertRows() { List rowsWithDuplication = Lists.newArrayList( - new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), // more insert rows than delete rows, should keep extra insert rows - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, null)); - - Iterator iterator = - ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); - List result = Lists.newArrayList(iterator); + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null)); - assertEquals( - "Rows should match.", + List expectedRows = Lists.newArrayList( - new Object[] {0, "d", "data", DELETE}, new Object[] {1, "d", "data", INSERT}), - rowsToJava(result)); + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 0, 0}); + + validateIterators(rowsWithDuplication, expectedRows); } @Test @@ -296,17 +296,64 @@ public void testCarryRowsRemoveNoInsertRows() { List rowsWithDuplication = Lists.newArrayList( // next two rows are identical - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null), - new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, null)); + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {1, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", DELETE, 0, 0}); + validateIterators(rowsWithDuplication, expectedRows); + } + + private void validateIterators(List rowsWithDuplication, List expectedRows) { Iterator iterator = ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), SCHEMA); List result = Lists.newArrayList(iterator); - assertEquals( - "Duplicate rows should not be removed", + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); + + iterator = ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); + result = Lists.newArrayList(iterator); + + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); + } + + @Test + public void testRemoveNetCarryovers() { + List rowsWithDuplication = Lists.newArrayList( - new Object[] {1, "d", "data", DELETE}, new Object[] {1, "d", "data", DELETE}), - rowsToJava(result)); + // this row are different from other rows, it is a net change, should be kept + new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 0}, null), + // a pair of delete and insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 0}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 0}, null), + // 2 delete rows and 2 insert rows, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 1}, null), + // a pair of insert and delete rows across snapshots, should be removed + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 2, 2}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 3, 3}, null), + // extra insert rows, they are net changes, should be kept + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 4}, null), + // different key, net changes, should be kept + new GenericRowWithSchema(new Object[] {2, "d", "data", DELETE, 4, 4}, null)); + + List expectedRows = + Lists.newArrayList( + new Object[] {0, "d", "data", DELETE, 0, 0}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {1, "d", "data", INSERT, 4, 4}, + new Object[] {2, "d", "data", DELETE, 4, 4}); + + Iterator iterator = + ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA); + List result = Lists.newArrayList(iterator); + + assertEquals("Rows should match.", expectedRows, rowsToJava(result)); } }