From 94383ae1de411e18552cf08f5d8594a3ac2f4a3b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 21 Nov 2023 16:34:58 +0100 Subject: [PATCH 1/2] Unimplement DirectExchangeBuffer for TASK retries The class is not used for TASK-level retries. --- .../DeduplicatingDirectExchangeBuffer.java | 77 ++------ .../operator/DirectExchangeClientFactory.java | 1 + ...TestDeduplicatingDirectExchangeBuffer.java | 170 ------------------ 3 files changed, 16 insertions(+), 232 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java index becd8242c239..b6bb2033649d 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java @@ -75,7 +75,6 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.toListenableFuture; import static io.trino.execution.scheduler.Exchanges.getAllSourceHandles; -import static io.trino.operator.RetryPolicy.NONE; import static io.trino.operator.RetryPolicy.QUERY; import static io.trino.spi.StandardErrorCode.REMOTE_TASK_FAILED; import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; @@ -92,7 +91,6 @@ public class DeduplicatingDirectExchangeBuffer private static final Duration SINK_INSTANCE_HANDLE_GET_TIMEOUT = Duration.succinctDuration(60, SECONDS); private final Executor executor; - private final RetryPolicy retryPolicy; @GuardedBy("this") private final Set allTasks = new HashSet<>(); @@ -130,9 +128,7 @@ public DeduplicatingDirectExchangeBuffer( ExchangeId exchangeId) { this.executor = requireNonNull(executor, "executor is null"); - requireNonNull(retryPolicy, "retryPolicy is null"); - checkArgument(retryPolicy != NONE, "retries should be enabled"); - this.retryPolicy = retryPolicy; + checkArgument(retryPolicy == QUERY, "the class is used for query level retries only, got: %s", retryPolicy); this.pageBuffer = new PageBuffer( exchangeManagerRegistry, queryId, @@ -195,10 +191,8 @@ public synchronized void addTask(TaskId taskId) if (taskId.getAttemptId() > maxAttemptId) { maxAttemptId = taskId.getAttemptId(); - if (retryPolicy == QUERY) { - pageBuffer.removePagesForPreviousAttempts(maxAttemptId); - updateMaxRetainedSize(); - } + pageBuffer.removePagesForPreviousAttempts(maxAttemptId); + updateMaxRetainedSize(); } } @@ -217,7 +211,7 @@ public synchronized void addPages(TaskId taskId, List pages) checkState(!successfulTasks.contains(taskId), "task is finished: %s", taskId); checkState(!failedTasks.containsKey(taskId), "task is failed: %s", taskId); - if (retryPolicy == QUERY && taskId.getAttemptId() < maxAttemptId) { + if (taskId.getAttemptId() < maxAttemptId) { return; } @@ -283,61 +277,20 @@ private void checkInputFinished() return; } - Map failures; - switch (retryPolicy) { - case TASK -> { - Set allPartitions = allTasks.stream() - .map(TaskId::getPartitionId) - .collect(toImmutableSet()); - - Set successfulPartitions = successfulTasks.stream() - .map(TaskId::getPartitionId) - .collect(toImmutableSet()); - - if (successfulPartitions.containsAll(allPartitions)) { - Map partitionToTaskMap = new HashMap<>(); - for (TaskId successfulTaskId : successfulTasks) { - Integer partitionId = successfulTaskId.getPartitionId(); - TaskId existing = partitionToTaskMap.get(partitionId); - if (existing == null || existing.getAttemptId() > successfulTaskId.getAttemptId()) { - partitionToTaskMap.put(partitionId, successfulTaskId); - } - } - - outputSource = pageBuffer.createOutputSource(ImmutableSet.copyOf(partitionToTaskMap.values())); - unblock(outputReady); - return; - } - - Set runningPartitions = allTasks.stream() - .filter(taskId -> !successfulTasks.contains(taskId)) - .filter(taskId -> !failedTasks.containsKey(taskId)) - .map(TaskId::getPartitionId) - .collect(toImmutableSet()); + Set latestAttemptTasks = allTasks.stream() + .filter(taskId -> taskId.getAttemptId() == maxAttemptId) + .collect(toImmutableSet()); - failures = failedTasks.entrySet().stream() - .filter(entry -> !successfulPartitions.contains(entry.getKey().getPartitionId())) - .filter(entry -> !runningPartitions.contains(entry.getKey().getPartitionId())) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - } - case QUERY -> { - Set latestAttemptTasks = allTasks.stream() - .filter(taskId -> taskId.getAttemptId() == maxAttemptId) - .collect(toImmutableSet()); - - if (successfulTasks.containsAll(latestAttemptTasks)) { - outputSource = pageBuffer.createOutputSource(latestAttemptTasks); - unblock(outputReady); - return; - } - - failures = failedTasks.entrySet().stream() - .filter(entry -> entry.getKey().getAttemptId() == maxAttemptId) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - } - default -> throw new UnsupportedOperationException("unexpected retry policy: " + retryPolicy); + if (successfulTasks.containsAll(latestAttemptTasks)) { + outputSource = pageBuffer.createOutputSource(latestAttemptTasks); + unblock(outputReady); + return; } + Map failures = failedTasks.entrySet().stream() + .filter(entry -> entry.getKey().getAttemptId() == maxAttemptId) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + Throwable failure = null; for (Map.Entry entry : failures.entrySet()) { TaskId taskId = entry.getKey(); diff --git a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java index ec629a531dcb..9ad7022bd6c3 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java @@ -144,6 +144,7 @@ public DirectExchangeClient get( DirectExchangeBuffer buffer; switch (retryPolicy) { case TASK: + throw new UnsupportedOperationException(); case QUERY: buffer = new DeduplicatingDirectExchangeBuffer(scheduler, deduplicationBufferSize, retryPolicy, exchangeManagerRegistry, queryId, exchangeId); break; diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java index 2ec80f350157..28df3edb36d5 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java @@ -342,175 +342,6 @@ public void testPollPagesQueryLevelRetry() error); } - @Test - public void testPollPagesTaskLevelRetry() - { - // 0 pages - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.of(), - ImmutableMap.of(), - DEFAULT_BUFFER_CAPACITY, - 0, - ImmutableList.of()); - - // single page, no spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.of(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(10, BYTE))), - ImmutableMap.of(), - DataSize.of(1, KILOBYTE), - 0, - ImmutableList.of(createPage("p0a0v0", DataSize.of(10, BYTE)))); - - // single page, with spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.of(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(2, KILOBYTE))), - ImmutableMap.of(), - DataSize.of(1, KILOBYTE), - 1, - ImmutableList.of(createPage("p0a0v0", DataSize.of(2, KILOBYTE)))); - - // discard single page, with no spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(6, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(3, KILOBYTE))) - .build(), - ImmutableMap.of(), - DataSize.of(10, KILOBYTE), - 0, - ImmutableList.of( - createPage("p0a0v0", DataSize.of(6, KILOBYTE)))); - - // discard single page, with spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(6, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(3, KILOBYTE))) - .build(), - ImmutableMap.of(), - DataSize.of(5, KILOBYTE), - 2, - ImmutableList.of( - createPage("p0a0v0", DataSize.of(6, KILOBYTE)))); - - // multiple pages, no spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(), - DataSize.of(5, KILOBYTE), - 0, - ImmutableList.of( - createPage("p0a0v0", DataSize.of(1, KILOBYTE)), - createPage("p1a0v0", DataSize.of(1, KILOBYTE)))); - - // multiple pages, with spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE))) - .put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(), - DataSize.of(2, KILOBYTE), - 3, - ImmutableList.of( - createPage("p0a0v0", DataSize.of(1, KILOBYTE)), - createPage("p1a0v0", DataSize.of(1, KILOBYTE)))); - - // failure in a task that produced no pages, no spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE))) - .put(createTaskId(1, 1), createPage("p1a1v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(createTaskId(1, 0), new RuntimeException("error")), - DataSize.of(10, KILOBYTE), - 0, - ImmutableList.of( - createPage("p0a0v0", DataSize.of(1, KILOBYTE)), - createPage("p1a1v0", DataSize.of(1, KILOBYTE)))); - - // failure in a task that produced no pages, with spilling - testPollPages( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE))) - .put(createTaskId(1, 1), createPage("p1a1v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(createTaskId(1, 0), new RuntimeException("error")), - DataSize.of(2, KILOBYTE), - 3, - ImmutableList.of( - createPage("p0a0v0", DataSize.of(1, KILOBYTE)), - createPage("p1a1v0", DataSize.of(1, KILOBYTE)))); - - RuntimeException error = new RuntimeException("error"); - - // buffer failure in a task that produced no pages, no spilling - testPollPagesFailure( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(createTaskId(2, 2), error), - DataSize.of(5, KILOBYTE), - 0, - error); - - // buffer failure in a task that produced some pages, no spilling - testPollPagesFailure( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(createTaskId(0, 1), error), - DataSize.of(5, KILOBYTE), - 0, - error); - - // buffer failure in a task that produced no pages, with spilling - testPollPagesFailure( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE))) - .put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(createTaskId(2, 2), error), - DataSize.of(2, KILOBYTE), - 3, - error); - - // buffer failure in a task that produced some pages, with spilling - testPollPagesFailure( - RetryPolicy.TASK, - ImmutableListMultimap.builder() - .put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE))) - .put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE))) - .build(), - ImmutableMap.of(createTaskId(0, 1), error), - DataSize.of(1, KILOBYTE), - 2, - error); - } - private void testPollPages( RetryPolicy retryPolicy, Multimap pages, @@ -820,7 +651,6 @@ public void testRemainingBufferCapacity() public void testRemoteTaskFailedError() { testRemoteTaskFailedError(RetryPolicy.QUERY); - testRemoteTaskFailedError(RetryPolicy.TASK); } private void testRemoteTaskFailedError(RetryPolicy retryPolicy) From bcacc3f348de5761ccd1c308d6c02aba4125e829 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 21 Nov 2023 17:21:41 +0100 Subject: [PATCH 2/2] Use switch expression --- .../operator/DirectExchangeClientFactory.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java index 9ad7022bd6c3..737029857ee8 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java @@ -141,19 +141,12 @@ public DirectExchangeClient get( TaskFailureListener taskFailureListener, RetryPolicy retryPolicy) { - DirectExchangeBuffer buffer; - switch (retryPolicy) { - case TASK: - throw new UnsupportedOperationException(); - case QUERY: - buffer = new DeduplicatingDirectExchangeBuffer(scheduler, deduplicationBufferSize, retryPolicy, exchangeManagerRegistry, queryId, exchangeId); - break; - case NONE: - buffer = new StreamingDirectExchangeBuffer(scheduler, maxBufferedBytes); - break; - default: - throw new IllegalArgumentException("unexpected retry policy: " + retryPolicy); - } + @SuppressWarnings("resource") + DirectExchangeBuffer buffer = switch (retryPolicy) { + case TASK -> throw new UnsupportedOperationException(); + case QUERY -> new DeduplicatingDirectExchangeBuffer(scheduler, deduplicationBufferSize, retryPolicy, exchangeManagerRegistry, queryId, exchangeId); + case NONE -> new StreamingDirectExchangeBuffer(scheduler, maxBufferedBytes); + }; return new DirectExchangeClient( nodeInfo.getExternalAddress(),