diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 1c871ba30c04..5f981040ea08 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1262,6 +1262,12 @@ acceptedBreaks: \ java.lang.Object[]) throws java.lang.Exception" justification: "Reduce visibilty of deprecated method" org.apache.iceberg:iceberg-core: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.RewriteTablePathUtil.RewriteResult" + new: "class org.apache.iceberg.RewriteTablePathUtil.RewriteResult" + justification: "Serialization across versions is not supported" - code: "java.class.removed" old: "class org.apache.iceberg.MetadataUpdate.EnableRowLineage" justification: "Removing deprecations for 1.10.0" diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 9cff49f601d8..4e5ddf17c4b1 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -64,6 +64,7 @@ private RewriteTablePathUtil() {} public static class RewriteResult implements Serializable { private final Set toRewrite = Sets.newHashSet(); private final Set> copyPlan = Sets.newHashSet(); + private Long length = null; public RewriteResult() {} @@ -85,6 +86,14 @@ public Set toRewrite() { public Set> copyPlan() { return copyPlan; } + + public Long length() { + return length; + } + + public void length(long newLength) { + this.length = newLength; + } } /** @@ -226,7 +235,9 @@ private static List updatePathInSnapshots( * @param outputPath location to write the manifest list * @return a copy plan for manifest files whose metadata were contained in the rewritten manifest * list + * @deprecated since 1.10.0, will be removed in 1.11.0 */ + @Deprecated public static RewriteResult rewriteManifestList( Snapshot snapshot, FileIO io, @@ -276,6 +287,62 @@ public static RewriteResult rewriteManifestList( } } + /** + * Rewrite a manifest list representing a snapshot, replacing path references. + * + * @param snapshot snapshot represented by the manifest list + * @param io file io + * @param tableMetadata metadata of table + * @param rewrittenManifestLengths rewritten manifest files and their sizes + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @param outputPath location to write the manifest list + */ + public static void rewriteManifestList( + Snapshot snapshot, + FileIO io, + TableMetadata tableMetadata, + Map rewrittenManifestLengths, + String sourcePrefix, + String targetPrefix, + String outputPath) { + OutputFile outputFile = io.newOutputFile(outputPath); + + List manifestFiles = manifestFilesInSnapshot(io, snapshot); + manifestFiles.forEach( + mf -> { + Preconditions.checkArgument( + mf.path().startsWith(sourcePrefix), + "Encountered manifest file %s not under the source prefix %s", + mf.path(), + sourcePrefix); + Preconditions.checkArgument( + rewrittenManifestLengths.get(mf.path()) != null, + "Encountered manifest file %s that was not rewritten or has null length", + mf.path()); + }); + + try (FileAppender writer = + ManifestLists.write( + tableMetadata.formatVersion(), + outputFile, + snapshot.snapshotId(), + snapshot.parentId(), + snapshot.sequenceNumber(), + snapshot.firstRowId())) { + + for (ManifestFile file : manifestFiles) { + ManifestFile newFile = file.copy(); + ((StructLike) newFile).set(0, newPath(file.path(), sourcePrefix, targetPrefix)); + ((StructLike) newFile).set(1, rewrittenManifestLengths.get(file.path())); + writer.add(newFile); + } + } catch (IOException e) { + throw new UncheckedIOException( + "Failed to rewrite the manifest list file " + snapshot.manifestListLocation(), e); + } + } + private static List manifestFilesInSnapshot(FileIO io, Snapshot snapshot) { String path = snapshot.manifestListLocation(); List manifestFiles = Lists.newLinkedList(); @@ -347,16 +414,24 @@ public static RewriteResult rewriteDataManifest( String targetPrefix) throws IOException { PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); - try (ManifestWriter writer = - ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); + RewriteResult rewriteResult; + ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); + + try (writer; ManifestReader reader = ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { - return StreamSupport.stream(reader.entries().spliterator(), false) - .map( - entry -> - writeDataFileEntry(entry, snapshotIds, spec, sourcePrefix, targetPrefix, writer)) - .reduce(new RewriteResult<>(), RewriteResult::append); + rewriteResult = + StreamSupport.stream(reader.entries().spliterator(), false) + .map( + entry -> + writeDataFileEntry( + entry, snapshotIds, spec, sourcePrefix, targetPrefix, writer)) + .reduce(new RewriteResult<>(), RewriteResult::append); } + + rewriteResult.length(writer.length()); + return rewriteResult; } /** @@ -427,24 +502,31 @@ public static RewriteResult rewriteDeleteManifest( String stagingLocation) throws IOException { PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); - try (ManifestWriter writer = - ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId()); + RewriteResult rewriteResult; + ManifestWriter writer = + ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId()); + + try (writer; ManifestReader reader = ManifestFiles.readDeleteManifest(manifestFile, io, specsById) .select(Arrays.asList("*"))) { - return StreamSupport.stream(reader.entries().spliterator(), false) - .map( - entry -> - writeDeleteFileEntry( - entry, - snapshotIds, - spec, - sourcePrefix, - targetPrefix, - stagingLocation, - writer)) - .reduce(new RewriteResult<>(), RewriteResult::append); + rewriteResult = + StreamSupport.stream(reader.entries().spliterator(), false) + .map( + entry -> + writeDeleteFileEntry( + entry, + snapshotIds, + spec, + sourcePrefix, + targetPrefix, + stagingLocation, + writer)) + .reduce(new RewriteResult<>(), RewriteResult::append); } + + rewriteResult.length(writer.length()); + return rewriteResult; } private static RewriteResult writeDataFileEntry( diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index a8e82d101fbf..456dcaf7d441 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -151,22 +151,8 @@ protected Dataset contentFileDS(Table table) { protected Dataset contentFileDS(Table table, Set snapshotIds) { Table serializableTable = SerializableTableWithSize.copyOf(table); Broadcast tableBroadcast = sparkContext.broadcast(serializableTable); - int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); - - Dataset manifestBeanDS = - manifestDF(table, snapshotIds) - .selectExpr( - "content", - "path", - "length", - "0 as sequenceNumber", - "partition_spec_id as partitionSpecId", - "added_snapshot_id as addedSnapshotId") - .dropDuplicates("path") - .repartition(numShufflePartitions) // avoid adaptive execution combining tasks - .as(ManifestFileBean.ENCODER); - - return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER); + return manifestBeanDS(table, snapshotIds) + .flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER); } protected Dataset manifestDS(Table table) { @@ -179,6 +165,22 @@ protected Dataset manifestDS(Table table, Set snapshotIds) { .as(FileInfo.ENCODER); } + protected Dataset manifestBeanDS(Table table, Set snapshotIds) { + int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); + + return manifestDF(table, snapshotIds) + .selectExpr( + "content", + "path", + "length", + "0 as sequenceNumber", + "partition_spec_id as partitionSpecId", + "added_snapshot_id as addedSnapshotId") + .dropDuplicates("path") + .repartition(numShufflePartitions) // avoid adaptive execution combining tasks + .as(ManifestFileBean.ENCODER); + } + private Dataset manifestDF(Table table, Set snapshotIds) { Dataset manifestDF = loadMetadataTable(table, ALL_MANIFESTS); if (snapshotIds != null) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index f7e90e679a02..f769d7ce75e0 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,22 +69,21 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.util.Pair; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.functions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple2; public class RewriteTablePathSparkAction extends BaseSparkAction implements RewriteTablePath { @@ -281,20 +281,30 @@ private String rebuildMetadata() { // rebuild version files RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); - - Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata); Set validSnapshots = Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata)); + // rebuild manifest files + Set manifestsToRewrite = manifestsToRewrite(validSnapshots); + + Map rewriteManifestResultMap = + rewriteManifests(deltaSnapshots, endMetadata, manifestsToRewrite); + + // Extract manifest file sizes for manifest list rewriting + Map rewrittenManifestLengths = + rewriteManifestResultMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().length())); + // rebuild manifest-list files RewriteResult rewriteManifestListResult = validSnapshots.stream() - .map(snapshot -> rewriteManifestList(snapshot, endMetadata, manifestsToRewrite)) + .map(snapshot -> rewriteManifestList(snapshot, endMetadata, rewrittenManifestLengths)) .reduce(new RewriteResult<>(), RewriteResult::append); - // rebuild manifest files + // Aggregate all manifest rewrite results RewriteContentFileResult rewriteManifestResult = - rewriteManifests(deltaSnapshots, endMetadata, rewriteManifestListResult.toRewrite()); + rewriteManifestResultMap.values().stream() + .reduce(new RewriteContentFileResult(), RewriteContentFileResult::append); // rebuild position delete files Set deleteFiles = @@ -308,7 +318,6 @@ private String rebuildMetadata() { copyPlan.addAll(rewriteVersionResult.copyPlan()); copyPlan.addAll(rewriteManifestListResult.copyPlan()); copyPlan.addAll(rewriteManifestResult.copyPlan()); - return saveFileList(copyPlan); } @@ -417,28 +426,24 @@ private Set> statsFileCopyPlan( * * @param snapshot snapshot represented by the manifest list * @param tableMetadata metadata of table - * @param manifestsToRewrite filter of manifests to rewrite. - * @return a result including a copy plan for the manifests contained in the manifest list, as - * well as for the manifest list itself + * @param rewrittenManifestLengths lengths of rewritten manifests + * @return a result including a copy plan for the manifest list itself */ private RewriteResult rewriteManifestList( - Snapshot snapshot, TableMetadata tableMetadata, Set manifestsToRewrite) { + Snapshot snapshot, TableMetadata tableMetadata, Map rewrittenManifestLengths) { RewriteResult result = new RewriteResult<>(); String path = snapshot.manifestListLocation(); String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, stagingDir); - RewriteResult rewriteResult = - RewriteTablePathUtil.rewriteManifestList( - snapshot, - table.io(), - tableMetadata, - manifestsToRewrite, - sourcePrefix, - targetPrefix, - stagingDir, - outputPath); - - result.append(rewriteResult); + RewriteTablePathUtil.rewriteManifestList( + snapshot, + table.io(), + tableMetadata, + rewrittenManifestLengths, + sourcePrefix, + targetPrefix, + outputPath); + // add the manifest list copy plan itself to the result result .copyPlan() @@ -446,26 +451,12 @@ private RewriteResult rewriteManifestList( return result; } - private Set manifestsToRewrite( - Set deltaSnapshots, TableMetadata startMetadata) { + private Set manifestsToRewrite(Set liveSnapshots) { try { Table endStaticTable = newStaticTable(endVersionName, table.io()); - Dataset lastVersionFiles = manifestDS(endStaticTable).select("path"); - if (startMetadata == null) { - return Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList()); - } else { - Set deltaSnapshotIds = - deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); - return Sets.newHashSet( - lastVersionFiles - .distinct() - .filter( - functions - .column(ManifestFile.SNAPSHOT_ID.name()) - .isInCollection(deltaSnapshotIds)) - .as(Encoders.STRING()) - .collectAsList()); - } + Set liveSnapshotIds = + liveSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return Sets.newHashSet(manifestBeanDS(endStaticTable, liveSnapshotIds).collectAsList()); } catch (Exception e) { throw new UnsupportedOperationException( "Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. " @@ -494,36 +485,64 @@ public RewriteContentFileResult appendDeleteFile(RewriteResult r1) { } } - /** Rewrite manifest files in a distributed manner and return rewritten data files path pairs. */ - private RewriteContentFileResult rewriteManifests( + /** + * Rewrite manifest files in a distributed manner and return the resulting manifests and content + * files selected for rewriting. + */ + private Map rewriteManifests( Set deltaSnapshots, TableMetadata tableMetadata, Set toRewrite) { if (toRewrite.isEmpty()) { - return new RewriteContentFileResult(); + return Maps.newHashMap(); } Encoder manifestFileEncoder = Encoders.javaSerialization(ManifestFile.class); + Encoder manifestResultEncoder = + Encoders.javaSerialization(RewriteContentFileResult.class); + Encoder> tupleEncoder = + Encoders.tuple(Encoders.STRING(), manifestResultEncoder); + Dataset manifestDS = spark().createDataset(Lists.newArrayList(toRewrite), manifestFileEncoder); Set deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); - return manifestDS - .repartition(toRewrite.size()) - .map( - toManifests( - tableBroadcast(), - sparkContext().broadcast(deltaSnapshotIds), - stagingDir, - tableMetadata.formatVersion(), - sourcePrefix, - targetPrefix), - Encoders.bean(RewriteContentFileResult.class)) - // duplicates are expected here as the same data file can have different statuses - // (e.g. added and deleted) - .reduce((ReduceFunction) RewriteContentFileResult::append); - } - - private static MapFunction toManifests( + // TODO: This implementation uses toLocalIterator to collect manifest rewrite results on + // the driver, which can be a bottleneck for tables with many manifests. This should be + // refactored to use a reducible result class with a more scalable aggregation pattern. + // See issue: https://github.com/apache/iceberg/issues/13932 + Iterator> resultIterator = + manifestDS + .repartition(toRewrite.size()) + .map( + toManifests( + tableBroadcast(), + sparkContext().broadcast(deltaSnapshotIds), + stagingDir, + tableMetadata.formatVersion(), + sourcePrefix, + targetPrefix), + tupleEncoder) + .toLocalIterator(); + + Map rewrittenManifests = Maps.newHashMap(); + + while (resultIterator.hasNext()) { + Tuple2 resultTuple = resultIterator.next(); + String originalManifestPath = resultTuple._1(); + RewriteContentFileResult manifestRewriteResult = resultTuple._2(); + String stagingManifestPath = + RewriteTablePathUtil.stagingPath(originalManifestPath, sourcePrefix, stagingDir); + String targetManifestPath = + RewriteTablePathUtil.newPath(originalManifestPath, sourcePrefix, targetPrefix); + + manifestRewriteResult.copyPlan().add(Pair.of(stagingManifestPath, targetManifestPath)); + rewrittenManifests.put(originalManifestPath, manifestRewriteResult); + } + + return rewrittenManifests; + } + + private static MapFunction> toManifests( Broadcast
table, Broadcast> deltaSnapshotIds, String stagingLocation, @@ -532,10 +551,11 @@ private static MapFunction toManifests( String targetPrefix) { return manifestFile -> { - RewriteContentFileResult result = new RewriteContentFileResult(); + RewriteContentFileResult result; + switch (manifestFile.content()) { case DATA: - result.appendDataFile( + RewriteResult dataManifestResult = writeDataManifest( manifestFile, table, @@ -543,10 +563,13 @@ private static MapFunction toManifests( stagingLocation, format, sourcePrefix, - targetPrefix)); - break; + targetPrefix); + + result = new RewriteContentFileResult().appendDataFile(dataManifestResult); + result.length(dataManifestResult.length()); + return Tuple2.apply(manifestFile.path(), result); case DELETES: - result.appendDeleteFile( + RewriteResult deleteManifestResult = writeDeleteManifest( manifestFile, table, @@ -554,13 +577,15 @@ private static MapFunction toManifests( stagingLocation, format, sourcePrefix, - targetPrefix)); - break; + targetPrefix); + + result = new RewriteContentFileResult().appendDeleteFile(deleteManifestResult); + result.length(deleteManifestResult.length()); + return Tuple2.apply(manifestFile.path(), result); default: throw new UnsupportedOperationException( "Unsupported manifest type: " + manifestFile.content()); } - return result; }; } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 5735d2b3359f..08d84dad8d1b 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -230,7 +230,8 @@ public void testStartVersion() throws Exception { .startVersion("v2.metadata.json") .execute(); - checkFileNum(1, 1, 1, 4, result); + // 1 metadata JSON file, 1 snapshot, 2 manifests, 1 data file + checkFileNum(1, 1, 2, 5, result); List> paths = readPathPairList(result.fileListLocation()); @@ -304,7 +305,8 @@ public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path locat .startVersion("v2.metadata.json") .execute(); - checkFileNum(2, 2, 2, 8, result); + // 2 metadata JSON files, 2 snapshots, 3 manifests, 2 data files + checkFileNum(2, 2, 3, 9, result); // start from the first version RewriteTablePath.Result result1 = @@ -314,6 +316,7 @@ public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path locat .startVersion("v1.metadata.json") .execute(); + // 3 metadata JSON files, 3 snapshots, 3 manifests, 3 data files checkFileNum(3, 3, 3, 12, result1); } @@ -343,7 +346,7 @@ public void testManifestRewriteAndIncrementalCopy() throws Exception { int addedManifest = Iterables.size(rewriteManifestResult.addedManifests()); // only move version v4, which is the version generated by rewrite manifest - RewriteTablePath.Result postReweiteResult = + RewriteTablePath.Result postRewriteResult = actions() .rewriteTablePath(table) .rewriteLocationPrefix(table.location(), targetTableLocation()) @@ -352,7 +355,7 @@ public void testManifestRewriteAndIncrementalCopy() throws Exception { .execute(); // no data files need to move - checkFileNum(1, 1, addedManifest, 3, postReweiteResult); + checkFileNum(1, 1, addedManifest, 3, postRewriteResult); } @Test @@ -714,8 +717,8 @@ public void testStartSnapshotWithoutValidSnapshot() throws Exception { .startVersion("v2.metadata.json") .execute(); - // 2 metadata.json, 1 manifest list file, 1 manifest files - checkFileNum(2, 1, 1, 5, result); + // 2 metadata.json, 1 manifest list file, 2 manifest files + checkFileNum(2, 1, 2, 6, result); } @Test @@ -1039,7 +1042,7 @@ public void testMetadataLocationChange() throws Exception { .startVersion(fileName(metadataFilePath)) .execute(); - checkFileNum(2, 1, 1, 5, result2); + checkFileNum(2, 1, 2, 6, result2); } @Test @@ -1209,6 +1212,174 @@ public void testNestedDirectoryStructurePreservation() throws Exception { assertThat(targetPath2).startsWith(targetTableLocation()); } + @Test + public void testFullRewriteUpdatesAllManifestLengthsInManifestList() throws Exception { + String location = newTableLocation(); + Table sourceTable = createTableWithSnapshots(location, 10); + + commitNewPositionDelete( + sourceTable, removePrefix(sourceTable.location() + "/data/deeply/nested/deletes.parquet")); + + Map manifestSizesBeforeRewrite = + sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream() + .collect(Collectors.toMap(m -> fileName(m.path()), m -> m.length())); + + // Rewrite table metadata to a location that's longer than the original in order + // to make manifests larger + String targetLocation = longTargetTableLocation(); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(location, targetLocation) + .execute(); + + // 1 + 11 JSON metadata files, 11 snapshots, 11 manifests, 10 data files, 1 delete file + checkFileNum(12, 11, 11, 45, result); + copyTableFiles(result); + + Table targetTable = TABLES.load(targetLocation); + + // We have rewritten all 11 snapshots. Make sure all sizes were correctly updated + // across all manifest lists + assertThat(targetTable.snapshots()) + .allSatisfy( + snapshot -> + assertThat(snapshot.allManifests(targetTable.io())) + .allSatisfy( + manifest -> { + String manifestName = fileName(manifest.path()); + assertThat(manifest.length()) + .isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName)); + }) + .allSatisfy( + manifest -> { + assertThat(targetTable.io().newInputFile(manifest.path()).getLength()) + .isEqualTo(manifest.length()); + })); + } + + @Test + public void testPartialRewriteUpdatesDataManifestLengthInManifestList() throws Exception { + String location = newTableLocation(); + Table sourceTable = createTableWithSnapshots(location, 10); + + Map manifestSizesBeforeRewrite = + sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream() + .collect(Collectors.toMap(m -> fileName(m.path()), m -> m.length())); + + // Rewrite just the latest table version to a location that's longer than + // the original in order to make manifests larger + String targetLocation = longTargetTableLocation(); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(location, targetLocation) + .startVersion("v10.metadata.json") + .execute(); + + // 1 metadata JSON file, 1 snapshot, 10 manifests, 1 data file + checkFileNum(1, 1, 10, 13, result); + copyTableFiles(result); + + Table targetTable = TABLES.load(targetLocation); + Snapshot lastSnapshot = targetTable.currentSnapshot(); + + // We have rewritten all manifests in one snapshot. Make sure all sizes were correctly + // updated in the manifest list + assertThat(targetTable.currentSnapshot().allManifests(targetTable.io())) + .allSatisfy( + manifest -> { + String manifestName = fileName(manifest.path()); + assertThat(manifest.length()) + .isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName)); + }) + .allSatisfy( + manifest -> { + assertThat(targetTable.io().newInputFile(manifest.path()).getLength()) + .isEqualTo(manifest.length()); + }); + } + + @Test + public void testPartialRewriteUpdatesDeleteManifestLengthInManifestList() throws Exception { + String location = newTableLocation(); + Table sourceTable = createTableWithSnapshots(location, 5); + + // Add two more snapshots with just position deletes + commitNewPositionDelete( + sourceTable, + removePrefix(sourceTable.location() + "/data/deeply/nested/deletes-1.parquet")); + commitNewPositionDelete( + sourceTable, + removePrefix(sourceTable.location() + "/data/deeply/nested/deletes-2.parquet")); + + Map manifestSizesBeforeRewrite = + sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream() + .collect(Collectors.toMap(m -> fileName(m.path()), m -> m.length())); + + // Rewrite just the latest table version to a location that's longer than + // the original in order to make manifests larger + String targetLocation = longTargetTableLocation(); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(location, targetLocation) + .startVersion("v7.metadata.json") + .execute(); + + // 1 metadata JSON file, 1 snapshot, 5 + 2 manifests, 1 delete file + checkFileNum(1, 1, 7, 10, result); + copyTableFiles(result); + + Table targetTable = TABLES.load(targetLocation); + Snapshot lastSnapshot = targetTable.currentSnapshot(); + + // We have rewritten all manifests in one snapshot. Make sure all sizes were correctly + // updated in the manifest list + assertThat(targetTable.currentSnapshot().allManifests(targetTable.io())) + .allSatisfy( + manifest -> { + String manifestName = fileName(manifest.path()); + assertThat(manifest.length()) + .isNotEqualTo(manifestSizesBeforeRewrite.get(manifestName)); + }) + .allSatisfy( + manifest -> { + assertThat(targetTable.io().newInputFile(manifest.path()).getLength()) + .isEqualTo(manifest.length()); + }); + } + + protected static String generateLongNestedPath(int depth) { + StringBuilder pathBuilder = new StringBuilder(); + for (int i = 1; i <= depth; i++) { + pathBuilder.append(String.format("/%03d", i)); + } + pathBuilder.append("/"); + return pathBuilder.toString(); + } + + protected void commitNewPositionDelete(Table targetTable, String path) throws Exception { + DataFile dataFile = + StreamSupport.stream(targetTable.snapshots().spliterator(), false) + .flatMap( + snapshot -> + StreamSupport.stream( + snapshot.addedDataFiles(targetTable.io()).spliterator(), false)) + .findAny() + .get(); + + List> deletes = Lists.newArrayList(Pair.of(dataFile.location(), 0L)); + File file = new File(path); + + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + targetTable, targetTable.io().newOutputFile(file.toURI().toString()), deletes) + .first(); + + targetTable.newRowDelta().addDeletes(positionDeletes).commit(); + } + protected void checkFileNum( int versionFileCount, int manifestListCount, @@ -1263,6 +1434,10 @@ protected String targetTableLocation() throws IOException { return toAbsolute(targetTableDir); } + protected String longTargetTableLocation() throws IOException { + return toAbsolute(targetTableDir) + generateLongNestedPath(5); + } + protected String stagingLocation() throws IOException { return toAbsolute(staging); }