diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index 3e5541adc500..62c38abd1f80 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -42,6 +42,7 @@ import io.trino.execution.scheduler.faulttolerant.NodeAllocatorService; import io.trino.execution.scheduler.faulttolerant.OutputStatsEstimatorFactory; import io.trino.execution.scheduler.faulttolerant.PartitionMemoryEstimatorFactory; +import io.trino.execution.scheduler.faulttolerant.StageExecutionStats; import io.trino.execution.scheduler.faulttolerant.TaskDescriptorStorage; import io.trino.execution.scheduler.policy.ExecutionPolicy; import io.trino.execution.warnings.WarningCollector; @@ -120,6 +121,7 @@ public class SqlQueryExecution private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; private final OutputStatsEstimatorFactory outputStatsEstimatorFactory; private final TaskExecutionStats taskExecutionStats; + private final StageExecutionStats stageExecutionStats; private final List planOptimizers; private final List adaptivePlanOptimizers; private final PlanFragmenter planFragmenter; @@ -159,6 +161,7 @@ private SqlQueryExecution( PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, OutputStatsEstimatorFactory outputStatsEstimatorFactory, TaskExecutionStats taskExecutionStats, + StageExecutionStats stageExecutionStats, List planOptimizers, List adaptivePlanOptimizers, PlanFragmenter planFragmenter, @@ -192,6 +195,7 @@ private SqlQueryExecution( this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); this.outputStatsEstimatorFactory = requireNonNull(outputStatsEstimatorFactory, "outputDataSizeEstimatorFactory is null"); this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); + this.stageExecutionStats = requireNonNull(stageExecutionStats, "stageExecutionStats is null"); this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null"); @@ -580,6 +584,7 @@ private void planDistribution(PlanRoot plan, CachingTableStatsProvider tableStat stateMachine.getWarningCollector(), planOptimizersStatsCollector, tableStatsProvider), + stageExecutionStats, plan.getRoot()); break; default: @@ -782,6 +787,7 @@ public static class SqlQueryExecutionFactory private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; private final OutputStatsEstimatorFactory outputStatsEstimatorFactory; private final TaskExecutionStats taskExecutionStats; + private final StageExecutionStats stageExecutionStats; private final List planOptimizers; private final List adaptivePlanOptimizers; private final PlanFragmenter planFragmenter; @@ -813,6 +819,7 @@ public static class SqlQueryExecutionFactory PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, OutputStatsEstimatorFactory outputStatsEstimatorFactory, TaskExecutionStats taskExecutionStats, + StageExecutionStats stageExecutionStats, PlanOptimizersFactory planOptimizersFactory, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, @@ -843,6 +850,7 @@ public static class SqlQueryExecutionFactory this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); this.outputStatsEstimatorFactory = requireNonNull(outputStatsEstimatorFactory, "outputDataSizeEstimatorFactory is null"); this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); + this.stageExecutionStats = requireNonNull(stageExecutionStats, "stageExecutionStats is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"); this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null"); @@ -889,6 +897,7 @@ public QueryExecution createQueryExecution( partitionMemoryEstimatorFactory, outputStatsEstimatorFactory, taskExecutionStats, + stageExecutionStats, planOptimizers, adaptivePlanOptimizers, planFragmenter, diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index c93e9e4f41ed..e662febc3bf5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -125,6 +125,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -220,6 +221,7 @@ public class EventDrivenFaultTolerantQueryScheduler private final DynamicFilterService dynamicFilterService; private final TaskExecutionStats taskExecutionStats; private final Optional adaptivePlanner; + private final StageExecutionStats stageExecutionStats; private final SubPlan originalPlan; private final boolean stageEstimationForEagerParentEnabled; @@ -251,6 +253,7 @@ public EventDrivenFaultTolerantQueryScheduler( DynamicFilterService dynamicFilterService, TaskExecutionStats taskExecutionStats, AdaptivePlanner adaptivePlanner, + StageExecutionStats stageExecutionStats, SubPlan originalPlan) { this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null"); @@ -278,6 +281,7 @@ public EventDrivenFaultTolerantQueryScheduler( Optional.of(requireNonNull(adaptivePlanner, "adaptivePlanner is null")) : Optional.empty(); this.originalPlan = requireNonNull(originalPlan, "originalPlan is null"); + this.stageExecutionStats = requireNonNull(stageExecutionStats, "stageExecutionStats is null"); this.stageEstimationForEagerParentEnabled = isFaultTolerantExecutionStageEstimationForEagerParentEnabled(queryStateMachine.getSession()); @@ -349,6 +353,7 @@ public synchronized void start() failureDetector, stageRegistry, taskExecutionStats, + stageExecutionStats, dynamicFilterService, new SchedulingDelayer( getRetryInitialDelay(session), @@ -696,6 +701,7 @@ private static class Scheduler private final FailureDetector failureDetector; private final StageRegistry stageRegistry; private final TaskExecutionStats taskExecutionStats; + private final StageExecutionStats stageExecutionStats; private final DynamicFilterService dynamicFilterService; private final int maxPartitionCount; private final boolean runtimeAdaptivePartitioningEnabled; @@ -754,6 +760,7 @@ public Scheduler( FailureDetector failureDetector, StageRegistry stageRegistry, TaskExecutionStats taskExecutionStats, + StageExecutionStats stageExecutionStats, DynamicFilterService dynamicFilterService, SchedulingDelayer schedulingDelayer, SubPlan plan, @@ -787,6 +794,7 @@ public Scheduler( this.failureDetector = requireNonNull(failureDetector, "failureDetector is null"); this.stageRegistry = requireNonNull(stageRegistry, "stageRegistry is null"); this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); + this.stageExecutionStats = requireNonNull(stageExecutionStats, "stageExecutionStats is null"); this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null"); this.schedulingDelayer = requireNonNull(schedulingDelayer, "schedulingDelayer is null"); this.plan = requireNonNull(plan, "plan is null"); @@ -1328,6 +1336,10 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan) subPlan.getFragment().getId(), finishedSourcesCount, estimateCountByKind); + estimateCountByKind.forEach(stageExecutionStats::recordSourceOutputEstimationOnStageStart); + } + else { + stageExecutionStats.recordSourcesFinishedOnStageStart(subPlan.getChildren().size()); } return IsReadyForExecutionResult.ready(sourceOutputStatsEstimates.buildOrThrow(), eager, speculative); } @@ -1483,6 +1495,7 @@ private void createStageExecution( taskSource, sinkPartitioningScheme, exchange, + stageExecutionStats, memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment, planFragmentLookup), outputStatsEstimator, // do not retry coordinator only tasks @@ -1881,6 +1894,7 @@ public static class StageExecution private final EventDrivenTaskSource taskSource; private final FaultTolerantPartitioningScheme sinkPartitioningScheme; private final Exchange exchange; + private final StageExecutionStats stageExecutionStats; private final PartitionMemoryEstimator partitionMemoryEstimator; private final OutputStatsEstimator outputStatsEstimator; private final int maxTaskExecutionAttempts; @@ -1908,6 +1922,9 @@ public static class StageExecution private boolean taskDescriptorLoadingActive; private boolean exchangeClosed; + private final long startTime = System.currentTimeMillis(); + private OptionalLong nonSpeculativeSwitchTime = OptionalLong.empty(); + private MemoryRequirements initialMemoryRequirements; private StageExecution( @@ -1917,6 +1934,7 @@ private StageExecution( EventDrivenTaskSource taskSource, FaultTolerantPartitioningScheme sinkPartitioningScheme, Exchange exchange, + StageExecutionStats stageExecutionStats, PartitionMemoryEstimator partitionMemoryEstimator, OutputStatsEstimator outputStatsEstimator, int maxTaskExecutionAttempts, @@ -1930,6 +1948,7 @@ private StageExecution( this.taskSource = requireNonNull(taskSource, "taskSource is null"); this.sinkPartitioningScheme = requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null"); this.exchange = requireNonNull(exchange, "exchange is null"); + this.stageExecutionStats = requireNonNull(stageExecutionStats, "stageExecutionStats is null"); this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null"); this.outputStatsEstimator = requireNonNull(outputStatsEstimator, "outputStatsEstimator is null"); this.maxTaskExecutionAttempts = maxTaskExecutionAttempts; @@ -2009,6 +2028,10 @@ public boolean isExchangeClosed() public void setSpeculative(boolean speculative) { + checkArgument(!speculative || this.speculative, "cannot mark non-speculative stage as speculative"); + if (this.speculative && !speculative) { + nonSpeculativeSwitchTime = OptionalLong.of(System.currentTimeMillis()); + } this.speculative = speculative; } @@ -2330,6 +2353,22 @@ private void finish() sinkOutputSelectorBuilder = null; stage.finish(); taskSource.close(); + + recordFinishStats(); + } + + private void recordFinishStats() + { + long finishTime = System.currentTimeMillis(); + long nonSpeculativeSwitchTime = this.nonSpeculativeSwitchTime.orElse(finishTime); + + double speculativeExecutionFraction = ((double) nonSpeculativeSwitchTime - (double) startTime) / ((double) finishTime - (double) startTime); + if (Double.isFinite(speculativeExecutionFraction)) { + stageExecutionStats.recordStageSpeculativeExecutionFraction(speculativeExecutionFraction); + } + else { + stageExecutionStats.recordStageSpeculativeExecutionFraction(1.0); + } } private void updateOutputSize(SpoolingOutputStats.Snapshot taskOutputStats) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/StageExecutionStats.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/StageExecutionStats.java new file mode 100644 index 000000000000..3e8477ea332e --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/StageExecutionStats.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.faulttolerant; + +import com.google.inject.Inject; +import io.airlift.stats.CounterStat; +import io.airlift.stats.DistributionStat; +import org.weakref.jmx.MBeanExporter; +import org.weakref.jmx.Managed; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static java.util.Objects.requireNonNull; + +public class StageExecutionStats +{ + private static final int EXECUTION_FRACTION_RESCALE_FACTOR = 1_000_000; + private final Map outputEstimationKindCounters = new ConcurrentHashMap<>(); + private final DistributionStat speculativeExecutionFractionDistribution = new DistributionStat(); + + private final MBeanExporter mbeanExporter; + + @Inject + public StageExecutionStats(MBeanExporter mbeanExporter) + { + this.mbeanExporter = requireNonNull(mbeanExporter, "mbeanExporter is null"); + } + + public void recordSourceOutputEstimationOnStageStart(String sourceOutputEstimationKind, int sourcesCount) + { + updateSourceOutputEstimationKindCounter(sourceOutputEstimationKind, sourcesCount); + } + + public void recordSourcesFinishedOnStageStart(int sourcesCount) + { + updateSourceOutputEstimationKindCounter("finished", sourcesCount); + } + + @Managed + public void recordStageSpeculativeExecutionFraction(double fractionSpentSpeculative) + { + speculativeExecutionFractionDistribution.add((long) (fractionSpentSpeculative * EXECUTION_FRACTION_RESCALE_FACTOR)); + } + + private void updateSourceOutputEstimationKindCounter(String outputEstimationKind, int sourcesCount) + { + getCounterStat(outputEstimationKind).update(sourcesCount); + } + + private CounterStat getCounterStat(String outputEstimationKind) + { + return outputEstimationKindCounters.computeIfAbsent(outputEstimationKind, ignored -> { + CounterStat counter = new CounterStat(); + mbeanExporter.exportWithGeneratedName(counter, StageExecutionStats.class, "output_size_estimation" + "_" + outputEstimationKind); + return counter; + }); + } +} diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index 001cc523b784..1bd340603ac1 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -80,6 +80,7 @@ import io.trino.execution.scheduler.faulttolerant.NodeAllocatorService; import io.trino.execution.scheduler.faulttolerant.OutputStatsEstimatorFactory; import io.trino.execution.scheduler.faulttolerant.PartitionMemoryEstimatorFactory; +import io.trino.execution.scheduler.faulttolerant.StageExecutionStats; import io.trino.execution.scheduler.faulttolerant.TaskDescriptorStorage; import io.trino.execution.scheduler.policy.AllAtOnceExecutionPolicy; import io.trino.execution.scheduler.policy.ExecutionPolicy; @@ -353,6 +354,8 @@ List getCompositeOutputDataSizeEstimatorDelegateFac binder.bind(TaskExecutionStats.class).in(Scopes.SINGLETON); newExporter(binder).export(TaskExecutionStats.class).withGeneratedName(); + binder.bind(StageExecutionStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(StageExecutionStats.class).withGeneratedName(); MapBinder executionPolicyBinder = newMapBinder(binder, String.class, ExecutionPolicy.class); executionPolicyBinder.addBinding("all-at-once").to(AllAtOnceExecutionPolicy.class);