Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IcebergManifestUtils;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
Expand Down Expand Up @@ -2260,11 +2261,18 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl
ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder();

for (Snapshot snapshot : table.snapshots()) {
if (snapshot.manifestListLocation() != null) {
validMetadataFileNames.add(fileName(snapshot.manifestListLocation()));
String manifestListLocation = snapshot.manifestListLocation();
List<ManifestFile> allManifests;
if (manifestListLocation != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we have tests using v1 iceberg format for remove orphan files. We should add at least one as a sanity check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a special case of V1 tables where write.manifest-lists.enabled is set to false. However, the iceberg library removed the ability to write embedded manifest list a few years ago (apache/iceberg#5773). So we don't have a way to produce such data in the tests.
It might be possible to generate such data from an old version and test on that pre-generated dataset, but I think that is overkill at the moment. The fallback logic looks safe enough to me.

validMetadataFileNames.add(fileName(manifestListLocation));
allManifests = loadAllManifestsFromManifestList(table, manifestListLocation);
}
else {
// This is to maintain support for V1 tables which have embedded manifest lists
allManifests = loadAllManifestsFromSnapshot(table, snapshot);
}

for (ManifestFile manifest : loadAllManifestsFromSnapshot(table, snapshot)) {
for (ManifestFile manifest : allManifests) {
if (!processedManifestFilePaths.add(manifest.path())) {
// Already read this manifest
continue;
Expand Down Expand Up @@ -3469,6 +3477,21 @@ private static List<ManifestFile> loadAllManifestsFromSnapshot(Table icebergTabl
}
}

/**
* Use instead of loadAllManifestsFromSnapshot when loading manifests from multiple distinct snapshots
* Each BaseSnapshot object caches manifest files separately, so loading manifests from multiple distinct snapshots
* results in O(num_snapshots^2) copies of the same manifest file metadata in memory
*/
private static List<ManifestFile> loadAllManifestsFromManifestList(Table icebergTable, String manifestListLocation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this move to ManifestUtils ?

Copy link
Contributor Author

@grantatspothero grantatspothero May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the org.apache.iceberg package.

The intention of this class was to utilize the package private method ManifestLists.read.

We want the behavior of BaseSnapshot.allManifests() without caching, but iceberg does not support that. My workaround works fine for v2 iceberg tables, but I'm concerned it might not work with v1 iceberg tables. See:
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/BaseSnapshot.java#L174-L186

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to restrict the fix to V2 iceberg tables while we look for a solution for V1 tables in https://apache-iceberg.slack.com/archives/C03LG1D563F/p1747921709647279 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grantatspothero i've pushed changes here to restrict the fix to v2 tables, PTAL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something we have to do this PR, but can we start moving these types of utilities out into a separate utility class? We have lots of utilities throughout with similar/close functionality repeated, and I imagine most occasional contributors (like myself) don't even know they exist because they are placed in the calling class as a private method.

{
try {
return IcebergManifestUtils.read(icebergTable.io(), manifestListLocation);
}
catch (NotFoundException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_INVALID_METADATA, "Error accessing manifest file for table %s".formatted(icebergTable.name()), e);
}
}

private static Set<Integer> identityPartitionColumnsInAllSpecs(Table table)
{
// Extract identity partition column source ids common to ALL specs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.io.FileIO;

import java.util.List;

public class IcebergManifestUtils
{
private IcebergManifestUtils() {}

public static List<ManifestFile> read(FileIO fileIO, String manifestListLocation)
{
// Avoid using snapshot.allManifests() when processing multiple snapshots,
// as each Snapshot instance internally caches `org.apache.iceberg.BaseSnapshot.allManifests`
// and leads to high memory usage
return ManifestLists.read(fileIO.newInputFile(manifestListLocation));
}
}