Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ public interface RemoteTask

PartitionedSplitsInfo getPartitionedSplitsInfo();

void fail(Throwable cause);
/**
* Fails task from the coordinator perspective immediately, without waiting for acknowledgement from the remote task
*/
void failLocallyImmediately(Throwable cause);

/**
* Fails task remotely; only transitions to failed state when we receive confirmation that remote operation is completed
Expand Down
110 changes: 64 additions & 46 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@
import static io.trino.execution.DynamicFiltersCollector.INITIAL_DYNAMIC_FILTERS_VERSION;
import static io.trino.execution.DynamicFiltersCollector.INITIAL_DYNAMIC_FILTER_DOMAINS;
import static io.trino.execution.TaskState.ABORTED;
import static io.trino.execution.TaskState.ABORTING;
import static io.trino.execution.TaskState.CANCELED;
import static io.trino.execution.TaskState.CANCELING;
import static io.trino.execution.TaskState.FAILED;
import static io.trino.execution.TaskState.FAILING;
import static io.trino.execution.TaskState.FINISHED;
import static io.trino.execution.TaskState.RUNNING;
import static io.trino.util.Failures.toFailures;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -147,7 +153,7 @@ private SqlTask(
// Pass a memory context supplier instead of a memory context to the output buffer,
// because we haven't created the task context that holds the memory context yet.
() -> queryContext.getTaskContextByTaskId(taskId).localMemoryContext(),
() -> notifyStatusChanged(),
this::notifyStatusChanged,
Comment thread
pettyjamesm marked this conversation as resolved.
Outdated
exchangeManagerRegistry);
taskStateMachine = new TaskStateMachine(taskId, taskNotificationExecutor);
}
Expand All @@ -157,54 +163,66 @@ private void initialize(Consumer<SqlTask> onDone, CounterStat failedTasks)
{
requireNonNull(onDone, "onDone is null");
requireNonNull(failedTasks, "failedTasks is null");

AtomicBoolean outputBufferCleanedUp = new AtomicBoolean();
taskStateMachine.addStateChangeListener(newState -> {
if (!newState.isDone()) {
if (newState != RUNNING) {
// notify that task state changed (apart from initial RUNNING state notification)
notifyStatusChanged();
if (newState.isTerminatingOrDone()) {
if (newState.isTerminating()) {
// This section must be synchronized to lock out any threads that might be attempting to create a SqlTaskExecution
synchronized (taskHolderLock) {
// If a SqlTaskExecution exists, it decides when termination is complete. Otherwise, we can mark termination completed immediately
if (taskHolderReference.get().getTaskExecution() == null) {
taskStateMachine.terminationComplete();
}
}
}
return;
}

// Update failed tasks counter
if (newState == FAILED) {
failedTasks.update(1);
}

// store final task info
synchronized (taskHolderLock) {
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
// another concurrent worker already set the final state
return;
else if (newState.isDone()) {
// Update failed tasks counter
if (newState == FAILED) {
failedTasks.update(1);
}
// store final task info
boolean finished = false;
synchronized (taskHolderLock) {
TaskHolder taskHolder = taskHolderReference.get();
if (!taskHolder.isFinished()) {
TaskHolder newHolder = new TaskHolder(
createTaskInfo(taskHolder),
taskHolder.getIoStats(),
taskHolder.getDynamicFilterDomains());
checkState(taskHolderReference.compareAndSet(taskHolder, newHolder), "unsynchronized concurrent task holder update");
finished = true;
}
}
// Successfully set the final task info, cleanup the output buffer and call the completion handler
if (finished) {
try {
onDone.accept(this);
}
catch (Exception e) {
log.warn(e, "Error running task cleanup callback %s", SqlTask.this.taskId);
}
}
}
// make sure buffers are cleaned up
if (outputBufferCleanedUp.compareAndSet(false, true)) {
switch (newState) {
// don't close buffers for a failed query
// closed buffers signal to upstream tasks that everything finished cleanly
case FAILED, FAILING, ABORTED, ABORTING ->
outputBuffer.abort();
case FINISHED, CANCELED, CANCELING ->
outputBuffer.destroy();
default ->
throw new IllegalStateException(format("Invalid state for output buffer destruction: %s", newState));
}
}

TaskHolder newHolder = new TaskHolder(
createTaskInfo(taskHolder),
taskHolder.getIoStats(),
taskHolder.getDynamicFilterDomains());
checkState(taskHolderReference.compareAndSet(taskHolder, newHolder), "unsynchronized concurrent task holder update");
}

// make sure buffers are cleaned up
if (newState == FAILED || newState == ABORTED) {
// don't close buffers for a failed query
// closed buffers signal to upstream tasks that everything finished cleanly
outputBuffer.abort();
}
else {
outputBuffer.destroy();
}

try {
onDone.accept(this);
}
catch (Exception e) {
log.warn(e, "Error running task cleanup callback %s", SqlTask.this.taskId);
// notify that task state changed (apart from initial RUNNING state notification)
if (newState != RUNNING) {
notifyStatusChanged();
}

// notify that task is finished
notifyStatusChanged();
});
}

Expand Down Expand Up @@ -283,7 +301,7 @@ private TaskStatus createTaskStatus(TaskHolder taskHolder)

TaskState state = taskStateMachine.getState();
List<ExecutionFailureInfo> failures = ImmutableList.of();
if (state == FAILED) {
if (state == FAILED || state == FAILING) {
failures = toFailures(taskStateMachine.getFailureCauses());
}

Expand Down Expand Up @@ -490,8 +508,8 @@ private SqlTaskExecution tryCreateSqlTaskExecution(Session session, PlanFragment
return execution;
}

// Don't create a new execution if the task is already done
if (taskStateMachine.getState().isDone()) {
// Don't create SqlTaskExecution once termination has started
if (taskStateMachine.getState().isTerminatingOrDone()) {
return null;
}

Expand Down
Loading