Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.trino.execution.scheduler.Exchanges.getAllSourceHandles;
import static io.trino.operator.RetryPolicy.NONE;
import static io.trino.operator.RetryPolicy.QUERY;
import static io.trino.spi.StandardErrorCode.REMOTE_TASK_FAILED;
import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
Expand All @@ -92,7 +91,6 @@ public class DeduplicatingDirectExchangeBuffer
private static final Duration SINK_INSTANCE_HANDLE_GET_TIMEOUT = Duration.succinctDuration(60, SECONDS);

private final Executor executor;
private final RetryPolicy retryPolicy;

@GuardedBy("this")
private final Set<TaskId> allTasks = new HashSet<>();
Expand Down Expand Up @@ -130,9 +128,7 @@ public DeduplicatingDirectExchangeBuffer(
ExchangeId exchangeId)
{
this.executor = requireNonNull(executor, "executor is null");
requireNonNull(retryPolicy, "retryPolicy is null");
checkArgument(retryPolicy != NONE, "retries should be enabled");
this.retryPolicy = retryPolicy;
checkArgument(retryPolicy == QUERY, "the class is used for query level retries only, got: %s", retryPolicy);
this.pageBuffer = new PageBuffer(
exchangeManagerRegistry,
queryId,
Expand Down Expand Up @@ -195,10 +191,8 @@ public synchronized void addTask(TaskId taskId)
if (taskId.getAttemptId() > maxAttemptId) {
maxAttemptId = taskId.getAttemptId();

if (retryPolicy == QUERY) {
pageBuffer.removePagesForPreviousAttempts(maxAttemptId);
updateMaxRetainedSize();
}
pageBuffer.removePagesForPreviousAttempts(maxAttemptId);
updateMaxRetainedSize();
}
}

Expand All @@ -217,7 +211,7 @@ public synchronized void addPages(TaskId taskId, List<Slice> pages)
checkState(!successfulTasks.contains(taskId), "task is finished: %s", taskId);
checkState(!failedTasks.containsKey(taskId), "task is failed: %s", taskId);

if (retryPolicy == QUERY && taskId.getAttemptId() < maxAttemptId) {
if (taskId.getAttemptId() < maxAttemptId) {
return;
}

Expand Down Expand Up @@ -283,61 +277,20 @@ private void checkInputFinished()
return;
}

Map<TaskId, Throwable> failures;
switch (retryPolicy) {
case TASK -> {
Set<Integer> allPartitions = allTasks.stream()
.map(TaskId::getPartitionId)
.collect(toImmutableSet());

Set<Integer> successfulPartitions = successfulTasks.stream()
.map(TaskId::getPartitionId)
.collect(toImmutableSet());

if (successfulPartitions.containsAll(allPartitions)) {
Map<Integer, TaskId> partitionToTaskMap = new HashMap<>();
for (TaskId successfulTaskId : successfulTasks) {
Integer partitionId = successfulTaskId.getPartitionId();
TaskId existing = partitionToTaskMap.get(partitionId);
if (existing == null || existing.getAttemptId() > successfulTaskId.getAttemptId()) {
partitionToTaskMap.put(partitionId, successfulTaskId);
}
}

outputSource = pageBuffer.createOutputSource(ImmutableSet.copyOf(partitionToTaskMap.values()));
unblock(outputReady);
return;
}

Set<Integer> runningPartitions = allTasks.stream()
.filter(taskId -> !successfulTasks.contains(taskId))
.filter(taskId -> !failedTasks.containsKey(taskId))
.map(TaskId::getPartitionId)
.collect(toImmutableSet());
Set<TaskId> latestAttemptTasks = allTasks.stream()
.filter(taskId -> taskId.getAttemptId() == maxAttemptId)
.collect(toImmutableSet());

failures = failedTasks.entrySet().stream()
.filter(entry -> !successfulPartitions.contains(entry.getKey().getPartitionId()))
.filter(entry -> !runningPartitions.contains(entry.getKey().getPartitionId()))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}
case QUERY -> {
Set<TaskId> latestAttemptTasks = allTasks.stream()
.filter(taskId -> taskId.getAttemptId() == maxAttemptId)
.collect(toImmutableSet());

if (successfulTasks.containsAll(latestAttemptTasks)) {
outputSource = pageBuffer.createOutputSource(latestAttemptTasks);
unblock(outputReady);
return;
}

failures = failedTasks.entrySet().stream()
.filter(entry -> entry.getKey().getAttemptId() == maxAttemptId)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}
default -> throw new UnsupportedOperationException("unexpected retry policy: " + retryPolicy);
if (successfulTasks.containsAll(latestAttemptTasks)) {
outputSource = pageBuffer.createOutputSource(latestAttemptTasks);
unblock(outputReady);
return;
}

Map<TaskId, Throwable> failures = failedTasks.entrySet().stream()
.filter(entry -> entry.getKey().getAttemptId() == maxAttemptId)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

Throwable failure = null;
for (Map.Entry<TaskId, Throwable> entry : failures.entrySet()) {
TaskId taskId = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,12 @@ public DirectExchangeClient get(
TaskFailureListener taskFailureListener,
RetryPolicy retryPolicy)
{
DirectExchangeBuffer buffer;
switch (retryPolicy) {
case TASK:
case QUERY:
buffer = new DeduplicatingDirectExchangeBuffer(scheduler, deduplicationBufferSize, retryPolicy, exchangeManagerRegistry, queryId, exchangeId);
break;
case NONE:
buffer = new StreamingDirectExchangeBuffer(scheduler, maxBufferedBytes);
break;
default:
throw new IllegalArgumentException("unexpected retry policy: " + retryPolicy);
}
@SuppressWarnings("resource")
DirectExchangeBuffer buffer = switch (retryPolicy) {
case TASK -> throw new UnsupportedOperationException();
case QUERY -> new DeduplicatingDirectExchangeBuffer(scheduler, deduplicationBufferSize, retryPolicy, exchangeManagerRegistry, queryId, exchangeId);
case NONE -> new StreamingDirectExchangeBuffer(scheduler, maxBufferedBytes);
};

return new DirectExchangeClient(
nodeInfo.getExternalAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,175 +342,6 @@ public void testPollPagesQueryLevelRetry()
error);
}

@Test
public void testPollPagesTaskLevelRetry()
{
// 0 pages
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.of(),
ImmutableMap.of(),
DEFAULT_BUFFER_CAPACITY,
0,
ImmutableList.of());

// single page, no spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.of(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(10, BYTE))),
ImmutableMap.of(),
DataSize.of(1, KILOBYTE),
0,
ImmutableList.of(createPage("p0a0v0", DataSize.of(10, BYTE))));

// single page, with spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.of(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(2, KILOBYTE))),
ImmutableMap.of(),
DataSize.of(1, KILOBYTE),
1,
ImmutableList.of(createPage("p0a0v0", DataSize.of(2, KILOBYTE))));

// discard single page, with no spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(6, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(3, KILOBYTE)))
.build(),
ImmutableMap.of(),
DataSize.of(10, KILOBYTE),
0,
ImmutableList.of(
createPage("p0a0v0", DataSize.of(6, KILOBYTE))));

