Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,7 @@ public void logDebugInfo()
return;
}
log.debug("SqlStages:");
stages.forEach((stageId, stage) -> {
log.debug("SqlStage %s: %s", stageId, stage);
});
stages.forEach((stageId, stage) -> log.debug("SqlStage %s: %s", stageId, stage));
}
}

Expand Down Expand Up @@ -677,7 +675,7 @@ private static class Scheduler
private static final long SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS = MINUTES.toMillis(10);
private static final long SCHEDULER_MAX_DEBUG_INFO_FREQUENCY_MILLIS = MINUTES.toMillis(10);
private static final long SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS = SECONDS.toMillis(30);
private static final long SCHEDULER_STALLED_DURATION_ON_USER_CANCELED_THRESHOLD_MILLIS = SECONDS.toMillis(60);
private static final long SCHEDULER_STALLED_DURATION_ON_USER_CANCELED_THRESHOLD_MILLIS = MINUTES.toMillis(10);
private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10;
private static final int TASK_FAILURES_LOG_SIZE = 5;

Expand Down Expand Up @@ -720,7 +718,6 @@ private static class Scheduler
private final Queue<Map.Entry<TaskId, RuntimeException>> taskFailures = new ArrayDeque<>(TASK_FAILURES_LOG_SIZE);
Comment thread
losipiuk marked this conversation as resolved.
Outdated

private boolean started;
private boolean runtimeAdaptivePartitioningApplied;

private SubPlan plan;
private List<SubPlan> planInTopologicalOrder;
Expand Down Expand Up @@ -993,7 +990,6 @@ private void logDebugInfo(String reason)
.add("runtimeAdaptivePartitioningMaxTaskSizeInBytes", runtimeAdaptivePartitioningMaxTaskSizeInBytes)
.add("stageEstimationForEagerParentEnabled", stageEstimationForEagerParentEnabled)
.add("started", started)
.add("runtimeAdaptivePartitioningApplied", runtimeAdaptivePartitioningApplied)
.add("nextSchedulingPriority", nextSchedulingPriority)
.add("preSchedulingTaskContexts", preSchedulingTaskContexts)
.add("schedulingDelayer", schedulingDelayer)
Expand All @@ -1003,9 +999,7 @@ private void logDebugInfo(String reason)
stageRegistry.logDebugInfo();

log.debug("StageExecutions:");
stageExecutions.forEach((stageId, stageExecution) -> {
stageExecution.logDebugInfo();
});
stageExecutions.forEach((_, stageExecution) -> stageExecution.logDebugInfo());

eventDebugInfos.ifPresent(EventDebugInfos::log);

Expand Down Expand Up @@ -1043,9 +1037,7 @@ private boolean checkComplete()
new TrinoException(GENERIC_INTERNAL_ERROR, "stage failed due to unknown error: %s".formatted(execution.getStageId())) :
failureCause.toException();

taskFailures.forEach(taskFailure -> {
failure.addSuppressed(new RuntimeException("Task " + taskFailure.getKey() + " failed", taskFailure.getValue()));
});
taskFailures.forEach(taskFailure -> failure.addSuppressed(new RuntimeException("Task " + taskFailure.getKey() + " failed", taskFailure.getValue())));

queryStateMachine.transitionToFailed(failure);
return true;
Expand Down Expand Up @@ -1143,7 +1135,7 @@ private RuntimeInfoProvider createRuntimeInfoProvider()
ImmutableMap.Builder<PlanFragmentId, OutputStatsEstimateResult> stageRuntimeOutputStats = ImmutableMap.builder();
ImmutableMap.Builder<PlanFragmentId, PlanFragment> planFragments = ImmutableMap.builder();
planInTopologicalOrder.forEach(subPlan -> planFragments.put(subPlan.getFragment().getId(), subPlan.getFragment()));
stageExecutions.forEach((stageId, stageExecution) -> {
stageExecutions.forEach((_, stageExecution) -> {
if (isStageRuntimeStatsReady(stageExecution)) {
OutputStatsEstimateResult runtimeOutputStats = stageExecution.getOutputStats(stageExecutions::get, false).get();
stageRuntimeOutputStats.put(
Expand Down Expand Up @@ -1284,13 +1276,6 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan)
// speculative execution not supported by Exchange implementation
return IsReadyForExecutionResult.notReady();
}
if (runtimeAdaptivePartitioningApplied) {
// Do not start a speculative stage after partition count has been changed at runtime, as when we estimate
// by progress, repartition tasks will produce very uneven output for different output partitions, which
// will result in very bad task bin-packing results; also the fact that runtime adaptive partitioning
// happened already suggests that there is plenty work ahead.
return IsReadyForExecutionResult.notReady();
}

if ((standardTasksInQueue || standardTasksWaitingForNode) && !eager) {
// Do not start a non-eager speculative stage if there is non-speculative work still to be done.
Expand Down Expand Up @@ -1323,7 +1308,7 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan)
return IsReadyForExecutionResult.notReady();
}

estimateCountByKind.compute(result.orElseThrow().kind(), (k, v) -> v == null ? 1 : v + 1);
estimateCountByKind.compute(result.orElseThrow().kind(), (_, v) -> v == null ? 1 : v + 1);

sourceOutputStatsEstimates.put(sourceStageExecution.getStageId(), result.orElseThrow().outputDataSizeEstimate());
someSourcesMadeProgress = someSourcesMadeProgress || sourceStageExecution.isSomeProgressMade();
Expand Down Expand Up @@ -1429,7 +1414,7 @@ private void createStageExecution(
schedulerStats);
closer.register(stage::abort);
stageRegistry.add(stage);
stage.addFinalStageInfoListener(status -> queryStateMachine.updateQueryInfo(Optional.ofNullable(stageRegistry.getStageInfo())));
stage.addFinalStageInfoListener(_ -> queryStateMachine.updateQueryInfo(Optional.ofNullable(stageRegistry.getStageInfo())));

ImmutableMap.Builder<PlanFragmentId, Exchange> sourceExchangesBuilder = ImmutableMap.builder();
Map<PlanFragmentId, OutputDataSizeEstimate> sourceOutputEstimatesByFragmentId = new HashMap<>();
Expand Down Expand Up @@ -2528,7 +2513,7 @@ public Optional<OutputStatsEstimateResult> getOutputStats(Function<StageId, Stag

public boolean isSomeProgressMade()
{
return partitions.size() > 0 && remainingPartitions.size() < partitions.size();
return !partitions.isEmpty() && remainingPartitions.size() < partitions.size();
}

public long getOutputRowCount()
Expand Down Expand Up @@ -2655,9 +2640,7 @@ public void logDebugInfo()
.add("initialMemoryRequirements", initialMemoryRequirements)
.toString());

partitions.forEach((partitionId, stagePartition) -> {
log.debug(" StagePartition %s.%s: %s", stage.getStageId(), partitionId, stagePartition.getDebugInfo());
});
partitions.forEach((partitionId, stagePartition) -> log.debug(" StagePartition %s.%s: %s", stage.getStageId(), partitionId, stagePartition.getDebugInfo()));
}
}

Expand Down