Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,34 +350,51 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
{
// Sidecar files contain only ADD and REMOVE entry types. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec
Set<CheckpointEntryIterator.EntryType> dataEntryTypes = Sets.intersection(entryTypes, Set.of(ADD, REMOVE));
List<CompletableFuture<Stream<DeltaLakeTransactionLogEntry>>> logEntryStreamFutures =
getV2CheckpointEntries(session, entryTypes, metadataEntry, protocolEntry, checkpointSchemaManager, typeManager, stats, checkpoint, checkpointFile, partitionConstraint, addStatsMinMaxColumnFilter, fileSystem, fileSize)
.map(v2checkpointEntry -> {
if (v2checkpointEntry.getSidecar() == null || dataEntryTypes.isEmpty()) {
return CompletableFuture.completedFuture(Stream.of(v2checkpointEntry));
}
// Sidecar files are retrieved in parallel using a bounded executor
return supplyAsync(() -> {
Location sidecar = checkpointFile.location().sibling("_sidecars").appendPath(v2checkpointEntry.getSidecar().path());
CheckpointEntryIterator iterator = new CheckpointEntryIterator(
fileSystem.newInputFile(sidecar),
session,
v2checkpointEntry.getSidecar().sizeInBytes(),
checkpointSchemaManager,
typeManager,
dataEntryTypes,
metadataEntry,
protocolEntry,
stats,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
addStatsMinMaxColumnFilter);
return stream(iterator).onClose(iterator::close);
}, executor);
})
.collect(toImmutableList());
Stream<DeltaLakeTransactionLogEntry> v2CheckpointEntries = getV2CheckpointEntries(
session,
entryTypes,
metadataEntry,
protocolEntry,
checkpointSchemaManager,
typeManager,
stats,
checkpoint,
checkpointFile,
partitionConstraint,
addStatsMinMaxColumnFilter,
fileSystem,
fileSize);
if (dataEntryTypes.isEmpty()) {
return v2CheckpointEntries;
}

List<CompletableFuture<Stream<DeltaLakeTransactionLogEntry>>> logEntryStreamFutures = v2CheckpointEntries
.map(v2checkpointEntry -> {
if (v2checkpointEntry.getSidecar() == null) {
return CompletableFuture.completedFuture(Stream.of(v2checkpointEntry));
}
// Sidecar files are retrieved in parallel using a bounded executor
return supplyAsync(() -> {
Location sidecar = checkpointFile.location().sibling("_sidecars").appendPath(v2checkpointEntry.getSidecar().path());
CheckpointEntryIterator iterator = new CheckpointEntryIterator(
fileSystem.newInputFile(sidecar),
session,
v2checkpointEntry.getSidecar().sizeInBytes(),
checkpointSchemaManager,
typeManager,
dataEntryTypes,
metadataEntry,
protocolEntry,
stats,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
addStatsMinMaxColumnFilter);
return stream(iterator).onClose(iterator::close);
}, executor);
})
.collect(toImmutableList());
// Return the stream to retrieve the values of the futures lazily and allow streamlined split generation
return logEntryStreamFutures.stream()
.mapMulti((logEntryStream, builder)
Expand Down
Loading