diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index f250d2e12289..fc66a45154fc 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -354,7 +354,10 @@ private static RewriteResult writeDataFileEntry( DataFile newDataFile = DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); appendEntryWithFile(entry, writer, newDataFile); - result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); + // keep deleted data file entries but exclude them from copyPlan + if (entry.isLive()) { + result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); + } return result; } @@ -381,16 +384,22 @@ private static RewriteResult writeDeleteFileEntry( .withMetrics(metricsWithTargetPath) .build(); appendEntryWithFile(entry, writer, movedFile); - result - .copyPlan() - .add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location())); + // keep deleted position delete entries but exclude them from copyPlan + if (entry.isLive()) { + result + .copyPlan() + .add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location())); + } result.toRewrite().add(file); return result; case EQUALITY_DELETES: DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix); appendEntryWithFile(entry, writer, eqDeleteFile); - // No need to rewrite equality delete files as they do not contain absolute file paths. - result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location())); + // keep deleted equality delete entries but exclude them from copyPlan + if (entry.isLive()) { + // No need to rewrite equality delete files as they do not contain absolute file paths. + result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location())); + } return result; default: diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 644ddcd27ef4..bf36a0e694e8 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -28,6 +28,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -40,6 +41,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -47,6 +49,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.FileHelpers; @@ -57,6 +60,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkCatalog; @@ -64,6 +68,7 @@ import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.SparkEnv; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; @@ -105,14 +110,14 @@ protected ActionsProvider actions() { private final String backupNs = "backupns"; @BeforeEach - public void setupTableLocation() throws Exception { + public void setupTableLocation() { this.tableLocation = tableDir.toFile().toURI().toString(); this.table = createATableWith2Snapshots(tableLocation); createNameSpaces(); } @AfterEach - public void cleanupTableSetup() throws Exception { + public void cleanupTableSetup() { dropNameSpaces(); } @@ -126,6 +131,11 @@ private Table createTableWithSnapshots(String location, int snapshotNumber) { protected Table createTableWithSnapshots( String location, int snapshotNumber, Map properties) { + return createTableWithSnapshots(location, snapshotNumber, properties, "append"); + } + + private Table createTableWithSnapshots( + String location, int snapshotNumber, Map properties, String mode) { Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location); List records = @@ -134,7 +144,7 @@ protected Table createTableWithSnapshots( Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); for (int i = 0; i < snapshotNumber; i++) { - df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + df.select("c1", "c2", "c3").write().format("iceberg").mode(mode).save(location); } return newTable; @@ -478,10 +488,13 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { Table sourceTable = createTableWithSnapshots(location, 2); // expire the first snapshot Table staticTable = newStaticTable(location + "metadata/v2.metadata.json", table.io()); - actions() - .expireSnapshots(sourceTable) - .expireSnapshotId(staticTable.currentSnapshot().snapshotId()) - .execute(); + int expiredManifestListCount = 1; + ExpireSnapshots.Result expireResult = + actions() + .expireSnapshots(sourceTable) + .expireSnapshotId(staticTable.currentSnapshot().snapshotId()) + .execute(); + assertThat(expireResult.deletedManifestListsCount()).isEqualTo(expiredManifestListCount); // create 100 more snapshots List records = @@ -492,9 +505,12 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { } sourceTable.refresh(); + // each iteration generate 1 version file, 1 manifest list, 1 manifest and 1 data file + int totalIteration = 102; // v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and there is no way to find // the first snapshot // from the version file history + int missingVersionFile = 1; RewriteTablePath.Result result = actions() .rewriteTablePath(sourceTable) @@ -502,7 +518,12 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { .rewriteLocationPrefix(location, targetTableLocation()) .execute(); - checkFileNum(101, 101, 101, 406, result); + checkFileNum( + totalIteration - missingVersionFile, + totalIteration - expiredManifestListCount, + totalIteration, + totalIteration * 4 - missingVersionFile - expiredManifestListCount, + result); } @Test @@ -533,14 +554,77 @@ public void testExpireSnapshotBeforeRewrite() throws Exception { checkFileNum(4, 1, 2, 9, result); } + @Test + public void testRewritePathWithNonLiveEntry() throws Exception { + String location = newTableLocation(); + // first overwrite generate 1 manifest and 1 data file + // each subsequent overwrite on unpartitioned table generate 2 manifests and 1 data file + Table tableWith3Snaps = createTableWithSnapshots(location, 3, Maps.newHashMap(), "overwrite"); + + Snapshot oldest = SnapshotUtil.oldestAncestor(tableWith3Snaps); + String oldestDataFilePath = + Iterables.getOnlyElement( + tableWith3Snaps.snapshot(oldest.snapshotId()).addedDataFiles(tableWith3Snaps.io())) + .location(); + String deletedDataFilePathInTargetLocation = + String.format("%sdata/%s", targetTableLocation(), fileName(oldestDataFilePath)); + + // expire the oldest snapshot and remove oldest DataFile + ExpireSnapshots.Result expireResult = + actions().expireSnapshots(tableWith3Snaps).expireSnapshotId(oldest.snapshotId()).execute(); + assertThat(expireResult) + .as("Should deleted 1 data files in root snapshot") + .extracting( + ExpireSnapshots.Result::deletedManifestListsCount, + ExpireSnapshots.Result::deletedManifestsCount, + ExpireSnapshots.Result::deletedDataFilesCount) + .contains(1L, 1L, 1L); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(tableWith3Snaps) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWith3Snaps.location(), targetTableLocation()) + .execute(); + + // 5 version files include 1 table creation 3 overwrite and 1 snapshot expiration + // 3 overwrites generate 3 manifest list and 5 manifests with 3 data files + // snapshot expiration removed 1 of each + checkFileNum(5, 2, 4, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // expect deleted data file is excluded from rewrite and copy + List copiedDataFiles = + spark + .read() + .format("iceberg") + .load(targetTableLocation() + "#all_files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(copiedDataFiles).hasSize(2).doesNotContain(deletedDataFilePathInTargetLocation); + + // expect manifest entries still contain deleted entry + List copiedEntries = + spark + .read() + .format("iceberg") + .load(targetTableLocation() + "#all_entries") + .filter("status == 2") + .select("data_file.file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation); + } + @Test public void testStartSnapshotWithoutValidSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); - assertThat(((List) table.snapshots()).size()) - .withFailMessage("1 out 2 snapshot has been removed") - .isEqualTo(1); + assertThat(table.snapshots()).hasSize(1); RewriteTablePath.Result result = actions() @@ -573,7 +657,7 @@ public void testMoveTheVersionExpireSnapshot() throws Exception { } @Test - public void testMoveVersionWithInvalidSnapshots() throws Exception { + public void testMoveVersionWithInvalidSnapshots() { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -940,16 +1024,20 @@ protected void checkFileNum( .load(result.fileListLocation()) .as(Encoders.STRING()) .collectAsList(); - assertThat(filesToMove.stream().filter(f -> f.endsWith(".metadata.json")).count()) - .withFailMessage("Wrong rebuilt version file count") + Predicate isManifest = f -> f.endsWith("-m0.avro") || f.endsWith("-m1.avro"); + Predicate isManifestList = f -> f.contains("snap-") && f.endsWith(".avro"); + Predicate isMetadataJSON = f -> f.endsWith(".metadata.json"); + + assertThat(filesToMove.stream().filter(isMetadataJSON).count()) + .as("Wrong rebuilt version file count") .isEqualTo(versionFileCount); - assertThat(filesToMove.stream().filter(f -> f.contains("snap-")).count()) - .withFailMessage("Wrong rebuilt Manifest list file count") + assertThat(filesToMove.stream().filter(isManifestList).count()) + .as("Wrong rebuilt Manifest list file count") .isEqualTo(manifestListCount); - assertThat(filesToMove.stream().filter(f -> f.endsWith("-m0.avro")).count()) - .withFailMessage("Wrong rebuilt Manifest file file count") + assertThat(filesToMove.stream().filter(isManifest).count()) + .as("Wrong rebuilt Manifest file file count") .isEqualTo(manifestFileCount); - assertThat(filesToMove.size()).withFailMessage("Wrong total file count").isEqualTo(totalCount); + assertThat(filesToMove.size()).as("Wrong total file count").isEqualTo(totalCount); } protected String newTableLocation() throws IOException {