diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTask.java b/core/trino-main/src/main/java/io/trino/execution/SqlTask.java index 975c6d310d8f..ffb4a1832472 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTask.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTask.java @@ -43,6 +43,7 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.net.URI; import java.util.List; @@ -90,7 +91,9 @@ public class SqlTask private final AtomicReference lastHeartbeat = new AtomicReference<>(DateTime.now()); private final AtomicLong taskStatusVersion = new AtomicLong(TaskStatus.STARTING_VERSION); private final FutureStateChange taskStatusVersionChange = new FutureStateChange<>(); - + // Must be synchronized when updating the current task holder reference, but not when only reading the current reference value + private final Object taskHolderLock = new Object(); + @GuardedBy("taskHolderLock") private final AtomicReference taskHolderReference = new AtomicReference<>(new TaskHolder()); private final AtomicBoolean needsPlan = new AtomicBoolean(true); private final AtomicReference traceToken = new AtomicReference<>(); @@ -167,19 +170,18 @@ private void initialize(Consumer onDone, CounterStat failedTasks) } // store final task info - while (true) { + synchronized (taskHolderLock) { TaskHolder taskHolder = taskHolderReference.get(); if (taskHolder.isFinished()) { // another concurrent worker already set the final state return; } - if (taskHolderReference.compareAndSet(taskHolder, new TaskHolder( + TaskHolder newHolder = new TaskHolder( createTaskInfo(taskHolder), taskHolder.getIoStats(), - taskHolder.getDynamicFilterDomains()))) { - break; - } + taskHolder.getDynamicFilterDomains()); + checkState(taskHolderReference.compareAndSet(taskHolder, newHolder), "unsynchronized concurrent task holder update"); } // make sure buffers are cleaned up @@ -433,44 +435,69 @@ public TaskInfo updateTask( // a VALUES query). outputBuffer.setOutputBuffers(outputBuffers); - // assure the task execution is only created once - SqlTaskExecution taskExecution; - synchronized (this) { - // is task already complete? - TaskHolder taskHolder = taskHolderReference.get(); - if (taskHolder.isFinished()) { - return taskHolder.getFinalTaskInfo(); - } - taskExecution = taskHolder.getTaskExecution(); - if (taskExecution == null) { - checkState(fragment.isPresent(), "fragment must be present"); - taskExecution = sqlTaskExecutionFactory.create( - session, - queryContext, - taskStateMachine, - outputBuffer, - fragment.get(), - this::notifyStatusChanged); - taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution)); - needsPlan.set(false); - taskExecution.start(); - } + // is task already complete? + TaskHolder taskHolder = taskHolderReference.get(); + if (taskHolder.isFinished()) { + return taskHolder.getFinalTaskInfo(); } - taskExecution.addSplitAssignments(splitAssignments); - taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains); + SqlTaskExecution taskExecution = taskHolder.getTaskExecution(); + if (taskExecution == null) { + checkState(fragment.isPresent(), "fragment must be present"); + taskExecution = tryCreateSqlTaskExecution(session, fragment.get()); + } + // taskExecution can still be null if the creation was skipped + if (taskExecution != null) { + taskExecution.addSplitAssignments(splitAssignments); + taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains); + } } catch (Error e) { failed(e); throw e; } catch (RuntimeException e) { - failed(e); + return failed(e); } return getTaskInfo(); } + @Nullable + private SqlTaskExecution tryCreateSqlTaskExecution(Session session, PlanFragment fragment) + { + synchronized (taskHolderLock) { + // Recheck holder for task execution after acquiring the lock + TaskHolder taskHolder = taskHolderReference.get(); + if (taskHolder.isFinished()) { + return null; + } + SqlTaskExecution execution = taskHolder.getTaskExecution(); + if (execution != null) { + return execution; + } + + // Don't create a new execution if the task is already done + if (taskStateMachine.getState().isDone()) { + return null; + } + + execution = sqlTaskExecutionFactory.create( + session, + queryContext, + taskStateMachine, + outputBuffer, + fragment, + this::notifyStatusChanged); + needsPlan.set(false); + execution.start(); + // this must happen after taskExecution.start(), otherwise it could become visible to a + // concurrent update without being fully initialized + checkState(taskHolderReference.compareAndSet(taskHolder, new TaskHolder(execution)), "unsynchronized concurrent task holder update"); + return execution; + } + } + public ListenableFuture getTaskResults(PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) { requireNonNull(bufferId, "bufferId is null"); diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java index b3a021c2d5ae..11992e742b0a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java @@ -163,17 +163,22 @@ public SqlTaskExecution( else { taskHandle = null; } - - outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this)); } } public void start() { try (SetThreadName ignored = new SetThreadName("Task-%s", getTaskId())) { + // Task handle was not created because the task is already done, nothing to do + if (taskHandle == null) { + return; + } // The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object. - // The call back is accessed from another thread, so this code cannot be placed in the constructor. + // The call back is accessed from another thread, so this code cannot be placed in the constructor. This must also happen before outputBuffer + // callbacks are registered to prevent a task completion check before task lifecycle splits are created scheduleDriversForTaskLifeCycle(); + // Output buffer state change listener callback must not run in the constructor to avoid leaking a reference to "this" across to another thread + outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this)); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java index 1d84e55b1dcd..b16c161549d0 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java @@ -401,17 +401,6 @@ public ListenableFuture getTaskInfo(TaskId taskId, long currentVersion return sqlTask.getTaskInfo(currentVersion); } - /** - * Gets the unique instance id of a task. This can be used to detect a task - * that was destroyed and recreated. - */ - public String getTaskInstanceId(TaskId taskId) - { - SqlTask sqlTask = tasks.getUnchecked(taskId); - sqlTask.recordHeartbeat(); - return sqlTask.getTaskInstanceId(); - } - /** * Gets future status for the task after the state changes from * {@code current state}. If the task has not been created yet, an @@ -508,14 +497,15 @@ private TaskInfo doUpdateTask( * NOTE: this design assumes that only tasks and buffers that will * eventually exist are queried. */ - public ListenableFuture getTaskResults(TaskId taskId, PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) + public SqlTaskWithResults getTaskResults(TaskId taskId, PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) { requireNonNull(taskId, "taskId is null"); requireNonNull(bufferId, "bufferId is null"); checkArgument(startingSequenceId >= 0, "startingSequenceId is negative"); requireNonNull(maxSize, "maxSize is null"); - return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize); + SqlTask task = tasks.getUnchecked(taskId); + return new SqlTaskWithResults(task, task.getTaskResults(bufferId, startingSequenceId, maxSize)); } /** @@ -778,4 +768,39 @@ private void failStuckSplitTasks() } } } + + public static final class SqlTaskWithResults + { + private final SqlTask task; + private final ListenableFuture resultsFuture; + + public SqlTaskWithResults(SqlTask task, ListenableFuture resultsFuture) + { + this.task = requireNonNull(task, "task is null"); + this.resultsFuture = requireNonNull(resultsFuture, "resultsFuture is null"); + } + + public void recordHeartbeat() + { + task.recordHeartbeat(); + } + + public String getTaskInstanceId() + { + return task.getTaskInstanceId(); + } + + public boolean isTaskFailed() + { + return switch (task.getTaskState()) { + case ABORTED, FAILED -> true; + default -> false; + }; + } + + public ListenableFuture getResultsFuture() + { + return resultsFuture; + } + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java index 9dd4bd9fc532..173e76756dd3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java @@ -209,8 +209,10 @@ public ListenableFuture get(OutputBufferId bufferId, long token, D if (outputBuffer == null) { synchronized (this) { if (delegate == null) { - if (stateMachine.getState() == FINISHED) { - return immediateFuture(emptyResults(taskInstanceId, 0, true)); + if (stateMachine.getState().isTerminal()) { + // only set complete when finished, otherwise + boolean complete = stateMachine.getState() == FINISHED; + return immediateFuture(emptyResults(taskInstanceId, 0, complete)); } PendingRead pendingRead = new PendingRead(bufferId, token, maxSize); @@ -310,19 +312,31 @@ public void destroy() @Override public void abort() { + List pendingReads = ImmutableList.of(); OutputBuffer outputBuffer = delegate; if (outputBuffer == null) { synchronized (this) { if (delegate == null) { // ignore abort if the buffer already in a terminal state. - stateMachine.abort(); + if (!stateMachine.abort()) { + return; + } - // Do not free readers on fail - return; + pendingReads = ImmutableList.copyOf(this.pendingReads); + this.pendingReads.clear(); } outputBuffer = delegate; } } + + // if there is no output buffer, send an empty result without buffer completed signaled + if (outputBuffer == null) { + for (PendingRead pendingRead : pendingReads) { + pendingRead.getFutureResult().set(emptyResults(taskInstanceId, 0, false)); + } + return; + } + outputBuffer.abort(); } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java index 203b5dcab5bf..927efb0aa7e2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java @@ -248,8 +248,10 @@ public synchronized void schedulingComplete(PlanNodeId partitionedSource) @Override public synchronized void cancel() { - stateMachine.transitionToCanceled(); - getAllTasks().forEach(RemoteTask::cancel); + // Only send tasks a cancel command if the stage is successfully cancelled and not already failed + if (stateMachine.transitionToCanceled()) { + getAllTasks().forEach(RemoteTask::cancel); + } } @Override diff --git a/core/trino-main/src/main/java/io/trino/server/TaskResource.java b/core/trino-main/src/main/java/io/trino/server/TaskResource.java index 146253243196..f1c60c641a28 100644 --- a/core/trino-main/src/main/java/io/trino/server/TaskResource.java +++ b/core/trino-main/src/main/java/io/trino/server/TaskResource.java @@ -27,9 +27,9 @@ import io.trino.execution.FailureInjector; import io.trino.execution.FailureInjector.InjectedFailure; import io.trino.execution.SqlTaskManager; +import io.trino.execution.SqlTaskManager.SqlTaskWithResults; import io.trino.execution.TaskId; import io.trino.execution.TaskInfo; -import io.trino.execution.TaskState; import io.trino.execution.TaskStatus; import io.trino.execution.buffer.BufferResult; import io.trino.execution.buffer.PipelinedOutputBuffers; @@ -70,6 +70,7 @@ import static io.airlift.concurrent.MoreFutures.addTimeout; import static io.airlift.jaxrs.AsyncResponseHandler.bindAsyncResponse; import static io.trino.TrinoMediaTypes.TRINO_PAGES; +import static io.trino.execution.buffer.BufferResult.emptyResults; import static io.trino.server.InternalHeaders.TRINO_BUFFER_COMPLETE; import static io.trino.server.InternalHeaders.TRINO_CURRENT_VERSION; import static io.trino.server.InternalHeaders.TRINO_MAX_SIZE; @@ -189,11 +190,14 @@ public void getTaskInfo( } Duration waitTime = randomizeWaitTime(maxWait); - ListenableFuture futureTaskInfo = addTimeout( - taskManager.getTaskInfo(taskId, currentVersion), - () -> taskManager.getTaskInfo(taskId), - waitTime, - timeoutExecutor); + ListenableFuture futureTaskInfo = taskManager.getTaskInfo(taskId, currentVersion); + if (!futureTaskInfo.isDone()) { + futureTaskInfo = addTimeout( + futureTaskInfo, + () -> taskManager.getTaskInfo(taskId), + waitTime, + timeoutExecutor); + } if (shouldSummarize(uriInfo)) { futureTaskInfo = Futures.transform(futureTaskInfo, TaskInfo::summarize, directExecutor()); @@ -232,11 +236,14 @@ public void getTaskStatus( // TODO: With current implementation, a newly completed driver group won't trigger immediate HTTP response, // leading to a slight delay of approx 1 second, which is not a major issue for any query that are heavy weight enough // to justify group-by-group execution. In order to fix this, REST endpoint /v1/{task}/status will need change. - ListenableFuture futureTaskStatus = addTimeout( - taskManager.getTaskStatus(taskId, currentVersion), - () -> taskManager.getTaskStatus(taskId), - waitTime, - timeoutExecutor); + ListenableFuture futureTaskStatus = taskManager.getTaskStatus(taskId, currentVersion); + if (!futureTaskStatus.isDone()) { + futureTaskStatus = addTimeout( + futureTaskStatus, + () -> taskManager.getTaskStatus(taskId), + waitTime, + timeoutExecutor); + } // For hard timeout, add an additional time to max wait for thread scheduling contention and GC Duration timeout = new Duration(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), MILLISECONDS); @@ -322,52 +329,26 @@ public void getResults( return; } - TaskState state = taskManager.getTaskStatus(taskId).getState(); - boolean taskFailed = state == TaskState.ABORTED || state == TaskState.FAILED; - long start = System.nanoTime(); - ListenableFuture bufferResultFuture = taskManager.getTaskResults(taskId, bufferId, token, maxSize); + SqlTaskWithResults taskWithResults = taskManager.getTaskResults(taskId, bufferId, token, maxSize); + ListenableFuture bufferResultFuture = taskWithResults.getResultsFuture(); + BufferResult emptyBufferResults = emptyResults(taskWithResults.getTaskInstanceId(), token, false); + Duration waitTime = randomizeWaitTime(DEFAULT_MAX_WAIT_TIME); - bufferResultFuture = addTimeout( - bufferResultFuture, - () -> BufferResult.emptyResults(taskManager.getTaskInstanceId(taskId), token, false), - waitTime, - timeoutExecutor); - - ListenableFuture responseFuture = Futures.transform(bufferResultFuture, result -> { - List serializedPages = result.getSerializedPages(); - - GenericEntity entity = null; - Status status; - if (serializedPages.isEmpty()) { - status = Status.NO_CONTENT; - } - else { - entity = new GenericEntity<>(serializedPages, new TypeToken>() {}.getType()); - status = Status.OK; - } + if (!bufferResultFuture.isDone()) { + bufferResultFuture = addTimeout( + bufferResultFuture, + () -> emptyBufferResults, + waitTime, + timeoutExecutor); + } - return Response.status(status) - .entity(entity) - .header(TRINO_TASK_INSTANCE_ID, result.getTaskInstanceId()) - .header(TRINO_PAGE_TOKEN, result.getToken()) - .header(TRINO_PAGE_NEXT_TOKEN, result.getNextToken()) - .header(TRINO_BUFFER_COMPLETE, result.isBufferComplete()) - .header(TRINO_TASK_FAILED, taskFailed) - .build(); - }, directExecutor()); + ListenableFuture responseFuture = Futures.transform(bufferResultFuture, results -> createBufferResultResponse(taskWithResults, results), directExecutor()); // For hard timeout, add an additional time to max wait for thread scheduling contention and GC Duration timeout = new Duration(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), MILLISECONDS); bindAsyncResponse(asyncResponse, responseFuture, responseExecutor) - .withTimeout(timeout, - Response.status(Status.NO_CONTENT) - .header(TRINO_TASK_INSTANCE_ID, taskManager.getTaskInstanceId(taskId)) - .header(TRINO_PAGE_TOKEN, token) - .header(TRINO_PAGE_NEXT_TOKEN, token) - .header(TRINO_BUFFER_COMPLETE, false) - .header(TRINO_TASK_FAILED, taskFailed) - .build()); + .withTimeout(timeout, () -> createBufferResultResponse(taskWithResults, emptyBufferResults)); responseFuture.addListener(() -> readFromOutputBufferTime.add(Duration.nanosSince(start)), directExecutor()); asyncResponse.register((CompletionCallback) throwable -> resultsRequestTime.add(Duration.nanosSince(start))); @@ -516,4 +497,32 @@ private static Duration randomizeWaitTime(Duration waitTime) long halfWaitMillis = waitTime.toMillis() / 2; return new Duration(halfWaitMillis + ThreadLocalRandom.current().nextLong(halfWaitMillis), MILLISECONDS); } + + private static Response createBufferResultResponse(SqlTaskWithResults taskWithResults, BufferResult result) + { + // This response may have been created as the result of a timeout, so refresh the task heartbeat + taskWithResults.recordHeartbeat(); + + List serializedPages = result.getSerializedPages(); + + GenericEntity entity = null; + Status status; + if (serializedPages.isEmpty()) { + status = Status.NO_CONTENT; + } + else { + entity = new GenericEntity<>(serializedPages, new TypeToken>() {}.getType()); + status = Status.OK; + } + + return Response.status(status) + .entity(entity) + .header(TRINO_TASK_INSTANCE_ID, result.getTaskInstanceId()) + .header(TRINO_PAGE_TOKEN, result.getToken()) + .header(TRINO_PAGE_NEXT_TOKEN, result.getNextToken()) + .header(TRINO_BUFFER_COMPLETE, result.isBufferComplete()) + // check for task failure after getting the result to ensure it's consistent with isBufferComplete() + .header(TRINO_TASK_FAILED, taskWithResults.isTaskFailed()) + .build(); + } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java index ad1f09186f5b..61aa5671e9ce 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java @@ -145,13 +145,13 @@ public void testSimpleQuery() TaskInfo taskInfo = sqlTaskManager.getTaskInfo(taskId, TaskStatus.STARTING_VERSION).get(); assertEquals(taskInfo.getTaskStatus().getState(), TaskState.FLUSHING); - BufferResult results = sqlTaskManager.getTaskResults(taskId, OUT, 0, DataSize.of(1, Unit.MEGABYTE)).get(); + BufferResult results = sqlTaskManager.getTaskResults(taskId, OUT, 0, DataSize.of(1, Unit.MEGABYTE)).getResultsFuture().get(); assertFalse(results.isBufferComplete()); assertEquals(results.getSerializedPages().size(), 1); assertEquals(getSerializedPagePositionCount(results.getSerializedPages().get(0)), 1); for (boolean moreResults = true; moreResults; moreResults = !results.isBufferComplete()) { - results = sqlTaskManager.getTaskResults(taskId, OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, Unit.MEGABYTE)).get(); + results = sqlTaskManager.getTaskResults(taskId, OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, Unit.MEGABYTE)).getResultsFuture().get(); } assertTrue(results.isBufferComplete()); assertEquals(results.getSerializedPages().size(), 0); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java index 8f94a52d80d5..258183489d7a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java @@ -26,6 +26,7 @@ import io.trino.execution.SqlTaskManager; import io.trino.execution.TaskId; import io.trino.execution.TaskInfo; +import io.trino.execution.TaskState; import io.trino.execution.warnings.WarningCollector; import io.trino.memory.LocalMemoryManager; import io.trino.memory.MemoryPool; @@ -66,12 +67,14 @@ import java.util.OptionalLong; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; +import static io.trino.execution.StageInfo.getAllStages; import static io.trino.sql.ParsingUtil.createParsingOptions; import static io.trino.sql.SqlFormatter.formatSql; import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy; @@ -192,11 +195,11 @@ private void checkQueryInfosFinal() for (BasicQueryInfo basicQueryInfo : queryManager.getQueries()) { QueryId queryId = basicQueryInfo.getQueryId(); if (!basicQueryInfo.getState().isDone()) { - fail("query is expected to be in done state: " + basicQueryInfo.getQuery()); + fail("query is expected to be in a done state\n\n" + createQueryDebuggingSummary(basicQueryInfo, queryManager.getFullQueryInfo(queryId))); } QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId); if (!queryInfo.isFinalQueryInfo()) { - fail("QueryInfo is expected to be final: " + basicQueryInfo.getQuery()); + fail("QueryInfo for is expected to be final\n\n" + createQueryDebuggingSummary(basicQueryInfo, queryInfo)); } } })); @@ -216,20 +219,53 @@ private void checkTasksDone() for (TaskInfo taskInfo : taskInfos) { TaskId taskId = taskInfo.getTaskStatus().getTaskId(); QueryId queryId = taskId.getQueryId(); - String query = "unknown"; - try { - query = queryManager.getQueryInfo(queryId).getQuery(); - } - catch (NoSuchElementException ignored) { - } - if (!taskInfo.getTaskStatus().getState().isDone()) { - fail("Task is expected to be in done state. TaskId: %s, QueryId: %s, Query: %s ".formatted(taskId, queryId, query)); + TaskState taskState = taskInfo.getTaskStatus().getState(); + if (!taskState.isDone()) { + try { + BasicQueryInfo basicQueryInfo = queryManager.getQueryInfo(queryId); + QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId); + String querySummary = createQueryDebuggingSummary(basicQueryInfo, queryInfo); + fail("Task is expected to be in done state, found: %s - TaskId: %s, QueryId: %s".formatted(taskState, taskId, queryId) + "\n\n" + querySummary); + } + catch (NoSuchElementException ignored) { + } + fail("Task is expected to be in done state, found: %s - TaskId: %s, QueryId: %s, Query: unknown".formatted(taskState, taskId, queryId)); } } } })); } + private static String createQueryDebuggingSummary(BasicQueryInfo basicQueryInfo, QueryInfo queryInfo) + { + String queryDetails = format("Query %s [%s]: %s", basicQueryInfo.getQueryId(), basicQueryInfo.getState(), basicQueryInfo.getQuery()); + if (queryInfo.getOutputStage().isEmpty()) { + return queryDetails + " -- "; + } + else { + return queryDetails + getAllStages(queryInfo.getOutputStage()).stream() + .map(stageInfo -> { + String stageDetail = format("Stage %s [%s]", stageInfo.getStageId(), stageInfo.getState()); + if (stageInfo.getTasks().isEmpty()) { + return stageDetail; + } + return stageDetail + stageInfo.getTasks().stream() + .map(TaskInfo::getTaskStatus) + .map(task -> { + String taskDetail = format("Task %s [%s]", task.getTaskId(), task.getState()); + if (task.getFailures().isEmpty()) { + return taskDetail; + } + return " -- Failures: " + task.getFailures().stream() + .map(failure -> format("%s %s: %s", failure.getErrorCode(), failure.getType(), failure.getMessage())) + .collect(Collectors.joining(", ", "[", "]")); + }) + .collect(Collectors.joining("\n\t\t", ":\n\t\t", "")); + }) + .collect(Collectors.joining("\n\n\t", "\nStages:\n\t", "")); + } + } + @Test public void ensureTestNamingConvention() {