Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ public synchronized Optional<RemoteTask> createTask(
return Optional.of(task);
}

public void recordGetSplitTime(long start)
public void recordGetSplitTime(PlanNodeId planNodeId, long start)
{
stateMachine.recordGetSplitTime(start);
stateMachine.recordGetSplitTime(planNodeId, start);
}

private synchronized void updateTaskStatus(TaskStatus status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -51,6 +52,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.succinctBytes;
import static io.airlift.units.Duration.succinctDuration;
import static io.trino.execution.StageState.ABORTED;
Expand Down Expand Up @@ -84,6 +86,7 @@ public class StageStateMachine

private final AtomicReference<DateTime> schedulingComplete = new AtomicReference<>();
private final Distribution getSplitDistribution = new Distribution();
private final Map<PlanNodeId, Distribution> tableGetSplitDistribution = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what do we gain by knowing the distribution of get-splits time per table scan instead of the simpler cumulative time taken for get-splits per table scan ?
One file listing operation might feed multiple batches of splits, so it's possible that the distribution will just capture a few high values and many low values.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With distribution, we have information about total time needed for getting splits, count, and information if there are some outliers splits.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that some get-split batches having an outlier time taken could be normal and then it wouldn't be clear what insight we might get from having a distribution.
Could you check what kind of results we get on an unpartitioned table with large number of files and a partitioned table with lots of partitions ?


private final AtomicLong peakUserMemory = new AtomicLong();
private final AtomicLong peakRevocableMemory = new AtomicLong();
Expand Down Expand Up @@ -546,10 +549,15 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
}
}

List<TableGetSplitDistribution> tableGetSplitDistributions = tableGetSplitDistribution.entrySet().stream()
.map(entry -> new TableGetSplitDistribution(entry.getKey(), entry.getValue().snapshot()))
.collect(toImmutableList());

StageStats stageStats = new StageStats(
schedulingComplete.get(),
getSplitDistribution.snapshot(),

tableGetSplitDistributions,
totalTasks,
runningTasks,
completedTasks,
Expand Down Expand Up @@ -638,10 +646,13 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
failureInfo);
}

public void recordGetSplitTime(long startNanos)
public void recordGetSplitTime(PlanNodeId planNodeId, long startNanos)
{
requireNonNull(planNodeId, "planNodeId is null");
long elapsedNanos = System.nanoTime() - startNanos;
getSplitDistribution.add(elapsedNanos);
tableGetSplitDistribution.computeIfAbsent(planNodeId, (key) -> new Distribution())
.add(elapsedNanos);
scheduledStats.getGetSplitTime().add(elapsedNanos, NANOSECONDS);
}

Expand Down
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class StageStats
private final DateTime schedulingComplete;

private final DistributionSnapshot getSplitDistribution;
private final List<TableGetSplitDistribution> tableGetSplitDistribution;

private final int totalTasks;
private final int runningTasks;
Expand Down Expand Up @@ -122,6 +123,7 @@ public StageStats(
@JsonProperty("schedulingComplete") DateTime schedulingComplete,

@JsonProperty("getSplitDistribution") DistributionSnapshot getSplitDistribution,
@JsonProperty("tableGetSplitDistribution") List<TableGetSplitDistribution> tableGetSplitDistribution,

@JsonProperty("totalTasks") int totalTasks,
@JsonProperty("runningTasks") int runningTasks,
Expand Down Expand Up @@ -282,6 +284,7 @@ public StageStats(
this.gcInfo = requireNonNull(gcInfo, "gcInfo is null");

this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null"));
this.tableGetSplitDistribution = ImmutableList.copyOf(requireNonNull(tableGetSplitDistribution, "tableGetSplitDistribution is null"));
}

@JsonProperty
Expand All @@ -296,6 +299,12 @@ public DistributionSnapshot getGetSplitDistribution()
return getSplitDistribution;
}

@JsonProperty
public List<TableGetSplitDistribution> getTableGetSplitDistribution()
{
return tableGetSplitDistribution;
}

