Skip to content
Merged
6 changes: 5 additions & 1 deletion core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class FeaturesConfig
private boolean spillEnabled;
private DataSize aggregationOperatorUnspillMemoryLimit = DataSize.of(4, DataSize.Unit.MEGABYTE);
private List<Path> spillerSpillPaths = ImmutableList.of();
private int spillerThreads = 4;
private Integer spillerThreads;
private double spillMaxUsedSpaceThreshold = 0.9;
private double memoryRevokingTarget = 0.5;
private double memoryRevokingThreshold = 0.9;
Expand Down Expand Up @@ -287,6 +287,10 @@ public FeaturesConfig setSpillerSpillPaths(List<String> spillPaths)
@Min(1)
public int getSpillerThreads()
{
if (spillerThreads == null) {
// Higher default concurrency allows to saturate spill disks better in case of multiple spill locations.
return Math.max(spillerSpillPaths.size() * 2, 4);
}
return spillerThreads;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ private static QueryStats immediateFailureQueryStats()
DataSize.ofBytes(0),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
false,
OptionalDouble.empty(),
OptionalDouble.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
long revocableMemoryReservation = 0;
long totalMemoryReservation = 0;

long spilledDataSize = 0;

long totalScheduledTime = 0;
long failedScheduledTime = 0;
long totalCpuTime = 0;
Expand Down Expand Up @@ -730,6 +732,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
userMemoryReservation += stageStats.getUserMemoryReservation().toBytes();
revocableMemoryReservation += stageStats.getRevocableMemoryReservation().toBytes();
totalMemoryReservation += stageStats.getTotalMemoryReservation().toBytes();
spilledDataSize += stageStats.getSpilledDataSize().toBytes();
totalScheduledTime += stageStats.getTotalScheduledTime().roundTo(MILLISECONDS);
failedScheduledTime += stageStats.getFailedScheduledTime().roundTo(MILLISECONDS);
totalCpuTime += stageStats.getTotalCpuTime().roundTo(MILLISECONDS);
Expand Down Expand Up @@ -874,6 +877,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
succinctBytes(getPeakTaskRevocableMemory()),
succinctBytes(getPeakTaskTotalMemory()),

succinctBytes(spilledDataSize),

scheduled,
progressPercentage,
runningPercentage,
Expand Down Expand Up @@ -1487,6 +1492,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getPeakTaskUserMemory(),
queryStats.getPeakTaskRevocableMemory(),
queryStats.getPeakTaskTotalMemory(),
queryStats.getSpilledDataSize(),
queryStats.isScheduled(),
queryStats.getProgressPercentage(),
queryStats.getRunningPercentage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class QueryStats
private final DataSize peakTaskRevocableMemory;
private final DataSize peakTaskTotalMemory;

private final DataSize spilledDataSize;

private final boolean scheduled;
private final OptionalDouble progressPercentage;
private final OptionalDouble runningPercentage;
Expand Down Expand Up @@ -174,6 +176,8 @@ public QueryStats(
@JsonProperty("peakTaskRevocableMemory") DataSize peakTaskRevocableMemory,
@JsonProperty("peakTaskTotalMemory") DataSize peakTaskTotalMemory,

@JsonProperty("spilledDataSize") DataSize spilledDataSize,

@JsonProperty("scheduled") boolean scheduled,
@JsonProperty("progressPercentage") OptionalDouble progressPercentage,
@JsonProperty("runningPercentage") OptionalDouble runningPercentage,
Expand Down Expand Up @@ -275,6 +279,7 @@ public QueryStats(
this.peakTaskUserMemory = requireNonNull(peakTaskUserMemory, "peakTaskUserMemory is null");
this.peakTaskRevocableMemory = requireNonNull(peakTaskRevocableMemory, "peakTaskRevocableMemory is null");
this.peakTaskTotalMemory = requireNonNull(peakTaskTotalMemory, "peakTaskTotalMemory is null");
this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null");
this.scheduled = scheduled;
this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null");
this.runningPercentage = requireNonNull(runningPercentage, "runningPercentage is null");
Expand Down Expand Up @@ -821,8 +826,6 @@ public List<QueryPlanOptimizerStatistics> getOptimizerRulesSummaries()
@JsonProperty
public DataSize getSpilledDataSize()
{
return succinctBytes(operatorSummaries.stream()
.mapToLong(stats -> stats.getSpilledDataSize().toBytes())
.sum());
return spilledDataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,7 @@ public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfos
rawInputPositions += taskStats.getRawInputPositions();
}

spilledDataSize += taskStats.getPipelines().stream()
.flatMap(pipeline -> pipeline.getOperatorSummaries().stream())
.mapToLong(summary -> summary.getSpilledDataSize().toBytes())
.sum();
spilledDataSize += taskStats.getSpilledDataSize().toBytes();
}

OptionalDouble progressPercentage = OptionalDouble.empty();
Expand Down Expand Up @@ -437,6 +434,8 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
long peakUserMemoryReservation = peakUserMemory.get();
long peakRevocableMemoryReservation = peakRevocableMemory.get();

long spilledDataSize = 0;

long totalScheduledTime = 0;
long failedScheduledTime = 0;
long totalCpuTime = 0;
Expand Down Expand Up @@ -519,6 +518,8 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
failedCumulativeUserMemory += taskStats.getCumulativeUserMemory();
}

spilledDataSize += taskStats.getSpilledDataSize().toBytes();

totalScheduledTime += taskStats.getTotalScheduledTime().roundTo(NANOSECONDS);
totalCpuTime += taskStats.getTotalCpuTime().roundTo(NANOSECONDS);
totalBlockedTime += taskStats.getTotalBlockedTime().roundTo(NANOSECONDS);
Expand Down Expand Up @@ -624,6 +625,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
succinctBytes(totalMemoryReservation),
succinctBytes(peakUserMemoryReservation),
succinctBytes(peakRevocableMemoryReservation),
succinctBytes(spilledDataSize),
succinctDuration(totalScheduledTime, NANOSECONDS),
succinctDuration(failedScheduledTime, NANOSECONDS),
succinctDuration(totalCpuTime, NANOSECONDS),
Expand Down
16 changes: 14 additions & 2 deletions core/trino-main/src/main/java/io/trino/execution/StageStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.execution.DistributionSnapshot.pruneMetrics;
import static io.trino.execution.StageState.RUNNING;
import static java.lang.Math.min;
Expand Down Expand Up @@ -69,6 +68,8 @@ public class StageStats
private final DataSize peakUserMemoryReservation;
private final DataSize peakRevocableMemoryReservation;

private final DataSize spilledDataSize;

private final Duration totalScheduledTime;
private final Duration failedScheduledTime;
private final Duration totalCpuTime;
Expand Down Expand Up @@ -145,6 +146,8 @@ public StageStats(
@JsonProperty("peakUserMemoryReservation") DataSize peakUserMemoryReservation,
@JsonProperty("peakRevocableMemoryReservation") DataSize peakRevocableMemoryReservation,

@JsonProperty("spilledDataSize") DataSize spilledDataSize,

@JsonProperty("totalScheduledTime") Duration totalScheduledTime,
@JsonProperty("failedScheduledTime") Duration failedScheduledTime,
@JsonProperty("totalCpuTime") Duration totalCpuTime,
Expand Down Expand Up @@ -226,6 +229,7 @@ public StageStats(
this.totalMemoryReservation = requireNonNull(totalMemoryReservation, "totalMemoryReservation is null");
this.peakUserMemoryReservation = requireNonNull(peakUserMemoryReservation, "peakUserMemoryReservation is null");
this.peakRevocableMemoryReservation = requireNonNull(peakRevocableMemoryReservation, "peakRevocableMemoryReservation is null");
this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null");

this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null");
this.failedScheduledTime = requireNonNull(failedScheduledTime, "failedScheduledTime is null");
Expand Down Expand Up @@ -398,6 +402,12 @@ public DataSize getPeakRevocableMemoryReservation()
return peakRevocableMemoryReservation;
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return spilledDataSize;
}

@JsonProperty
public Duration getTotalScheduledTime()
{
Expand Down Expand Up @@ -667,7 +677,7 @@ public BasicStageStats toBasicStageStats(StageState stageState)
internalNetworkInputPositions,
rawInputDataSize,
rawInputPositions,
succinctBytes(operatorSummaries.stream().mapToLong(operatorSummary -> operatorSummary.getSpilledDataSize().toBytes()).sum()),
spilledDataSize,
(long) cumulativeUserMemory,
(long) failedCumulativeUserMemory,
userMemoryReservation,
Expand Down Expand Up @@ -703,6 +713,7 @@ public StageStats pruneDigests()
totalMemoryReservation,
peakUserMemoryReservation,
peakRevocableMemoryReservation,
spilledDataSize,
totalScheduledTime,
failedScheduledTime,
totalCpuTime,
Expand Down Expand Up @@ -770,6 +781,7 @@ public static StageStats createInitial()
zeroBytes,
zeroBytes,
zeroBytes,
zeroBytes,
zeroSeconds,
zeroSeconds,
zeroSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airlift.units.Duration;
import io.trino.plugin.base.metrics.DurationTiming;
import io.trino.plugin.base.metrics.LongCount;
import io.trino.spi.metrics.Metric;
import io.trino.spi.metrics.Metrics;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand All @@ -29,6 +30,8 @@ public class AggregationMetrics
private static final String ACCUMULATOR_TIME_METRIC_NAME = "Accumulator update CPU time";
private static final String GROUP_BY_HASH_TIME_METRIC_NAME = "Group by hash update CPU time";

private final SpillMetrics spillMetrics = new SpillMetrics();

private long accumulatorTimeNanos;
private long groupByHashTimeNanos;
private long inputRowsProcessedWithPartialAggregationDisabled;
Expand All @@ -48,11 +51,23 @@ public void recordInputRowsProcessedWithPartialAggregationDisabled(long rows)
inputRowsProcessedWithPartialAggregationDisabled += rows;
}

public void recordSpillSince(long startNanos, long spillBytes)
{
spillMetrics.recordSpillSince(startNanos, spillBytes);
}

public void recordUnspillSince(long startNanos, long unspillBytes)
{
spillMetrics.recordUnspillSince(startNanos, unspillBytes);
}

public Metrics getMetrics()
{
return new Metrics(ImmutableMap.of(
INPUT_ROWS_WITH_PARTIAL_AGGREGATION_DISABLED_METRIC_NAME, new LongCount(inputRowsProcessedWithPartialAggregationDisabled),
ACCUMULATOR_TIME_METRIC_NAME, new DurationTiming(new Duration(accumulatorTimeNanos, NANOSECONDS)),
GROUP_BY_HASH_TIME_METRIC_NAME, new DurationTiming(new Duration(groupByHashTimeNanos, NANOSECONDS))));
return new Metrics(ImmutableMap.<String, Metric<?>>builder()
.put(INPUT_ROWS_WITH_PARTIAL_AGGREGATION_DISABLED_METRIC_NAME, new LongCount(inputRowsProcessedWithPartialAggregationDisabled))
.put(ACCUMULATOR_TIME_METRIC_NAME, new DurationTiming(new Duration(accumulatorTimeNanos, NANOSECONDS)))
.put(GROUP_BY_HASH_TIME_METRIC_NAME, new DurationTiming(new Duration(groupByHashTimeNanos, NANOSECONDS)))
.putAll(spillMetrics.getMetrics().getMetrics())
.buildOrThrow());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,10 @@ public DriverStats getDriverStats()
}

ImmutableSet.Builder<BlockedReason> builder = ImmutableSet.builder();
long spilledDataSize = 0;
long physicalWrittenDataSize = 0;
for (OperatorStats operator : operators) {
spilledDataSize += operator.getSpilledDataSize().toBytes();
physicalWrittenDataSize += operator.getPhysicalWrittenDataSize().toBytes();
if (operator.getBlockedReason().isPresent()) {
builder.add(operator.getBlockedReason().get());
Expand All @@ -410,6 +412,7 @@ public DriverStats getDriverStats()
elapsedTime.convertToMostSuccinctTimeUnit(),
succinctBytes(driverMemoryContext.getUserMemory()),
succinctBytes(driverMemoryContext.getRevocableMemory()),
succinctBytes(spilledDataSize),
new Duration(totalScheduledTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
new Duration(totalCpuTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
new Duration(totalBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down
14 changes: 14 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/DriverStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class DriverStats
private final DataSize userMemoryReservation;
private final DataSize revocableMemoryReservation;

private final DataSize spilledDataSize;

private final Duration totalScheduledTime;
private final Duration totalCpuTime;
private final Duration totalBlockedTime;
Expand Down Expand Up @@ -85,6 +87,8 @@ public DriverStats()
this.userMemoryReservation = DataSize.ofBytes(0);
this.revocableMemoryReservation = DataSize.ofBytes(0);

this.spilledDataSize = DataSize.ofBytes(0);

this.totalScheduledTime = new Duration(0, MILLISECONDS);
this.totalCpuTime = new Duration(0, MILLISECONDS);
this.totalBlockedTime = new Duration(0, MILLISECONDS);
Expand Down Expand Up @@ -128,6 +132,8 @@ public DriverStats(
@JsonProperty("userMemoryReservation") DataSize userMemoryReservation,
@JsonProperty("revocableMemoryReservation") DataSize revocableMemoryReservation,

@JsonProperty("spilledDataSize") DataSize spilledDataSize,

@JsonProperty("totalScheduledTime") Duration totalScheduledTime,
@JsonProperty("totalCpuTime") Duration totalCpuTime,
@JsonProperty("totalBlockedTime") Duration totalBlockedTime,
Expand Down Expand Up @@ -168,6 +174,8 @@ public DriverStats(
this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null");
this.revocableMemoryReservation = requireNonNull(revocableMemoryReservation, "revocableMemoryReservation is null");

this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null");

this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null");
this.totalCpuTime = requireNonNull(totalCpuTime, "totalCpuTime is null");
this.totalBlockedTime = requireNonNull(totalBlockedTime, "totalBlockedTime is null");
Expand Down Expand Up @@ -249,6 +257,12 @@ public DataSize getRevocableMemoryReservation()
return revocableMemoryReservation;
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return spilledDataSize;
}

@JsonProperty
public Duration getTotalScheduledTime()
{
Expand Down
Loading