diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 617d83f4ef81..9913528031b2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -105,6 +105,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -484,6 +485,7 @@ private static class Scheduler private int nextSchedulingPriority; private final Map nodeAcquisitions = new HashMap<>(); + private final Set tasksWaitingForSinkInstanceHandle = new HashSet<>(); private final SchedulingDelayer schedulingDelayer; @@ -577,6 +579,7 @@ public void run() failure = closeAndAddSuppressed(failure, nodeLease::release); } nodeAcquisitions.clear(); + tasksWaitingForSinkInstanceHandle.clear(); failure = closeAndAddSuppressed(failure, nodeAllocator); failure.ifPresent(queryStateMachine::transitionToFailed); @@ -884,33 +887,68 @@ private void processNodeAcquisitions() } else if (nodeLease.getNode().isDone()) { nodeAcquisitionIterator.remove(); - try { - InternalNode node = getDone(nodeLease.getNode()); - Optional remoteTask = stageExecution.schedule(scheduledTask.partitionId(), node); - remoteTask.ifPresent(task -> { - task.addStateChangeListener(createExchangeSinkInstanceHandleUpdateRequiredListener()); - task.addStateChangeListener(taskStatus -> { - if (taskStatus.getState().isDone()) { - nodeLease.release(); - } - }); - task.addFinalTaskInfoListener(taskExecutionStats::update); - task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.getTaskStatus()))); - nodeLease.attachTaskId(task.getTaskId()); - task.start(); - if (queryStateMachine.getQueryState() == QueryState.STARTING) { - queryStateMachine.transitionToRunning(); + tasksWaitingForSinkInstanceHandle.add(scheduledTask); + Optional getExchangeSinkInstanceHandleResult = stageExecution.getExchangeSinkInstanceHandle(scheduledTask.partitionId()); + if (getExchangeSinkInstanceHandleResult.isPresent()) { + CompletableFuture sinkInstanceHandleFuture = getExchangeSinkInstanceHandleResult.get().exchangeSinkInstanceHandleFuture(); + sinkInstanceHandleFuture.whenComplete((sinkInstanceHandle, throwable) -> { + if (throwable != null) { + eventQueue.add(new StageFailureEvent(scheduledTask.stageId, throwable)); + } + else { + eventQueue.add(new SinkInstanceHandleAcquiredEvent( + scheduledTask.stageId(), + scheduledTask.partitionId(), + nodeLease, + getExchangeSinkInstanceHandleResult.get().attempt(), + sinkInstanceHandle)); } }); - if (remoteTask.isEmpty()) { + } + else { + nodeLease.release(); + } + } + } + } + + @Override + public void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent) + { + ScheduledTask scheduledTask = new ScheduledTask(sinkInstanceHandleAcquiredEvent.getStageId(), sinkInstanceHandleAcquiredEvent.getPartitionId()); + verify(tasksWaitingForSinkInstanceHandle.remove(scheduledTask), "expected %s in tasksWaitingForSinkInstanceHandle", scheduledTask); + NodeLease nodeLease = sinkInstanceHandleAcquiredEvent.getNodeLease(); + int partitionId = sinkInstanceHandleAcquiredEvent.getPartitionId(); + StageId stageId = sinkInstanceHandleAcquiredEvent.getStageId(); + int attempt = sinkInstanceHandleAcquiredEvent.getAttempt(); + ExchangeSinkInstanceHandle sinkInstanceHandle = sinkInstanceHandleAcquiredEvent.getSinkInstanceHandle(); + StageExecution stageExecution = getStageExecution(stageId); + + try { + InternalNode node = getDone(nodeLease.getNode()); + Optional remoteTask = stageExecution.schedule(partitionId, sinkInstanceHandle, attempt, node); + remoteTask.ifPresent(task -> { + task.addStateChangeListener(createExchangeSinkInstanceHandleUpdateRequiredListener()); + task.addStateChangeListener(taskStatus -> { + if (taskStatus.getState().isDone()) { nodeLease.release(); } + }); + task.addFinalTaskInfoListener(taskExecutionStats::update); + task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.getTaskStatus()))); + nodeLease.attachTaskId(task.getTaskId()); + task.start(); + if (queryStateMachine.getQueryState() == QueryState.STARTING) { + queryStateMachine.transitionToRunning(); } - catch (ExecutionException e) { - throw new UncheckedExecutionException(e); - } + }); + if (remoteTask.isEmpty()) { + nodeLease.release(); } } + catch (ExecutionException e) { + throw new UncheckedExecutionException(e); + } } private StateChangeListener createExchangeSinkInstanceHandleUpdateRequiredListener() @@ -954,7 +992,7 @@ public void onSuccess(AssignmentResult result) @Override public void onFailure(Throwable t) { - eventQueue.add(new TaskSourceFailureEvent(stageExecution.getStageId(), t)); + eventQueue.add(new StageFailureEvent(stageExecution.getStageId(), t)); } }, queryExecutor)); } @@ -1004,7 +1042,15 @@ public void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateR { TaskId taskId = event.getTaskStatus().getTaskId(); StageExecution stageExecution = getStageExecution(taskId.getStageId()); - stageExecution.updateExchangeSinkInstanceHandle(taskId); + stageExecution.initializeUpdateOfExchangeSinkInstanceHandle(taskId, eventQueue); + } + + @Override + public void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) + { + TaskId taskId = event.getTaskId(); + StageExecution stageExecution = getStageExecution(taskId.getStageId()); + stageExecution.finalizeUpdateOfExchangeSinkInstanceHandle(taskId, event.getExchangeSinkInstanceHandle()); } @Override @@ -1026,8 +1072,8 @@ public void onSplitAssignment(SplitAssignmentEvent event) assignment.sealedPartitions().forEach(partitionId -> { Optional scheduledTask = stageExecution.sealPartition(partitionId); scheduledTask.ifPresent(prioritizedTask -> { - if (nodeAcquisitions.containsKey(prioritizedTask.task())) { - // task is already waiting for node + if (nodeAcquisitions.containsKey(prioritizedTask.task()) || tasksWaitingForSinkInstanceHandle.contains(prioritizedTask.task)) { + // task is already waiting for node or for sink instance handle return; } schedulingQueue.addOrUpdate(prioritizedTask); @@ -1040,11 +1086,10 @@ public void onSplitAssignment(SplitAssignmentEvent event) } @Override - public void onTaskSourceFailure(TaskSourceFailureEvent event) + public void onStageFailure(StageFailureEvent event) { StageExecution stageExecution = getStageExecution(event.getStageId()); stageExecution.fail(event.getFailure()); - stageExecution.taskDescriptorLoadingComplete(); } private StageExecution getStageExecution(StageId stageId) @@ -1242,7 +1287,26 @@ public void noMorePartitions() } } - public Optional schedule(int partitionId, InternalNode node) + public Optional getExchangeSinkInstanceHandle(int partitionId) + { + if (getState().isDone()) { + return Optional.empty(); + } + + StagePartition partition = getStagePartition(partitionId); + verify(partition.getRemainingAttempts() >= 0, "remaining attempts is expected to be greater than or equal to zero: %s", partition.getRemainingAttempts()); + + if (partition.isFinished()) { + return Optional.empty(); + } + + int attempt = maxTaskExecutionAttempts - partition.getRemainingAttempts(); + return Optional.of(new EventDrivenFaultTolerantQueryScheduler.GetExchangeSinkInstanceHandleResult( + exchange.instantiateSink(partition.getExchangeSinkHandle(), attempt), + attempt)); + } + + public Optional schedule(int partitionId, ExchangeSinkInstanceHandle exchangeSinkInstanceHandle, int attempt, InternalNode node) { if (getState().isDone()) { return Optional.empty(); @@ -1274,8 +1338,6 @@ public Optional schedule(int partitionId, InternalNode node) } } - int attempt = maxTaskExecutionAttempts - partition.getRemainingAttempts(); - ExchangeSinkInstanceHandle exchangeSinkInstanceHandle = exchange.instantiateSink(partition.getExchangeSinkHandle(), attempt); SpoolingOutputBuffers outputBuffers = SpoolingOutputBuffers.createInitial(exchangeSinkInstanceHandle, sinkPartitioningScheme.getPartitionCount()); Optional task = stage.createTask( node, @@ -1328,14 +1390,31 @@ private Map getSourceOutputSelectors() return result.buildOrThrow(); } - public void updateExchangeSinkInstanceHandle(TaskId taskId) + public void initializeUpdateOfExchangeSinkInstanceHandle(TaskId taskId, BlockingQueue eventQueue) { if (getState().isDone()) { return; } StagePartition partition = getStagePartition(taskId.getPartitionId()); - ExchangeSinkInstanceHandle exchangeSinkInstanceHandle = exchange.updateSinkInstanceHandle(partition.getExchangeSinkHandle(), taskId.getAttemptId()); - partition.updateExchangeSinkInstanceHandle(taskId, exchangeSinkInstanceHandle); + CompletableFuture exchangeSinkInstanceHandleFuture = exchange.updateSinkInstanceHandle(partition.getExchangeSinkHandle(), taskId.getAttemptId()); + + exchangeSinkInstanceHandleFuture.whenComplete((sinkInstanceHandle, throwable) -> { + if (throwable != null) { + eventQueue.add(new StageFailureEvent(taskId.getStageId(), throwable)); + } + else { + eventQueue.add(new RemoteTaskExchangeUpdatedSinkAcquired(taskId, sinkInstanceHandle)); + } + }); + } + + public void finalizeUpdateOfExchangeSinkInstanceHandle(TaskId taskId, ExchangeSinkInstanceHandle updatedExchangeSinkInstanceHandle) + { + if (getState().isDone()) { + return; + } + StagePartition partition = getStagePartition(taskId.getPartitionId()); + partition.updateExchangeSinkInstanceHandle(taskId, updatedExchangeSinkInstanceHandle); } public void taskFinished(TaskId taskId, TaskStatus taskStatus) @@ -1519,6 +1598,7 @@ public void fail(Throwable t) catch (IOException e) { throw new UncheckedIOException(e); } + taskDescriptorLoadingComplete(); } private Closer createStageExecutionCloser() @@ -1950,9 +2030,63 @@ private interface EventListener void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event); + void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event); + void onSplitAssignment(SplitAssignmentEvent event); - void onTaskSourceFailure(TaskSourceFailureEvent event); + void onStageFailure(StageFailureEvent event); + + void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent); + } + + private static class SinkInstanceHandleAcquiredEvent + implements Event + { + private final StageId stageId; + private final int partitionId; + private final NodeLease nodeLease; + private final int attempt; + private final ExchangeSinkInstanceHandle sinkInstanceHandle; + + public SinkInstanceHandleAcquiredEvent(StageId stageId, int partitionId, NodeLease nodeLease, int attempt, ExchangeSinkInstanceHandle sinkInstanceHandle) + { + this.stageId = requireNonNull(stageId, "stageId is null"); + this.partitionId = partitionId; + this.nodeLease = requireNonNull(nodeLease, "nodeLease is null"); + this.attempt = attempt; + this.sinkInstanceHandle = requireNonNull(sinkInstanceHandle, "sinkInstanceHandle is null"); + } + + public StageId getStageId() + { + return stageId; + } + + public int getPartitionId() + { + return partitionId; + } + + public NodeLease getNodeLease() + { + return nodeLease; + } + + public int getAttempt() + { + return attempt; + } + + public ExchangeSinkInstanceHandle getSinkInstanceHandle() + { + return sinkInstanceHandle; + } + + @Override + public void accept(EventListener listener) + { + listener.onSinkInstanceHandleAcquired(this); + } } private static class RemoteTaskCompletedEvent @@ -1985,6 +2119,35 @@ public void accept(EventListener listener) } } + private static class RemoteTaskExchangeUpdatedSinkAcquired + implements Event + { + private final TaskId taskId; + private final ExchangeSinkInstanceHandle exchangeSinkInstanceHandle; + + private RemoteTaskExchangeUpdatedSinkAcquired(TaskId taskId, ExchangeSinkInstanceHandle exchangeSinkInstanceHandle) + { + this.taskId = requireNonNull(taskId, "taskId is null"); + this.exchangeSinkInstanceHandle = requireNonNull(exchangeSinkInstanceHandle, "exchangeSinkInstanceHandle is null"); + } + + @Override + public void accept(EventListener listener) + { + listener.onRemoteTaskExchangeUpdatedSinkAcquired(this); + } + + public TaskId getTaskId() + { + return taskId; + } + + public ExchangeSinkInstanceHandle getExchangeSinkInstanceHandle() + { + return exchangeSinkInstanceHandle; + } + } + private abstract static class RemoteTaskEvent implements Event { @@ -2002,7 +2165,7 @@ public TaskStatus getTaskStatus() } private static class SplitAssignmentEvent - extends TaskSourceEvent + extends StageEvent { private final AssignmentResult assignmentResult; @@ -2024,12 +2187,12 @@ public void accept(EventListener listener) } } - private static class TaskSourceFailureEvent - extends TaskSourceEvent + private static class StageFailureEvent + extends StageEvent { private final Throwable failure; - public TaskSourceFailureEvent(StageId stageId, Throwable failure) + public StageFailureEvent(StageId stageId, Throwable failure) { super(stageId); this.failure = requireNonNull(failure, "failure is null"); @@ -2043,16 +2206,16 @@ public Throwable getFailure() @Override public void accept(EventListener listener) { - listener.onTaskSourceFailure(this); + listener.onStageFailure(this); } } - private abstract static class TaskSourceEvent + private abstract static class StageEvent implements Event { private final StageId stageId; - protected TaskSourceEvent(StageId stageId) + protected StageEvent(StageId stageId) { this.stageId = requireNonNull(stageId, "stageId is null"); } @@ -2062,4 +2225,12 @@ public StageId getStageId() return stageId; } } + + private record GetExchangeSinkInstanceHandleResult(CompletableFuture exchangeSinkInstanceHandleFuture, int attempt) + { + public GetExchangeSinkInstanceHandleResult + { + requireNonNull(exchangeSinkInstanceHandleFuture, "exchangeSinkInstanceHandleFuture is null"); + } + } } diff --git a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java index 3b4624619dea..f0168bd4ee41 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java @@ -28,6 +28,7 @@ import io.airlift.slice.SliceOutput; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.airlift.units.Duration; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.StageId; import io.trino.execution.TaskId; @@ -39,6 +40,7 @@ import io.trino.spi.exchange.ExchangeManager; import io.trino.spi.exchange.ExchangeSink; import io.trino.spi.exchange.ExchangeSinkHandle; +import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSource; import io.trino.spi.exchange.ExchangeSourceOutputSelector; @@ -81,6 +83,7 @@ import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; import static java.lang.Math.max; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; public class DeduplicatingDirectExchangeBuffer @@ -88,6 +91,8 @@ public class DeduplicatingDirectExchangeBuffer { private static final Logger log = Logger.get(DeduplicatingDirectExchangeBuffer.class); + private static final Duration SINK_INSTANCE_HANDLE_GET_TIMEOUT = Duration.succinctDuration(60, SECONDS); + private final Executor executor; private final RetryPolicy retryPolicy; @@ -265,6 +270,7 @@ public synchronized void noMoreTasks() checkInputFinished(); } + @GuardedBy("this") private void checkInputFinished() { if (failure != null) { @@ -281,7 +287,7 @@ private void checkInputFinished() Map failures; switch (retryPolicy) { - case TASK: { + case TASK -> { Set allPartitions = allTasks.stream() .map(TaskId::getPartitionId) .collect(toImmutableSet()); @@ -315,9 +321,8 @@ private void checkInputFinished() .filter(entry -> !successfulPartitions.contains(entry.getKey().getPartitionId())) .filter(entry -> !runningPartitions.contains(entry.getKey().getPartitionId())) .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - break; } - case QUERY: { + case QUERY -> { Set latestAttemptTasks = allTasks.stream() .filter(taskId -> taskId.getAttemptId() == maxAttemptId) .collect(toImmutableSet()); @@ -331,10 +336,8 @@ private void checkInputFinished() failures = failedTasks.entrySet().stream() .filter(entry -> entry.getKey().getAttemptId() == maxAttemptId) .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - break; } - default: - throw new UnsupportedOperationException("unexpected retry policy: " + retryPolicy); + default -> throw new UnsupportedOperationException("unexpected retry policy: " + retryPolicy); } Throwable failure = null; @@ -398,18 +401,21 @@ public synchronized long getMaxRetainedSizeInBytes() } @Override + @GuardedBy("this") public int getBufferedPageCount() { return pageBuffer.getBufferedPageCount(); } @Override + @GuardedBy("this") public long getSpilledBytes() { return pageBuffer.getSpilledBytes(); } @Override + @GuardedBy("this") public int getSpilledPageCount() { return pageBuffer.getSpilledPageCount(); @@ -425,12 +431,14 @@ public synchronized void close() closeAndUnblock(); } + @GuardedBy("this") private void fail(Throwable failure) { this.failure = failure; closeAndUnblock(); } + @GuardedBy("this") private void throwIfFailed() { if (failure != null) { @@ -439,6 +447,7 @@ private void throwIfFailed() } } + @GuardedBy("this") private void closeAndUnblock() { try (Closer closer = Closer.create()) { @@ -453,6 +462,7 @@ private void closeAndUnblock() } } + @GuardedBy("this") private void updateMaxRetainedSize() { maxRetainedSizeInBytes = max(maxRetainedSizeInBytes, getRetainedSizeInBytes()); @@ -545,7 +555,18 @@ public synchronized void addPages(TaskId taskId, List pages) sinkHandle = exchange.addSink(0); exchange.noMoreSinks(); - exchangeSink = exchangeManager.createSink(exchange.instantiateSink(sinkHandle, 0)); + ExchangeSinkInstanceHandle sinkInstanceHandle; + try { + sinkInstanceHandle = exchange.instantiateSink(this.sinkHandle, 0).get(SINK_INSTANCE_HANDLE_GET_TIMEOUT.toMillis(), MILLISECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + catch (Exception e) { + throw new RuntimeException(e); + } + exchangeSink = exchangeManager.createSink(sinkInstanceHandle); writeBuffer = new DynamicSliceOutput(DEFAULT_MAX_PAGE_SIZE_IN_BYTES); } @@ -571,6 +592,7 @@ private static long getRetainedSizeInBytes(List pages) return result; } + @GuardedBy("this") private void writeToSink(TaskId taskId, List pages) { verify(exchangeSink != null, "exchangeSink is expected to be initialized"); @@ -604,6 +626,7 @@ private void writeToSink(TaskId taskId, List pages) } } + @GuardedBy("this") private void updateSinkInstanceHandleIfNecessary() { verify(Thread.holdsLock(this), "this method is expected to be called under a lock"); @@ -612,7 +635,19 @@ private void updateSinkInstanceHandleIfNecessary() verify(sinkHandle != null, "sinkHandle is null"); if (exchangeSink.isHandleUpdateRequired()) { - exchangeSink.updateHandle(exchange.updateSinkInstanceHandle(sinkHandle, 0)); + CompletableFuture sinkInstanceHandleFuture = exchange.updateSinkInstanceHandle(sinkHandle, 0); + ExchangeSinkInstanceHandle sinkInstanceHandle; + try { + sinkInstanceHandle = sinkInstanceHandleFuture.get(SINK_INSTANCE_HANDLE_GET_TIMEOUT.toMillis(), MILLISECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + catch (Exception e) { + throw new RuntimeException(e); + } + exchangeSink.updateHandle(sinkInstanceHandle); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java index 4a9d6b8643ae..8d1c4c1f8c62 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java @@ -601,13 +601,13 @@ public void noMoreSinks() } @Override - public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId) + public CompletableFuture instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId) { throw new UnsupportedOperationException(); } @Override - public ExchangeSinkInstanceHandle updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId) + public CompletableFuture updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId) { throw new UnsupportedOperationException(); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java index 070990b34f78..650b729560ea 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java @@ -18,6 +18,7 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.Closeable; +import java.util.concurrent.CompletableFuture; @ThreadSafe @Experimental(eta = "2023-01-01") @@ -61,7 +62,7 @@ public interface Exchange * @return ExchangeSinkInstanceHandle to be sent to a worker that is needed to create an {@link ExchangeSink} instance using * {@link ExchangeManager#createSink(ExchangeSinkInstanceHandle)} */ - ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId); + CompletableFuture instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId); /** * Update {@link ExchangeSinkInstanceHandle}. Update is requested by {@link ExchangeSink}. @@ -71,7 +72,7 @@ public interface Exchange * @param taskAttemptId - attempt id * @return updated handle */ - ExchangeSinkInstanceHandle updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId); + CompletableFuture updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId); /** * Called by the engine when an attempt finishes successfully. diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java index 7ec5eab81d0b..639c0f0883d9 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java @@ -62,6 +62,7 @@ import static java.lang.Integer.parseInt; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; public class FileSystemExchange implements Exchange @@ -142,7 +143,7 @@ public void noMoreSinks() } @Override - public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId) + public CompletableFuture instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId) { FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = (FileSystemExchangeSinkHandle) sinkHandle; int taskPartitionId = fileSystemExchangeSinkHandle.getPartitionId(); @@ -154,11 +155,11 @@ public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, throw new UncheckedIOException(e); } - return new FileSystemExchangeSinkInstanceHandle(fileSystemExchangeSinkHandle, outputDirectory, outputPartitionCount, preserveOrderWithinPartition); + return completedFuture(new FileSystemExchangeSinkInstanceHandle(fileSystemExchangeSinkHandle, outputDirectory, outputPartitionCount, preserveOrderWithinPartition)); } @Override - public ExchangeSinkInstanceHandle updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId) + public CompletableFuture updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId) { // this implementation never requests an update throw new UnsupportedOperationException(); diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java index f7246e9d1373..18d6d0d8d559 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java @@ -89,7 +89,7 @@ public void testHappyPath() ExchangeSinkHandle sinkHandle2 = exchange.addSink(2); exchange.noMoreSinks(); - ExchangeSinkInstanceHandle sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 0); + ExchangeSinkInstanceHandle sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 0).get(); writeData( sinkInstanceHandle, ImmutableListMultimap.of( @@ -99,7 +99,7 @@ public void testHappyPath() 1, "0-1-1"), true); exchange.sinkFinished(sinkHandle0, 0); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 1); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 1).get(); writeData( sinkInstanceHandle, ImmutableListMultimap.of( @@ -109,7 +109,7 @@ public void testHappyPath() 1, "0-1-1"), true); exchange.sinkFinished(sinkHandle0, 1); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 2); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 2).get(); writeData( sinkInstanceHandle, ImmutableListMultimap.of( @@ -118,7 +118,7 @@ public void testHappyPath() false); exchange.sinkFinished(sinkHandle0, 2); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 0); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 0).get(); writeData( sinkInstanceHandle, ImmutableListMultimap.of( @@ -128,7 +128,7 @@ public void testHappyPath() 1, "1-1-1"), true); exchange.sinkFinished(sinkHandle1, 0); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 1); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 1).get(); writeData( sinkInstanceHandle, ImmutableListMultimap.of( @@ -138,7 +138,7 @@ public void testHappyPath() 1, "1-1-1"), true); exchange.sinkFinished(sinkHandle1, 1); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 2); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 2).get(); writeData( sinkInstanceHandle, ImmutableListMultimap.of( @@ -147,7 +147,7 @@ public void testHappyPath() false); exchange.sinkFinished(sinkHandle1, 2); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle2, 2); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle2, 2).get(); writeData( sinkInstanceHandle, ImmutableListMultimap.of( @@ -197,7 +197,7 @@ public void testLargePages() ExchangeSinkHandle sinkHandle2 = exchange.addSink(2); exchange.noMoreSinks(); - ExchangeSinkInstanceHandle sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 0); + ExchangeSinkInstanceHandle sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 0).get(); writeData( sinkInstanceHandle, new ImmutableListMultimap.Builder() @@ -208,7 +208,7 @@ public void testLargePages() true); exchange.sinkFinished(sinkHandle0, 0); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 0); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 0).get(); writeData( sinkInstanceHandle, new ImmutableListMultimap.Builder() @@ -219,7 +219,7 @@ public void testLargePages() true); exchange.sinkFinished(sinkHandle1, 0); - sinkInstanceHandle = exchange.instantiateSink(sinkHandle2, 0); + sinkInstanceHandle = exchange.instantiateSink(sinkHandle2, 0).get(); writeData( sinkInstanceHandle, new ImmutableListMultimap.Builder()