@JsonProperty
public int getTotalTasks()
{
Expand Down Expand Up @@ -669,6 +678,7 @@ public static StageStats createInitial()
return new StageStats(
null,
new Distribution().snapshot(),
ImmutableList.of(),
0,
0,
0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.stats.Distribution.DistributionSnapshot;
import io.trino.sql.planner.plan.PlanNodeId;

import javax.annotation.concurrent.Immutable;

import static java.util.Objects.requireNonNull;

@Immutable
public class TableGetSplitDistribution
{
private final PlanNodeId planNodeId;
private final DistributionSnapshot splitDistribution;

@JsonCreator
public TableGetSplitDistribution(
@JsonProperty("planNodeId") PlanNodeId planNodeId,
@JsonProperty("splitDistribution") DistributionSnapshot splitDistribution)
{
requireNonNull(planNodeId, "planNodeId is null");
requireNonNull(splitDistribution, "splitDistribution is null");
this.planNodeId = planNodeId;
this.splitDistribution = splitDistribution;
}

@JsonProperty
public PlanNodeId getPlanNodeId()
{
return planNodeId;
}

@JsonProperty
public DistributionSnapshot getSplitDistribution()
{
return splitDistribution;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -83,7 +84,7 @@ class EventDrivenTaskSource
private final int splitBatchSize;
private final long targetExchangeSplitSizeInBytes;
private final FaultTolerantPartitioningScheme sourcePartitioningScheme;
private final LongConsumer getSplitTimeRecorder;
private final BiConsumer<PlanNodeId, Long> getSplitTimeRecorder;
private final SetMultimap<PlanNodeId, PlanFragmentId> remoteSourceFragments;

@GuardedBy("this")
Expand Down Expand Up @@ -114,7 +115,7 @@ class EventDrivenTaskSource
int splitBatchSize,
long targetExchangeSplitSizeInBytes,
FaultTolerantPartitioningScheme sourcePartitioningScheme,
LongConsumer getSplitTimeRecorder)
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
Expand Down Expand Up @@ -228,7 +229,7 @@ public void failed(Throwable t)
}
},
splitBatchSize,
getSplitTimeRecorder);
(time) -> getSplitTimeRecorder.accept(remoteSourceNodeId, time));
}

private SplitLoader createTableScanSplitLoader(PlanNodeId planNodeId, SplitSource splitSource)
Expand Down Expand Up @@ -272,7 +273,7 @@ public void failed(Throwable t)
}
},
splitBatchSize,
getSplitTimeRecorder);
(time) -> getSplitTimeRecorder.accept(planNodeId, time));
}

private PlanNodeId getRemoteSourceNode(PlanFragmentId fragmentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;
import java.util.function.BiConsumer;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -102,7 +102,7 @@ public EventDrivenTaskSource create(
PlanFragment fragment,
Map<PlanFragmentId, Exchange> sourceExchanges,
FaultTolerantPartitioningScheme sourcePartitioningScheme,
LongConsumer getSplitTimeRecorder,
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder,
Map<PlanNodeId, OutputDataSizeEstimate> outputDataSizeEstimates)
{
ImmutableMap.Builder<PlanFragmentId, PlanNodeId> remoteSources = ImmutableMap.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,9 @@ public boolean isAnyTaskBlocked()
}

@Override
public void recordGetSplitTime(long start)
public void recordGetSplitTime(PlanNodeId planNodeId, long start)
{
stage.recordGetSplitTime(start);
stage.recordGetSplitTime(planNodeId, start);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ else if (pendingSplits.isEmpty()) {
nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize);

long start = System.nanoTime();
addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start));
addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(partitionedNode, start));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hive connector, splits are loaded by the connector in a background thread (See BackgroundHiveSplitLoader), so recording time taken here will probably miss the actual work done in listing files by the hive connector.
@sopel39 maybe we need to extend ConnectorSplitSource to allow connector to provide metrics about split generation.

}

