Conversation
1e33d08 to
a5fb804
Compare
a5fb804 to
4499bfe
Compare
9be92d5 to
66e2687
Compare
e4a8704 to
019ac2f
Compare
|
Can you briefly summarize in PR description what are you working on here? :) |
019ac2f to
a2e0b9d
Compare
59e9f7b to
735827d
Compare
d9de22b to
d7e3628
Compare
There was a problem hiding this comment.
What's different between this and assertUpdate?
There was a problem hiding this comment.
The failure messages include the queryId so that the failing execution can be correlated to log messages that may be related.
arhimondr
left a comment
There was a problem hiding this comment.
Looks good to me. But I would prefer if somebody else could take a look as well. I've broken this code so many times that it feels like a second look wouldn't be unnecessary 😄
core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Generally we are trying to avoid setting future under a lock as it deadlock prone (as the callbacks are called inline). Would it be difficult to do this outside the lock?
There was a problem hiding this comment.
So... I guess it's possible to move this outside of the locked region, but that will mean doing something like this in 3 other call sites:
finally {
exclusiveLock.unlock();
if (state.get() == DESTROYED) {
destroyedFuture.set(null);
}
}
Which seems more awkward to me in that it's spatially separating the action of destruction from setting the future. Since right now that future is only tied to detecting termination at the task level, I think this is safe because for a potential for deadlock the callback would need to attempt to acquire some lock held by a thread attempting to acquire the driver's exclusive lock which shouldn't happen. However, it would be riskier if the usage of that callback listener evolves over time...
Maybe instead we don't use a destroyedFuture but instead accept a Runnable to the Driver constructor directly? It's still the same thing as the cleanup action would run while holding the lock, but it seems less risky longer term since there's no opportunity to pack more listeners onto the future later. Let me know what you think and I'll either keep what's there, move the future setting out of the lock region, or pass a cleanup runnable to the driver constructor instead.
There was a problem hiding this comment.
Update on the above: you actually can’t move it out of the exclusive lock region without potentially missing the final DriverContext finished() call, since failing to acquire the lock could still see the state flip to DESTROYED before the lock holder completes destruction.
This could be addressed by adding a new DESTROYING state, but that’s extra complexity too.
There was a problem hiding this comment.
Yeah, i can totally see how that would make the code less reasonable. I also agree that chances of introducing a deadlock here are rather low. However to me it still feels worth a decrease in readability, but I don't feel particularly strongly about it.
There was a problem hiding this comment.
I have the same concerns. This code has a long history with deadlocks. Maybe add a utility method like checkDestroyedState.
BTW a runnable won't work either as executing it while holding the lock is still an open out call.
There was a problem hiding this comment.
After looking over some of the options, it seemed to me the best compromise between avoiding potential deadlocks and not having multiple checks in different codepaths to decide when to fire the destroyedFuture is to add Driver.State.DESTROYING to separate "destruction in progress by some thread" from "destruction completed", and have DriverLock#unlock() complete the future immediately after it's released to avoid duplication of checks in multiple finally blocks- so that's what I've published the latest revision.
There was a problem hiding this comment.
What do you think about renaming isFinished* to isTerminatingOrDone (in this and other classes?)
There was a problem hiding this comment.
I'm ok with that, but the notion of "terminatingOr(Done,Finished)" doesn't apply super cleanly through all the usage sites. Driver has one specific definition (that's pretty complex in terms of what it entails), and SplitRunner has another that mostly overlaps, but also includes whether DriverSplitRunner#close() was called, and finally PrioritizedSplitRunner has another notion that includes whether PrioritizedSplitRunner#destroy() was called- so ultimately I think it's probably fine to keep the notion of terminatingOrDone() (task level concern) separate from isFinished() (Driver, SplitRunner, PrioritizedSplitRunner concern).
There was a problem hiding this comment.
Yeah, I can see that. What do you think about changing it for the Driver only?
There was a problem hiding this comment.
I think maybe the compromise is to rename isFinishedInternal() but not Driver#isFinished() since the internal method is about checking whether the driver should be destroyed (implies association to "terminating") but Driver#isFinished() is a separate semantic that needs only to describe having the Driver itself having "finished" (for whatever reason).
There was a problem hiding this comment.
Is there a reason this was reordered? I'm just curious if the order matters here. (if it does add a comment)
There was a problem hiding this comment.
Is there a reason this was reordered? I'm just curious if the order matters here.
No correctness reason.
I reordered these checks originally while trying to disentangle race conditions and the isTerminatingOrDone() check used to eventually synchronize on TaskHandle.
At this point that check is now just a volatile read (landed in another PR, works similarly to how DriverFactory does in this PR), but I chose to keep the new ordering because it still holds that “normal” finishing is more likely than task termination and so the cheaper checks that of driver-local state should come first before trying to check shared state (or acquire shared locks).
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DriverFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Because canceling a task will call outputBuffer.destroy(), which means that tasks consuming from said buffer can see BufferResult(completed: true) on the immediate subsequent request. This means that a task receiving a cancel before tasks consuming from it have themselves been canceled, aborted, or failed can trigger "finished" to propagate forward through the stages along worker to worker communication links unless the cancellation proceeds in topological order.
Essentially, when DistributedStagesScheduler#cancel() is called, we want to traverse the stageExecutions in topological order to reduce the possibility of that happening. That said, this issue shouldn't arise now because DistributedStagesScheduler#cancel() is only called once the query state machine is finished anyway, but during development of these changes I did continue to see incorrect query results that seemed to stem from cancellations turning into incorrect query results, potentially due to tasks being sent both a cancel and abort command concurrently and those requests racing with one another to reach the to tasks (now fixed by HttpRemoteTask only allowing a single termination command to proceed).
Honestly, I'm still scratching my head a little bit about why tasks should call outputBuffer.destroy() on cancel instead of outputBuffer.abort(). It seems like that's a fragile behavior that can easily cause incorrect query results due to subtle concurrency or state machine bugs.
There was a problem hiding this comment.
Sorry, I mean "explain why this matters in the comment"... so future readers know why sorting is there. You don't need to explain in this level of detail (but you could). Generally, I like to see enough to get the reader thinking of the implications.
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This seems like a lot of duplicateion just to get the query id into the exceptions. Is there any way to just always have the query id and then modify the original to do this work instead of copying?
There was a problem hiding this comment.
I don't see a good way to get a QueryId out of LocalQueryRunner that would allow these code paths to be shared unfortunately, but if you see a way to do that let me know and I'll make that change. Agreed on the duplication here being less ideal.
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
I have the same concerns. This code has a long history with deadlocks. Maybe add a utility method like checkDestroyedState.
BTW a runnable won't work either as executing it while holding the lock is still an open out call.
core/trino-main/src/main/java/io/trino/operator/DriverFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DriverFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/RequestErrorTracker.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DriverFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Here is what I was thinking. I don't know if this actually happens, but I think the code allows for this.
thread 1: tryCreateNewDriver incrementAndGet
thread 1: if isTerminatingOrDone is not entered
thread x: terminate task
thread 2: liveCreatedDrivers.get() will equal 1 and state is terminating is true, so terminationComplete is not called
There was a problem hiding this comment.
For safety, I typically check <= 0, it mask over bugs (you could log), but it prevents bugs from locking up resources. Also, calling check termination periodically after termination until the task is actually all cleaned up. Again this can mask bugs, but it is better than locking up a server.
This is a very large commit, but unfortuantely it's an all or nothing change that fundamentally changes how tasks report their state throughout the engine on both workers and on the coordinator. Before this change, tasks would immediately change to some final state when cancelled, failed, or told to abort- even while drivers were still running. Additionally, the coordinator would forcibly set the task state locally during an abort or some kinds of failure without the remote worker acknowledging that command. The result was that the coordinators view of the worker state was inconsistent for the purposes of scheduling (since drivers for tasks that were told to terminate may still be running for some amount of time), and the final task stats may have been incomplete. This change introduces CANCELING, ABORTING, and FAILING states that are considered "terminating" but not yet "done", and awaiting the final task driver to stop executing before transition to their corresponding "done" state.
Avoids unnecessary redundant TaskExecutor scheduling work when TaskExecutor#removeTask is called more than once as a result of terminating and done task state transition races.
|
A little late to the party, but it would be better to pick a term other than Also, does this need docs? |
I’m open to renaming the state, but whatever new name we would choose should equally be applied to the “final” state name to maintain the symmetry of
I can’t find any docs describing the existing task states (let me know if I missed something there), so I don’t think that there’s an immediate need to document the new ones although documenting task, stage, and query state semantics might be worthwhile to do at some point. |
|
One other request... ideally.. could we fix the grammar and change from CANCELING to CANCELLING ? |
|
And with regards to ... ABORTING .. could we change to TERMINATING ? Although I do see the problem of having the existing status of "ABORTED" .. so maybe we have to leave that at ABORTING |
The problem I see with that is that failing, canceling, and aborting could all be considered “terminating” which is a useful concept by itself that unifies the three variations of “stopping the task early” that I’ve already used fairly heavily in this PR (e.g.: |
|
Fair ... I think the current setup works. I just wish we could fix the spelling ;-) |
Description
At a high level, this PR introduces a new concept of "terminating states" to
TaskStatewhich represent that a task has begun terminating but not all drivers that were created have fully exited yet. The new statesCANCELING,ABORTING, andFAILING) are the "terminating" equivalents toCANCELED,ABORTED, andFAILEDwhich are the final state that tasks will transition into after all drivers have observed the termination signal and been destroyed.The motivation for this change is two-fold:
TaskInforesponse that coordinators would use to produce stage and query stats.Additional context and related issues
Assumptions to note in the implementation here:
SqlTaskExecutionif one has been created. Otherwise,SqlTaskwill transition the state machine from terminating to termination complete (since no execution means no drivers can possibly be running).TaskState#isDone()or checks forstate == TaskState.FAILEDwill now only fire when termination is fully complete. In general this behavior is still correct, but potentially slower to react than before and there may be opportunities to more eagerly react to terminating states likeTaskState.FAILINGin fault tolerant execution.Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text: