From 0743c409053bd2fd7d86e89cc205d2c14989c596 Mon Sep 17 00:00:00 2001 From: James Petty Date: Thu, 9 Jun 2022 15:12:02 -0400 Subject: [PATCH 1/2] Add less lock hungry AsyncQueue#offerAll Adds a method to handle batch insertion of all elements to insert returned from AsyncQueue.BorrowResult. This simplifies the implementation and threads contending to synchronize on the queue for each individual element inserted. --- .../io/trino/plugin/hive/util/AsyncQueue.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java index 79f2323c3712..eb1b3d4e14b8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AsyncQueue.java @@ -116,6 +116,20 @@ public synchronized ListenableFuture offer(T element) return immediateVoidFuture(); } + private synchronized void offerAll(List elementsToInsert) + { + requireNonNull(elementsToInsert); + if (finishing && borrowerCount == 0) { + return; + } + boolean wasEmpty = elements.isEmpty(); + elements.addAll(elementsToInsert); + if (wasEmpty && !elements.isEmpty()) { + completeAsync(executor, notEmptySignal); + notEmptySignal = SettableFuture.create(); + } + } + public synchronized int size() { return elements.size(); @@ -205,8 +219,9 @@ else if (finishing && borrowerCount == 0) { checkArgument(borrowResult.getElementsToInsert().isEmpty(), "Function must not insert anything when no element is borrowed"); return borrowResult.getResult(); } - for (T element : borrowResult.getElementsToInsert()) { - offer(element); + List elementsToInsert = borrowResult.getElementsToInsert(); + if (!elementsToInsert.isEmpty()) { + offerAll(elementsToInsert); } return borrowResult.getResult(); } From e9bfb3db2a0ad3095b1ca0c52cea4823b41d1ad5 Mon Sep 17 00:00:00 2001 From: James Petty Date: Thu, 9 Jun 2022 15:13:14 -0400 Subject: [PATCH 2/2] Avoid recording redundant scannedFiles in Hive Only records the Path for scanned files in HiveSplitSource when the split start point is at the beginning of the file. The only consumer of the scanned files list is only interested in unique paths scanned, so enqueuing the same path repeatedly for the same InternalHiveSplit is unnecessarily expensive. --- .../io/trino/plugin/hive/HiveSplitSource.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java index 5ab32345f6f7..fc627d5b123d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java @@ -15,7 +15,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.log.Logger; import io.airlift.stats.CounterStat; @@ -49,7 +48,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.failedFuture; import static io.airlift.concurrent.MoreFutures.toCompletableFuture; import static io.airlift.units.DataSize.succinctBytes; @@ -349,9 +347,9 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan } OptionalInt bucketNumber = toBucketNumber(partitionHandle); - ListenableFuture> future = queues.borrowBatchAsync(bucketNumber, maxSize, internalSplits -> { + ListenableFuture> future = queues.borrowBatchAsync(bucketNumber, maxSize, internalSplits -> { ImmutableList.Builder splitsToInsertBuilder = ImmutableList.builder(); - ImmutableList.Builder resultBuilder = ImmutableList.builder(); + ImmutableList.Builder resultBuilder = ImmutableList.builder(); int removedEstimatedSizeInBytes = 0; int removedSplitCount = 0; for (InternalHiveSplit internalSplit : internalSplits) { @@ -425,16 +423,20 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { estimatedSplitSizeInBytes.addAndGet(-removedEstimatedSizeInBytes); bufferedInternalSplitCount.addAndGet(-removedSplitCount); - List splitsToInsert = splitsToInsertBuilder.build(); - List result = resultBuilder.build(); - return new AsyncQueue.BorrowResult<>(splitsToInsert, result); + return new AsyncQueue.BorrowResult<>(splitsToInsertBuilder.build(), resultBuilder.build()); }); - ListenableFuture transform = Futures.transform(future, splits -> { - requireNonNull(splits, "splits is null"); + return toCompletableFuture(future).thenApply(hiveSplits -> { + requireNonNull(hiveSplits, "hiveSplits is null"); if (recordScannedFiles) { - splits.forEach(split -> scannedFilePaths.add(((HiveSplit) split).getPath())); + hiveSplits.stream() + .filter(split -> split.getStart() == 0) + .map(HiveSplit::getPath) + .forEach(scannedFilePaths::add); } + // This won't actually initiate a copy since hiveSplits is already an ImmutableList, but it will + // let us convert from List to List without casting + List splits = ImmutableList.copyOf(hiveSplits); if (noMoreSplits) { // Checking splits.isEmpty() here is required for thread safety. // Let's say there are 10 splits left, and max number of splits per batch is 5. @@ -451,9 +453,7 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { else { return new ConnectorSplitBatch(splits, false); } - }, directExecutor()); - - return toCompletableFuture(transform); + }); } @Override