// discard single page, with spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(6, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(3, KILOBYTE)))
.build(),
ImmutableMap.of(),
DataSize.of(5, KILOBYTE),
2,
ImmutableList.of(
createPage("p0a0v0", DataSize.of(6, KILOBYTE))));

// multiple pages, no spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(),
DataSize.of(5, KILOBYTE),
0,
ImmutableList.of(
createPage("p0a0v0", DataSize.of(1, KILOBYTE)),
createPage("p1a0v0", DataSize.of(1, KILOBYTE))));

// multiple pages, with spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE)))
.put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(),
DataSize.of(2, KILOBYTE),
3,
ImmutableList.of(
createPage("p0a0v0", DataSize.of(1, KILOBYTE)),
createPage("p1a0v0", DataSize.of(1, KILOBYTE))));

// failure in a task that produced no pages, no spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE)))
.put(createTaskId(1, 1), createPage("p1a1v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(createTaskId(1, 0), new RuntimeException("error")),
DataSize.of(10, KILOBYTE),
0,
ImmutableList.of(
createPage("p0a0v0", DataSize.of(1, KILOBYTE)),
createPage("p1a1v0", DataSize.of(1, KILOBYTE))));

// failure in a task that produced no pages, with spilling
testPollPages(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE)))
.put(createTaskId(1, 1), createPage("p1a1v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(createTaskId(1, 0), new RuntimeException("error")),
DataSize.of(2, KILOBYTE),
3,
ImmutableList.of(
createPage("p0a0v0", DataSize.of(1, KILOBYTE)),
createPage("p1a1v0", DataSize.of(1, KILOBYTE))));

RuntimeException error = new RuntimeException("error");

// buffer failure in a task that produced no pages, no spilling
testPollPagesFailure(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(createTaskId(2, 2), error),
DataSize.of(5, KILOBYTE),
0,
error);

// buffer failure in a task that produced some pages, no spilling
testPollPagesFailure(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(createTaskId(0, 1), error),
DataSize.of(5, KILOBYTE),
0,
error);

// buffer failure in a task that produced no pages, with spilling
testPollPagesFailure(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 0), createPage("p0a0v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(2, KILOBYTE)))
.put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(createTaskId(2, 2), error),
DataSize.of(2, KILOBYTE),
3,
error);

// buffer failure in a task that produced some pages, with spilling
testPollPagesFailure(
RetryPolicy.TASK,
ImmutableListMultimap.<TaskId, Slice>builder()
.put(createTaskId(0, 1), createPage("p0a1v0", DataSize.of(1, KILOBYTE)))
.put(createTaskId(1, 0), createPage("p1a0v0", DataSize.of(1, KILOBYTE)))
.build(),
ImmutableMap.of(createTaskId(0, 1), error),
DataSize.of(1, KILOBYTE),
2,
error);
}

private void testPollPages(
RetryPolicy retryPolicy,
Multimap<TaskId, Slice> pages,
Expand Down Expand Up @@ -820,7 +651,6 @@ public void testRemainingBufferCapacity()
public void testRemoteTaskFailedError()
{
testRemoteTaskFailedError(RetryPolicy.QUERY);
testRemoteTaskFailedError(RetryPolicy.TASK);
}

private void testRemoteTaskFailedError(RetryPolicy retryPolicy)
Expand Down