diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java index 29327a968b02..27280d3f7179 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java @@ -263,9 +263,9 @@ public synchronized Optional createTask( return Optional.of(task); } - public void recordGetSplitTime(long start) + public void recordGetSplitTime(PlanNodeId planNodeId, long start) { - stateMachine.recordGetSplitTime(start); + stateMachine.recordGetSplitTime(planNodeId, start); } private synchronized void updateTaskStatus(TaskStatus status) diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 6dc8479fce2f..d76524fb5d17 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -42,6 +42,7 @@ import java.util.Optional; import java.util.OptionalDouble; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -51,6 +52,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.units.DataSize.succinctBytes; import static io.airlift.units.Duration.succinctDuration; import static io.trino.execution.StageState.ABORTED; @@ -84,6 +86,7 @@ public class StageStateMachine private final AtomicReference schedulingComplete = new AtomicReference<>(); private final Distribution getSplitDistribution = new Distribution(); + private final Map tableGetSplitDistribution = new ConcurrentHashMap<>(); private final AtomicLong peakUserMemory = new AtomicLong(); private final AtomicLong peakRevocableMemory = new AtomicLong(); @@ -546,10 +549,15 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) } } + List tableGetSplitDistributions = tableGetSplitDistribution.entrySet().stream() + .map(entry -> new TableGetSplitDistribution(entry.getKey(), entry.getValue().snapshot())) + .collect(toImmutableList()); + StageStats stageStats = new StageStats( schedulingComplete.get(), getSplitDistribution.snapshot(), + tableGetSplitDistributions, totalTasks, runningTasks, completedTasks, @@ -638,10 +646,13 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) failureInfo); } - public void recordGetSplitTime(long startNanos) + public void recordGetSplitTime(PlanNodeId planNodeId, long startNanos) { + requireNonNull(planNodeId, "planNodeId is null"); long elapsedNanos = System.nanoTime() - startNanos; getSplitDistribution.add(elapsedNanos); + tableGetSplitDistribution.computeIfAbsent(planNodeId, (key) -> new Distribution()) + .add(elapsedNanos); scheduledStats.getGetSplitTime().add(elapsedNanos, NANOSECONDS); } diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index 8358f27efc92..46df43a27dec 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -47,6 +47,7 @@ public class StageStats private final DateTime schedulingComplete; private final DistributionSnapshot getSplitDistribution; + private final List tableGetSplitDistribution; private final int totalTasks; private final int runningTasks; @@ -122,6 +123,7 @@ public StageStats( @JsonProperty("schedulingComplete") DateTime schedulingComplete, @JsonProperty("getSplitDistribution") DistributionSnapshot getSplitDistribution, + @JsonProperty("tableGetSplitDistribution") List tableGetSplitDistribution, @JsonProperty("totalTasks") int totalTasks, @JsonProperty("runningTasks") int runningTasks, @@ -282,6 +284,7 @@ public StageStats( this.gcInfo = requireNonNull(gcInfo, "gcInfo is null"); this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null")); + this.tableGetSplitDistribution = ImmutableList.copyOf(requireNonNull(tableGetSplitDistribution, "tableGetSplitDistribution is null")); } @JsonProperty @@ -296,6 +299,12 @@ public DistributionSnapshot getGetSplitDistribution() return getSplitDistribution; } + @JsonProperty + public List getTableGetSplitDistribution() + { + return tableGetSplitDistribution; + } + @JsonProperty public int getTotalTasks() { @@ -669,6 +678,7 @@ public static StageStats createInitial() return new StageStats( null, new Distribution().snapshot(), + ImmutableList.of(), 0, 0, 0, diff --git a/core/trino-main/src/main/java/io/trino/execution/TableGetSplitDistribution.java b/core/trino-main/src/main/java/io/trino/execution/TableGetSplitDistribution.java new file mode 100644 index 000000000000..f38335ef0cc2 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/TableGetSplitDistribution.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.stats.Distribution.DistributionSnapshot; +import io.trino.sql.planner.plan.PlanNodeId; + +import javax.annotation.concurrent.Immutable; + +import static java.util.Objects.requireNonNull; + +@Immutable +public class TableGetSplitDistribution +{ + private final PlanNodeId planNodeId; + private final DistributionSnapshot splitDistribution; + + @JsonCreator + public TableGetSplitDistribution( + @JsonProperty("planNodeId") PlanNodeId planNodeId, + @JsonProperty("splitDistribution") DistributionSnapshot splitDistribution) + { + requireNonNull(planNodeId, "planNodeId is null"); + requireNonNull(splitDistribution, "splitDistribution is null"); + this.planNodeId = planNodeId; + this.splitDistribution = splitDistribution; + } + + @JsonProperty + public PlanNodeId getPlanNodeId() + { + return planNodeId; + } + + @JsonProperty + public DistributionSnapshot getSplitDistribution() + { + return splitDistribution; + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java index 366865a940c4..1b54272e0e1a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.Supplier; @@ -83,7 +84,7 @@ class EventDrivenTaskSource private final int splitBatchSize; private final long targetExchangeSplitSizeInBytes; private final FaultTolerantPartitioningScheme sourcePartitioningScheme; - private final LongConsumer getSplitTimeRecorder; + private final BiConsumer getSplitTimeRecorder; private final SetMultimap remoteSourceFragments; @GuardedBy("this") @@ -114,7 +115,7 @@ class EventDrivenTaskSource int splitBatchSize, long targetExchangeSplitSizeInBytes, FaultTolerantPartitioningScheme sourcePartitioningScheme, - LongConsumer getSplitTimeRecorder) + BiConsumer getSplitTimeRecorder) { this.queryId = requireNonNull(queryId, "queryId is null"); this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null"); @@ -228,7 +229,7 @@ public void failed(Throwable t) } }, splitBatchSize, - getSplitTimeRecorder); + (time) -> getSplitTimeRecorder.accept(remoteSourceNodeId, time)); } private SplitLoader createTableScanSplitLoader(PlanNodeId planNodeId, SplitSource splitSource) @@ -272,7 +273,7 @@ public void failed(Throwable t) } }, splitBatchSize, - getSplitTimeRecorder); + (time) -> getSplitTimeRecorder.accept(planNodeId, time)); } private PlanNodeId getRemoteSourceNode(PlanFragmentId fragmentId) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java index 0fdb967d274e..ca9eda5fc010 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java @@ -41,7 +41,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.function.LongConsumer; +import java.util.function.BiConsumer; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -102,7 +102,7 @@ public EventDrivenTaskSource create( PlanFragment fragment, Map sourceExchanges, FaultTolerantPartitioningScheme sourcePartitioningScheme, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, Map outputDataSizeEstimates) { ImmutableMap.Builder remoteSources = ImmutableMap.builder(); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java index ac2b3d94f126..8dfc2224e4f0 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java @@ -531,9 +531,9 @@ public boolean isAnyTaskBlocked() } @Override - public void recordGetSplitTime(long start) + public void recordGetSplitTime(PlanNodeId planNodeId, long start) { - stage.recordGetSplitTime(start); + stage.recordGetSplitTime(planNodeId, start); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java index 17233b702dc5..04bf502f2440 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java @@ -244,7 +244,7 @@ else if (pendingSplits.isEmpty()) { nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize); long start = System.nanoTime(); - addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start)); + addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(partitionedNode, start)); } if (nextSplitBatchFuture.isDone()) { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java index 89fc0bc73e2e..e23064125899 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java @@ -58,7 +58,7 @@ public interface StageExecution void abort(); - void recordGetSplitTime(long start); + void recordGetSplitTime(PlanNodeId plainNodeId, long start); Optional scheduleTask( InternalNode node, diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java index c80e12f77d0f..bfbae4cf8a95 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java @@ -72,7 +72,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.function.LongConsumer; +import java.util.function.BiConsumer; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -152,7 +152,7 @@ public TaskSource create( Session session, PlanFragment fragment, Multimap exchangeSourceHandles, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, FaultTolerantPartitioningScheme sourcePartitioningScheme) { PartitioningHandle partitioning = fragment.getPartitioning(); @@ -359,7 +359,7 @@ public static class HashDistributionTaskSource private final ListMultimap replicatedExchangeSourceHandles; private final int splitBatchSize; - private final LongConsumer getSplitTimeRecorder; + private final BiConsumer getSplitTimeRecorder; private final FaultTolerantPartitioningScheme sourcePartitioningScheme; private final Optional catalogRequirement; private final long targetPartitionSourceSizeInBytes; // compared data read from ExchangeSources @@ -379,7 +379,7 @@ public static HashDistributionTaskSource create( SplitSourceFactory splitSourceFactory, Multimap exchangeSourceHandles, int splitBatchSize, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, FaultTolerantPartitioningScheme sourcePartitioningScheme, long targetPartitionSplitWeight, DataSize targetPartitionSourceSize, @@ -431,7 +431,7 @@ public Boolean visitTableWriter(TableWriterNode node, Void context) ListMultimap partitionedExchangeSourceHandles, ListMultimap replicatedExchangeSourceHandles, int splitBatchSize, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, FaultTolerantPartitioningScheme sourcePartitioningScheme, Optional catalogRequirement, long targetPartitionSplitWeight, @@ -642,7 +642,7 @@ public static class SourceDistributionTaskSource private final SplitSource splitSource; private final ListMultimap replicatedSplits; private final int splitBatchSize; - private final LongConsumer getSplitTimeRecorder; + private final BiConsumer getSplitTimeRecorder; private final Optional catalogRequirement; private final int minPartitionSplitCount; private final long targetPartitionSplitWeight; @@ -670,7 +670,7 @@ public static SourceDistributionTaskSource create( Multimap exchangeSourceHandles, TableExecuteContextManager tableExecuteContextManager, int splitBatchSize, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, int minPartitionSplitCount, long targetPartitionSplitWeight, int maxPartitionSplitCount, @@ -711,7 +711,7 @@ public static SourceDistributionTaskSource create( SplitSource splitSource, ListMultimap replicatedSplits, int splitBatchSize, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, Optional catalogRequirement, int minPartitionSplitCount, long targetPartitionSplitWeight, @@ -750,7 +750,7 @@ public synchronized ListenableFuture> getMoreTasks() currentSplitBatchFuture = splitSource.getNextBatch(splitBatchSize); long start = System.nanoTime(); - addSuccessCallback(currentSplitBatchFuture, () -> getSplitTimeRecorder.accept(start)); + addSuccessCallback(currentSplitBatchFuture, () -> getSplitTimeRecorder.accept(partitionedSourceNodeId, start)); return Futures.transform( currentSplitBatchFuture, @@ -956,14 +956,14 @@ private static class SplitLoadingFuture private final PlanNodeId planNodeId; private final SplitSource splitSource; private final int splitBatchSize; - private final LongConsumer getSplitTimeRecorder; + private final BiConsumer getSplitTimeRecorder; private final Executor executor; @GuardedBy("this") private final List loadedSplits = new ArrayList<>(); @GuardedBy("this") private ListenableFuture currentSplitBatch = immediateFuture(null); - SplitLoadingFuture(PlanNodeId planNodeId, SplitSource splitSource, int splitBatchSize, LongConsumer getSplitTimeRecorder, Executor executor) + SplitLoadingFuture(PlanNodeId planNodeId, SplitSource splitSource, int splitBatchSize, BiConsumer getSplitTimeRecorder, Executor executor) { this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); this.splitSource = requireNonNull(splitSource, "splitSource is null"); @@ -990,7 +990,7 @@ public synchronized void load() @Override public void onSuccess(SplitBatch splitBatch) { - getSplitTimeRecorder.accept(start); + getSplitTimeRecorder.accept(planNodeId, start); synchronized (SplitLoadingFuture.this) { loadedSplits.addAll(splitBatch.getSplits()); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java index 34661234f083..bf9d28807eb7 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java @@ -18,8 +18,9 @@ import io.trino.spi.exchange.ExchangeSourceHandle; import io.trino.sql.planner.PlanFragment; import io.trino.sql.planner.plan.PlanFragmentId; +import io.trino.sql.planner.plan.PlanNodeId; -import java.util.function.LongConsumer; +import java.util.function.BiConsumer; /** * Deprecated in favor of {@link EventDrivenTaskSourceFactory} @@ -31,6 +32,6 @@ TaskSource create( Session session, PlanFragment fragment, Multimap exchangeSourceHandles, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, FaultTolerantPartitioningScheme sourcePartitioningScheme); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java index c2ecc8ca631a..41ef60492e77 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java @@ -37,7 +37,7 @@ public class TestStageStats new DateTime(0), getTestDistribution(1), - + ImmutableList.of(), 4, 5, 6, @@ -130,7 +130,7 @@ private static void assertExpectedStageStats(StageStats actual) assertEquals(actual.getSchedulingComplete().getMillis(), 0); assertEquals(actual.getGetSplitDistribution().getCount(), 1.0); - + assertEquals(actual.getTableGetSplitDistribution(), ImmutableList.of()); assertEquals(actual.getTotalTasks(), 4); assertEquals(actual.getRunningTasks(), 5); assertEquals(actual.getCompletedTasks(), 6); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java index b0f8ee90c1df..8d73c5c9bfbc 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java @@ -418,7 +418,7 @@ private void testStageTaskSourceSuccess( 1, 1, partitioningScheme, - (getSplitDuration) -> getSplitInvocations.incrementAndGet())) { + (planNodeId, getSplitDuration) -> getSplitInvocations.incrementAndGet())) { taskSource.start(); try { if (taskSourceCallback instanceof FailingTaskSourceCallback callback) { diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java index 413922938be9..915b30a60f89 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java @@ -530,7 +530,7 @@ private static HashDistributionTaskSource createHashDistributionTaskSource( partitionedExchangeSources, replicatedExchangeSources, splitBatchSize, - getSplitsTime -> {}, + (planNodeId, getSplitsTime) -> {}, sourcePartitioningScheme, Optional.of(TEST_CATALOG_HANDLE), targetPartitionSplitWeight, @@ -851,7 +851,7 @@ private static SourceDistributionTaskSource createSourceDistributionTaskSource( splitSource, createRemoteSplits(replicatedSources), splitBatchSize, - getSplitsTime -> {}, + (planNodeId, time) -> {}, Optional.of(TEST_CATALOG_HANDLE), minSplitsPerTask, splitWeightPerTask, diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java index 10ba0e2b82e3..2aff361c7635 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.LongConsumer; +import java.util.function.BiConsumer; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -69,7 +69,7 @@ public TaskSource create( Session session, PlanFragment fragment, Multimap exchangeSourceHandles, - LongConsumer getSplitTimeRecorder, + BiConsumer getSplitTimeRecorder, FaultTolerantPartitioningScheme sourcePartitioningScheme) { List partitionedSources = fragment.getPartitionedSources(); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java index cb0eac19ed2e..748429f243b1 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java @@ -334,7 +334,7 @@ public void abort() } @Override - public void recordGetSplitTime(long start) + public void recordGetSplitTime(PlanNodeId planNodeId, long start) { throw new UnsupportedOperationException(); }