Implement task level retries#9818
Conversation
eb068b3 to
5ae17d5
Compare
5ae17d5 to
10eaae8
Compare
There was a problem hiding this comment.
I feel this state is not necessary and adds confusion --- even for UI display. If the stage has finished its tasks, then it's finished; in the case of a retry, we should either show it's RUNNING again, or we make it explicit saying that it's a retry.
| * Stage is finished running existing tasks but more tasks could be scheduled in the future. | |
| * Stage has finished running existing tasks but more tasks could be scheduled in the future. |
There was a problem hiding this comment.
If the stage has finished its tasks, then it's finished; in the case of a retry, we should either show it's RUNNING again, or we make it explicit saying that it's a retry.
That's a good point. I also don't really like the "PENDING" state. The problem is that we need to have a terminal state, a state that would indicate that no more tasks can be scheduled in a given stage. This is needed for the final stage info creation that would create a final summary of runtime statistics for a given stage. I was thinking about something like COMPLETED / FINISHED, but the semantic difference between these two words is too subtle, and I thought i may introduce even more confusion.
core/trino-main/src/main/java/io/trino/execution/TaskManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
I was trying to be consistent with the other two methods, cancelTask and abortTask
There was a problem hiding this comment.
Why do we need ImmutableSet.copyOf here? I don't see a potential race condition.
There was a problem hiding this comment.
This is to find a number of unique nodes. The immutability doesn't have any special meaning here, just a useful constructor method for a set. Ideally it should be refactored to provide a number of unique nodes explicitly, as a constructor parameter. But the refactor wasn't trivial. I decided to delay the refactor until later since the impact of this de-duplication is not that high (it is only called once per stage and fixed mappings is rather a corner case and are rare).
There was a problem hiding this comment.
This would be more idiomatic for that purpose:
bucketToNode.stream()
.distinct()
.count();There was a problem hiding this comment.
Given that we call scheduleTask with the last two parameters as ImmutableMultimap.of() very often, does it make sense to create an overloaded version of scheduleTask?
There was a problem hiding this comment.
I was thinking about that. There's a subtle trade-off though. Overloads tend to add complexity, as now you need to think if there's any semantic difference between two overloads and what that semantic difference might be. Another trade-off is that overloads add extra overhead when trying to search through the code where the method is used. You need to check each overload separately.
core/trino-main/src/main/java/io/trino/server/TaskResource.java
Outdated
Show resolved
Hide resolved
.../trino-main/src/main/java/io/trino/server/testing/shuffle/LocalFileSystemShuffleService.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/shuffle/ShuffleService.java
Outdated
Show resolved
Hide resolved
5cc34ca to
ea58f80
Compare
core/trino-spi/src/main/java/io/trino/spi/shuffle/ShuffleInput.java
Outdated
Show resolved
Hide resolved
.../trino-main/src/main/java/io/trino/server/testing/shuffle/LocalFileSystemShuffleService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Why this needs to be synchronized now?
There was a problem hiding this comment.
This method has been made public, and now it can be called by any thread, thus synchronization must be enforced (similarly to cancel and abort)
core/trino-main/src/test/java/io/trino/operator/TestingExchangeClientBuffer.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Override | ||
| public synchronized void noMoreTasks() |
There was a problem hiding this comment.
Can retry tasks still be added after noMoreTasks gets called? Or it will only be called after all tasks finish/fail?
If it's the former, then I don't see why we need this method; if it's the latter, why call checkInputFinished in taskFailed/taskFinished?
There was a problem hiding this comment.
Can retry tasks still be added after noMoreTasks gets called?
No. After this method is called no new tasks can be added.
if it's the latter, why call checkInputFinished in taskFailed/taskFinished?
The noMoreTasks might be called when no future retries are anticipated (when retries are disabled or when number of attempts is exhausted). Yet some tasks might still be running.
| Multimap<PlanNodeId, Split> tableScanSplits = batchTask.getSplits(); | ||
| Multimap<PlanNodeId, Split> remoteSplits = createRemoteSplits(batchTask.getShuffleInputs()); | ||
|
|
||
| Multimap<PlanNodeId, Split> taskSplits = ImmutableListMultimap.<PlanNodeId, Split>builder() |
There was a problem hiding this comment.
For my understanding, one of tableScanSplits and remoteSplits will be empty because a batch task either reads from table scans or shuffle files.
There was a problem hiding this comment.
In case of bucketed or collocated join it can read both, a table (thus the splits) and a remote exchange (if one of the tables is not bucketed and must be repartitioned)
core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java
Outdated
Show resolved
Hide resolved
| StageId stageId = stageExecution.getStageId(); | ||
| allStages.put(stageId, stageExecution); | ||
| if (fragment.getPartitioning().isCoordinatorOnly()) { | ||
| coordinatorStagesInTopologicalOrder.add(stageExecution); |
There was a problem hiding this comment.
Actually this is reverse topological order, not sure if we can have better naming here to avoid confusion
There was a problem hiding this comment.
Yeah, it's a little non intuitive. But the way the plan is structured is top down. The root node has references to it's children. Since we are following the reference direction it is still a topological order (despite the plan leaves come last)
core/trino-main/src/main/java/io/trino/execution/scheduler/BatchTaskScheduler.java
Outdated
Show resolved
Hide resolved
| log.warn(failureInfo.toException(), "Task failed: %s", taskId); | ||
| ErrorCode errorCode = failureInfo.getErrorCode(); | ||
| if (remainingRetryAttempts > 0 && (errorCode == null || errorCode.getType() != USER_ERROR)) { | ||
| remainingRetryAttempts--; |
There was a problem hiding this comment.
So remainingRetryAttempts is on a stage-basis (instead of task-basis)
There was a problem hiding this comment.
Currently it is a stage based, but I'm not sure if that's the right strategy. We should discuss what is the right strategy.
ea58f80 to
d2490c5
Compare
core/trino-main/src/main/java/io/trino/execution/scheduler/StreamingStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/ResultsConsumer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/StreamingStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/BatchTask.java
Outdated
Show resolved
Hide resolved
f716aff to
01b40d3
Compare
core/trino-main/src/main/java/io/trino/server/testing/exchange/LocalFileSystemExchangeSink.java
Outdated
Show resolved
Hide resolved
1016d56 to
2a9c5fa
Compare
2a9c5fa to
149fafb
Compare
...trino-main/src/main/java/io/trino/server/testing/exchange/LocalFileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
149fafb to
390c682
Compare
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceSplitter.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/metadata/HandleJsonModule.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java
Show resolved
Hide resolved
54fb567 to
6d4a9b3
Compare
6d4a9b3 to
0cf6b73
Compare
core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Currently it is inconsistent across the code base. But there are 260 matches when I search for Logger log and only 33 when I search for Logger LOG, so it looks like Logger log is preferred.
There was a problem hiding this comment.
Yes, I'm aware of the inconsistency. But it should be LOG per convention since it's a static final variable. We should not contribute to the unconventional usage.
There was a problem hiding this comment.
Yeah, code style convention says The names of variables declared class constants and of ANSI constants should be all uppercase with words separated by underscores ("_").: https://www.oracle.com/java/technologies/javase/codeconventions-namingconventions.html
However whether a Logger can be considered a constant is controversial. Generally constants are expected to be inherently immutable objects. Declaring something as private static final does not make something a constant. The declaration may also be used for static fields assigned once and accessible only from within a class.
For example codestyle document from Google says:
Constant names use CONSTANT_CASE: all uppercase letters, with each word separated from the next by a single underscore. But what is a constant, exactly?
Constants are static final fields whose contents are deeply immutable and whose methods have no detectable side effects. This includes primitives, Strings, immutable types, and immutable collections of immutable types. If any of the instance's observable state can change, it is not a constant. Merely intending to never mutate the object is not enough.
And they explicitly mention examples that shouldn't be considered constants and declaring a logger is one of them:
static final Logger logger = Logger.getLogger(MyClass.getName());
https://google.github.io/styleguide/javaguide.html#s5.2.4-constant-names
I don't have a particularly strong opinion here. It feels either should do as long as it is used consistently. In the current state it feels like Logger log will be more consistent with the other places in the code base.
If you feel particularly strong I can do the rename. Regardless we should probably follow up with a PR that would rename all other places to make it consistent and add a style check rule.
There was a problem hiding this comment.
How does "the implementation" know if an attempt succeeded or failed?
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSink.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSource.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
What's the purpose of these? Every object implements these methods, so they don't impose any requirement or constraint on implementations.
There was a problem hiding this comment.
Unfortunately it is impossible to enforce presence of these methods compile time. The idea behind leaving them explicitly declared in this interface is to make it more difficult to miss for somebody who is going to be implementing this interface. Though yeah, it doesn't provide any strong guarantees.
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceSplitter.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/testing/exchange/LocalFileSystemExchange.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This could be called just exchangeClientSupplier.
There was a problem hiding this comment.
While we don't have anything named "client" for external exchange IMO it still improves readability as it emphasizes that this supplies a client specifically for direct exchange.
core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Yes, I'm aware of the inconsistency. But it should be LOG per convention since it's a static final variable. We should not contribute to the unconventional usage.
3b607d7 to
15ce61f
Compare
core/trino-main/src/main/java/io/trino/execution/buffer/ExternalExchangeOutputBuffer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/ExternalExchangeOutputBuffer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/ExecutionFailureInfo.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This would be more idiomatic for that purpose:
bucketToNode.stream()
.distinct()
.count();
core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java
Outdated
Show resolved
Hide resolved
| cancelRunningTasks(abort); | ||
| cancelBlockedFuture(); | ||
| releaseAcquiredNode(); | ||
| closeTaskSource(); | ||
| closeSinkExchange(); |
There was a problem hiding this comment.
Only if a failure in closing would affect the query results (e.g., incomplete results), otherwise, we should just log an error and ignore.
core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/FixedCountNodeAllocator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/FixedCountNodeAllocator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/NodeRequirements.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java
Outdated
Show resolved
Hide resolved
Streaming upload to S3 allocates a 16MB buffer (by default) for each output stream. Failure recovery tests create a table partitioned into ~60 partitions. Since for each partition at least one file must be created the engine has to allocate ~1GB of buffer space. These buffer allocations push the memory reservation beyond the maximum heap size.
To avoid a clash when both testTargetMaxFileSizePartitioned and testTargetMaxFileSize are executed concurrently
15ce61f to
991145a
Compare
No description provided.