if (nextSplitBatchFuture.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public interface StageExecution

void abort();

void recordGetSplitTime(long start);
void recordGetSplitTime(PlanNodeId plainNodeId, long start);

Optional<RemoteTask> scheduleTask(
InternalNode node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;
import java.util.function.BiConsumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -152,7 +152,7 @@ public TaskSource create(
Session session,
PlanFragment fragment,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
LongConsumer getSplitTimeRecorder,
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder,
FaultTolerantPartitioningScheme sourcePartitioningScheme)
{
PartitioningHandle partitioning = fragment.getPartitioning();
Expand Down Expand Up @@ -359,7 +359,7 @@ public static class HashDistributionTaskSource
private final ListMultimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;

private final int splitBatchSize;
private final LongConsumer getSplitTimeRecorder;
private final BiConsumer<PlanNodeId, Long> getSplitTimeRecorder;
private final FaultTolerantPartitioningScheme sourcePartitioningScheme;
private final Optional<CatalogHandle> catalogRequirement;
private final long targetPartitionSourceSizeInBytes; // compared data read from ExchangeSources
Expand All @@ -379,7 +379,7 @@ public static HashDistributionTaskSource create(
SplitSourceFactory splitSourceFactory,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
int splitBatchSize,
LongConsumer getSplitTimeRecorder,
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder,
FaultTolerantPartitioningScheme sourcePartitioningScheme,
long targetPartitionSplitWeight,
DataSize targetPartitionSourceSize,
Expand Down Expand Up @@ -431,7 +431,7 @@ public Boolean visitTableWriter(TableWriterNode node, Void context)
ListMultimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles,
ListMultimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles,
int splitBatchSize,
LongConsumer getSplitTimeRecorder,
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder,
FaultTolerantPartitioningScheme sourcePartitioningScheme,
Optional<CatalogHandle> catalogRequirement,
long targetPartitionSplitWeight,
Expand Down Expand Up @@ -642,7 +642,7 @@ public static class SourceDistributionTaskSource
private final SplitSource splitSource;
private final ListMultimap<PlanNodeId, Split> replicatedSplits;
private final int splitBatchSize;
private final LongConsumer getSplitTimeRecorder;
private final BiConsumer<PlanNodeId, Long> getSplitTimeRecorder;
private final Optional<CatalogHandle> catalogRequirement;
private final int minPartitionSplitCount;
private final long targetPartitionSplitWeight;
Expand Down Expand Up @@ -670,7 +670,7 @@ public static SourceDistributionTaskSource create(
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
TableExecuteContextManager tableExecuteContextManager,
int splitBatchSize,
LongConsumer getSplitTimeRecorder,
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder,
int minPartitionSplitCount,
long targetPartitionSplitWeight,
int maxPartitionSplitCount,
Expand Down Expand Up @@ -711,7 +711,7 @@ public static SourceDistributionTaskSource create(
SplitSource splitSource,
ListMultimap<PlanNodeId, Split> replicatedSplits,
int splitBatchSize,
LongConsumer getSplitTimeRecorder,
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder,
Optional<CatalogHandle> catalogRequirement,
int minPartitionSplitCount,
long targetPartitionSplitWeight,
Expand Down Expand Up @@ -750,7 +750,7 @@ public synchronized ListenableFuture<List<TaskDescriptor>> getMoreTasks()
currentSplitBatchFuture = splitSource.getNextBatch(splitBatchSize);

long start = System.nanoTime();
addSuccessCallback(currentSplitBatchFuture, () -> getSplitTimeRecorder.accept(start));
addSuccessCallback(currentSplitBatchFuture, () -> getSplitTimeRecorder.accept(partitionedSourceNodeId, start));

return Futures.transform(
currentSplitBatchFuture,
Expand Down Expand Up @@ -956,14 +956,14 @@ private static class SplitLoadingFuture
private final PlanNodeId planNodeId;
private final SplitSource splitSource;
private final int splitBatchSize;
private final LongConsumer getSplitTimeRecorder;
private final BiConsumer<PlanNodeId, Long> getSplitTimeRecorder;
private final Executor executor;
@GuardedBy("this")
private final List<Split> loadedSplits = new ArrayList<>();
@GuardedBy("this")
private ListenableFuture<SplitBatch> currentSplitBatch = immediateFuture(null);

SplitLoadingFuture(PlanNodeId planNodeId, SplitSource splitSource, int splitBatchSize, LongConsumer getSplitTimeRecorder, Executor executor)
SplitLoadingFuture(PlanNodeId planNodeId, SplitSource splitSource, int splitBatchSize, BiConsumer<PlanNodeId, Long> getSplitTimeRecorder, Executor executor)
{
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.splitSource = requireNonNull(splitSource, "splitSource is null");
Expand All @@ -990,7 +990,7 @@ public synchronized void load()
@Override
public void onSuccess(SplitBatch splitBatch)
{
getSplitTimeRecorder.accept(start);
getSplitTimeRecorder.accept(planNodeId, start);
synchronized (SplitLoadingFuture.this) {
loadedSplits.addAll(splitBatch.getSplits());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.function.LongConsumer;
import java.util.function.BiConsumer;

/**
* Deprecated in favor of {@link EventDrivenTaskSourceFactory}
Expand All @@ -31,6 +32,6 @@ TaskSource create(
Session session,
PlanFragment fragment,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
LongConsumer getSplitTimeRecorder,
BiConsumer<PlanNodeId, Long> getSplitTimeRecorder,
FaultTolerantPartitioningScheme sourcePartitioningScheme);
}
Loading