diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java index 5ab32345f6f7..fc627d5b123d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java @@ -15,7 +15,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.log.Logger; import io.airlift.stats.CounterStat; @@ -49,7 +48,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.failedFuture; import static io.airlift.concurrent.MoreFutures.toCompletableFuture; import static io.airlift.units.DataSize.succinctBytes; @@ -349,9 +347,9 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan } OptionalInt bucketNumber = toBucketNumber(partitionHandle); - ListenableFuture> future = queues.borrowBatchAsync(bucketNumber, maxSize, internalSplits -> { + ListenableFuture> future = queues.borrowBatchAsync(bucketNumber, maxSize, internalSplits -> { ImmutableList.Builder splitsToInsertBuilder = ImmutableList.builder(); - ImmutableList.Builder resultBuilder = ImmutableList.builder(); + ImmutableList.Builder resultBuilder = ImmutableList.builder(); int removedEstimatedSizeInBytes = 0; int removedSplitCount = 0; for (InternalHiveSplit internalSplit : internalSplits) { @@ -425,16 +423,20 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { estimatedSplitSizeInBytes.addAndGet(-removedEstimatedSizeInBytes); bufferedInternalSplitCount.addAndGet(-removedSplitCount); - List splitsToInsert = splitsToInsertBuilder.build(); - List result = resultBuilder.build(); - return new AsyncQueue.BorrowResult<>(splitsToInsert, result); + return new AsyncQueue.BorrowResult<>(splitsToInsertBuilder.build(), resultBuilder.build()); }); - ListenableFuture transform = Futures.transform(future, splits -> { - requireNonNull(splits, "splits is null"); + return toCompletableFuture(future).thenApply(hiveSplits -> { + requireNonNull(hiveSplits, "hiveSplits is null"); if (recordScannedFiles) { - splits.forEach(split -> scannedFilePaths.add(((HiveSplit) split).getPath())); + hiveSplits.stream() + .filter(split -> split.getStart() == 0) + .map(HiveSplit::getPath) + .forEach(scannedFilePaths::add); } + // This won't actually initiate a copy since hiveSplits is already an ImmutableList, but it will + // let us convert from List to List without casting + List splits = ImmutableList.copyOf(hiveSplits); if (noMoreSplits) { // Checking splits.isEmpty() here is required for thread safety. // Let's say there are 10 splits left, and max number of splits per batch is 5. @@ -451,9 +453,7 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { else { return new ConnectorSplitBatch(splits, false); } - }, directExecutor()); - - return toCompletableFuture(transform); + }); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java index 79f2323c3712..eb1b3d4e14b8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java @@ -116,6 +116,20 @@ public synchronized ListenableFuture offer(T element) return immediateVoidFuture(); } + private synchronized void offerAll(List elementsToInsert) + { + requireNonNull(elementsToInsert); + if (finishing && borrowerCount == 0) { + return; + } + boolean wasEmpty = elements.isEmpty(); + elements.addAll(elementsToInsert); + if (wasEmpty && !elements.isEmpty()) { + completeAsync(executor, notEmptySignal); + notEmptySignal = SettableFuture.create(); + } + } + public synchronized int size() { return elements.size(); @@ -205,8 +219,9 @@ else if (finishing && borrowerCount == 0) { checkArgument(borrowResult.getElementsToInsert().isEmpty(), "Function must not insert anything when no element is borrowed"); return borrowResult.getResult(); } - for (T element : borrowResult.getElementsToInsert()) { - offer(element); + List elementsToInsert = borrowResult.getElementsToInsert(); + if (!elementsToInsert.isEmpty()) { + offerAll(elementsToInsert); } return borrowResult.getResult(); }