Skip to content

Conversation

@vaultah
Copy link

@vaultah vaultah commented Aug 1, 2025

Fixes #13719

The idea is to rewrite manifest files first, in order to get their final sizes, then rewrite manifest lists updating sizes for the previously rewritten manifests.

@vaultah vaultah changed the title Update sizes of rewritten manifests in manifest lists Spark: RewriteTablePath: Update sizes of rewritten manifests in manifest lists Aug 1, 2025
Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change and I think this is on the right direction. Left some comments and normally I would suggest we focus the changeset to latest Spark version 4.0 and have a separate PR to backfill the changes for older spark version 3.4/3.5 after PR is approved.

@anuragmantri, last I heard you are interested in this as well

}
}

public static class RewrittenFileInfo implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is in iceberg core and this class is only used in static class for record and in SparkAction, I am wondering if we want to define it here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could return it from rewrite*ManifestWithResult and maybe use it in fixes for delete files later

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it altogether.

Map<String, RewrittenFileInfo> rewrittenManifests,
String sourcePrefix,
String targetPrefix,
String stagingDir,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this stagingDir is no longer used as now path come from rewrittenManifests.get(file.path()).getNewPath()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
}
return Pair.of(writer.toManifestFile(), rewriteResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are returning entire new manifestFile on top of list of data files in copy plan , but later in SparkAction we only use its new location and length, can we do better memory wise?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only made it return the entire ManifestFile in an attempt to make this function more general-purpose, since it's part of public API (however specialized and niche it might be in reality). We can return RewrittenFileInfo from it though

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It now returns the size of the rewritten manifest instead of the full instance.

Comment on lines 292 to 293
ManifestsRewriteResult rewriteManifestResult =
rewriteManifests(deltaSnapshots, endMetadata, manifestsToRewrite);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some notes about behavior change here and also potential performance implications

Before this change we are doing rewrite top down in one pass from version-file -> manifest-list -> parallelize rewrite of manifest-files as well as position deletes. So it's optimized to do top layer rewrite + discover the subset of bottom layer require to be rewritten and gradually expand.

However in order to get size correctly, we need to reverse some of the rewrite ordering by rewrite the manifest first and then later the manifest-list.

Comment on lines 531 to 549
public static class ManifestsRewriteResult {
private final RewriteContentFileResult contentFileResult;
private final Map<String, RewrittenFileInfo> rewrittenManifests;

ManifestsRewriteResult(
RewriteContentFileResult contentFileResult,
Map<String, RewrittenFileInfo> rewrittenManifests) {
this.contentFileResult = contentFileResult;
this.rewrittenManifests = rewrittenManifests;
}

public RewriteContentFileResult getContentFileResult() {
return contentFileResult;
}

public Map<String, RewrittenFileInfo> getRewrittenManifests() {
return rewrittenManifests;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we also need the similar for position deletes, where we need write delete files before write delete manifests, so I am wondering if we can generalize this result class to take into account both

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a way to do that because RewriteContentFileResult in its current form is not needed for position deletes. It should be okay to have a small wrapper class just for manifests rewrite result. I have now made it private though.

Comment on lines 464 to 502
expectedManifestPaths =
Sets.newHashSet(
allManifestsDF
.distinct()
.filter(
functions
.column(ManifestFile.SNAPSHOT_ID.name())
.isInCollection(deltaSnapshotIds))
.as(Encoders.STRING())
.collectAsList());
}

Set<ManifestFile> foundManifests =
deltaSnapshots.stream()
.flatMap(
s -> {
try {
return s.allManifests(table.io()).stream();
} catch (NotFoundException e) {
LOG.warn(
"Skipping snapshot {} as its manifest list is missing (likely expired).",
s.snapshotId(),
e);
return Stream.empty();
}
})
.collect(Collectors.toSet());

Set<String> foundManifestPaths =
foundManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
Set<String> missingPaths = Sets.difference(expectedManifestPaths, foundManifestPaths);
Preconditions.checkState(
missingPaths.isEmpty(),
"Could not find all expected manifests. Missing files: %s",
String.join(", ", missingPaths));

return foundManifests.stream()
.filter(m -> expectedManifestPaths.contains(m.path()))
.collect(Collectors.toSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are looking for set of manifests filtered by snapshot ids, I think this can probably be replaced with something similar to https://github.com/apache/iceberg/blob/main/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L182-L190,

Copy link
Author

@vaultah vaultah Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get expectedManifestPaths using manifestDF, but in the end we still need ManifestFile objects.

This logic needs to be changed though, because we always have to rewrite all referenced manifest files for the selected snapshots to make sure all their sizes are correct after the rewrite (working on it, sorry)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think we might want the same thing?

From what I can tell, .all_manifests table include all the valid manifest file that are referenced from any snapshot currently tracked by the
table, and if we apply the filter on referenced_snapshot with deltaSnapshotIds, it shall provide us the same result as foundManifests. I am thinking if we use ManifestEncoders against returned Dataset<Row> and collect as a list (or a set) of manifestsToRewrite

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this logic using the (newly added) manifestBeanDS method. Also note it now uses validSnapshots instead of deltaSnapshots

@dramaticlly
Copy link
Contributor

@szehon-ho or @amogh-jahagirdar can you help kick off the CI ?

@anuragmantri
Copy link
Contributor

anuragmantri commented Aug 4, 2025

Yes, this is great timing, I just started working on a fix for this. I had a very similar approach in mind. @vaultah do you mind fixing the position delete files rewrite as well? see #12554

We generally add changes to only the default spark version (4.0 now) and backport to older versions later. Makes reviews easier. Thanks.

@vaultah
Copy link
Author

vaultah commented Aug 4, 2025

I believe delete files should be fixed separately, otherwise there'd be too many changes to validate. There are also several issues related to delete files that can be addressed together (#12554 + #12586 + #13671)

@vaultah vaultah requested a review from dramaticlly August 6, 2025 21:10
@vaultah
Copy link
Author

vaultah commented Aug 7, 2025

Also it seems problematic that IO exceptions are skipped here. If it's to handle expired snapshots with missing manifest lists, that method is only supposed to be called with live snapshots, and also ManifestList.read appears to throw NotFoundException for missing files so it's not caught anyway

@manuzhang manuzhang changed the title Spark: RewriteTablePath: Update sizes of rewritten manifests in manifest lists Spark 4.0: RewriteTablePath: Update sizes of rewritten manifests in manifest lists Aug 13, 2025
Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, some nit comments.

ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId());
RewriteResult<DataFile> rewriteResult = null;

try (ManifestWriter<DataFile> dataManifestWriter = writer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataManifestWriter seems unused, I assume we want to ensure writer is closed with try resource

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used it as a dummy variable so we could close writer and have a reference to it later, after it's closed. Java 9 improves this syntax (JEP 213), and since Iceberg doesn't need to support earlier versions, I now changed it to just

try (writer;
    ManifestReader<DeleteFile> reader =
        ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
        ...

Comment on lines 521 to 554
RewriteContentFileResult finalContentResult = new RewriteContentFileResult();
Iterator<Tuple3<String, Long, RewriteContentFileResult>> resultIterator =
manifestDS
.repartition(toRewrite.size())
.map(
toManifests(
tableBroadcast(),
sparkContext().broadcast(deltaSnapshotIds),
stagingDir,
tableMetadata.formatVersion(),
sourcePrefix,
targetPrefix),
tupleEncoder)
.toLocalIterator();

Map<String, Long> rewrittenManifests = Maps.newHashMap();

while (resultIterator.hasNext()) {
Tuple3<String, Long, RewriteContentFileResult> resultTuple = resultIterator.next();
String originalManifestPath = resultTuple._1();
Long rewrittenManifestLength = resultTuple._2();
RewriteContentFileResult contentFileResult = resultTuple._3();
String stagingManifestPath =
RewriteTablePathUtil.stagingPath(originalManifestPath, sourcePrefix, stagingDir);
String targetManifestPath =
RewriteTablePathUtil.newPath(originalManifestPath, sourcePrefix, targetPrefix);

finalContentResult.append(contentFileResult);
finalContentResult.copyPlan().add(Pair.of(stagingManifestPath, targetManifestPath));
rewrittenManifests.put(originalManifestPath, rewrittenManifestLength);
}

return new ManifestsRewriteResult(finalContentResult, rewrittenManifests);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao can I ask for your expertise on this as well

Comment on lines +1256 to +1260
.allSatisfy(
manifest -> {
assertThat(targetTable.io().newInputFile(manifest.path()).getLength())
.isEqualTo(manifest.length());
}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love this


// Rewrite table metadata to a location that's much longer than the original in order
// to make manifests larger
String targetLocation = toAbsolute(rootTargetLocation) + generateLongNestedPath(25);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can actually just move this into the function above and no need to keep @TempDir Path rootTargetLocation for each tests

  protected String targetTableLocation() {
    return toAbsolute(targetTableDir) + generateLongNestedPath(5);
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this extra nesting is potentially inefficient and only needed for these three tests, I wanted to avoid applying it globally, and having a new location unrelated to targetTableDir would arguably be safer (e.g., if targetTableDir ends up being reused in the same test).

I added

protected String longTargetTableLocation() throws IOException {
    return toAbsolute(targetTableDir) + generateLongNestedPath(5);
}

to highlight the difference and purpose. Let me know if you want me to merge it into targetTableLocation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried with longer targetTableLocation for all tests and they seem to be happy 😉 But I am fine with longTargetTableLocation


Map<String, Long> manifestSizesBeforeRewrite =
sourceTable.currentSnapshot().allManifests(sourceTable.io()).stream()
.collect(Collectors.toMap(m -> fileName(m.path()), m -> m.length()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not relate to your change, but I just realized for filename(m.path()), we can just replace with just Paths.get(path).getFileName().toString() and and m.length() can be replaced with Method reference

Comment on lines 1291 to 1295
ManifestFile rewrittenManifest =
Iterables.getOnlyElement(
allManifests.stream()
.filter(manifest -> manifest.snapshotId() == lastSnapshot.snapshotId())
.collect(Collectors.toList()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this rewrittenManifest is not used, do we want to apply some assertion to this or can be removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

return pathBuilder.toString();
}

protected void addAnyPositionDelete(Table targetTable, String path) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about

protected void commitNewPositionDelete(Table table, String deleteFilePath)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

@dramaticlly
Copy link
Contributor

@stevenzwu can you help take a look as well? This help fix the rewritten manifest size in manifest-lists after table path rewrite.

* @return a copy plan of content files in the manifest that was rewritten
* @deprecated since 1.10.0, will be removed in 1.11.0
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is added in the upcoming 1.10. should we just remove this method directly? what do you think @dramaticlly ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vaultah normally we will need to go through the deprecation cycle for public method, but as Steven suggested, both of these methods (LINE 398 and LINE 521) are part of 62d9ff5 which never gets released. So we can actually remove these directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That method is being used by RewriteTablePathSparkAction in Spark 3.4 and Spark 3.5, so we can't remove it unless we backport these changes to Spark 3.4 and Spark 3.5 in the same (this) PR. Same for rewriteDataManifest and rewriteDeleteManifest methods below. Let me know how you want to proceed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with doing the backport change in this same PR. it is not a super large PR anyway.

Copy link
Contributor

@dramaticlly dramaticlly Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's my fault as I initially asked to separate the 3.4 and 3.5 changes to facilitate reviews, but I think as we get closer with more in-depth reviews. Doing Spark 3.4 & 3.5 backport together in this PR helps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially, it is our recommended practice to separate out 3.4 and 3.5 changes to a separate backport PR.

Right now, it is more important to avoid carrying over debts of deprecated APIs when we can for unreleased APIs. Hence, it is good to bring in the backport changes in the same PR, although it will make the PR bigger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this PR is going to take longer due to larger refactoring for the suggested bottom up approach and result classes and can't make the 1.10 release in time, we can go with the deprecation approach like here. Let's defer this deprecation decision /change as the last step.

* rewritten here)
* @return size of the resulting manifest file and a copy plan for the referenced content files
*/
public static Pair<Long, RewriteResult<DeleteFile>> rewriteDeleteManifestWithResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just add the size (long value) to the RewriteResult?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tricky because this class is being used in RewriteTablePathSparkAction in Spark 3.4 and Spark 3.5 (same concern as in #13720 (comment)), it's public so we'll have to somehow keep backward compatibility, and also it's used to store the result of rewriting metadata JSON files, snapshots, etc., so we'll need to update those parts as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a new optional field. so it shouldn't break source compatibility.

regarding other rewrites (metadata JSON files, snapshots, etc.), is the rewrite output file size concept still applicable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's value in this, as in this PR we are doing bottom first rewrite in order to have size, this differs from previous top down approach and result might need to change accordingly. We return Pair<Long, RewriteResult<DeleteFile>> when rewrite a result of delete manifests for both long and discovered list of deletes to copy and to rewrite, all of which belong to a single delete manifests.

For other top down rewrite such as given metadata.json to discover list of delta snapshots, size is unknown and we can leave it null. It's can be up to caller to leverage the size in the rewrite result. Let's give it a try and see if there's compatibility problem

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on what @dramaticlly said. If we are changing to bottom-up approach. It is natural to add a size attribute to the rewrite output (data manifest, delete manifest, manifest list, metadata.json, etc.). We may only need the size information from the data and delete manifest rewrite result. it is still good to track the new size of rewritten files in the rewrite result.

This may require larger refactoring. but it seems like the right direction in order to carry over the rewritten file size.

Copy link
Author

@vaultah vaultah Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand and agree that it makes sense to add size attribute to rewrite result, but the way RewriteResult is currently implemented and used is counterintuitive to me. When I see RewriteResult<T>, I expect the result of rewriting T. Adding another general-purpose and nullable attribute that's not related to T makes it even more confusing. I did consider doing that initially (sans nullability), but it felt like shoehorning, attempting to attach metadata about the container (the manifest file's size) to an object that describes its contents. Same for RewriteContentFileResult, which is public too.

That said, if we're aligned that modifying RewriteResult is the preferred path forward, I'm happy to implement it that way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way RewriteResult is currently implemented and used is counterintuitive to me.

Definitely agree. A larger refactoring can really help. but that probably can be tackled as a follow-up.

Copy link
Contributor

@dramaticlly dramaticlly Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your confusion and I think we can probably do better on RewriteResult class. It's counterintuitive in a way that the instance of this result is not just for rewrite of T, but a collection T require open/rewrite/close in next iteration and also a collection of file path to copy from source to destination.

Take a manifestList as an example, this avro file contain mapping to many manifests. So when we trying to path rewrite a manifestList, it contains list of manifests require open to rewrite, list of source to destination path to plan the copy. Conceptually I think the result is relate to the manifest-list, now we are adding the size of written manifest lists. Recursively this also applies to when we apply path rewrite on a data manifest level, results contain set of data files, list of copy plans and size of the manifest.

For existing rewriteManifestList method in utils, the return type is of RewriteResult<ManifestFile> instead of RewriteResult<ManifestListFile>. Ans similarly the rewriteVersionFiles are returning RewriteResult.

* @return a copy plan of content files in the manifest that was rewritten
* @deprecated since 1.10.0, will be removed in 1.11.0
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question for direct removal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed!

break;
targetPrefix);
return Tuple3.apply(
manifestFile.path(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the path and size into the RewriteContentFileResult?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can give this a try

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible but it'd be a poor fit, because RewriteContentFileResult may store the results for more than one manifest. That is, while everything in resultTuple does relate to one manifest, contentFileResult is later merged into finalContentResult that will contain the results of rewriting all manifests.

RewriteContentFileResult finalContentResult = new RewriteContentFileResult();

// while (...) {
String originalManifestPath = resultTuple._1();
Long rewrittenManifestLength = resultTuple._2();
RewriteContentFileResult contentFileResult = resultTuple._3();

finalContentResult.append(contentFileResult);
// }

The name of the class would no longer match its intended purpose, and in that case we'd also need to leave path and length of finalContentResult unset, which may be confusing.

Copy link
Contributor

@stevenzwu stevenzwu Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RewriteContentFileResult may store the results for more than one manifest.

Should we change that?

Can the new ManifestsRewriteResult be replaced by the Map<String, RewriteContentFileResult>? the string key is the source manifest file.

RewriteContentFileResult is the rewrite result for the manifest rewrite, which probably be named as ManifestRewriteResult? it probably shouldn't be public at the first place. It is only for one manifest file and will contains the size attribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of the complexity of this can contribute to the fact that the size is treated outside the RewriteResult, so we are bundling tuple3 by ManifestFile in MappingFunction and unzip & reassembly in localIterator with necessary aggregation.

Can we explore on bundle the size in RewriteResult and see Map<String, RewriteContentFileResult> helps with such situation?

@vaultah
Copy link
Author

vaultah commented Aug 21, 2025

  • Added nullable size to RewriteResult
  • Removed ManifestRewriteResult
  • Kept RewriteContentFileResult unchanged (public and can't be made private)
  • Removed rewrite*ManifestWithResult methods
  • Removed deprecation notice from rewrite*Manifest methods that are now enough

PTAL

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we dont need to bring in Spark 3.4/3.5 and also save on deprecation cycle. @stevenzwu can you help kick off the CI?

Comment on lines 293 to 309
RewriteContentFileResult allManifestsResult = new RewriteContentFileResult();
Map<String, Long> rewrittenManifestLengths = Maps.newHashMap();
rewriteManifestResult.forEach(
(path, rewriteResult) -> {
rewrittenManifestLengths.put(path, rewriteResult.size());
allManifestsResult.append(rewriteResult);
});

// rebuild manifest-list files
RewriteResult<ManifestFile> rewriteManifestListResult =
validSnapshots.stream()
.map(snapshot -> rewriteManifestList(snapshot, endMetadata, manifestsToRewrite))
.map(snapshot -> rewriteManifestList(snapshot, endMetadata, rewrittenManifestLengths))
.reduce(new RewriteResult<>(), RewriteResult::append);

// rebuild manifest files
RewriteContentFileResult rewriteManifestResult =
rewriteManifests(deltaSnapshots, endMetadata, rewriteManifestListResult.toRewrite());

// rebuild position delete files
Set<DeleteFile> deleteFiles =
rewriteManifestResult.toRewrite().stream()
allManifestsResult.toRewrite().stream()
Copy link
Contributor

@dramaticlly dramaticlly Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having one iteration on rewriteManifestResult to build both aggregated results and length mapping helps but it's a bit hard to follow the control flow. Here's what I have in mind

// Extract manifest file sizes for manifest list rewriting
Map<String, Long> rewrittenManifestLengths = rewriteManifestResult.entrySet().stream()
    .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().size()));

// rebuild manifest-list files
RewriteResult<ManifestFile> rewriteManifestListResult =
    validSnapshots.stream()
        .map(snapshot -> rewriteManifestList(snapshot, endMetadata, rewrittenManifestLengths))
        .reduce(new RewriteResult<>(), RewriteResult::append);

// Aggregate all manifest rewrite results
RewriteContentFileResult allManifestsResult = rewriteManifestResult.values().stream()
        .reduce(new RewriteContentFileResult(), RewriteContentFileResult::append);

  // rebuild position delete files
Set<DeleteFile> deleteFiles =
    allManifestsResult.toRewrite().stream()
        .filter(e -> e instanceof DeleteFile)
        .map(e -> (DeleteFile) e)
        .collect(Collectors.toSet());
rewritePositionDeletes(endMetadata, deleteFiles);

I moved allManifestsResult later where it's being used in rebuild the position delete files. Apart from what this PR to fix for manifest length in manifest-list, the length of position delete in delete manifests also needs to be fixed in a separate change where we might need to move this block before rebuild manifest.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think readability gains are substantial enough to justify double-iteration, but I implemented your version.

* rewritten here)
* @return size of the resulting manifest file and a copy plan for the referenced content files
*/
public static Pair<Long, RewriteResult<DeleteFile>> rewriteDeleteManifestWithResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way RewriteResult is currently implemented and used is counterintuitive to me.

Definitely agree. A larger refactoring can really help. but that probably can be tackled as a follow-up.

sourcePrefix,
targetPrefix),
tupleEncoder)
.toLocalIterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we have memory concern with toLocalIterator?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu please elaborate, I don't see how it's worse memory-wise than the previous version

Copy link
Contributor

@stevenzwu stevenzwu Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous code of using reduce, I thought the executors perform the initial reduction within their partitions, then the driver aggregates the partial results from executors.

With toLocalIterator, everything is shipped back to the driver for one pass of aggregation. Hence I was asking about the memory footprint and scalability for large tables with a lot of manifest files (large or small).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to build a map of manifests and memory footprint should be acceptable because toLocalIterator fetches data to the driver one partition at a time. We can probably still use reduce to pre-aggregate maps on executors and merge them into one big map on the driver if you think that'd be preferable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this may cause problem. With toLocalIterator, every record is shipped to the driver and aggregated there in one pass. It streams partition-by-partition, but the driver still processes O(N) records and the accumulator can grow to O(#manifests). For large tables that can increase driver memory and hurt scalability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seems ok to use local iterator for now and tackle the larger refactoring later. what do you think? @dramaticlly @huaxingao

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fair to me and I think it helps to have correctness fix first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Let’s proceed with the minimal correctness fix now. Please open a tracking issue and add a TODO with a link in the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vaultah please fix the CI failure. @dramaticlly pointed out one revapi issue in another comment.

Copy link
Member

@szehon-ho szehon-ho Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just ramping up on this coming back from vacation. +1 to refactor (can be another pr) to get rid the awkwardness of RewriteResult, it was initially to preserve the internal logic we had that tried to do everything in one pass (rewrite the current metadata layer, and get the next layer of files to rewrite), but it does make the code ugly and caused this issue in the first place. Thanks everyone for looking at it

// rebuild position delete files
Set<DeleteFile> deleteFiles =
rewriteManifestResult.toRewrite().stream()
allManifestsResult.toRewrite().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is the variable renaming more accurate? trying to see if it is unnecessary change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, you're right — this change was unnecessary. After changing the return type of rewriteManifests, I had to add another variable to hold the aggregated RewriteContentFileResult. I have now changed its name back to rewriteManifestResult and added

Map<String, RewriteContentFileResult> rewriteManifestResultMap

to hold the result of rewriteManifests.

Comment on lines +89 to +96

public Long length() {
return length;
}

public void length(long newLength) {
this.length = newLength;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this alerts the revapi and we probably need to add a exception as this is neither source nor binary breaking. Copy from CI run

  java.class.defaultSerializationChanged: The default serialization ID for the class has changed. This means that the new version of the class is not deserializable from the byte stream of a serialized old class.
  
  old: class org.apache.iceberg.RewriteTablePathUtil.RewriteResult<T extends java.lang.Object>
  new: class org.apache.iceberg.RewriteTablePathUtil.RewriteResult<T extends java.lang.Object>
  
  SOURCE: EQUIVALENT, BINARY: EQUIVALENT, SEMANTIC: BREAKING
  
  From old archive: iceberg-core-1.9.0.jar
  From new archive: iceberg-core-d356455.jar

\ java.lang.Object>"
new: "class org.apache.iceberg.RewriteTablePathUtil.RewriteResult<T extends\
\ java.lang.Object>"
justification: "Adding an optional field"
Copy link
Contributor

@dramaticlly dramaticlly Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Serialization across versions is not supported is better and conform to result of java.class.defaultSerializationChanged in revapi exceptions

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM and pending CI, thanks @vaultah for the contribution


checkFileNum(1, 1, 1, 4, result);
// 1 metadata JSON file, 1 snapshot, 2 manifests, 1 data file
checkFileNum(1, 1, 2, 5, result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu raised a good point on this length fix shall not change the behavior for delta rewrite. I recall we had a earlier discussion on this in #13720 (comment) but on a second thought I believe we are trying to do many things together, to rewrite extra manifests which is not added by delta snapshot to fix the manifest size in new manifest-list.

graph TD
  subgraph "Version File Metadata"
    V2["v2.json"]
    V3["v3.json"]
  end

  subgraph "Manifest Lists"
    S1["snap1.avro"]
    S2["snap2.avro"]
  end

  subgraph "Manifests"
    M1["m1.avro"]
    M2["m2.avro"]
  end


  V2 --> S1
  V3 --> S2
  S1 --> M1
  S2 --new--> M2
  S2 --existing--> M1
Loading

Let's use this unit test as an example here, if we do incremental rewrite from v2.metadata.json

  • Before:
    we scan manifests table to filter manifests where added_snapshot_id is in deltaSnapshotIdSet, so we will only need to rewrite 1 manifest (m2.avro)

  • After:
    we scan all_manifests table to filter manifests where reference_snapshot_id is in deltaSnapshotIdSet, so we will have to rewrite 2 manifests (m1.avro & m2.avro)

I think this might help with fix the previous manifest size problem in the new manifest-list but we are no longer doing incremental path rewrite (where only added snapshot shall be able to determine the delta).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fixing null length in the target table, we should run the rewrite without starting version to fully restart all manifest files. But we should keep the incremental behavior the same if starting version is provided.

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to keep the original delta rewrite behavior before we can merge this

@vaultah
Copy link
Author

vaultah commented Aug 28, 2025

@dramaticlly @stevenzwu

Let's say we have manifests A, B, and C, added by snapshot 1. We then create snapshot 2: its manifest list will contain references to manifests A, B, and C, and we also add the reference to the new manifest D.

Now we use rewrite_table_path in incremental mode, starting from snapshot 1.

Per your suggestion, it will rewrite just manifest D and the manifest list of snapshot 2. We assume that manifests added in snapshot 1 were already rewritten before, so we simply update their paths in the manifest list of snapshot 2.

For manifest D, the rewritten manifest list will have the new path and the new size. For manifests A, B, C, the rewritten manifest list will have new paths and their original lengths. In other words, the rewritten manifest list will be

manifest_path manifest_length ...
newPath(D) newLength(D)
newPath(C) oldLength(C)
newPath(B) oldLength(B)
newPath(A) oldLength(A)

As a result of rewriting, manifest length will almost certainly change, so in general oldLength(A) != newLength(A), which means the size of manifest A in the rewritten manifest list is incorrect, as in it doesn't match the length of the actual physical file at newPath(A) that it's referencing. This is the scenario from #13719 that I'm trying to solve.

Please help me understand how the correctness is maintained in your suggestion

@dramaticlly
Copy link
Contributor

@dramaticlly @stevenzwu

Let's say we have manifests A, B, and C, added by snapshot 1. We then create snapshot 2: its manifest list will contain references to manifests A, B, and C, and we also add the reference to the new manifest D.

Now we use rewrite_table_path in incremental mode, starting from snapshot 1.

Per your suggestion, it will rewrite just manifest D and the manifest list of snapshot 2. We assume that manifests added in snapshot 1 were already rewritten before, so we simply update their paths in the manifest list of snapshot 2.

For manifest D, the rewritten manifest list will have the new path and the new size. For manifests A, B, C, the rewritten manifest list will have new paths and their original lengths. In other words, the rewritten manifest list will be

manifest_path manifest_length ...
newPath(D) newLength(D)
newPath(C) oldLength(C)
newPath(B) oldLength(B)
newPath(A) oldLength(A)
As a result of rewriting, manifest length will almost certainly change, so in general oldLength(A) != newLength(A), which means the size of manifest A in the rewritten manifest list is incorrect, as in it doesn't match the length of the actual physical file at newPath(A) that it's referencing. This is the scenario from #13719 that I'm trying to solve.

Please help me understand how the correctness is maintained in your suggestion

I think we are trying to do 2 things at the same time

  1. incremental copy between existing and new version files, let's say snapshot 2 produced new manifest D and manifest A/B/C already exists in target table before this incremental copy. We want to rewrite the metadata files needed to copy all the incremental files over from source to target
  2. fix the size for existing manifest A/B/C in latest manifest-list or snapshot 2

As of now, we are trying to rewrite manifest A/B/C/D as part of this SparkAction, but only manifest D is strictly required if sizing is not a problem.

Personally, to properly fix on all historical manifest sizing in the target table, we might need more than just incremental copy, so the recommendation is do a complete rewrite (non-incremental) for one time fix. All future rewrite of table path afterward will be delta based

@vaultah
Copy link
Author

vaultah commented Aug 28, 2025

@dramaticlly sorry, I'm not sure I follow. Re. 1: My point is that even in incremental mode, even when we rewrite a single manifest list (snapshot 2), we need to update lengths for all manifests that it references (A, B, C, D). If we don't do that, per your suggestion, the rewritten manifest list will only have correct lengths for the manifests that were rewritten in the same run (D), leaving lengths of other manifests (A, B, C) as they were in the source table (i.e., as if they were never rewritten) and therefore incorrect. Then, after this manifest list is added to the target table, the target table will have the problem described in #13719.

Re. 2: fixing existing manifest references is not in scope of this PR. This PR fixes rewrite_table_path so it produces correct manifest lists going forward. In a correct manifest list, manifest_length matches the size of the file at manifest_path for all manifest references. To guarantee that, we need to rewrite all referenced manifests.

@dramaticlly
Copy link
Contributor

@dramaticlly sorry, I'm not sure I follow. Re. 1: My point is that even in incremental mode, even when we rewrite a single manifest list (snapshot 2), we need to update lengths for all manifests that it references (A, B, C, D). If we don't do that, per your suggestion, the rewritten manifest list will only have correct lengths for the manifests that were rewritten in the same run (D), leaving lengths of other manifests (A, B, C) as they were in the source table (i.e., as if they were never rewritten) and therefore incorrect. Then, after this manifest list is added to the target table, the target table will have the problem described in #13719.

Re. 2: fixing existing manifest references is not in scope of this PR. This PR fixes rewrite_table_path so it produces correct manifest lists going forward. In a correct manifest list, manifest_length matches the size of the file at manifest_path for all manifest references. To guarantee that, we need to rewrite all referenced manifests.

Re 1. I get what you mean now, even though manifest A, B, C was copied earlier with correct length in target table, the incremental rewrite for snapshot 2 cannot obtain such length from source table, so they unfortunately need rewrite to get the correct length. And this essentially means incremental rewrite is no longer limited to the changing snapshots. Let me think a bit more on this.

@vaultah
Copy link
Author

vaultah commented Sep 2, 2025

Hi @dramaticlly, just wanted to check in on this. Please let me know if you have any thoughts on the path forward when you get a chance. Thanks!

@dramaticlly
Copy link
Contributor

Hi @dramaticlly, just wanted to check in on this. Please let me know if you have any thoughts on the path forward when you get a chance. Thanks!

I am going to raise this in iceberg community sync tmr (Sept 3rd 9am PT) https://iceberg.apache.org/community/#apache-iceberg-community-calendar and collect some feedback. Feel free to join us if time works for you

@github-actions
Copy link

github-actions bot commented Oct 5, 2025

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Oct 5, 2025
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Oct 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

rewrite_table_path does not update sizes of rewritten manifests in manifest lists

6 participants