Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER
);

Expand All @@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;

Expand Down Expand Up @@ -248,7 +250,7 @@ private ExecutorService rewriteService() {

@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(table, startingSnapshotId);
return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
}

private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
Expand Down Expand Up @@ -396,6 +398,10 @@ void validateAndInitOptions() {
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_ENABLED_DEFAULT);

useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
USE_STARTING_SEQUENCE_NUMBER,
USE_STARTING_SEQUENCE_NUMBER_DEFAULT);

rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
REWRITE_JOB_ORDER,
REWRITE_JOB_ORDER_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
Expand Down Expand Up @@ -277,6 +279,67 @@ public void testBinPackWithDeleteAllData() {
(long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
}

@Test
public void testBinPackWithStartingSequenceNumber() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();

Result result = basicRewrite(table)
.option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
.execute();
Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());

shouldHaveFiles(table, 4);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);

table.refresh();
Assert.assertTrue("Table sequence number should be incremented",
oldSequenceNumber < table.currentSnapshot().sequenceNumber());

Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
for (Row row : rows.collectAsList()) {
if (row.getInt(0) == 1) {
Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2));
}
}
}

@Test
public void testBinPackWithStartingSequenceNumberV1Compatibility() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber);

Result result = basicRewrite(table)
.option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
.execute();
Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());

shouldHaveFiles(table, 4);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);

table.refresh();
Assert.assertEquals("Table sequence number should still be 0",
oldSequenceNumber, table.currentSnapshot().sequenceNumber());

Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
for (Row row : rows.collectAsList()) {
Assert.assertEquals("Expect sequence number 0 for all entries",
oldSequenceNumber, row.getLong(2));
}
}

@Test
public void testRewriteLargeTableHasResiduals() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER
);

Expand All @@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;

Expand Down Expand Up @@ -248,7 +250,7 @@ private ExecutorService rewriteService() {

@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(table, startingSnapshotId);
return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
}

private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
Expand Down Expand Up @@ -395,6 +397,10 @@ void validateAndInitOptions() {
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_ENABLED_DEFAULT);

useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
USE_STARTING_SEQUENCE_NUMBER,
USE_STARTING_SEQUENCE_NUMBER_DEFAULT);

rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
REWRITE_JOB_ORDER,
REWRITE_JOB_ORDER_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
Expand Down Expand Up @@ -277,6 +279,67 @@ public void testBinPackWithDeleteAllData() {
(long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
}

@Test
public void testBinPackWithStartingSequenceNumber() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();

Result result = basicRewrite(table)
.option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
.execute();
Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());

shouldHaveFiles(table, 4);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);

table.refresh();
Assert.assertTrue("Table sequence number should be incremented",
oldSequenceNumber < table.currentSnapshot().sequenceNumber());

Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
for (Row row : rows.collectAsList()) {
if (row.getInt(0) == 1) {
Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2));
}
}
}

@Test
public void testBinPackWithStartingSequenceNumberV1Compatibility() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber);

Result result = basicRewrite(table)
.option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
.execute();
Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());

shouldHaveFiles(table, 4);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);

table.refresh();
Assert.assertEquals("Table sequence number should still be 0",
oldSequenceNumber, table.currentSnapshot().sequenceNumber());

Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
for (Row row : rows.collectAsList()) {
Assert.assertEquals("Expect sequence number 0 for all entries",
oldSequenceNumber, row.getLong(2));
}
}

@Test
public void testRewriteLargeTableHasResiduals() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
Expand Down