diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index fd2e8fcb6206..b26e0e263486 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -350,34 +350,51 @@ private Stream getV2CheckpointTransactionLogEntrie { // Sidecar files contain only ADD and REMOVE entry types. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec Set dataEntryTypes = Sets.intersection(entryTypes, Set.of(ADD, REMOVE)); - List>> logEntryStreamFutures = - getV2CheckpointEntries(session, entryTypes, metadataEntry, protocolEntry, checkpointSchemaManager, typeManager, stats, checkpoint, checkpointFile, partitionConstraint, addStatsMinMaxColumnFilter, fileSystem, fileSize) - .map(v2checkpointEntry -> { - if (v2checkpointEntry.getSidecar() == null || dataEntryTypes.isEmpty()) { - return CompletableFuture.completedFuture(Stream.of(v2checkpointEntry)); - } - // Sidecar files are retrieved in parallel using a bounded executor - return supplyAsync(() -> { - Location sidecar = checkpointFile.location().sibling("_sidecars").appendPath(v2checkpointEntry.getSidecar().path()); - CheckpointEntryIterator iterator = new CheckpointEntryIterator( - fileSystem.newInputFile(sidecar), - session, - v2checkpointEntry.getSidecar().sizeInBytes(), - checkpointSchemaManager, - typeManager, - dataEntryTypes, - metadataEntry, - protocolEntry, - stats, - parquetReaderOptions, - checkpointRowStatisticsWritingEnabled, - domainCompactionThreshold, - partitionConstraint, - addStatsMinMaxColumnFilter); - return stream(iterator).onClose(iterator::close); - }, executor); - }) - .collect(toImmutableList()); + Stream v2CheckpointEntries = getV2CheckpointEntries( + session, + entryTypes, + metadataEntry, + protocolEntry, + checkpointSchemaManager, + typeManager, + stats, + checkpoint, + checkpointFile, + partitionConstraint, + addStatsMinMaxColumnFilter, + fileSystem, + fileSize); + if (dataEntryTypes.isEmpty()) { + return v2CheckpointEntries; + } + + List>> logEntryStreamFutures = v2CheckpointEntries + .map(v2checkpointEntry -> { + if (v2checkpointEntry.getSidecar() == null) { + return CompletableFuture.completedFuture(Stream.of(v2checkpointEntry)); + } + // Sidecar files are retrieved in parallel using a bounded executor + return supplyAsync(() -> { + Location sidecar = checkpointFile.location().sibling("_sidecars").appendPath(v2checkpointEntry.getSidecar().path()); + CheckpointEntryIterator iterator = new CheckpointEntryIterator( + fileSystem.newInputFile(sidecar), + session, + v2checkpointEntry.getSidecar().sizeInBytes(), + checkpointSchemaManager, + typeManager, + dataEntryTypes, + metadataEntry, + protocolEntry, + stats, + parquetReaderOptions, + checkpointRowStatisticsWritingEnabled, + domainCompactionThreshold, + partitionConstraint, + addStatsMinMaxColumnFilter); + return stream(iterator).onClose(iterator::close); + }, executor); + }) + .collect(toImmutableList()); // Return the stream to retrieve the values of the futures lazily and allow streamlined split generation return logEntryStreamFutures.stream() .mapMulti((logEntryStream, builder)