diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 794a2625f9c5..178b89ad213a 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; @@ -225,11 +224,7 @@ public static RewriteResult rewriteManifestList( OutputFile outputFile = io.newOutputFile(outputPath); List manifestFiles = manifestFilesInSnapshot(io, snapshot); - List manifestFilesToRewrite = - manifestFiles.stream() - .filter(mf -> manifestsToRewrite.contains(mf.path())) - .collect(Collectors.toList()); - manifestFilesToRewrite.forEach( + manifestFiles.forEach( mf -> Preconditions.checkArgument( mf.path().startsWith(sourcePrefix), @@ -245,13 +240,15 @@ public static RewriteResult rewriteManifestList( snapshot.parentId(), snapshot.sequenceNumber())) { - for (ManifestFile file : manifestFilesToRewrite) { + for (ManifestFile file : manifestFiles) { ManifestFile newFile = file.copy(); ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, targetPrefix)); writer.add(newFile); - result.toRewrite().add(file); - result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path())); + if (manifestsToRewrite.contains(file.path())) { + result.toRewrite().add(file); + result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path())); + } } return result; } catch (IOException e) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index bf36a0e694e8..c69a270d8f17 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -245,6 +245,52 @@ public void testStartVersion() throws Exception { .isEqualTo(0); } + @Test + public void testIncrementalRewrite() throws Exception { + String location = newTableLocation(); + Table sourceTable = + TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), location); + List recordsA = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset dfA = spark.createDataFrame(recordsA, ThreeColumnRecord.class).coalesce(1); + + // Write first increment to source table + dfA.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1); + + // Replicate first increment to target table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTable.location(), targetTableLocation()) + .execute(); + copyTableFiles(result); + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1); + + // Write second increment to source table + List recordsB = + Lists.newArrayList(new ThreeColumnRecord(2, "BBBBBBBBB", "BBB")); + Dataset dfB = spark.createDataFrame(recordsB, ThreeColumnRecord.class).coalesce(1); + dfB.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2); + + // Replicate second increment to target table + sourceTable.refresh(); + Table targetTable = TABLES.load(targetTableLocation()); + String targetTableMetadata = currentMetadata(targetTable).metadataFileLocation(); + String startVersion = fileName(targetTableMetadata); + RewriteTablePath.Result incrementalRewriteResult = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTable.location(), targetTableLocation()) + .startVersion(startVersion) + .execute(); + copyTableFiles(incrementalRewriteResult); + List actual = rowsSorted(targetTableLocation(), "c1"); + List expected = rowsSorted(location, "c1"); + assertEquals("Rows should match after copy", expected, actual); + } + @Test public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2) throws Exception { @@ -1144,6 +1190,10 @@ private List rows(String location) { return rowsToJava(spark.read().format("iceberg").load(location).collectAsList()); } + private List rowsSorted(String location, String sortCol) { + return rowsToJava(spark.read().format("iceberg").load(location).sort(sortCol).collectAsList()); + } + private PositionDelete positionDelete( Schema tableSchema, CharSequence path, Long position, Object... values) { PositionDelete posDelete = PositionDelete.create();