Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
Expand Down Expand Up @@ -49,7 +48,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.failedFuture;
import static io.airlift.concurrent.MoreFutures.toCompletableFuture;
import static io.airlift.units.DataSize.succinctBytes;
Expand Down Expand Up @@ -349,9 +347,9 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
}

OptionalInt bucketNumber = toBucketNumber(partitionHandle);
ListenableFuture<List<ConnectorSplit>> future = queues.borrowBatchAsync(bucketNumber, maxSize, internalSplits -> {
ListenableFuture<ImmutableList<HiveSplit>> future = queues.borrowBatchAsync(bucketNumber, maxSize, internalSplits -> {
ImmutableList.Builder<InternalHiveSplit> splitsToInsertBuilder = ImmutableList.builder();
ImmutableList.Builder<ConnectorSplit> resultBuilder = ImmutableList.builder();
ImmutableList.Builder<HiveSplit> resultBuilder = ImmutableList.builder();
int removedEstimatedSizeInBytes = 0;
int removedSplitCount = 0;
for (InternalHiveSplit internalSplit : internalSplits) {
Expand Down Expand Up @@ -425,16 +423,20 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
estimatedSplitSizeInBytes.addAndGet(-removedEstimatedSizeInBytes);
bufferedInternalSplitCount.addAndGet(-removedSplitCount);

List<InternalHiveSplit> splitsToInsert = splitsToInsertBuilder.build();
List<ConnectorSplit> result = resultBuilder.build();
return new AsyncQueue.BorrowResult<>(splitsToInsert, result);
return new AsyncQueue.BorrowResult<>(splitsToInsertBuilder.build(), resultBuilder.build());
});

ListenableFuture<ConnectorSplitBatch> transform = Futures.transform(future, splits -> {
requireNonNull(splits, "splits is null");
return toCompletableFuture(future).thenApply(hiveSplits -> {
requireNonNull(hiveSplits, "hiveSplits is null");
if (recordScannedFiles) {
splits.forEach(split -> scannedFilePaths.add(((HiveSplit) split).getPath()));
hiveSplits.stream()
.filter(split -> split.getStart() == 0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a chance that split startin at 0 could be filtered out alltogether? While still another splits for same file are present? Can this happen with DF for example? cc: @raunaqmorarka ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not be possible, no- the first split generated for a file will always have start == 0. Now, it is possible that a file will get eliminated by dynamic filters after the first split is produced which could then eliminate the remainder of the file from being scanned, but that could happen even with the previous implementation. As far as I can tell, this logic is only necessary for "optimizing small files" and collecting the files scanned to delete them after the contents are rewritten- which should not have dynamic filters in the first place because it would be unsafe for this reason with or without this improvement.

.map(HiveSplit::getPath)
.forEach(scannedFilePaths::add);
}
// This won't actually initiate a copy since hiveSplits is already an ImmutableList, but it will
// let us convert from List<HiveSplit> to List<ConnectorSplit> without casting
List<ConnectorSplit> splits = ImmutableList.copyOf(hiveSplits);
if (noMoreSplits) {
// Checking splits.isEmpty() here is required for thread safety.
// Let's say there are 10 splits left, and max number of splits per batch is 5.
Expand All @@ -451,9 +453,7 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
else {
return new ConnectorSplitBatch(splits, false);
}
}, directExecutor());

return toCompletableFuture(transform);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ public synchronized ListenableFuture<Void> offer(T element)
return immediateVoidFuture();
}

private synchronized void offerAll(List<T> elementsToInsert)
{
requireNonNull(elementsToInsert);
if (finishing && borrowerCount == 0) {
return;
}
boolean wasEmpty = elements.isEmpty();
elements.addAll(elementsToInsert);
if (wasEmpty && !elements.isEmpty()) {
completeAsync(executor, notEmptySignal);
notEmptySignal = SettableFuture.create();
}
}

public synchronized int size()
{
return elements.size();
Expand Down Expand Up @@ -205,8 +219,9 @@ else if (finishing && borrowerCount == 0) {
checkArgument(borrowResult.getElementsToInsert().isEmpty(), "Function must not insert anything when no element is borrowed");
return borrowResult.getResult();
}
for (T element : borrowResult.getElementsToInsert()) {
offer(element);
List<T> elementsToInsert = borrowResult.getElementsToInsert();
if (!elementsToInsert.isEmpty()) {
offerAll(elementsToInsert);
}
return borrowResult.getResult();
}
Expand Down