|
149 | 149 | import org.apache.iceberg.ManifestFile; |
150 | 150 | import org.apache.iceberg.ManifestFiles; |
151 | 151 | import org.apache.iceberg.ManifestReader; |
| 152 | +import org.apache.iceberg.ManifestUtils; |
152 | 153 | import org.apache.iceberg.MetadataColumns; |
153 | 154 | import org.apache.iceberg.PartitionField; |
154 | 155 | import org.apache.iceberg.PartitionSpec; |
|
206 | 207 | import java.util.HashSet; |
207 | 208 | import java.util.Iterator; |
208 | 209 | import java.util.LinkedHashMap; |
| 210 | +import java.util.LinkedList; |
209 | 211 | import java.util.List; |
210 | 212 | import java.util.Map; |
211 | 213 | import java.util.Optional; |
212 | 214 | import java.util.OptionalLong; |
| 215 | +import java.util.Queue; |
213 | 216 | import java.util.Set; |
214 | 217 | import java.util.concurrent.Callable; |
215 | 218 | import java.util.concurrent.ConcurrentHashMap; |
|
238 | 241 | import static com.google.common.collect.ImmutableSet.toImmutableSet; |
239 | 242 | import static com.google.common.collect.Iterables.getLast; |
240 | 243 | import static com.google.common.collect.Iterables.getOnlyElement; |
| 244 | +import static com.google.common.collect.Lists.newArrayList; |
241 | 245 | import static com.google.common.collect.Maps.transformValues; |
242 | 246 | import static com.google.common.collect.Sets.difference; |
243 | 247 | import static io.trino.filesystem.Locations.isS3Tables; |
@@ -2251,12 +2255,13 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl |
2251 | 2255 | ImmutableSet.Builder<String> validMetadataFileNames = ImmutableSet.builder(); |
2252 | 2256 | ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder(); |
2253 | 2257 |
|
2254 | | - for (Snapshot snapshot : table.snapshots()) { |
2255 | | - if (snapshot.manifestListLocation() != null) { |
2256 | | - validMetadataFileNames.add(fileName(snapshot.manifestListLocation())); |
| 2258 | + ArrayList<String> manifestListLocations = newArrayList(Iterables.transform(table.snapshots(), Snapshot::manifestListLocation)); |
| 2259 | + for (String manifestListLocation : manifestListLocations) { |
| 2260 | + if (manifestListLocation != null) { |
| 2261 | + validMetadataFileNames.add(fileName(manifestListLocation)); |
2257 | 2262 | } |
2258 | 2263 |
|
2259 | | - for (ManifestFile manifest : loadAllManifestsFromSnapshot(table, snapshot)) { |
| 2264 | + for (ManifestFile manifest : loadAllManifestsFromManifestList(table, manifestListLocation)) { |
2260 | 2265 | if (!processedManifestFilePaths.add(manifest.path())) { |
2261 | 2266 | // Already read this manifest |
2262 | 2267 | continue; |
@@ -3460,6 +3465,21 @@ private static List<ManifestFile> loadAllManifestsFromSnapshot(Table icebergTabl |
3460 | 3465 | } |
3461 | 3466 | } |
3462 | 3467 |
|
| 3468 | + /** |
| 3469 | + * Use instead of loadAllManifestsFromSnapshot when loading manifests from multiple distinct snapshots |
| 3470 | + * Each BaseSnapshot object caches manifest files separately, so loading manifests from multiple distinct snapshots |
| 3471 | + * results in O(num_snapshots^2) copies of the same manifest file metadata in memory |
| 3472 | + */ |
| 3473 | + private static List<ManifestFile> loadAllManifestsFromManifestList(Table icebergTable, String manifestListLocation) |
| 3474 | + { |
| 3475 | + try { |
| 3476 | + return ManifestUtils.read(icebergTable.io(), manifestListLocation); |
| 3477 | + } |
| 3478 | + catch (NotFoundException | UncheckedIOException e) { |
| 3479 | + throw new TrinoException(ICEBERG_INVALID_METADATA, "Error accessing manifest file for table %s".formatted(icebergTable.name()), e); |
| 3480 | + } |
| 3481 | + } |
| 3482 | + |
3463 | 3483 | private static Set<Integer> identityPartitionColumnsInAllSpecs(Table table) |
3464 | 3484 | { |
3465 | 3485 | // Extract identity partition column source ids common to ALL specs |
|
0 commit comments