Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,15 @@ private void updateTaskStatus(TaskStatus taskStatus)
.map(this::rewriteTransportFailure)
.map(ExecutionFailureInfo::toException)
// task is failed or failing, so we need to create a synthetic exception to fail the stage now
.orElseGet(() -> new TrinoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason"));
.orElseGet(() -> new TrinoException(GENERIC_INTERNAL_ERROR, format("Task %s failed for an unknown reason", taskStatus.getTaskId())));
fail(failure);
break;
case CANCELING:
case CANCELED:
case ABORTING:
case ABORTED:
// A task should only be in the aborting, aborted, canceling, or canceled state if the STAGE is done (ABORTED or FAILED)
fail(new TrinoException(GENERIC_INTERNAL_ERROR, format("A task is in the %s state but stage is %s", taskState, stateMachine.getState())));
fail(new TrinoException(GENERIC_INTERNAL_ERROR, format("Task %s is in the %s state but stage %s is %s", taskStatus.getTaskId(), taskState, stateMachine.getStageId(), stateMachine.getState())));
break;
case FLUSHING:
newFlushingOrFinishedTaskObserved = addFlushingTask(taskStatus.getTaskId());
Expand Down Expand Up @@ -601,6 +601,11 @@ private PipelinedStageStateMachine(StageId stageId, Executor executor)
state.addStateChangeListener(state -> log.debug("Pipelined stage execution %s is %s", stageId, state));
}

public StageId getStageId()
{
return stageId;
}

public State getState()
{
return state.get();
Expand Down