From 04a51b0bf54a3de92e053c6290e506f3a51e8948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 13 Aug 2024 11:19:06 +0200 Subject: [PATCH 1/3] Various cleanups --- ...ventDrivenFaultTolerantQueryScheduler.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 105186718137..1ce569a2f2f7 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -524,9 +524,7 @@ public void logDebugInfo() return; } log.debug("SqlStages:"); - stages.forEach((stageId, stage) -> { - log.debug("SqlStage %s: %s", stageId, stage); - }); + stages.forEach((stageId, stage) -> log.debug("SqlStage %s: %s", stageId, stage)); } } @@ -1003,9 +1001,7 @@ private void logDebugInfo(String reason) stageRegistry.logDebugInfo(); log.debug("StageExecutions:"); - stageExecutions.forEach((stageId, stageExecution) -> { - stageExecution.logDebugInfo(); - }); + stageExecutions.forEach((_, stageExecution) -> stageExecution.logDebugInfo()); eventDebugInfos.ifPresent(EventDebugInfos::log); @@ -1043,9 +1039,7 @@ private boolean checkComplete() new TrinoException(GENERIC_INTERNAL_ERROR, "stage failed due to unknown error: %s".formatted(execution.getStageId())) : failureCause.toException(); - taskFailures.forEach(taskFailure -> { - failure.addSuppressed(new RuntimeException("Task " + taskFailure.getKey() + " failed", taskFailure.getValue())); - }); + taskFailures.forEach(taskFailure -> failure.addSuppressed(new RuntimeException("Task " + taskFailure.getKey() + " failed", taskFailure.getValue()))); queryStateMachine.transitionToFailed(failure); return true; @@ -1143,7 +1137,7 @@ private RuntimeInfoProvider createRuntimeInfoProvider() ImmutableMap.Builder stageRuntimeOutputStats = ImmutableMap.builder(); ImmutableMap.Builder planFragments = ImmutableMap.builder(); planInTopologicalOrder.forEach(subPlan -> planFragments.put(subPlan.getFragment().getId(), subPlan.getFragment())); - stageExecutions.forEach((stageId, stageExecution) -> { + stageExecutions.forEach((_, stageExecution) -> { if (isStageRuntimeStatsReady(stageExecution)) { OutputStatsEstimateResult runtimeOutputStats = stageExecution.getOutputStats(stageExecutions::get, false).get(); stageRuntimeOutputStats.put( @@ -1323,7 +1317,7 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan) return IsReadyForExecutionResult.notReady(); } - estimateCountByKind.compute(result.orElseThrow().kind(), (k, v) -> v == null ? 1 : v + 1); + estimateCountByKind.compute(result.orElseThrow().kind(), (_, v) -> v == null ? 1 : v + 1); sourceOutputStatsEstimates.put(sourceStageExecution.getStageId(), result.orElseThrow().outputDataSizeEstimate()); someSourcesMadeProgress = someSourcesMadeProgress || sourceStageExecution.isSomeProgressMade(); @@ -1429,7 +1423,7 @@ private void createStageExecution( schedulerStats); closer.register(stage::abort); stageRegistry.add(stage); - stage.addFinalStageInfoListener(status -> queryStateMachine.updateQueryInfo(Optional.ofNullable(stageRegistry.getStageInfo()))); + stage.addFinalStageInfoListener(_ -> queryStateMachine.updateQueryInfo(Optional.ofNullable(stageRegistry.getStageInfo()))); ImmutableMap.Builder sourceExchangesBuilder = ImmutableMap.builder(); Map sourceOutputEstimatesByFragmentId = new HashMap<>(); @@ -2528,7 +2522,7 @@ public Optional getOutputStats(Function 0 && remainingPartitions.size() < partitions.size(); + return !partitions.isEmpty() && remainingPartitions.size() < partitions.size(); } public long getOutputRowCount() @@ -2655,9 +2649,7 @@ public void logDebugInfo() .add("initialMemoryRequirements", initialMemoryRequirements) .toString()); - partitions.forEach((partitionId, stagePartition) -> { - log.debug(" StagePartition %s.%s: %s", stage.getStageId(), partitionId, stagePartition.getDebugInfo()); - }); + partitions.forEach((partitionId, stagePartition) -> log.debug(" StagePartition %s.%s: %s", stage.getStageId(), partitionId, stagePartition.getDebugInfo())); } } From 4c72af261b34884f763cf2a0410ddd768661599b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 13 Aug 2024 11:21:12 +0200 Subject: [PATCH 2/3] Remove unused field --- .../EventDrivenFaultTolerantQueryScheduler.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 1ce569a2f2f7..552e353d8d91 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -718,7 +718,6 @@ private static class Scheduler private final Queue> taskFailures = new ArrayDeque<>(TASK_FAILURES_LOG_SIZE); private boolean started; - private boolean runtimeAdaptivePartitioningApplied; private SubPlan plan; private List planInTopologicalOrder; @@ -991,7 +990,6 @@ private void logDebugInfo(String reason) .add("runtimeAdaptivePartitioningMaxTaskSizeInBytes", runtimeAdaptivePartitioningMaxTaskSizeInBytes) .add("stageEstimationForEagerParentEnabled", stageEstimationForEagerParentEnabled) .add("started", started) - .add("runtimeAdaptivePartitioningApplied", runtimeAdaptivePartitioningApplied) .add("nextSchedulingPriority", nextSchedulingPriority) .add("preSchedulingTaskContexts", preSchedulingTaskContexts) .add("schedulingDelayer", schedulingDelayer) @@ -1278,13 +1276,6 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan) // speculative execution not supported by Exchange implementation return IsReadyForExecutionResult.notReady(); } - if (runtimeAdaptivePartitioningApplied) { - // Do not start a speculative stage after partition count has been changed at runtime, as when we estimate - // by progress, repartition tasks will produce very uneven output for different output partitions, which - // will result in very bad task bin-packing results; also the fact that runtime adaptive partitioning - // happened already suggests that there is plenty work ahead. - return IsReadyForExecutionResult.notReady(); - } if ((standardTasksInQueue || standardTasksWaitingForNode) && !eager) { // Do not start a non-eager speculative stage if there is non-speculative work still to be done. From 619d7ef9d600d5ad6669c91905021d67b0d8bd46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 13 Aug 2024 11:37:11 +0200 Subject: [PATCH 3/3] Increase timeout for diagnostics loggin of stalled scheduler --- .../faulttolerant/EventDrivenFaultTolerantQueryScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 552e353d8d91..1da173b3e1cd 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -675,7 +675,7 @@ private static class Scheduler private static final long SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS = MINUTES.toMillis(10); private static final long SCHEDULER_MAX_DEBUG_INFO_FREQUENCY_MILLIS = MINUTES.toMillis(10); private static final long SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS = SECONDS.toMillis(30); - private static final long SCHEDULER_STALLED_DURATION_ON_USER_CANCELED_THRESHOLD_MILLIS = SECONDS.toMillis(60); + private static final long SCHEDULER_STALLED_DURATION_ON_USER_CANCELED_THRESHOLD_MILLIS = MINUTES.toMillis(10); private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10; private static final int TASK_FAILURES_LOG_SIZE = 5;