diff --git a/core/trino-main/src/main/java/io/trino/exchange/LazyExchangeDataSource.java b/core/trino-main/src/main/java/io/trino/exchange/LazyExchangeDataSource.java index 58e697d7bf71..4e3aabf6b864 100644 --- a/core/trino-main/src/main/java/io/trino/exchange/LazyExchangeDataSource.java +++ b/core/trino-main/src/main/java/io/trino/exchange/LazyExchangeDataSource.java @@ -25,10 +25,7 @@ import io.trino.spi.QueryId; import io.trino.spi.exchange.ExchangeId; import io.trino.spi.exchange.ExchangeManager; -import io.trino.spi.exchange.ExchangeSource; -import io.trino.spi.exchange.ExchangeSourceHandle; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -116,19 +113,14 @@ public void addInput(ExchangeInput input) return; } ExchangeDataSource dataSource = delegate.get(); - boolean inputAdded = false; if (dataSource == null) { if (input instanceof DirectExchangeInput) { DirectExchangeClient client = directExchangeClientSupplier.get(queryId, exchangeId, systemMemoryContext, taskFailureListener, retryPolicy); dataSource = new DirectExchangeDataSource(client); } else if (input instanceof SpoolingExchangeInput) { - SpoolingExchangeInput spoolingExchangeInput = (SpoolingExchangeInput) input; ExchangeManager exchangeManager = exchangeManagerRegistry.getExchangeManager(); - List sourceHandles = spoolingExchangeInput.getExchangeSourceHandles(); - ExchangeSource exchangeSource = exchangeManager.createSource(sourceHandles); - dataSource = new SpoolingExchangeDataSource(exchangeSource, systemMemoryContext); - inputAdded = true; + dataSource = new SpoolingExchangeDataSource(exchangeManager.createSource(), systemMemoryContext); } else { throw new IllegalArgumentException("Unexpected input: " + input); @@ -136,9 +128,7 @@ else if (input instanceof SpoolingExchangeInput) { delegate.set(dataSource); initialized = true; } - if (!inputAdded) { - dataSource.addInput(input); - } + dataSource.addInput(input); } if (initialized) { diff --git a/core/trino-main/src/main/java/io/trino/exchange/SpoolingExchangeDataSource.java b/core/trino-main/src/main/java/io/trino/exchange/SpoolingExchangeDataSource.java index 62a44ea83c4f..a088aa21429e 100644 --- a/core/trino-main/src/main/java/io/trino/exchange/SpoolingExchangeDataSource.java +++ b/core/trino-main/src/main/java/io/trino/exchange/SpoolingExchangeDataSource.java @@ -86,14 +86,22 @@ public ListenableFuture isBlocked() @Override public void addInput(ExchangeInput input) { - throw new UnsupportedOperationException("only a single input is expected"); + SpoolingExchangeInput spoolingExchangeInput = (SpoolingExchangeInput) input; + ExchangeSource exchangeSource = this.exchangeSource; + if (exchangeSource == null) { + return; + } + exchangeSource.addSourceHandles(spoolingExchangeInput.getExchangeSourceHandles()); } @Override public void noMoreInputs() { - // Only a single input is expected when the spooling exchange is used. - // Thus the assumption of "noMoreSplit" is made on construction. + ExchangeSource exchangeSource = this.exchangeSource; + if (exchangeSource == null) { + return; + } + exchangeSource.noMoreSourceHandles(); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTask.java b/core/trino-main/src/main/java/io/trino/execution/SqlTask.java index c8e290eecc7e..10cf4e1ba666 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTask.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTask.java @@ -204,11 +204,6 @@ private void initialize(Consumer onDone, CounterStat failedTasks) }); } - public boolean isOutputBufferOverutilized() - { - return outputBuffer.isOverutilized(); - } - public SqlTaskIoStats getIoStats() { return taskHolderReference.get().getIoStats(); @@ -322,7 +317,8 @@ else if (taskHolder.getTaskExecution() != null) { dynamicFiltersVersion = taskContext.getDynamicFiltersVersion(); } - return new TaskStatus(taskStateMachine.getTaskId(), + return new TaskStatus( + taskStateMachine.getTaskId(), taskInstanceId, versionNumber, state, @@ -331,7 +327,7 @@ else if (taskHolder.getTaskExecution() != null) { failures, queuedPartitionedDrivers, runningPartitionedDrivers, - isOutputBufferOverutilized(), + outputBuffer.getStatus(), physicalWrittenDataSize, userMemoryReservation, peakUserMemoryReservation, diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskStatus.java b/core/trino-main/src/main/java/io/trino/execution/TaskStatus.java index b7e035c1d102..16d1334567c7 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskStatus.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskStatus.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.execution.buffer.OutputBufferStatus; import java.net.URI; import java.util.List; @@ -55,7 +56,7 @@ public class TaskStatus private final long queuedPartitionedSplitsWeight; private final int runningPartitionedDrivers; private final long runningPartitionedSplitsWeight; - private final boolean outputBufferOverutilized; + private final OutputBufferStatus outputBufferStatus; private final DataSize physicalWrittenDataSize; private final DataSize memoryReservation; private final DataSize peakMemoryReservation; @@ -79,7 +80,7 @@ public TaskStatus( @JsonProperty("failures") List failures, @JsonProperty("queuedPartitionedDrivers") int queuedPartitionedDrivers, @JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers, - @JsonProperty("outputBufferOverutilized") boolean outputBufferOverutilized, + @JsonProperty("outputBufferStatus") OutputBufferStatus outputBufferStatus, @JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize, @JsonProperty("memoryReservation") DataSize memoryReservation, @JsonProperty("peakMemoryReservation") DataSize peakMemoryReservation, @@ -109,7 +110,7 @@ public TaskStatus( checkArgument(runningPartitionedSplitsWeight >= 0, "runningPartitionedSplitsWeight must be positive"); this.runningPartitionedSplitsWeight = runningPartitionedSplitsWeight; - this.outputBufferOverutilized = outputBufferOverutilized; + this.outputBufferStatus = requireNonNull(outputBufferStatus, "outputBufferStatus is null"); this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null"); @@ -186,9 +187,9 @@ public DataSize getPhysicalWrittenDataSize() } @JsonProperty - public boolean isOutputBufferOverutilized() + public OutputBufferStatus getOutputBufferStatus() { - return outputBufferOverutilized; + return outputBufferStatus; } @JsonProperty @@ -260,7 +261,7 @@ public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String n ImmutableList.of(), 0, 0, - false, + OutputBufferStatus.initial(), DataSize.ofBytes(0), DataSize.ofBytes(0), DataSize.ofBytes(0), @@ -284,7 +285,7 @@ public static TaskStatus failWith(TaskStatus taskStatus, TaskState state, List= 0.5) || !stateMachine.getState().canAddPages(); + // do not grab lock to acquire outputBuffers to avoid delaying TaskStatus response + return OutputBufferStatus.builder(outputBuffers.getVersion()) + .setOverutilized(memoryManager.getUtilization() >= 0.5 || !stateMachine.getState().canAddPages()) + .build(); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java index 5b40b9ac447b..c05410bc2626 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java @@ -63,7 +63,7 @@ public class BroadcastOutputBuffer private final PagesReleasedListener onPagesReleased; @GuardedBy("this") - private OutputBuffers outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(BROADCAST); + private volatile OutputBuffers outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(BROADCAST); @GuardedBy("this") private final Map buffers = new ConcurrentHashMap<>(); @@ -112,9 +112,12 @@ public double getUtilization() } @Override - public boolean isOverutilized() + public OutputBufferStatus getStatus() { - return (getUtilization() > 0.5) && stateMachine.getState().canAddPages(); + // do not grab lock to acquire outputBuffers to avoid delaying TaskStatus response + return OutputBufferStatus.builder(outputBuffers.getVersion()) + .setOverutilized(getUtilization() > 0.5 && stateMachine.getState().canAddPages()) + .build(); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java index 9f257b50499a..7d7ddd384197 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java @@ -109,12 +109,13 @@ public double getUtilization() } @Override - public boolean isOverutilized() + public OutputBufferStatus getStatus() { OutputBuffer outputBuffer = getDelegateOutputBuffer(); - - // until output buffer is initialized, readers cannot enqueue and thus cannot be blocked - return (outputBuffer != null) && outputBuffer.isOverutilized(); + if (outputBuffer == null) { + return OutputBufferStatus.initial(); + } + return outputBuffer.getStatus(); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java index 37ce169516cb..06fa1d024a32 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java @@ -41,9 +41,9 @@ public interface OutputBuffer double getUtilization(); /** - * Check if the buffer is blocking producers. + * Get buffer status */ - boolean isOverutilized(); + OutputBufferStatus getStatus(); /** * Add a listener which fires anytime the buffer state changes. diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferStatus.java b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferStatus.java new file mode 100644 index 000000000000..4266d2bcdbc0 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferStatus.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.buffer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.OptionalLong; + +import static java.util.Objects.requireNonNull; + +public class OutputBufferStatus +{ + private static final OutputBufferStatus INITIAL = new OutputBufferStatus(OptionalLong.empty(), false, false); + + private final OptionalLong outputBuffersVersion; + private final boolean overutilized; + private final boolean exchangeSinkInstanceHandleUpdateRequired; + + @JsonCreator + public OutputBufferStatus( + @JsonProperty("outputBuffersVersion") OptionalLong outputBuffersVersion, + @JsonProperty("overutilized") boolean overutilized, + @JsonProperty("exchangeSinkInstanceHandleUpdateRequired") boolean exchangeSinkInstanceHandleUpdateRequired) + { + this.outputBuffersVersion = requireNonNull(outputBuffersVersion, "outputBuffersVersion is null"); + this.overutilized = overutilized; + this.exchangeSinkInstanceHandleUpdateRequired = exchangeSinkInstanceHandleUpdateRequired; + } + + @JsonProperty + public OptionalLong getOutputBuffersVersion() + { + return outputBuffersVersion; + } + + @JsonProperty + public boolean isOverutilized() + { + return overutilized; + } + + @JsonProperty + public boolean isExchangeSinkInstanceHandleUpdateRequired() + { + return exchangeSinkInstanceHandleUpdateRequired; + } + + public static OutputBufferStatus initial() + { + return INITIAL; + } + + public static Builder builder(long outputBuffersVersion) + { + return new Builder(outputBuffersVersion); + } + + public static class Builder + { + private final OptionalLong outputBuffersVersion; + private boolean overutilized; + private boolean exchangeSinkInstanceHandleUpdateRequired; + + public Builder(long outputBuffersVersion) + { + this.outputBuffersVersion = OptionalLong.of(outputBuffersVersion); + } + + public Builder setOverutilized(boolean overutilized) + { + this.overutilized = overutilized; + return this; + } + + public Builder setExchangeSinkInstanceHandleUpdateRequired(boolean exchangeSinkInstanceHandleUpdateRequired) + { + this.exchangeSinkInstanceHandleUpdateRequired = exchangeSinkInstanceHandleUpdateRequired; + return this; + } + + public OutputBufferStatus build() + { + return new OutputBufferStatus(outputBuffersVersion, overutilized, exchangeSinkInstanceHandleUpdateRequired); + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java index 0a7dfcac1d09..3e35ab55d094 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java @@ -96,9 +96,11 @@ public double getUtilization() } @Override - public boolean isOverutilized() + public OutputBufferStatus getStatus() { - return memoryManager.isOverutilized(); + return OutputBufferStatus.builder(outputBuffers.getVersion()) + .setOverutilized(memoryManager.isOverutilized()) + .build(); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java index e52860fcc7dc..bc4aaf221241 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java @@ -21,6 +21,7 @@ import io.trino.execution.StateMachine; import io.trino.memory.context.LocalMemoryContext; import io.trino.spi.exchange.ExchangeSink; +import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import javax.annotation.concurrent.ThreadSafe; @@ -44,7 +45,7 @@ public class SpoolingExchangeOutputBuffer private static final Logger log = Logger.get(SpoolingExchangeOutputBuffer.class); private final OutputBufferStateMachine stateMachine; - private final OutputBuffers outputBuffers; + private volatile OutputBuffers outputBuffers; // This field is not final to allow releasing the memory retained by the ExchangeSink instance. // It is modified (assigned to null) when the OutputBuffer is destroyed (either finished or aborted). // It doesn't have to be declared as volatile as the nullification of this variable doesn't have to be immediately visible to other threads. @@ -103,9 +104,15 @@ public double getUtilization() } @Override - public boolean isOverutilized() + public OutputBufferStatus getStatus() { - return false; + // do not grab lock to acquire outputBuffers to avoid delaying TaskStatus response + OutputBufferStatus.Builder result = OutputBufferStatus.builder(outputBuffers.getVersion()); + ExchangeSink sink = exchangeSink; + if (sink != null) { + result.setExchangeSinkInstanceHandleUpdateRequired(sink.isHandleUpdateRequired()); + } + return result.build(); } @Override @@ -115,7 +122,7 @@ public void addStateChangeListener(StateMachine.StateChangeListener } @Override - public void setOutputBuffers(OutputBuffers newOutputBuffers) + public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers) { requireNonNull(newOutputBuffers, "newOutputBuffers is null"); @@ -127,6 +134,16 @@ public void setOutputBuffers(OutputBuffers newOutputBuffers) // no more buffers can be added but verify this is valid state change outputBuffers.checkValidTransition(newOutputBuffers); + + ExchangeSink sink = exchangeSink; + if (sink != null) { + ExchangeSinkInstanceHandle exchangeSinkInstanceHandle = newOutputBuffers.getExchangeSinkInstanceHandle() + .orElseThrow(() -> new IllegalArgumentException("exchange sink handle is expected to be present")); + sink.updateHandle(exchangeSinkInstanceHandle); + } + + // assign output buffers only after updating the sink to avoid triggering an extra update + outputBuffers = newOutputBuffers; } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/Exchanges.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/Exchanges.java new file mode 100644 index 000000000000..a38afa32adf1 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/Exchanges.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import io.trino.spi.exchange.ExchangeSourceHandle; +import io.trino.spi.exchange.ExchangeSourceHandleSource; +import io.trino.spi.exchange.ExchangeSourceHandleSource.ExchangeSourceHandleBatch; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static com.google.common.base.Preconditions.checkState; + +public final class Exchanges +{ + private Exchanges() {} + + public static ListenableFuture> getAllSourceHandles(ExchangeSourceHandleSource handleSource) + { + return new AbstractFuture>() + { + private final ImmutableList.Builder handles = ImmutableList.builder(); + private CompletableFuture nextBatchFuture; + + private synchronized AbstractFuture> process() + { + if (isDone()) { + return this; + } + try { + checkState(nextBatchFuture == null || nextBatchFuture.isDone(), "nextBatchFuture is expected to be done"); + nextBatchFuture = handleSource.getNextBatch(); + nextBatchFuture.whenComplete((result, failure) -> { + if (failure != null) { + setException(failure); + handleSource.close(); + return; + } + handles.addAll(result.handles()); + if (result.lastBatch()) { + set(handles.build()); + handleSource.close(); + return; + } + process(); + }); + } + catch (Throwable t) { + setException(t); + handleSource.close(); + } + return this; + } + + @Override + protected synchronized void interruptTask() + { + if (nextBatchFuture != null) { + nextBatchFuture.cancel(true); + nextBatchFuture = null; + } + handleSource.close(); + } + }.process(); + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java index 1721016c82c7..035128936460 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.MoreFutures; import io.airlift.concurrent.SetThreadName; import io.airlift.log.Logger; import io.airlift.stats.TimeStat; @@ -285,7 +284,7 @@ private Scheduler createScheduler() verify(!outputStages.isEmpty(), "coordinatorConsumedExchanges is empty"); List>> futures = outputStages.stream() .map(Exchange::getSourceHandles) - .map(MoreFutures::toListenableFuture) + .map(Exchanges::getAllSourceHandles) .collect(toImmutableList()); addSuccessCallback(Futures.allAsList(futures), result -> { List handles = result.stream() diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java index 1b87b22fb2f3..610060619dad 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java @@ -81,12 +81,12 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.getFutureValue; -import static io.airlift.concurrent.MoreFutures.toListenableFuture; import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor; import static io.trino.SystemSessionProperties.getRetryInitialDelay; import static io.trino.SystemSessionProperties.getRetryMaxDelay; import static io.trino.execution.buffer.OutputBuffers.createSpoolingExchangeOutputBuffers; import static io.trino.execution.scheduler.ErrorCodes.isOutOfMemoryError; +import static io.trino.execution.scheduler.Exchanges.getAllSourceHandles; import static io.trino.failuredetector.FailureDetector.State.GONE; import static io.trino.operator.ExchangeOperator.REMOTE_CATALOG_HANDLE; import static io.trino.spi.ErrorType.EXTERNAL; @@ -258,7 +258,7 @@ public synchronized void schedule() if (taskSource == null) { Map>> sourceHandles = sourceExchanges.entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> toListenableFuture(entry.getValue().getSourceHandles()))); + .collect(toImmutableMap(Map.Entry::getKey, entry -> getAllSourceHandles(entry.getValue().getSourceHandles()))); List>> blockedFutures = sourceHandles.values().stream() .filter(future -> !future.isDone()) @@ -408,7 +408,7 @@ private void startTask(int partition, NodeAllocator.NodeLease nodeLease, MemoryR taskFinishedFuture = SettableFuture.create(); } - task.addStateChangeListener(taskStatus -> updateTaskStatus(taskStatus, exchangeSinkInstanceHandle)); + task.addStateChangeListener(taskStatus -> updateTaskStatus(taskStatus, sinkHandle)); task.addFinalTaskInfoListener(taskExecutionStats::update); task.start(); } @@ -544,7 +544,7 @@ private static Multimap createRemoteSplits(Multimap getTaskStatuses() @Override public boolean isAnyTaskBlocked() { - return getTaskStatuses().stream().anyMatch(TaskStatus::isOutputBufferOverutilized); + return getTaskStatuses().stream() + .map(TaskStatus::getOutputBufferStatus) + .anyMatch(OutputBufferStatus::isOverutilized); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java index 5843a4d2a103..03637e975e0f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java @@ -19,6 +19,7 @@ import io.airlift.units.DataSize; import io.trino.execution.RemoteTask; import io.trino.execution.TaskStatus; +import io.trino.execution.buffer.OutputBufferStatus; import io.trino.metadata.InternalNode; import java.util.Collection; @@ -94,7 +95,8 @@ private int getNewTaskCount() double fullTasks = sourceTasksProvider.get().stream() .filter(task -> !task.getState().isDone()) - .map(TaskStatus::isOutputBufferOverutilized) + .map(TaskStatus::getOutputBufferStatus) + .map(OutputBufferStatus::isOverutilized) .mapToDouble(full -> full ? 1.0 : 0.0) .average().orElse(0.0); diff --git a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java index 20820a2c0667..88464e984422 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java @@ -39,7 +39,6 @@ import io.trino.spi.exchange.ExchangeManager; import io.trino.spi.exchange.ExchangeSink; import io.trino.spi.exchange.ExchangeSinkHandle; -import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSource; import javax.annotation.concurrent.GuardedBy; @@ -56,7 +55,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -66,19 +67,20 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Multimaps.asMap; import static com.google.common.util.concurrent.Futures.addCallback; -import static com.google.common.util.concurrent.Futures.getUnchecked; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.toListenableFuture; +import static io.trino.execution.scheduler.Exchanges.getAllSourceHandles; import static io.trino.operator.RetryPolicy.NONE; import static io.trino.operator.RetryPolicy.QUERY; import static io.trino.spi.StandardErrorCode.REMOTE_TASK_FAILED; import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; import static java.lang.Math.max; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; public class DeduplicatingDirectExchangeBuffer implements DirectExchangeBuffer @@ -480,7 +482,7 @@ private static class PageBuffer @GuardedBy("this") private Exchange exchange; @GuardedBy("this") - private ExchangeSinkInstanceHandle sinkInstanceHandle; + private ExchangeSinkHandle sinkHandle; @GuardedBy("this") private ExchangeSink exchangeSink; @GuardedBy("this") @@ -534,16 +536,15 @@ public synchronized void addPages(TaskId taskId, List pages) if (exchangeSink == null) { verify(exchangeManager == null, "exchangeManager is not expected to be initialized"); verify(exchange == null, "exchange is not expected to be initialized"); - verify(sinkInstanceHandle == null, "sinkInstanceHandle is not expected to be initialized"); + verify(sinkHandle == null, "sinkHandle is not expected to be initialized"); verify(writeBuffer == null, "writeBuffer is not expected to be initialized"); exchangeManager = exchangeManagerRegistry.getExchangeManager(); exchange = exchangeManager.createExchange(new ExchangeContext(queryId, exchangeId), 1, true); - ExchangeSinkHandle sinkHandle = exchange.addSink(0); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle, 0); + sinkHandle = exchange.addSink(0); exchange.noMoreSinks(); - exchangeSink = exchangeManager.createSink(sinkInstanceHandle); + exchangeSink = exchangeManager.createSink(exchange.instantiateSink(sinkHandle, 0)); writeBuffer = new DynamicSliceOutput(DEFAULT_MAX_PAGE_SIZE_IN_BYTES); } @@ -575,7 +576,22 @@ private void writeToSink(TaskId taskId, List pages) verify(writeBuffer != null, "writeBuffer is expected to be initialized"); for (Slice page : pages) { // wait for the sink to unblock - getUnchecked(exchangeSink.isBlocked()); + while (true) { + try { + exchangeSink.isBlocked().get(1, SECONDS); + break; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + catch (TimeoutException e) { + updateSinkInstanceHandleIfNecessary(); + } + } writeBuffer.writeInt(taskId.getStageId().getId()); writeBuffer.writeInt(taskId.getPartitionId()); writeBuffer.writeInt(taskId.getAttemptId()); @@ -587,6 +603,18 @@ private void writeToSink(TaskId taskId, List pages) } } + private void updateSinkInstanceHandleIfNecessary() + { + verify(Thread.holdsLock(this), "this method is expected to be called under a lock"); + verify(exchange != null, "exchange is null"); + verify(exchangeSink != null, "exchangeSink is null"); + verify(sinkHandle != null, "sinkHandle is null"); + + if (exchangeSink.isHandleUpdateRequired()) { + exchangeSink.updateHandle(exchange.updateSinkInstanceHandle(sinkHandle, 0)); + } + } + public synchronized void removePagesForPreviousAttempts(int currentAttemptId) { checkState(!inputFinished, "input is finished"); @@ -628,7 +656,7 @@ public synchronized OutputSource createOutputSource(Set selectedTasks) verify(exchangeManager != null, "exchangeManager is expected to be initialized"); verify(exchange != null, "exchange is expected to be initialized"); - verify(sinkInstanceHandle != null, "sinkInstanceHandle is expected to be initialized"); + verify(sinkHandle != null, "sinkHandle is expected to be initialized"); // no more data will be added, the buffer can be safely discarded writeBuffer = null; @@ -636,14 +664,32 @@ public synchronized OutputSource createOutputSource(Set selectedTasks) // Finish ExchangeSink and create ExchangeSource asynchronously to avoid blocking an ExchangeClient thread for potentially substantial amount of time ListenableFuture exchangeSourceFuture = FluentFuture.from(toListenableFuture(exchangeSink.finish())) .transformAsync(ignored -> { - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle, 0); synchronized (this) { exchangeSink = null; - sinkInstanceHandle = null; + sinkHandle = null; } - return toListenableFuture(exchange.getSourceHandles()); + return getAllSourceHandles(exchange.getSourceHandles()); }, executor) - .transform(exchangeManager::createSource, executor); + .transform(handles -> { + ExchangeSource source = exchangeManager.createSource(); + try { + source.addSourceHandles(handles); + source.noMoreSourceHandles(); + return source; + } + catch (Throwable t) { + try { + source.close(); + } + catch (Throwable closeFailure) { + if (closeFailure != t) { + t.addSuppressed(closeFailure); + } + } + throw t; + } + }, executor); return new ExchangeOutputSource(selectedTasks, queryId, exchangeSourceFuture); } diff --git a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java index 0e5eacc2ecc8..958f7fdeacdd 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java @@ -179,8 +179,6 @@ public static final class MockRemoteTask private final PartitionedSplitCountTracker partitionedSplitCountTracker; - private boolean isOutputBufferOverUtilized; - public MockRemoteTask( TaskId taskId, PlanFragment fragment, @@ -256,7 +254,7 @@ public TaskInfo getTaskInfo() failures, 0, 0, - isOutputBufferOverUtilized, + outputBuffer.getStatus(), DataSize.ofBytes(0), DataSize.ofBytes(0), DataSize.ofBytes(0), @@ -289,7 +287,7 @@ public TaskStatus getTaskStatus() ImmutableList.of(), queuedSplitsInfo.getCount(), combinedSplitsInfo.getCount() - queuedSplitsInfo.getCount(), - isOutputBufferOverUtilized, + outputBuffer.getStatus(), stats.getPhysicalWrittenDataSize(), stats.getUserMemoryReservation(), stats.getPeakUserMemoryReservation(), @@ -358,11 +356,6 @@ public synchronized void startSplits(int maxRunning) updateSplitQueueSpace(); } - public synchronized void setOutputBufferOverUtilized(boolean isOutputBufferOverUtilized) - { - this.isOutputBufferOverUtilized = isOutputBufferOverUtilized; - } - @Override public void start() { diff --git a/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java b/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java index 6c6324ecea5c..a62bb60c309b 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java @@ -28,6 +28,7 @@ import io.trino.execution.StateMachine.StateChangeListener; import io.trino.execution.buffer.BufferState; import io.trino.execution.buffer.OutputBufferInfo; +import io.trino.execution.buffer.OutputBufferStatus; import io.trino.execution.buffer.OutputBuffers; import io.trino.metadata.InternalNode; import io.trino.metadata.Split; @@ -174,7 +175,7 @@ public TaskStatus getTaskStatus() failures, 0, 0, - false, + OutputBufferStatus.initial(), DataSize.of(0, BYTE), DataSize.of(0, BYTE), DataSize.of(0, BYTE), diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java index 60bdee93aa14..9f22a0ad61bc 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java @@ -311,6 +311,18 @@ private static class TestingExchangeSink private boolean finishCalled; private boolean abortCalled; + @Override + public boolean isHandleUpdateRequired() + { + return false; + } + + @Override + public void updateHandle(ExchangeSinkInstanceHandle handle) + { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture isBlocked() { diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java index 32ca3e4242ae..c0606d490a13 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java @@ -19,6 +19,7 @@ import io.trino.spi.exchange.ExchangeSinkHandle; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSourceHandle; +import io.trino.spi.exchange.ExchangeSourceHandleSource; import org.openjdk.jol.info.ClassLayout; import java.util.List; @@ -65,9 +66,15 @@ public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, } @Override - public void sinkFinished(ExchangeSinkInstanceHandle handle) + public ExchangeSinkInstanceHandle updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId) { - finishedSinks.add(((TestingExchangeSinkInstanceHandle) handle).getSinkHandle()); + throw new UnsupportedOperationException(); + } + + @Override + public void sinkFinished(ExchangeSinkHandle sinkHandle, int taskAttemptId) + { + finishedSinks.add((TestingExchangeSinkHandle) sinkHandle); } public Set getFinishedSinkHandles() @@ -76,9 +83,19 @@ public Set getFinishedSinkHandles() } @Override - public CompletableFuture> getSourceHandles() + public ExchangeSourceHandleSource getSourceHandles() { - return sourceHandles; + return new ExchangeSourceHandleSource() + { + @Override + public CompletableFuture getNextBatch() + { + return sourceHandles.thenApply(handles -> new ExchangeSourceHandleBatch(handles, true)); + } + + @Override + public void close() {} + }; } public void setSourceHandles(List handles) diff --git a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java index 60b602dcb87b..ff343692165e 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java @@ -26,6 +26,7 @@ import io.trino.execution.TaskStatus; import io.trino.execution.buffer.BufferState; import io.trino.execution.buffer.OutputBufferInfo; +import io.trino.execution.buffer.OutputBufferStatus; import io.trino.operator.TaskStats; import io.trino.plugin.base.metrics.TDigestHistogram; import org.joda.time.DateTime; @@ -229,7 +230,7 @@ private static TaskInfo buildTaskInfo(TaskId taskId, TaskState state, Duration s ImmutableList.of(), 0, 0, - false, + OutputBufferStatus.initial(), DataSize.of(1, DataSize.Unit.MEGABYTE), DataSize.of(1, DataSize.Unit.MEGABYTE), DataSize.of(1, DataSize.Unit.MEGABYTE), diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java index 25fded9678b4..2a42e4ab3767 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java @@ -24,6 +24,7 @@ import io.trino.execution.buffer.BufferState; import io.trino.execution.buffer.OutputBuffer; import io.trino.execution.buffer.OutputBufferInfo; +import io.trino.execution.buffer.OutputBufferStatus; import io.trino.execution.buffer.OutputBuffers; import io.trino.execution.buffer.PagesSerde; import io.trino.execution.buffer.PagesSerdeFactory; @@ -683,9 +684,9 @@ public double getUtilization() } @Override - public boolean isOverutilized() + public OutputBufferStatus getStatus() { - return false; + return OutputBufferStatus.initial(); } @Override diff --git a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java index c5a58bea526c..b7a1ba556dce 100644 --- a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java +++ b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java @@ -769,7 +769,7 @@ private TaskStatus buildTaskStatus() initialTaskStatus.getFailures(), initialTaskStatus.getQueuedPartitionedDrivers(), initialTaskStatus.getRunningPartitionedDrivers(), - initialTaskStatus.isOutputBufferOverutilized(), + initialTaskStatus.getOutputBufferStatus(), initialTaskStatus.getPhysicalWrittenDataSize(), initialTaskStatus.getMemoryReservation(), initialTaskStatus.getPeakMemoryReservation(), diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java index 6c0fcd647e74..dd1b9183af29 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java @@ -18,8 +18,6 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.Closeable; -import java.util.List; -import java.util.concurrent.CompletableFuture; @ThreadSafe @Experimental(eta = "2023-01-01") @@ -60,22 +58,30 @@ public interface Exchange */ ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId); + /** + * Update {@link ExchangeSinkInstanceHandle}. Update is requested by {@link ExchangeSink}. + * The updated {@link ExchangeSinkInstanceHandle} is expected to be set by {@link ExchangeSink#updateHandle(ExchangeSinkInstanceHandle)}. + * + * @param sinkHandle - handle returned by addSink + * @param taskAttemptId - attempt id + * @return updated handle + */ + ExchangeSinkInstanceHandle updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId); + /** * Called by the engine when an attempt finishes successfully. *

* This method is expected to be lightweight. An implementation shouldn't perform any long running blocking operations within this method. */ - void sinkFinished(ExchangeSinkInstanceHandle handle); + void sinkFinished(ExchangeSinkHandle sinkHandle, int taskAttemptId); /** - * Returns a future containing handles to be used to read data from an exchange. - *

- * Future must be resolved when the data is available to be read. + * Returns an {@link ExchangeSourceHandleSource} instance to be used to enumerate {@link ExchangeSourceHandle}s. * * @return Future containing a list of {@link ExchangeSourceHandle} to be sent to a - * worker that is needed to create an {@link ExchangeSource} using {@link ExchangeManager#createSource(List)} + * worker that is needed to create an {@link ExchangeSource} using {@link ExchangeManager#createSource()} */ - CompletableFuture> getSourceHandles(); + ExchangeSourceHandleSource getSourceHandles(); @Override void close(); diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java index 7f4a5e2172ca..2399bcdb136f 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java @@ -18,8 +18,6 @@ import javax.annotation.concurrent.ThreadSafe; -import java.util.List; - /** * Service provider interface for an external exchange *

@@ -70,23 +68,9 @@ public interface ExchangeManager ExchangeSink createSink(ExchangeSinkInstanceHandle handle); /** - * Called by a worker to create an {@link ExchangeSource} to read data corresponding to - * a given list of exchange source handles. - *

- * A single {@link ExchangeSourceHandle} corresponds to a single output partition - * (see {@link ExchangeSink#add(int, Slice)}). - *

- * Based on the partition statistic (such as partition size) coordinator may also decide - * to process several partitions by the same task. In such scenarios the handles - * list may contain more than a single element. + * Called by a worker to create an {@link ExchangeSource} to read exchange data. * - * @param handles list of {@link ExchangeSourceHandle}'s describing what exchange data to - * read. The full list of handles is returned by {@link Exchange#getSourceHandles}. - * The coordinator decides what items from that list should be handled by what task and creates - * sub-lists that are further getting sent to a worker to be read. - * The handles list may contain {@link ExchangeSourceHandle}'s created by more than - * a single {@link Exchange}. * @return {@link ExchangeSource} used by the engine to read data from an exchange */ - ExchangeSource createSource(List handles); + ExchangeSource createSource(); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSink.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSink.java index cc69c73cceb1..d4198241382e 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSink.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSink.java @@ -26,6 +26,19 @@ public interface ExchangeSink { CompletableFuture NOT_BLOCKED = CompletableFuture.completedFuture(null); + /** + * Returns {@code true} when {@link ExchangeSinkInstanceHandle} needs to be updated + * through {@link #updateHandle(ExchangeSinkInstanceHandle)} to make further progress + */ + boolean isHandleUpdateRequired(); + + /** + * Update {@link ExchangeSinkInstanceHandle}. Done by the engine upon request initiated by the {@link ExchangeSink} + * + * @param handle updated handle + */ + void updateHandle(ExchangeSinkInstanceHandle handle); + /** * Returns a future that will be completed when the exchange sink becomes * unblocked. If the exchange sink is not blocked, this method should return diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSource.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSource.java index 615d1d2f6d9a..bdbe06089c8c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSource.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSource.java @@ -20,6 +20,7 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.Closeable; +import java.util.List; import java.util.concurrent.CompletableFuture; @ThreadSafe @@ -29,6 +30,24 @@ public interface ExchangeSource { CompletableFuture NOT_BLOCKED = CompletableFuture.completedFuture(null); + /** + * Add more {@link ExchangeSourceHandle}'s to be read by the {@link ExchangeSource}. + * + * @param handles list of {@link ExchangeSourceHandle}'s describing what exchange data to + * read. The full list of handles is returned by {@link ExchangeSourceHandleSource}. + * The coordinator decides what items from that list should be handled by what task and creates + * sub-lists that are further getting sent to a worker to be read. + * The handles list may contain {@link ExchangeSourceHandle}'s created by more than + * a single {@link Exchange}. + */ + void addSourceHandles(List handles); + + /** + * Notify {@link ExchangeSource} that no more {@link ExchangeSourceHandle}'s will be added with the + * {@link #addSourceHandles(List)} method. + */ + void noMoreSourceHandles(); + /** * Returns a future that will be completed when the exchange source becomes * unblocked. If the exchange source is not blocked, this method should return diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceHandleSource.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceHandleSource.java new file mode 100644 index 000000000000..9366b99934b9 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceHandleSource.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.exchange; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; + +public interface ExchangeSourceHandleSource + extends Closeable +{ + /** + * Returns a next batch of {@link ExchangeSourceHandle}s. + *

+ * Cannot be called when a future returned by a previous invocation is not yet finished. + *

+ * {@link ExchangeSourceHandleBatch#lastBatch()} returns true when finished. + * + * @return a future containing a batch of {@link ExchangeSourceHandle}s. + */ + CompletableFuture getNextBatch(); + + @Override + void close(); + + record ExchangeSourceHandleBatch(List handles, boolean lastBatch) + { + public ExchangeSourceHandleBatch + { + handles = List.copyOf(requireNonNull(handles, "handles is null")); + } + } +} diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java index 4d2a777870ea..fbc1ca40f28c 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java @@ -25,7 +25,8 @@ public interface ExchangeStorageReader extends Closeable { - Slice read() throws IOException; + Slice read() + throws IOException; ListenableFuture isBlocked(); diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java index 0676b5138b08..41688234ca27 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java @@ -25,6 +25,7 @@ import io.trino.spi.exchange.ExchangeSinkHandle; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSourceHandle; +import io.trino.spi.exchange.ExchangeSourceHandleSource; import javax.annotation.concurrent.GuardedBy; import javax.crypto.SecretKey; @@ -153,11 +154,18 @@ public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, } @Override - public void sinkFinished(ExchangeSinkInstanceHandle handle) + public ExchangeSinkInstanceHandle updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId) + { + // this implementation never requests an update + throw new UnsupportedOperationException(); + } + + @Override + public void sinkFinished(ExchangeSinkHandle handle, int taskAttemptId) { synchronized (this) { - FileSystemExchangeSinkInstanceHandle instanceHandle = (FileSystemExchangeSinkInstanceHandle) handle; - finishedSinks.add(instanceHandle.getSinkHandle().getPartitionId()); + FileSystemExchangeSinkHandle sinkHandle = (FileSystemExchangeSinkHandle) handle; + finishedSinks.add(sinkHandle.getPartitionId()); } checkInputReady(); } @@ -277,9 +285,19 @@ private URI getTaskOutputDirectory(int taskPartitionId) } @Override - public CompletableFuture> getSourceHandles() + public ExchangeSourceHandleSource getSourceHandles() { - return exchangeSourceHandlesFuture; + return new ExchangeSourceHandleSource() + { + @Override + public CompletableFuture getNextBatch() + { + return exchangeSourceHandlesFuture.thenApply(handles -> new ExchangeSourceHandleBatch(handles, true)); + } + + @Override + public void close() {} + }; } @Override diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java index e35ddb0c2d04..8591541c01ca 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java @@ -21,7 +21,6 @@ import io.trino.spi.exchange.ExchangeSink; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSource; -import io.trino.spi.exchange.ExchangeSourceHandle; import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; @@ -30,12 +29,10 @@ import java.net.URI; import java.security.NoSuchAlgorithmException; -import java.util.AbstractMap; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.plugin.exchange.filesystem.FileSystemExchangeErrorCode.MAX_OUTPUT_PARTITION_COUNT_EXCEEDED; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -139,24 +136,11 @@ public ExchangeSink createSink(ExchangeSinkInstanceHandle handle) } @Override - public ExchangeSource createSource(List handles) + public ExchangeSource createSource() { - List sourceFiles = handles.stream() - .map(FileSystemExchangeSourceHandle.class::cast) - .map(handle -> { - Optional secretKey = handle.getSecretKey().map(key -> new SecretKeySpec(key, 0, key.length, "AES")); - return new AbstractMap.SimpleEntry<>(handle, secretKey); - }) - .flatMap(entry -> entry.getKey().getFiles().stream().map(fileStatus -> - new ExchangeSourceFile( - URI.create(fileStatus.getFilePath()), - entry.getValue(), - fileStatus.getFileSize()))) - .collect(toImmutableList()); return new FileSystemExchangeSource( exchangeStorage, stats, - sourceFiles, maxPageStorageSizeInBytes, exchangeSourceConcurrentReaders); } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java index 2432d05df848..771500204845 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java @@ -23,6 +23,7 @@ import io.airlift.slice.Slices; import io.trino.spi.TrinoException; import io.trino.spi.exchange.ExchangeSink; +import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import org.openjdk.jol.info.ClassLayout; import javax.annotation.concurrent.GuardedBy; @@ -110,6 +111,19 @@ public FileSystemExchangeSink( this.bufferPool = new BufferPool(stats, max(outputPartitionCount * exchangeSinkBuffersPerPartition, exchangeSinkBufferPoolMinSize), exchangeStorage.getWriteBufferSize()); } + @Override + public boolean isHandleUpdateRequired() + { + return false; + } + + @Override + public void updateHandle(ExchangeSinkInstanceHandle handle) + { + // this implementation never requests an update + throw new UnsupportedOperationException(); + } + // The future returned by {@link #isBlocked()} should only be considered as a best-effort hint. @Override public CompletableFuture isBlocked() diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java index dd32ea4f7220..6baf64df4279 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java @@ -14,105 +14,147 @@ package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableList; +import com.google.common.io.Closer; +import com.google.common.util.concurrent.SettableFuture; import io.airlift.slice.Slice; import io.trino.spi.exchange.ExchangeSource; +import io.trino.spi.exchange.ExchangeSourceHandle; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.io.UncheckedIOException; +import java.net.URI; +import java.util.AbstractMap; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static io.airlift.concurrent.MoreFutures.toCompletableFuture; import static io.airlift.concurrent.MoreFutures.whenAnyComplete; -import static java.lang.Math.min; import static java.util.Objects.requireNonNull; public class FileSystemExchangeSource implements ExchangeSource { + private final FileSystemExchangeStorage exchangeStorage; private final FileSystemExchangeStats stats; - private final List readers; - private volatile CompletableFuture blocked; - private volatile boolean closed; + private final int maxPageStorageSize; + private final int exchangeSourceConcurrentReaders; + + private final Queue files = new ConcurrentLinkedQueue<>(); + @GuardedBy("this") + private boolean noMoreFiles; + @GuardedBy("this") + private SettableFuture blockedOnSourceHandles = SettableFuture.create(); + + private final AtomicReference> readers = new AtomicReference<>(ImmutableList.of()); + private final AtomicReference> blocked = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(); public FileSystemExchangeSource( FileSystemExchangeStorage exchangeStorage, FileSystemExchangeStats stats, - List sourceFiles, int maxPageStorageSize, int exchangeSourceConcurrentReaders) { - requireNonNull(exchangeStorage, "exchangeStorage is null"); + this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); this.stats = requireNonNull(stats, "stats is null"); - Queue sourceFileQueue = new ArrayBlockingQueue<>(sourceFiles.size()); - sourceFileQueue.addAll(sourceFiles); - - int numReaders = min(sourceFiles.size(), exchangeSourceConcurrentReaders); + this.maxPageStorageSize = maxPageStorageSize; + this.exchangeSourceConcurrentReaders = exchangeSourceConcurrentReaders; + } - ImmutableList.Builder readers = ImmutableList.builder(); - for (int i = 0; i < numReaders; ++i) { - readers.add(exchangeStorage.createExchangeStorageReader(sourceFileQueue, maxPageStorageSize)); + @Override + public synchronized void addSourceHandles(List handles) + { + if (closed.get()) { + return; } - this.readers = readers.build(); + files.addAll(getFiles(handles)); + closeAndCreateReadersIfNecessary(); + } + + @Override + public synchronized void noMoreSourceHandles() + { + noMoreFiles = true; + closeAndCreateReadersIfNecessary(); } @Override public CompletableFuture isBlocked() { - CompletableFuture blocked = this.blocked; + if (closed.get()) { + return NOT_BLOCKED; + } + + CompletableFuture blocked = this.blocked.get(); if (blocked != null && !blocked.isDone()) { return blocked; } - for (ExchangeStorageReader reader : readers) { + + List readers = this.readers.get(); + // regular loop for efficiency + for (int i = 0; i < readers.size(); i++) { + ExchangeStorageReader reader = readers.get(i); if (reader.isBlocked().isDone()) { return NOT_BLOCKED; } } + synchronized (this) { - if (this.blocked == null || this.blocked.isDone()) { - this.blocked = stats.getExchangeSourceBlocked().record(toCompletableFuture( + if (!blockedOnSourceHandles.isDone()) { + blocked = toCompletableFuture(nonCancellationPropagating(blockedOnSourceHandles)); + } + else if (readers.isEmpty()) { + blocked = NOT_BLOCKED; + } + else { + blocked = toCompletableFuture( nonCancellationPropagating( whenAnyComplete(readers.stream() .map(ExchangeStorageReader::isBlocked) - .collect(toImmutableList()))))); + .collect(toImmutableList())))); } - return this.blocked; + blocked = stats.getExchangeSourceBlocked().record(blocked); + this.blocked.set(blocked); + return blocked; } } @Override public boolean isFinished() { - if (closed) { - return true; - } - - for (ExchangeStorageReader reader : readers) { - if (!reader.isFinished()) { - return false; - } - } - return true; + return closed.get(); } @Nullable @Override public Slice read() { - if (closed) { + if (closed.get()) { return null; } - for (ExchangeStorageReader reader : readers) { + Slice data = null; + List readers = this.readers.get(); + // regular loop for efficiency + for (int i = 0; i < readers.size(); i++) { + ExchangeStorageReader reader = readers.get(i); if (reader.isBlocked().isDone() && !reader.isFinished()) { try { - return reader.read(); + data = reader.read(); + break; } catch (IOException e) { throw new UncheckedIOException(e); @@ -120,30 +162,145 @@ public Slice read() } } - return null; + closeAndCreateReadersIfNecessary(); + + return data; } @Override public long getMemoryUsage() { long memoryUsage = 0; - for (ExchangeStorageReader reader : readers) { - memoryUsage += reader.getRetainedSize(); + List readers = this.readers.get(); + // regular loop for efficiency + for (int i = 0; i < readers.size(); i++) { + memoryUsage += readers.get(i).getRetainedSize(); } return memoryUsage; } @Override - public void close() + public synchronized void close() { // Make sure we will only close once + if (!closed.compareAndSet(false, true)) { + return; + } + files.clear(); + Closer closer = Closer.create(); + for (ExchangeStorageReader reader : readers.getAndSet(ImmutableList.of())) { + closer.register(reader); + } + try { + closer.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void closeAndCreateReadersIfNecessary() + { + int numberOfActiveReaders = getNumberOfActiveReaders(); + if (numberOfActiveReaders == exchangeSourceConcurrentReaders) { + return; + } + if (numberOfActiveReaders > 0 && files.isEmpty()) { + return; + } + + SettableFuture blockedOnSourceHandlesToBeUnblocked = null; synchronized (this) { - if (closed) { + if (closed.get()) { return; } - closed = true; + + List activeReaders = new ArrayList<>(); + for (ExchangeStorageReader reader : readers.get()) { + if (reader.isFinished()) { + reader.close(); + } + else { + activeReaders.add(reader); + } + } + try { + while (activeReaders.size() < exchangeSourceConcurrentReaders && !files.isEmpty()) { + ImmutableList.Builder readerFiles = ImmutableList.builder(); + long readerFileSize = 0; + while (!files.isEmpty()) { + ExchangeSourceFile file = files.peek(); + if (readerFileSize == 0 || readerFileSize + file.getFileSize() <= maxPageStorageSize + exchangeStorage.getWriteBufferSize()) { + readerFiles.add(file); + readerFileSize += file.getFileSize(); + files.poll(); + } + else { + break; + } + } + activeReaders.add(exchangeStorage.createExchangeStorageReader(readerFiles.build(), maxPageStorageSize)); + } + if (activeReaders.isEmpty()) { + if (noMoreFiles) { + blockedOnSourceHandlesToBeUnblocked = blockedOnSourceHandles; + close(); + } + else if (blockedOnSourceHandles.isDone()) { + blockedOnSourceHandles = SettableFuture.create(); + } + } + else if (!blockedOnSourceHandles.isDone()) { + blockedOnSourceHandlesToBeUnblocked = blockedOnSourceHandles; + } + this.readers.set(ImmutableList.copyOf(activeReaders)); + } + catch (Throwable t) { + for (ExchangeStorageReader reader : activeReaders) { + try { + reader.close(); + } + catch (Throwable closeFailure) { + if (closeFailure != t) { + t.addSuppressed(closeFailure); + } + } + } + throw t; + } } + if (blockedOnSourceHandlesToBeUnblocked != null) { + blockedOnSourceHandlesToBeUnblocked.set(null); + } + } - readers.forEach(ExchangeStorageReader::close); + private int getNumberOfActiveReaders() + { + List readers = this.readers.get(); + int result = 0; + // regular loop for efficiency + for (int i = 0; i < readers.size(); i++) { + ExchangeStorageReader reader = readers.get(i); + if (!reader.isFinished()) { + result++; + } + } + return result; + } + + private static List getFiles(List handles) + { + return handles.stream() + .map(FileSystemExchangeSourceHandle.class::cast) + .map(handle -> { + Optional secretKey = handle.getSecretKey().map(key -> new SecretKeySpec(key, 0, key.length, "AES")); + return new AbstractMap.SimpleEntry<>(handle, secretKey); + }) + .flatMap(entry -> entry.getKey().getFiles().stream().map(fileStatus -> + new ExchangeSourceFile( + URI.create(fileStatus.getFilePath()), + entry.getValue(), + fileStatus.getFileSize()))) + .collect(toImmutableList()); } } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java index d1aa22e86bea..7fb1d8b73dfc 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java @@ -21,14 +21,13 @@ import java.net.URI; import java.util.List; import java.util.Optional; -import java.util.Queue; public interface FileSystemExchangeStorage extends AutoCloseable { void createDirectories(URI dir) throws IOException; - ExchangeStorageReader createExchangeStorageReader(Queue sourceFiles, int maxPageStorageSize); + ExchangeStorageReader createExchangeStorageReader(List sourceFiles, int maxPageStorageSize); ExchangeStorageWriter createExchangeStorageWriter(URI file, Optional secretKey); diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java index d6952eb3acf2..5041d3db2705 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java @@ -61,6 +61,7 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Base64; import java.util.List; @@ -119,7 +120,7 @@ public void createDirectories(URI dir) } @Override - public ExchangeStorageReader createExchangeStorageReader(Queue sourceFiles, int maxPageStorageSize) + public ExchangeStorageReader createExchangeStorageReader(List sourceFiles, int maxPageStorageSize) { return new AzureExchangeStorageReader(blobServiceAsyncClient, sourceFiles, blockSize, maxPageStorageSize); } @@ -273,6 +274,7 @@ private static class AzureExchangeStorageReader private static final int INSTANCE_SIZE = ClassLayout.parseClass(AzureExchangeStorageReader.class).instanceSize(); private final BlobServiceAsyncClient blobServiceAsyncClient; + @GuardedBy("this") private final Queue sourceFiles; private final int blockSize; private final int bufferSize; @@ -291,12 +293,12 @@ private static class AzureExchangeStorageReader public AzureExchangeStorageReader( BlobServiceAsyncClient blobServiceAsyncClient, - Queue sourceFiles, + List sourceFiles, int blockSize, int maxPageStorageSize) { this.blobServiceAsyncClient = requireNonNull(blobServiceAsyncClient, "blobServiceAsyncClient is null"); - this.sourceFiles = requireNonNull(sourceFiles, "sourceFiles is null"); + this.sourceFiles = new ArrayDeque<>(requireNonNull(sourceFiles, "sourceFiles is null")); this.blockSize = blockSize; // Make sure buffer can accommodate at least one complete Slice, and keep reads aligned to block boundaries this.bufferSize = maxPageStorageSize + blockSize; @@ -373,6 +375,7 @@ public synchronized void close() inProgressReadFuture = immediateVoidFuture(); // such that we don't retain reference to the buffer } + @GuardedBy("this") private void fillBuffer() { if (currentFile == null || fileOffset == currentFile.getFileSize()) { diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java index 3a01955bd2b5..2f83718defff 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java @@ -49,6 +49,7 @@ import java.nio.file.Paths; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +import java.util.ArrayDeque; import java.util.List; import java.util.Optional; import java.util.Queue; @@ -78,7 +79,7 @@ public void createDirectories(URI dir) } @Override - public ExchangeStorageReader createExchangeStorageReader(Queue sourceFiles, int maxPageStorageSize) + public ExchangeStorageReader createExchangeStorageReader(List sourceFiles, int maxPageStorageSize) { return new LocalExchangeStorageReader(sourceFiles); } @@ -149,6 +150,7 @@ private static class LocalExchangeStorageReader { private static final int INSTANCE_SIZE = ClassLayout.parseClass(LocalExchangeStorageReader.class).instanceSize(); + @GuardedBy("this") private final Queue sourceFiles; @GuardedBy("this") @@ -156,9 +158,9 @@ private static class LocalExchangeStorageReader @GuardedBy("this") private boolean closed; - public LocalExchangeStorageReader(Queue sourceFiles) + public LocalExchangeStorageReader(List sourceFiles) { - this.sourceFiles = requireNonNull(sourceFiles, "sourceFiles is null"); + this.sourceFiles = new ArrayDeque<>(requireNonNull(sourceFiles, "sourceFiles is null")); } @Override diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java index a77067da9027..77b6abe4510c 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java @@ -90,6 +90,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -209,9 +210,9 @@ public void createDirectories(URI dir) } @Override - public ExchangeStorageReader createExchangeStorageReader(Queue sourceFiles, int maxPageStorageSize) + public ExchangeStorageReader createExchangeStorageReader(List sourceFiles, int maxPageStorageSize) { - return new S3ExchangeStorageReader(stats, s3AsyncClient, sourceFiles, multiUploadPartSize, maxPageStorageSize); + return new S3ExchangeStorageReader(stats, s3AsyncClient, multiUploadPartSize, sourceFiles, maxPageStorageSize); } @Override @@ -478,10 +479,11 @@ private static class S3ExchangeStorageReader private final S3FileSystemExchangeStorageStats stats; private final S3AsyncClient s3AsyncClient; - private final Queue sourceFiles; private final int partSize; private final int bufferSize; + @GuardedBy("this") + private final Queue sourceFiles; @GuardedBy("this") private ExchangeSourceFile currentFile; @GuardedBy("this") @@ -497,14 +499,14 @@ private static class S3ExchangeStorageReader public S3ExchangeStorageReader( S3FileSystemExchangeStorageStats stats, S3AsyncClient s3AsyncClient, - Queue sourceFiles, int partSize, + List sourceFiles, int maxPageStorageSize) { this.stats = requireNonNull(stats, "stats is null"); this.s3AsyncClient = requireNonNull(s3AsyncClient, "s3AsyncClient is null"); - this.sourceFiles = requireNonNull(sourceFiles, "sourceFiles is null"); this.partSize = partSize; + this.sourceFiles = new ArrayDeque<>(requireNonNull(sourceFiles, "sourceFiles is null")); // Make sure buffer can accommodate at least one complete Slice, and keep reads aligned to part boundaries this.bufferSize = maxPageStorageSize + partSize; diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java index 8025439292b0..d7f1963fa8e0 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java @@ -29,12 +29,15 @@ import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSource; import io.trino.spi.exchange.ExchangeSourceHandle; +import io.trino.spi.exchange.ExchangeSourceHandleSource.ExchangeSourceHandleBatch; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.ArrayDeque; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.function.Function; import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap; @@ -48,6 +51,7 @@ import static java.lang.Math.toIntExact; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertTrue; public abstract class AbstractTestExchangeManager { @@ -90,7 +94,7 @@ public void testHappyPath() 0, "0-0-1", 1, "0-1-1"), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle0, 0); sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 1); writeData( sinkInstanceHandle, @@ -100,7 +104,7 @@ public void testHappyPath() 0, "0-0-1", 1, "0-1-1"), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle0, 1); sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 2); writeData( sinkInstanceHandle, @@ -108,7 +112,7 @@ public void testHappyPath() 0, "failed", 1, "another failed"), false); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle0, 2); sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 0); writeData( @@ -119,7 +123,7 @@ public void testHappyPath() 0, "1-0-1", 1, "1-1-1"), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle1, 0); sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 1); writeData( sinkInstanceHandle, @@ -129,7 +133,7 @@ public void testHappyPath() 0, "1-0-1", 1, "1-1-1"), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle1, 1); sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 2); writeData( sinkInstanceHandle, @@ -137,7 +141,7 @@ public void testHappyPath() 0, "more failed", 1, "another failed"), false); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle1, 2); sinkInstanceHandle = exchange.instantiateSink(sinkHandle2, 2); writeData( @@ -146,9 +150,11 @@ public void testHappyPath() 0, "2-0-0", 1, "2-1-0"), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle2, 2); - List partitionHandles = exchange.getSourceHandles().get(); + ExchangeSourceHandleBatch sourceHandleBatch = exchange.getSourceHandles().getNextBatch().get(); + assertTrue(sourceHandleBatch.lastBatch()); + List partitionHandles = sourceHandleBatch.handles(); assertThat(partitionHandles).hasSize(2); Map partitions = partitionHandles.stream() @@ -187,7 +193,7 @@ public void testLargePages() .putAll(2, ImmutableList.of()) .build(), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle0, 0); sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 0); writeData( @@ -198,7 +204,7 @@ public void testLargePages() .putAll(2, ImmutableList.of(smallPage)) .build(), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle1, 0); sinkInstanceHandle = exchange.instantiateSink(sinkHandle2, 0); writeData( @@ -209,9 +215,11 @@ public void testLargePages() .putAll(2, ImmutableList.of(maxPage, largePage, mediumPage)) .build(), true); - exchange.sinkFinished(sinkInstanceHandle); + exchange.sinkFinished(sinkHandle2, 0); - List partitionHandles = exchange.getSourceHandles().get(); + ExchangeSourceHandleBatch sourceHandleBatch = exchange.getSourceHandles().getNextBatch().get(); + assertTrue(sourceHandleBatch.lastBatch()); + List partitionHandles = sourceHandleBatch.handles(); assertThat(partitionHandles).hasSize(10); ListMultimap partitions = partitionHandles.stream() @@ -259,12 +267,20 @@ private List readData(ExchangeSourceHandle handle) private List readData(List handles) { ImmutableList.Builder result = ImmutableList.builder(); - try (ExchangeSource source = exchangeManager.createSource(handles)) { + try (ExchangeSource source = exchangeManager.createSource()) { + Queue remainingHandles = new ArrayDeque<>(handles); while (!source.isFinished()) { Slice data = source.read(); if (data != null) { result.add(data.toStringUtf8()); } + ExchangeSourceHandle handle = remainingHandles.poll(); + if (handle != null) { + source.addSourceHandles(ImmutableList.of(handle)); + } + else { + source.noMoreSourceHandles(); + } } } return result.build();