diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index 4f0c27fdcc75..cd12131bd04d 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS, TARGET_FILE_SIZE_BYTES, + USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER ); @@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction private int maxConcurrentFileGroupRewrites; private int maxCommits; private boolean partialProgressEnabled; + private boolean useStartingSequenceNumber; private RewriteJobOrder rewriteJobOrder; private RewriteStrategy strategy = null; @@ -248,7 +250,7 @@ private ExecutorService rewriteService() { @VisibleForTesting RewriteDataFilesCommitManager commitManager(long startingSnapshotId) { - return new RewriteDataFilesCommitManager(table, startingSnapshotId); + return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber); } private Result doExecute(RewriteExecutionContext ctx, Stream groupStream, @@ -396,6 +398,10 @@ void validateAndInitOptions() { PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED_DEFAULT); + useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(), + USE_STARTING_SEQUENCE_NUMBER, + USE_STARTING_SEQUENCE_NUMBER_DEFAULT); + rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER, REWRITE_JOB_ORDER_DEFAULT)); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 2e62fb0b2503..3f465fe72223 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -39,6 +39,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; @@ -74,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.FileScanTaskSetManager; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext; import org.apache.iceberg.spark.source.ThreeColumnRecord; @@ -277,6 +279,67 @@ public void testBinPackWithDeleteAllData() { (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount()); } + @Test + public void testBinPackWithStartingSequenceNumber() { + Table table = createTablePartitioned(4, 2); + shouldHaveFiles(table, 8); + List expectedRecords = currentData(); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + table.refresh(); + long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); + + Result result = basicRewrite(table) + .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true") + .execute(); + Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + + shouldHaveFiles(table, 4); + List actualRecords = currentData(); + assertEquals("Rows must match", expectedRecords, actualRecords); + + table.refresh(); + Assert.assertTrue("Table sequence number should be incremented", + oldSequenceNumber < table.currentSnapshot().sequenceNumber()); + + Dataset rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES); + for (Row row : rows.collectAsList()) { + if (row.getInt(0) == 1) { + Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2)); + } + } + } + + @Test + public void testBinPackWithStartingSequenceNumberV1Compatibility() { + Table table = createTablePartitioned(4, 2); + shouldHaveFiles(table, 8); + List expectedRecords = currentData(); + table.refresh(); + long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); + Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber); + + Result result = basicRewrite(table) + .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true") + .execute(); + Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + + shouldHaveFiles(table, 4); + List actualRecords = currentData(); + assertEquals("Rows must match", expectedRecords, actualRecords); + + table.refresh(); + Assert.assertEquals("Table sequence number should still be 0", + oldSequenceNumber, table.currentSnapshot().sequenceNumber()); + + Dataset rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES); + for (Row row : rows.collectAsList()) { + Assert.assertEquals("Expect sequence number 0 for all entries", + oldSequenceNumber, row.getLong(2)); + } + } + @Test public void testRewriteLargeTableHasResiduals() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index 1cbfa4cecf27..5c3c349cd835 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS, TARGET_FILE_SIZE_BYTES, + USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER ); @@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction private int maxConcurrentFileGroupRewrites; private int maxCommits; private boolean partialProgressEnabled; + private boolean useStartingSequenceNumber; private RewriteJobOrder rewriteJobOrder; private RewriteStrategy strategy = null; @@ -248,7 +250,7 @@ private ExecutorService rewriteService() { @VisibleForTesting RewriteDataFilesCommitManager commitManager(long startingSnapshotId) { - return new RewriteDataFilesCommitManager(table, startingSnapshotId); + return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber); } private Result doExecute(RewriteExecutionContext ctx, Stream groupStream, @@ -395,6 +397,10 @@ void validateAndInitOptions() { PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED_DEFAULT); + useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(), + USE_STARTING_SEQUENCE_NUMBER, + USE_STARTING_SEQUENCE_NUMBER_DEFAULT); + rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER, REWRITE_JOB_ORDER_DEFAULT)); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 2e62fb0b2503..3f465fe72223 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -39,6 +39,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; @@ -74,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.FileScanTaskSetManager; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext; import org.apache.iceberg.spark.source.ThreeColumnRecord; @@ -277,6 +279,67 @@ public void testBinPackWithDeleteAllData() { (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount()); } + @Test + public void testBinPackWithStartingSequenceNumber() { + Table table = createTablePartitioned(4, 2); + shouldHaveFiles(table, 8); + List expectedRecords = currentData(); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + table.refresh(); + long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); + + Result result = basicRewrite(table) + .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true") + .execute(); + Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + + shouldHaveFiles(table, 4); + List actualRecords = currentData(); + assertEquals("Rows must match", expectedRecords, actualRecords); + + table.refresh(); + Assert.assertTrue("Table sequence number should be incremented", + oldSequenceNumber < table.currentSnapshot().sequenceNumber()); + + Dataset rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES); + for (Row row : rows.collectAsList()) { + if (row.getInt(0) == 1) { + Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2)); + } + } + } + + @Test + public void testBinPackWithStartingSequenceNumberV1Compatibility() { + Table table = createTablePartitioned(4, 2); + shouldHaveFiles(table, 8); + List expectedRecords = currentData(); + table.refresh(); + long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); + Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber); + + Result result = basicRewrite(table) + .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true") + .execute(); + Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + + shouldHaveFiles(table, 4); + List actualRecords = currentData(); + assertEquals("Rows must match", expectedRecords, actualRecords); + + table.refresh(); + Assert.assertEquals("Table sequence number should still be 0", + oldSequenceNumber, table.currentSnapshot().sequenceNumber()); + + Dataset rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES); + for (Row row : rows.collectAsList()) { + Assert.assertEquals("Expect sequence number 0 for all entries", + oldSequenceNumber, row.getLong(2)); + } + } + @Test public void testRewriteLargeTableHasResiduals() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();