From f1846523e7bc13a50d4bf1dac4ef3892799c0169 Mon Sep 17 00:00:00 2001 From: Hongyue Zhang Date: Sat, 18 Jan 2025 21:03:10 -0800 Subject: [PATCH 1/6] Core, Spark: Scan only live entries in RewriteTablePathUtil --- .../apache/iceberg/RewriteTablePathUtil.java | 4 +- .../actions/TestRewriteTablePathsAction.java | 108 ++++++++++++++---- 2 files changed, 90 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index f250d2e12289..a30eb91a0756 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -291,7 +291,7 @@ public static RewriteResult rewriteDataManifest( ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); ManifestReader reader = ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { - return StreamSupport.stream(reader.entries().spliterator(), false) + return StreamSupport.stream(reader.liveEntries().spliterator(), false) .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer)) .reduce(new RewriteResult<>(), RewriteResult::append); } @@ -327,7 +327,7 @@ public static RewriteResult rewriteDeleteManifest( ManifestReader reader = ManifestFiles.readDeleteManifest(manifestFile, io, specsById) .select(Arrays.asList("*"))) { - return StreamSupport.stream(reader.entries().spliterator(), false) + return StreamSupport.stream(reader.liveEntries().spliterator(), false) .map( entry -> writeDeleteFileEntry( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 644ddcd27ef4..683354fad92c 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -28,6 +28,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -47,6 +48,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.FileHelpers; @@ -66,6 +68,7 @@ import org.apache.iceberg.util.Pair; import org.apache.spark.SparkEnv; import org.apache.spark.broadcast.Broadcast; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; @@ -105,14 +108,14 @@ protected ActionsProvider actions() { private final String backupNs = "backupns"; @BeforeEach - public void setupTableLocation() throws Exception { + public void setupTableLocation() { this.tableLocation = tableDir.toFile().toURI().toString(); this.table = createATableWith2Snapshots(tableLocation); createNameSpaces(); } @AfterEach - public void cleanupTableSetup() throws Exception { + public void cleanupTableSetup() { dropNameSpaces(); } @@ -126,6 +129,11 @@ private Table createTableWithSnapshots(String location, int snapshotNumber) { protected Table createTableWithSnapshots( String location, int snapshotNumber, Map properties) { + return createTableWithSnapshots(location, snapshotNumber, properties, "append"); + } + + private Table createTableWithSnapshots( + String location, int snapshotNumber, Map properties, String mode) { Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location); List records = @@ -134,7 +142,7 @@ protected Table createTableWithSnapshots( Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); for (int i = 0; i < snapshotNumber; i++) { - df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + df.select("c1", "c2", "c3").write().format("iceberg").mode(mode).save(location); } return newTable; @@ -478,10 +486,13 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { Table sourceTable = createTableWithSnapshots(location, 2); // expire the first snapshot Table staticTable = newStaticTable(location + "metadata/v2.metadata.json", table.io()); - actions() - .expireSnapshots(sourceTable) - .expireSnapshotId(staticTable.currentSnapshot().snapshotId()) - .execute(); + int expiredManifestListCount = 1; + ExpireSnapshots.Result expireResult = + actions() + .expireSnapshots(sourceTable) + .expireSnapshotId(staticTable.currentSnapshot().snapshotId()) + .execute(); + assertThat(expireResult.deletedManifestListsCount()).isEqualTo(expiredManifestListCount); // create 100 more snapshots List records = @@ -492,9 +503,12 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { } sourceTable.refresh(); + // each iteration generate 1 version file, 1 manifest list, 1 manifest and 1 data file + int totalIteration = 102; // v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and there is no way to find // the first snapshot // from the version file history + int missingVersionFile = 1; RewriteTablePath.Result result = actions() .rewriteTablePath(sourceTable) @@ -502,7 +516,12 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { .rewriteLocationPrefix(location, targetTableLocation()) .execute(); - checkFileNum(101, 101, 101, 406, result); + checkFileNum( + totalIteration - missingVersionFile, + totalIteration - expiredManifestListCount, + totalIteration, + totalIteration * 4 - missingVersionFile - expiredManifestListCount, + result); } @Test @@ -533,14 +552,59 @@ public void testExpireSnapshotBeforeRewrite() throws Exception { checkFileNum(4, 1, 2, 9, result); } + @Test + public void testRewritePathWithNonLiveEntry() throws Exception { + String location = newTableLocation(); + int overwriteCount = 3; + // first overwrite generate 1 manifest and 1 data file + // each subsequent overwrite on unpartitioned table generate 2 manifests and 1 data file + Table tableWith3Snaps = createTableWithSnapshots(location, 3, Maps.newHashMap(), "overwrite"); + + // check the data file location before the rebuild + List validDataFiles = + spark + .read() + .format("iceberg") + .load(tableWith3Snaps + "#all_files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(validDataFiles) + .as("Should have 3 data files in 3 snapshots") + .hasSize(overwriteCount); + + // expire first snapshot + ExpireSnapshots.Result expireResult = + actions() + .expireSnapshots(tableWith3Snaps) + .expireSnapshotId(SnapshotUtil.oldestAncestor(tableWith3Snaps).snapshotId()) + .execute(); + + // expire first out of 3 snapshots + assertThat(expireResult) + .as("Should deleted 1 data files in root snapshot") + .extracting( + ExpireSnapshots.Result::deletedManifestListsCount, + ExpireSnapshots.Result::deletedManifestsCount, + ExpireSnapshots.Result::deletedDataFilesCount) + .contains(1L, 1L, 1L); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(tableWith3Snaps) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWith3Snaps.location(), targetTableLocation()) + .execute(); + + checkFileNum(5, 2, 4, 13, result); + } + @Test public void testStartSnapshotWithoutValidSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); - assertThat(((List) table.snapshots()).size()) - .withFailMessage("1 out 2 snapshot has been removed") - .isEqualTo(1); + assertThat(table.snapshots()).withFailMessage("1 out 2 snapshot has been removed").hasSize(1); RewriteTablePath.Result result = actions() @@ -573,7 +637,7 @@ public void testMoveTheVersionExpireSnapshot() throws Exception { } @Test - public void testMoveVersionWithInvalidSnapshots() throws Exception { + public void testMoveVersionWithInvalidSnapshots() { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -940,16 +1004,20 @@ protected void checkFileNum( .load(result.fileListLocation()) .as(Encoders.STRING()) .collectAsList(); - assertThat(filesToMove.stream().filter(f -> f.endsWith(".metadata.json")).count()) - .withFailMessage("Wrong rebuilt version file count") + Predicate isManifest = f -> (f.endsWith("-m0.avro")) || (f.endsWith("-m1.avro")); + Predicate isManifestList = f -> (f.contains("snap-")) && (f.endsWith(".avro")); + Predicate isMetadataJSON = f -> (f.endsWith(".metadata.json")); + + assertThat(filesToMove.stream().filter(isMetadataJSON).count()) + .as("Wrong rebuilt version file count") .isEqualTo(versionFileCount); - assertThat(filesToMove.stream().filter(f -> f.contains("snap-")).count()) - .withFailMessage("Wrong rebuilt Manifest list file count") + assertThat(filesToMove.stream().filter(isManifestList).count()) + .as("Wrong rebuilt Manifest list file count") .isEqualTo(manifestListCount); - assertThat(filesToMove.stream().filter(f -> f.endsWith("-m0.avro")).count()) - .withFailMessage("Wrong rebuilt Manifest file file count") + assertThat(filesToMove.stream().filter(isManifest).count()) + .as("Wrong rebuilt Manifest file file count") .isEqualTo(manifestFileCount); - assertThat(filesToMove.size()).withFailMessage("Wrong total file count").isEqualTo(totalCount); + assertThat(filesToMove.size()).as("Wrong total file count").isEqualTo(totalCount); } protected String newTableLocation() throws IOException { @@ -964,7 +1032,7 @@ protected String stagingLocation() throws IOException { return toAbsolute(staging); } - protected String toAbsolute(Path relative) throws IOException { + protected String toAbsolute(Path relative) { return relative.toFile().toURI().toString(); } From e7fc2ed752444750118bc6f3975db2cb836eb8c8 Mon Sep 17 00:00:00 2001 From: Hongyue Zhang Date: Thu, 23 Jan 2025 22:48:38 -0800 Subject: [PATCH 2/6] Exclude deleted data files but kept deleted entry for CDC --- .../apache/iceberg/RewriteTablePathUtil.java | 25 +++++--- .../actions/RewriteTablePathSparkAction.java | 2 +- .../actions/TestRewriteTablePathsAction.java | 58 ++++++++++++------- 3 files changed, 56 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index a30eb91a0756..61ef8a85a31a 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -291,7 +291,7 @@ public static RewriteResult rewriteDataManifest( ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); ManifestReader reader = ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { - return StreamSupport.stream(reader.liveEntries().spliterator(), false) + return StreamSupport.stream(reader.entries().spliterator(), false) .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer)) .reduce(new RewriteResult<>(), RewriteResult::append); } @@ -327,7 +327,7 @@ public static RewriteResult rewriteDeleteManifest( ManifestReader reader = ManifestFiles.readDeleteManifest(manifestFile, io, specsById) .select(Arrays.asList("*"))) { - return StreamSupport.stream(reader.liveEntries().spliterator(), false) + return StreamSupport.stream(reader.entries().spliterator(), false) .map( entry -> writeDeleteFileEntry( @@ -354,7 +354,10 @@ private static RewriteResult writeDataFileEntry( DataFile newDataFile = DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); appendEntryWithFile(entry, writer, newDataFile); - result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); + // Keep non-live entry but exclude deleted data files as part of copyPlan + if (entry.isLive()) { + result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); + } return result; } @@ -381,16 +384,22 @@ private static RewriteResult writeDeleteFileEntry( .withMetrics(metricsWithTargetPath) .build(); appendEntryWithFile(entry, writer, movedFile); - result - .copyPlan() - .add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location())); + // Keep non-live entry but exclude deleted position delete files as part of copyPlan + if (entry.isLive()) { + result + .copyPlan() + .add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location())); + } result.toRewrite().add(file); return result; case EQUALITY_DELETES: DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix); appendEntryWithFile(entry, writer, eqDeleteFile); - // No need to rewrite equality delete files as they do not contain absolute file paths. - result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location())); + // Keep non-live entry but exclude deleted equality delete files as part of copyPlan + if (entry.isLive()) { + // No need to rewrite equality delete files as they do not contain absolute file paths. + result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location())); + } return result; default: diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 55888f7f5e82..cf11797db72a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -684,7 +684,7 @@ private boolean fileExist(String path) { return table.io().newInputFile(path).exists(); } - private static String newPath(String path, String sourcePrefix, String targetPrefix) { + static String newPath(String path, String sourcePrefix, String targetPrefix) { return RewriteTablePathUtil.combinePaths( targetPrefix, RewriteTablePathUtil.relativize(path, sourcePrefix)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 683354fad92c..3888d09f29e7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -41,6 +41,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -59,6 +60,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkCatalog; @@ -555,32 +557,22 @@ public void testExpireSnapshotBeforeRewrite() throws Exception { @Test public void testRewritePathWithNonLiveEntry() throws Exception { String location = newTableLocation(); - int overwriteCount = 3; // first overwrite generate 1 manifest and 1 data file // each subsequent overwrite on unpartitioned table generate 2 manifests and 1 data file Table tableWith3Snaps = createTableWithSnapshots(location, 3, Maps.newHashMap(), "overwrite"); - // check the data file location before the rebuild - List validDataFiles = - spark - .read() - .format("iceberg") - .load(tableWith3Snaps + "#all_files") - .select("file_path") - .as(Encoders.STRING()) - .collectAsList(); - assertThat(validDataFiles) - .as("Should have 3 data files in 3 snapshots") - .hasSize(overwriteCount); + Snapshot oldest = SnapshotUtil.oldestAncestor(tableWith3Snaps); + String oldestDataFilePath = + Iterables.getOnlyElement( + tableWith3Snaps.snapshot(oldest.snapshotId()).addedDataFiles(tableWith3Snaps.io())) + .location(); + String deletedDataFilePath = + RewriteTablePathSparkAction.newPath( + oldestDataFilePath, tableWith3Snaps.location(), targetTableLocation()); - // expire first snapshot + // expire the oldest snapshot and remove oldest DataFile ExpireSnapshots.Result expireResult = - actions() - .expireSnapshots(tableWith3Snaps) - .expireSnapshotId(SnapshotUtil.oldestAncestor(tableWith3Snaps).snapshotId()) - .execute(); - - // expire first out of 3 snapshots + actions().expireSnapshots(tableWith3Snaps).expireSnapshotId(oldest.snapshotId()).execute(); assertThat(expireResult) .as("Should deleted 1 data files in root snapshot") .extracting( @@ -597,6 +589,32 @@ public void testRewritePathWithNonLiveEntry() throws Exception { .execute(); checkFileNum(5, 2, 4, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // expect deleted data file is excluded from rewrite and copy + List copiedDataFiles = + spark + .read() + .format("iceberg") + .load(targetTableLocation() + "#all_files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(copiedDataFiles).hasSize(2).doesNotContain(deletedDataFilePath); + + // expect manifest entries still contain deleted entry + List copiedEntries = + spark + .read() + .format("iceberg") + .load(targetTableLocation() + "#all_entries") + .filter("status == 2") + .select("data_file.file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(copiedEntries).contains(deletedDataFilePath); } @Test From ebf5409237bbd6c0700f39d9a0201d06a0d8da5d Mon Sep 17 00:00:00 2001 From: Hongyue Zhang Date: Fri, 24 Jan 2025 08:26:57 -0800 Subject: [PATCH 3/6] Address szehon feedback on unit tests --- .../spark/actions/RewriteTablePathSparkAction.java | 2 +- .../spark/actions/TestRewriteTablePathsAction.java | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index cf11797db72a..55888f7f5e82 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -684,7 +684,7 @@ private boolean fileExist(String path) { return table.io().newInputFile(path).exists(); } - static String newPath(String path, String sourcePrefix, String targetPrefix) { + private static String newPath(String path, String sourcePrefix, String targetPrefix) { return RewriteTablePathUtil.combinePaths( targetPrefix, RewriteTablePathUtil.relativize(path, sourcePrefix)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 3888d09f29e7..af69c8925e20 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -566,9 +566,8 @@ public void testRewritePathWithNonLiveEntry() throws Exception { Iterables.getOnlyElement( tableWith3Snaps.snapshot(oldest.snapshotId()).addedDataFiles(tableWith3Snaps.io())) .location(); - String deletedDataFilePath = - RewriteTablePathSparkAction.newPath( - oldestDataFilePath, tableWith3Snaps.location(), targetTableLocation()); + String deletedDataFilePathInTargetLocation = + String.format("%sdata/%s", targetTableLocation(), fileName(oldestDataFilePath)); // expire the oldest snapshot and remove oldest DataFile ExpireSnapshots.Result expireResult = @@ -588,6 +587,9 @@ public void testRewritePathWithNonLiveEntry() throws Exception { .rewriteLocationPrefix(tableWith3Snaps.location(), targetTableLocation()) .execute(); + // 5 version files include 1 table creation 3 overwrite and 1 snapshot expiration + // 3 overwrites generate 3 manifest list and 5 manifests with 3 data files + // snapshot expiration removed 1 of each checkFileNum(5, 2, 4, 13, result); // copy the metadata files and data files @@ -602,7 +604,7 @@ public void testRewritePathWithNonLiveEntry() throws Exception { .select("file_path") .as(Encoders.STRING()) .collectAsList(); - assertThat(copiedDataFiles).hasSize(2).doesNotContain(deletedDataFilePath); + assertThat(copiedDataFiles).hasSize(2).doesNotContain(deletedDataFilePathInTargetLocation); // expect manifest entries still contain deleted entry List copiedEntries = @@ -614,7 +616,7 @@ public void testRewritePathWithNonLiveEntry() throws Exception { .select("data_file.file_path") .as(Encoders.STRING()) .collectAsList(); - assertThat(copiedEntries).contains(deletedDataFilePath); + assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation); } @Test @@ -1050,7 +1052,7 @@ protected String stagingLocation() throws IOException { return toAbsolute(staging); } - protected String toAbsolute(Path relative) { + protected String toAbsolute(Path relative) throws IOException { return relative.toFile().toURI().toString(); } From 848d50aea7fcd473297af62c0eccd9526497d101 Mon Sep 17 00:00:00 2001 From: Hongyue Zhang Date: Sun, 26 Jan 2025 08:31:29 -0800 Subject: [PATCH 4/6] SpotelessApply --- .../iceberg/spark/actions/TestRewriteTablePathsAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index af69c8925e20..d8b7fc09b155 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -68,9 +68,9 @@ import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.SparkEnv; import org.apache.spark.broadcast.Broadcast; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; From c7823ba85597999f1cc9b1ae800a5ef2721180fc Mon Sep 17 00:00:00 2001 From: Hongyue Zhang Date: Thu, 6 Feb 2025 13:29:26 -0800 Subject: [PATCH 5/6] Address szehon feedback --- .../main/java/org/apache/iceberg/RewriteTablePathUtil.java | 6 +++--- .../iceberg/spark/actions/TestRewriteTablePathsAction.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 61ef8a85a31a..fc66a45154fc 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -354,7 +354,7 @@ private static RewriteResult writeDataFileEntry( DataFile newDataFile = DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); appendEntryWithFile(entry, writer, newDataFile); - // Keep non-live entry but exclude deleted data files as part of copyPlan + // keep deleted data file entries but exclude them from copyPlan if (entry.isLive()) { result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); } @@ -384,7 +384,7 @@ private static RewriteResult writeDeleteFileEntry( .withMetrics(metricsWithTargetPath) .build(); appendEntryWithFile(entry, writer, movedFile); - // Keep non-live entry but exclude deleted position delete files as part of copyPlan + // keep deleted position delete entries but exclude them from copyPlan if (entry.isLive()) { result .copyPlan() @@ -395,7 +395,7 @@ private static RewriteResult writeDeleteFileEntry( case EQUALITY_DELETES: DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix); appendEntryWithFile(entry, writer, eqDeleteFile); - // Keep non-live entry but exclude deleted equality delete files as part of copyPlan + // keep deleted equality delete entries but exclude them from copyPlan if (entry.isLive()) { // No need to rewrite equality delete files as they do not contain absolute file paths. result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location())); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index d8b7fc09b155..34a73f3222d5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -624,7 +624,7 @@ public void testStartSnapshotWithoutValidSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); - assertThat(table.snapshots()).withFailMessage("1 out 2 snapshot has been removed").hasSize(1); + assertThat(table.snapshots()).hasSize(1); RewriteTablePath.Result result = actions() @@ -1024,7 +1024,7 @@ protected void checkFileNum( .load(result.fileListLocation()) .as(Encoders.STRING()) .collectAsList(); - Predicate isManifest = f -> (f.endsWith("-m0.avro")) || (f.endsWith("-m1.avro")); + Predicate isManifest = f -> (f.endsWith("-m0.avro") || f.endsWith("-m1.avro")); Predicate isManifestList = f -> (f.contains("snap-")) && (f.endsWith(".avro")); Predicate isMetadataJSON = f -> (f.endsWith(".metadata.json")); From bbebcd92381bdf248f93cdce376bf0fb4da71b97 Mon Sep 17 00:00:00 2001 From: Hongyue Zhang Date: Thu, 6 Feb 2025 14:18:43 -0800 Subject: [PATCH 6/6] Reduce brackets --- .../iceberg/spark/actions/TestRewriteTablePathsAction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 34a73f3222d5..bf36a0e694e8 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -1024,9 +1024,9 @@ protected void checkFileNum( .load(result.fileListLocation()) .as(Encoders.STRING()) .collectAsList(); - Predicate isManifest = f -> (f.endsWith("-m0.avro") || f.endsWith("-m1.avro")); - Predicate isManifestList = f -> (f.contains("snap-")) && (f.endsWith(".avro")); - Predicate isMetadataJSON = f -> (f.endsWith(".metadata.json")); + Predicate isManifest = f -> f.endsWith("-m0.avro") || f.endsWith("-m1.avro"); + Predicate isManifestList = f -> f.contains("snap-") && f.endsWith(".avro"); + Predicate isMetadataJSON = f -> f.endsWith(".metadata.json"); assertThat(filesToMove.stream().filter(isMetadataJSON).count()) .as("Wrong rebuilt version file count")