diff --git a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java index 4e32436a250e..41a3dfd5f9fe 100644 --- a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java +++ b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java @@ -104,7 +104,7 @@ public class FeaturesConfig private boolean spillEnabled; private DataSize aggregationOperatorUnspillMemoryLimit = DataSize.of(4, DataSize.Unit.MEGABYTE); private List spillerSpillPaths = ImmutableList.of(); - private int spillerThreads = 4; + private Integer spillerThreads; private double spillMaxUsedSpaceThreshold = 0.9; private double memoryRevokingTarget = 0.5; private double memoryRevokingThreshold = 0.9; @@ -287,6 +287,10 @@ public FeaturesConfig setSpillerSpillPaths(List spillPaths) @Min(1) public int getSpillerThreads() { + if (spillerThreads == null) { + // Higher default concurrency allows to saturate spill disks better in case of multiple spill locations. + return Math.max(spillerSpillPaths.size() * 2, 4); + } return spillerThreads; } diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index 255d7771cdc6..a3d08eda694d 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -299,6 +299,7 @@ private static QueryStats immediateFailureQueryStats() DataSize.ofBytes(0), DataSize.ofBytes(0), DataSize.ofBytes(0), + DataSize.ofBytes(0), false, OptionalDouble.empty(), OptionalDouble.empty(), diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 2bea048b787f..e641d1ac9994 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -664,6 +664,8 @@ private QueryStats getQueryStats(Optional rootStage, List long revocableMemoryReservation = 0; long totalMemoryReservation = 0; + long spilledDataSize = 0; + long totalScheduledTime = 0; long failedScheduledTime = 0; long totalCpuTime = 0; @@ -730,6 +732,7 @@ private QueryStats getQueryStats(Optional rootStage, List userMemoryReservation += stageStats.getUserMemoryReservation().toBytes(); revocableMemoryReservation += stageStats.getRevocableMemoryReservation().toBytes(); totalMemoryReservation += stageStats.getTotalMemoryReservation().toBytes(); + spilledDataSize += stageStats.getSpilledDataSize().toBytes(); totalScheduledTime += stageStats.getTotalScheduledTime().roundTo(MILLISECONDS); failedScheduledTime += stageStats.getFailedScheduledTime().roundTo(MILLISECONDS); totalCpuTime += stageStats.getTotalCpuTime().roundTo(MILLISECONDS); @@ -874,6 +877,8 @@ private QueryStats getQueryStats(Optional rootStage, List succinctBytes(getPeakTaskRevocableMemory()), succinctBytes(getPeakTaskTotalMemory()), + succinctBytes(spilledDataSize), + scheduled, progressPercentage, runningPercentage, @@ -1487,6 +1492,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats) queryStats.getPeakTaskUserMemory(), queryStats.getPeakTaskRevocableMemory(), queryStats.getPeakTaskTotalMemory(), + queryStats.getSpilledDataSize(), queryStats.isScheduled(), queryStats.getProgressPercentage(), queryStats.getRunningPercentage(), diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java index fad10e7cc427..f343bc493f63 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java @@ -79,6 +79,8 @@ public class QueryStats private final DataSize peakTaskRevocableMemory; private final DataSize peakTaskTotalMemory; + private final DataSize spilledDataSize; + private final boolean scheduled; private final OptionalDouble progressPercentage; private final OptionalDouble runningPercentage; @@ -174,6 +176,8 @@ public QueryStats( @JsonProperty("peakTaskRevocableMemory") DataSize peakTaskRevocableMemory, @JsonProperty("peakTaskTotalMemory") DataSize peakTaskTotalMemory, + @JsonProperty("spilledDataSize") DataSize spilledDataSize, + @JsonProperty("scheduled") boolean scheduled, @JsonProperty("progressPercentage") OptionalDouble progressPercentage, @JsonProperty("runningPercentage") OptionalDouble runningPercentage, @@ -275,6 +279,7 @@ public QueryStats( this.peakTaskUserMemory = requireNonNull(peakTaskUserMemory, "peakTaskUserMemory is null"); this.peakTaskRevocableMemory = requireNonNull(peakTaskRevocableMemory, "peakTaskRevocableMemory is null"); this.peakTaskTotalMemory = requireNonNull(peakTaskTotalMemory, "peakTaskTotalMemory is null"); + this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null"); this.scheduled = scheduled; this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null"); this.runningPercentage = requireNonNull(runningPercentage, "runningPercentage is null"); @@ -821,8 +826,6 @@ public List getOptimizerRulesSummaries() @JsonProperty public DataSize getSpilledDataSize() { - return succinctBytes(operatorSummaries.stream() - .mapToLong(stats -> stats.getSpilledDataSize().toBytes()) - .sum()); + return spilledDataSize; } } diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 63a6ca7f8020..cc50238ef1b0 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -348,10 +348,7 @@ public BasicStageStats getBasicStageStats(Supplier> taskInfos rawInputPositions += taskStats.getRawInputPositions(); } - spilledDataSize += taskStats.getPipelines().stream() - .flatMap(pipeline -> pipeline.getOperatorSummaries().stream()) - .mapToLong(summary -> summary.getSpilledDataSize().toBytes()) - .sum(); + spilledDataSize += taskStats.getSpilledDataSize().toBytes(); } OptionalDouble progressPercentage = OptionalDouble.empty(); @@ -437,6 +434,8 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) long peakUserMemoryReservation = peakUserMemory.get(); long peakRevocableMemoryReservation = peakRevocableMemory.get(); + long spilledDataSize = 0; + long totalScheduledTime = 0; long failedScheduledTime = 0; long totalCpuTime = 0; @@ -519,6 +518,8 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) failedCumulativeUserMemory += taskStats.getCumulativeUserMemory(); } + spilledDataSize += taskStats.getSpilledDataSize().toBytes(); + totalScheduledTime += taskStats.getTotalScheduledTime().roundTo(NANOSECONDS); totalCpuTime += taskStats.getTotalCpuTime().roundTo(NANOSECONDS); totalBlockedTime += taskStats.getTotalBlockedTime().roundTo(NANOSECONDS); @@ -624,6 +625,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) succinctBytes(totalMemoryReservation), succinctBytes(peakUserMemoryReservation), succinctBytes(peakRevocableMemoryReservation), + succinctBytes(spilledDataSize), succinctDuration(totalScheduledTime, NANOSECONDS), succinctDuration(failedScheduledTime, NANOSECONDS), succinctDuration(totalCpuTime, NANOSECONDS), diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index d4f1dbba9b54..153d9b64f00e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -36,7 +36,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.units.DataSize.Unit.BYTE; -import static io.airlift.units.DataSize.succinctBytes; import static io.trino.execution.DistributionSnapshot.pruneMetrics; import static io.trino.execution.StageState.RUNNING; import static java.lang.Math.min; @@ -69,6 +68,8 @@ public class StageStats private final DataSize peakUserMemoryReservation; private final DataSize peakRevocableMemoryReservation; + private final DataSize spilledDataSize; + private final Duration totalScheduledTime; private final Duration failedScheduledTime; private final Duration totalCpuTime; @@ -145,6 +146,8 @@ public StageStats( @JsonProperty("peakUserMemoryReservation") DataSize peakUserMemoryReservation, @JsonProperty("peakRevocableMemoryReservation") DataSize peakRevocableMemoryReservation, + @JsonProperty("spilledDataSize") DataSize spilledDataSize, + @JsonProperty("totalScheduledTime") Duration totalScheduledTime, @JsonProperty("failedScheduledTime") Duration failedScheduledTime, @JsonProperty("totalCpuTime") Duration totalCpuTime, @@ -226,6 +229,7 @@ public StageStats( this.totalMemoryReservation = requireNonNull(totalMemoryReservation, "totalMemoryReservation is null"); this.peakUserMemoryReservation = requireNonNull(peakUserMemoryReservation, "peakUserMemoryReservation is null"); this.peakRevocableMemoryReservation = requireNonNull(peakRevocableMemoryReservation, "peakRevocableMemoryReservation is null"); + this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null"); this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null"); this.failedScheduledTime = requireNonNull(failedScheduledTime, "failedScheduledTime is null"); @@ -398,6 +402,12 @@ public DataSize getPeakRevocableMemoryReservation() return peakRevocableMemoryReservation; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @JsonProperty public Duration getTotalScheduledTime() { @@ -667,7 +677,7 @@ public BasicStageStats toBasicStageStats(StageState stageState) internalNetworkInputPositions, rawInputDataSize, rawInputPositions, - succinctBytes(operatorSummaries.stream().mapToLong(operatorSummary -> operatorSummary.getSpilledDataSize().toBytes()).sum()), + spilledDataSize, (long) cumulativeUserMemory, (long) failedCumulativeUserMemory, userMemoryReservation, @@ -703,6 +713,7 @@ public StageStats pruneDigests() totalMemoryReservation, peakUserMemoryReservation, peakRevocableMemoryReservation, + spilledDataSize, totalScheduledTime, failedScheduledTime, totalCpuTime, @@ -770,6 +781,7 @@ public static StageStats createInitial() zeroBytes, zeroBytes, zeroBytes, + zeroBytes, zeroSeconds, zeroSeconds, zeroSeconds, diff --git a/core/trino-main/src/main/java/io/trino/operator/AggregationMetrics.java b/core/trino-main/src/main/java/io/trino/operator/AggregationMetrics.java index e6c8901e581d..cf44f31c1dc8 100644 --- a/core/trino-main/src/main/java/io/trino/operator/AggregationMetrics.java +++ b/core/trino-main/src/main/java/io/trino/operator/AggregationMetrics.java @@ -18,6 +18,7 @@ import io.airlift.units.Duration; import io.trino.plugin.base.metrics.DurationTiming; import io.trino.plugin.base.metrics.LongCount; +import io.trino.spi.metrics.Metric; import io.trino.spi.metrics.Metrics; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -29,6 +30,8 @@ public class AggregationMetrics private static final String ACCUMULATOR_TIME_METRIC_NAME = "Accumulator update CPU time"; private static final String GROUP_BY_HASH_TIME_METRIC_NAME = "Group by hash update CPU time"; + private final SpillMetrics spillMetrics = new SpillMetrics(); + private long accumulatorTimeNanos; private long groupByHashTimeNanos; private long inputRowsProcessedWithPartialAggregationDisabled; @@ -48,11 +51,23 @@ public void recordInputRowsProcessedWithPartialAggregationDisabled(long rows) inputRowsProcessedWithPartialAggregationDisabled += rows; } + public void recordSpillSince(long startNanos, long spillBytes) + { + spillMetrics.recordSpillSince(startNanos, spillBytes); + } + + public void recordUnspillSince(long startNanos, long unspillBytes) + { + spillMetrics.recordUnspillSince(startNanos, unspillBytes); + } + public Metrics getMetrics() { - return new Metrics(ImmutableMap.of( - INPUT_ROWS_WITH_PARTIAL_AGGREGATION_DISABLED_METRIC_NAME, new LongCount(inputRowsProcessedWithPartialAggregationDisabled), - ACCUMULATOR_TIME_METRIC_NAME, new DurationTiming(new Duration(accumulatorTimeNanos, NANOSECONDS)), - GROUP_BY_HASH_TIME_METRIC_NAME, new DurationTiming(new Duration(groupByHashTimeNanos, NANOSECONDS)))); + return new Metrics(ImmutableMap.>builder() + .put(INPUT_ROWS_WITH_PARTIAL_AGGREGATION_DISABLED_METRIC_NAME, new LongCount(inputRowsProcessedWithPartialAggregationDisabled)) + .put(ACCUMULATOR_TIME_METRIC_NAME, new DurationTiming(new Duration(accumulatorTimeNanos, NANOSECONDS))) + .put(GROUP_BY_HASH_TIME_METRIC_NAME, new DurationTiming(new Duration(groupByHashTimeNanos, NANOSECONDS))) + .putAll(spillMetrics.getMetrics().getMetrics()) + .buildOrThrow()); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java index e087c609bb39..b8cc47fa474a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java @@ -394,8 +394,10 @@ public DriverStats getDriverStats() } ImmutableSet.Builder builder = ImmutableSet.builder(); + long spilledDataSize = 0; long physicalWrittenDataSize = 0; for (OperatorStats operator : operators) { + spilledDataSize += operator.getSpilledDataSize().toBytes(); physicalWrittenDataSize += operator.getPhysicalWrittenDataSize().toBytes(); if (operator.getBlockedReason().isPresent()) { builder.add(operator.getBlockedReason().get()); @@ -410,6 +412,7 @@ public DriverStats getDriverStats() elapsedTime.convertToMostSuccinctTimeUnit(), succinctBytes(driverMemoryContext.getUserMemory()), succinctBytes(driverMemoryContext.getRevocableMemory()), + succinctBytes(spilledDataSize), new Duration(totalScheduledTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(totalCpuTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(totalBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java index 3c985a00a355..01a67db69d35 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java @@ -43,6 +43,8 @@ public class DriverStats private final DataSize userMemoryReservation; private final DataSize revocableMemoryReservation; + private final DataSize spilledDataSize; + private final Duration totalScheduledTime; private final Duration totalCpuTime; private final Duration totalBlockedTime; @@ -85,6 +87,8 @@ public DriverStats() this.userMemoryReservation = DataSize.ofBytes(0); this.revocableMemoryReservation = DataSize.ofBytes(0); + this.spilledDataSize = DataSize.ofBytes(0); + this.totalScheduledTime = new Duration(0, MILLISECONDS); this.totalCpuTime = new Duration(0, MILLISECONDS); this.totalBlockedTime = new Duration(0, MILLISECONDS); @@ -128,6 +132,8 @@ public DriverStats( @JsonProperty("userMemoryReservation") DataSize userMemoryReservation, @JsonProperty("revocableMemoryReservation") DataSize revocableMemoryReservation, + @JsonProperty("spilledDataSize") DataSize spilledDataSize, + @JsonProperty("totalScheduledTime") Duration totalScheduledTime, @JsonProperty("totalCpuTime") Duration totalCpuTime, @JsonProperty("totalBlockedTime") Duration totalBlockedTime, @@ -168,6 +174,8 @@ public DriverStats( this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null"); this.revocableMemoryReservation = requireNonNull(revocableMemoryReservation, "revocableMemoryReservation is null"); + this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null"); + this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null"); this.totalCpuTime = requireNonNull(totalCpuTime, "totalCpuTime is null"); this.totalBlockedTime = requireNonNull(totalBlockedTime, "totalBlockedTime is null"); @@ -249,6 +257,12 @@ public DataSize getRevocableMemoryReservation() return revocableMemoryReservation; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @JsonProperty public Duration getTotalScheduledTime() { diff --git a/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java b/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java index 40ffb2101eb3..f5918084eddc 100644 --- a/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java @@ -29,7 +29,6 @@ import io.trino.spi.PageBuilder; import io.trino.spi.type.BigintType; import io.trino.spi.type.Type; -import io.trino.spi.type.TypeOperators; import io.trino.spiller.SpillerFactory; import io.trino.sql.planner.plan.AggregationNode.Step; import io.trino.sql.planner.plan.PlanNodeId; @@ -73,7 +72,6 @@ public static class HashAggregationOperatorFactory private final DataSize memoryLimitForMergeWithMemory; private final SpillerFactory spillerFactory; private final FlatHashStrategyCompiler hashStrategyCompiler; - private final TypeOperators typeOperators; private final Optional partialAggregationController; private boolean closed; @@ -92,7 +90,6 @@ public HashAggregationOperatorFactory( int expectedGroups, Optional maxPartialMemory, FlatHashStrategyCompiler hashStrategyCompiler, - TypeOperators typeOperators, Optional partialAggregationController) { this(operatorId, @@ -114,7 +111,6 @@ public HashAggregationOperatorFactory( throw new UnsupportedOperationException(); }, hashStrategyCompiler, - typeOperators, partialAggregationController); } @@ -135,7 +131,6 @@ public HashAggregationOperatorFactory( DataSize unspillMemoryLimit, SpillerFactory spillerFactory, FlatHashStrategyCompiler hashStrategyCompiler, - TypeOperators typeOperators, Optional partialAggregationController) { this(operatorId, @@ -155,7 +150,6 @@ public HashAggregationOperatorFactory( DataSize.succinctBytes((long) (unspillMemoryLimit.toBytes() * MERGE_WITH_MEMORY_RATIO)), spillerFactory, hashStrategyCompiler, - typeOperators, partialAggregationController); } @@ -178,7 +172,6 @@ public HashAggregationOperatorFactory( DataSize memoryLimitForMergeWithMemory, SpillerFactory spillerFactory, FlatHashStrategyCompiler hashStrategyCompiler, - TypeOperators typeOperators, Optional partialAggregationController) { this.operatorId = operatorId; @@ -198,7 +191,6 @@ public HashAggregationOperatorFactory( this.memoryLimitForMergeWithMemory = requireNonNull(memoryLimitForMergeWithMemory, "memoryLimitForMergeWithMemory is null"); this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null"); this.hashStrategyCompiler = requireNonNull(hashStrategyCompiler, "hashStrategyCompiler is null"); - this.typeOperators = requireNonNull(typeOperators, "typeOperators is null"); this.partialAggregationController = requireNonNull(partialAggregationController, "partialAggregationController is null"); } @@ -208,7 +200,7 @@ public Operator createOperator(DriverContext driverContext) checkState(!closed, "Factory is already closed"); OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, HashAggregationOperator.class.getSimpleName()); - HashAggregationOperator hashAggregationOperator = new HashAggregationOperator( + return new HashAggregationOperator( operatorContext, groupByTypes, groupByChannels, @@ -225,9 +217,7 @@ public Operator createOperator(DriverContext driverContext) memoryLimitForMergeWithMemory, spillerFactory, hashStrategyCompiler, - typeOperators, partialAggregationController); - return hashAggregationOperator; } @Override @@ -257,7 +247,6 @@ public OperatorFactory duplicate() memoryLimitForMergeWithMemory, spillerFactory, hashStrategyCompiler, - typeOperators, partialAggregationController.map(PartialAggregationController::duplicate)); } } @@ -279,7 +268,6 @@ public OperatorFactory duplicate() private final DataSize memoryLimitForMergeWithMemory; private final SpillerFactory spillerFactory; private final FlatHashStrategyCompiler flatHashStrategyCompiler; - private final TypeOperators typeOperators; private final AggregationMetrics aggregationMetrics = new AggregationMetrics(); private final List types; @@ -314,7 +302,6 @@ private HashAggregationOperator( DataSize memoryLimitForMergeWithMemory, SpillerFactory spillerFactory, FlatHashStrategyCompiler flatHashStrategyCompiler, - TypeOperators typeOperators, Optional partialAggregationController) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); @@ -340,7 +327,6 @@ private HashAggregationOperator( this.memoryLimitForMergeWithMemory = requireNonNull(memoryLimitForMergeWithMemory, "memoryLimitForMergeWithMemory is null"); this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null"); this.flatHashStrategyCompiler = requireNonNull(flatHashStrategyCompiler, "hashStrategyCompiler is null"); - this.typeOperators = requireNonNull(typeOperators, "typeOperators is null"); this.memoryContext = operatorContext.localUserMemoryContext(); } @@ -351,6 +337,17 @@ public OperatorContext getOperatorContext() return operatorContext; } + @Override + public ListenableFuture isBlocked() + { + if (outputPages == null || !outputPages.isBlocked()) { + return NOT_BLOCKED; + } + + // We can block e.g. because of self-triggered spill + return outputPages.getBlockedFuture(); + } + @Override public void finish() { @@ -426,7 +423,6 @@ else if (step.isOutputPartial() || !spillEnabled || !isSpillable()) { memoryLimitForMergeWithMemory, spillerFactory, flatHashStrategyCompiler, - typeOperators, aggregationMetrics); } diff --git a/core/trino-main/src/main/java/io/trino/operator/MergeHashSort.java b/core/trino-main/src/main/java/io/trino/operator/MergeHashSort.java index 716074db14f5..f6679f8797b7 100644 --- a/core/trino-main/src/main/java/io/trino/operator/MergeHashSort.java +++ b/core/trino-main/src/main/java/io/trino/operator/MergeHashSort.java @@ -13,11 +13,11 @@ */ package io.trino.operator; +import com.google.common.annotations.VisibleForTesting; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.type.Type; -import io.trino.spi.type.TypeOperators; import io.trino.util.MergeSortedPages.PageWithPosition; import java.io.Closeable; @@ -26,7 +26,6 @@ import java.util.stream.IntStream; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.operator.InterpretedHashGenerator.createPagePrefixHashGenerator; import static io.trino.util.MergeSortedPages.mergeSortedPages; /** @@ -40,20 +39,32 @@ public class MergeHashSort implements Closeable { private final AggregatedMemoryContext memoryContext; - private final TypeOperators typeOperators; - public MergeHashSort(AggregatedMemoryContext memoryContext, TypeOperators typeOperators) + public MergeHashSort(AggregatedMemoryContext memoryContext) { this.memoryContext = memoryContext; - this.typeOperators = typeOperators; } /** * Rows with same hash value are guaranteed to be in the same result page. */ - public WorkProcessor merge(List keyTypes, List allTypes, List> channels, DriverYieldSignal driverYieldSignal) + public WorkProcessor merge(List allTypes, List> channels, DriverYieldSignal driverYieldSignal, int hashChannel) + { + HashGenerator hashGenerator = new PrecomputedHashGenerator(hashChannel); + return mergeSortedPages( + channels, + createHashPageWithPositionComparator(hashGenerator), + IntStream.range(0, allTypes.size()).boxed().collect(toImmutableList()), + allTypes, + keepSameHashValuesWithinSinglePage(hashGenerator), + true, + memoryContext, + driverYieldSignal); + } + + @VisibleForTesting + public WorkProcessor merge(List allTypes, List> channels, DriverYieldSignal driverYieldSignal, HashGenerator hashGenerator) { - InterpretedHashGenerator hashGenerator = createPagePrefixHashGenerator(keyTypes, typeOperators); return mergeSortedPages( channels, createHashPageWithPositionComparator(hashGenerator), @@ -71,7 +82,7 @@ public void close() memoryContext.close(); } - private static BiPredicate keepSameHashValuesWithinSinglePage(InterpretedHashGenerator hashGenerator) + private static BiPredicate keepSameHashValuesWithinSinglePage(HashGenerator hashGenerator) { return new BiPredicate<>() { diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index 6b268c5849a0..de20a1f184a9 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -506,15 +506,6 @@ public String toString() return format("%s-%s", operatorType, planNodeId); } - public static Metrics getOperatorMetrics(Metrics operatorMetrics, long inputPositions, double cpuTimeSeconds, double wallTimeSeconds, double blockedWallSeconds) - { - return operatorMetrics.mergeWith(new Metrics(ImmutableMap.of( - "Input rows distribution", TDigestHistogram.fromValue(inputPositions), - "CPU time distribution (s)", TDigestHistogram.fromValue(cpuTimeSeconds), - "Scheduled time distribution (s)", TDigestHistogram.fromValue(wallTimeSeconds), - "Blocked time distribution (s)", TDigestHistogram.fromValue(blockedWallSeconds)))); - } - public R accept(QueryContextVisitor visitor, C context) { return visitor.visitOperatorContext(this, context); @@ -557,7 +548,6 @@ public OperatorStats getOperatorStats() dynamicFilterSplitsProcessed.get(), getOperatorMetrics( - metrics.get(), inputPositionsCount, new Duration(addInputTiming.getCpuNanos() + getOutputTiming.getCpuNanos() + finishTiming.getCpuNanos(), NANOSECONDS).convertTo(SECONDS).getValue(), new Duration(addInputTiming.getWallNanos() + getOutputTiming.getWallNanos() + finishTiming.getWallNanos(), NANOSECONDS).convertTo(SECONDS).getValue(), @@ -586,6 +576,15 @@ public OperatorStats getOperatorStats() info); } + private Metrics getOperatorMetrics(long inputPositions, double cpuTimeSeconds, double wallTimeSeconds, double blockedWallSeconds) + { + return metrics.get().mergeWith(new Metrics(ImmutableMap.of( + "Input rows distribution", TDigestHistogram.fromValue(inputPositions), + "CPU time distribution (s)", TDigestHistogram.fromValue(cpuTimeSeconds), + "Scheduled time distribution (s)", TDigestHistogram.fromValue(wallTimeSeconds), + "Blocked time distribution (s)", TDigestHistogram.fromValue(blockedWallSeconds)))); + } + private static long nanosBetween(long start, long end) { return max(0, end - start); @@ -619,7 +618,6 @@ private static class OperatorSpillContext implements SpillContext { private final DriverContext driverContext; - private final AtomicLong reservedBytes = new AtomicLong(); private final AtomicLong spilledBytes = new AtomicLong(); public OperatorSpillContext(DriverContext driverContext) @@ -631,12 +629,10 @@ public OperatorSpillContext(DriverContext driverContext) public void updateBytes(long bytes) { if (bytes >= 0) { - reservedBytes.addAndGet(bytes); driverContext.reserveSpill(bytes); spilledBytes.addAndGet(bytes); } else { - reservedBytes.accumulateAndGet(-bytes, this::decrementSpilledReservation); driverContext.freeSpill(-bytes); } } @@ -646,13 +642,6 @@ public long getSpilledBytes() return spilledBytes.longValue(); } - private long decrementSpilledReservation(long reservedBytes, long bytesBeingFreed) - { - checkArgument(bytesBeingFreed >= 0); - checkArgument(bytesBeingFreed <= reservedBytes, "tried to free %s spilled bytes from %s bytes reserved", bytesBeingFreed, reservedBytes); - return reservedBytes - bytesBeingFreed; - } - @Override public void close() { @@ -664,7 +653,7 @@ public void close() public String toString() { return toStringHelper(this) - .add("usedBytes", reservedBytes.get()) + .add("spilledBytes", spilledBytes.get()) .toString(); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/OrderByOperator.java b/core/trino-main/src/main/java/io/trino/operator/OrderByOperator.java index 182f50ca372e..85a90f35f9ac 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OrderByOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/OrderByOperator.java @@ -36,6 +36,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterators.transform; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.checkSuccess; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.util.MergeSortedPages.mergeSortedPages; @@ -315,7 +316,7 @@ private ListenableFuture spillToDisk() } pageIndex.sort(sortChannels, sortOrder); - spillInProgress = spiller.get().spill(pageIndex.getSortedPages()); + spillInProgress = asVoid(spiller.get().spill(pageIndex.getSortedPages())); finishMemoryRevoke = Optional.of(() -> { pageIndex.clear(); updateMemoryUsage(); diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java index e6b8c7ba58a3..24fcfc6250e5 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java @@ -74,6 +74,8 @@ public class PipelineContext private final AtomicReference lastExecutionStartTime = new AtomicReference<>(); private final AtomicReference lastExecutionEndTime = new AtomicReference<>(); + private final CounterStat spilledDataSize = new CounterStat(); + private final Distribution queuedTime = new Distribution(); private final Distribution elapsedTime = new Distribution(); @@ -208,6 +210,8 @@ public void driverFinished(DriverContext driverContext) completedSplitsWeight.addAndGet(driverContext.getSplitWeight()); } + spilledDataSize.update(driverStats.getSpilledDataSize().toBytes()); + queuedTime.add(driverStats.getQueuedTime().roundTo(NANOSECONDS)); elapsedTime.add(driverStats.getElapsedTime().roundTo(NANOSECONDS)); @@ -392,6 +396,8 @@ public PipelineStats getPipelineStats() int totalDrivers = completedDrivers + driverContexts.size(); + long spilledDataSize = this.spilledDataSize.getTotalCount(); + Distribution queuedTime = this.queuedTime.duplicate(); Distribution elapsedTime = this.elapsedTime.duplicate(); @@ -440,6 +446,8 @@ public PipelineStats getPipelineStats() blockedReasons.addAll(driverStats.getBlockedReasons()); } + spilledDataSize += driverStats.getSpilledDataSize().toBytes(); + queuedTime.add(driverStats.getQueuedTime().roundTo(NANOSECONDS)); elapsedTime.add(driverStats.getElapsedTime().roundTo(NANOSECONDS)); @@ -525,6 +533,8 @@ else if (pipelineLevelMetrics != Metrics.EMPTY) { succinctBytes(pipelineMemoryContext.getUserMemory()), succinctBytes(pipelineMemoryContext.getRevocableMemory()), + succinctBytes(spilledDataSize), + queuedTime.snapshot(), elapsedTime.snapshot(), diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java b/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java index 86fca937c1fa..39c7f411de5f 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java @@ -56,6 +56,8 @@ public class PipelineStats private final DataSize userMemoryReservation; private final DataSize revocableMemoryReservation; + private final DataSize spilledDataSize; + private final DistributionSnapshot queuedTime; private final DistributionSnapshot elapsedTime; @@ -114,6 +116,8 @@ public PipelineStats( @JsonProperty("userMemoryReservation") DataSize userMemoryReservation, @JsonProperty("revocableMemoryReservation") DataSize revocableMemoryReservation, + @JsonProperty("spilledDataSize") DataSize spilledDataSize, + @JsonProperty("queuedTime") DistributionSnapshot queuedTime, @JsonProperty("elapsedTime") DistributionSnapshot elapsedTime, @@ -179,6 +183,8 @@ public PipelineStats( this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null"); this.revocableMemoryReservation = requireNonNull(revocableMemoryReservation, "revocableMemoryReservation is null"); + this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null"); + this.queuedTime = requireNonNull(queuedTime, "queuedTime is null"); this.elapsedTime = requireNonNull(elapsedTime, "elapsedTime is null"); this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null"); @@ -324,6 +330,12 @@ public DataSize getRevocableMemoryReservation() return revocableMemoryReservation; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @JsonProperty public DistributionSnapshot getQueuedTime() { @@ -482,6 +494,7 @@ public PipelineStats summarize() completedDrivers, userMemoryReservation, revocableMemoryReservation, + spilledDataSize, queuedTime, elapsedTime, totalScheduledTime, @@ -527,6 +540,7 @@ public PipelineStats pruneDigests() completedDrivers, userMemoryReservation, revocableMemoryReservation, + spilledDataSize, queuedTime, elapsedTime, totalScheduledTime, diff --git a/core/trino-main/src/main/java/io/trino/operator/SpillMetrics.java b/core/trino-main/src/main/java/io/trino/operator/SpillMetrics.java new file mode 100644 index 000000000000..c7dd33b107c2 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/SpillMetrics.java @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; +import io.trino.plugin.base.metrics.TDigestHistogram; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; + +import java.util.concurrent.atomic.AtomicLong; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class SpillMetrics +{ + private static final String SPILL_TIME_METRIC_NAME = "Spill wall time (s)"; + @VisibleForTesting + public static final String SPILL_COUNT_METRIC_NAME = "Spill count"; + @VisibleForTesting + public static final String SPILL_DATA_SIZE = "Spill data size (MB)"; + private static final String UNSPILL_TIME_METRIC_NAME = "Unspill wall time (s)"; + private static final String UNSPILL_COUNT_METRIC_NAME = "Unspill count"; + private static final String UNSPILL_DATA_SIZE = "Unspill data size (MB)"; + + private final String prefix; + + private final AtomicLong spillTimeNanos = new AtomicLong(); + private final AtomicLong spillCount = new AtomicLong(); + private final AtomicLong spillBytes = new AtomicLong(); + private final AtomicLong unspillTimeNanos = new AtomicLong(); + private final AtomicLong unspillCount = new AtomicLong(); + private final AtomicLong unspillBytes = new AtomicLong(); + + public SpillMetrics() + { + this.prefix = ""; + } + + public SpillMetrics(String prefix) + { + this.prefix = requireNonNull(prefix, "prefix is null") + ": "; + } + + public void recordSpillSince(long startNanos, long spillBytes) + { + spillTimeNanos.addAndGet(System.nanoTime() - startNanos); + spillCount.incrementAndGet(); + this.spillBytes.addAndGet(spillBytes); + } + + public void recordUnspillSince(long startNanos, long unspillBytes) + { + unspillTimeNanos.addAndGet(System.nanoTime() - startNanos); + unspillCount.incrementAndGet(); + this.unspillBytes.addAndGet(unspillBytes); + } + + public Metrics getMetrics() + { + ImmutableMap.Builder> metricsBuilder = ImmutableMap.builder(); + if (spillTimeNanos.get() > 0 || spillCount.get() > 0 || spillBytes.get() > 0) { + metricsBuilder.put(prefix + SPILL_TIME_METRIC_NAME, TDigestHistogram.fromValue(new Duration(spillTimeNanos.longValue(), NANOSECONDS).getValue(SECONDS))); + metricsBuilder.put(prefix + SPILL_COUNT_METRIC_NAME, TDigestHistogram.fromValue(spillCount.doubleValue())); + metricsBuilder.put(prefix + SPILL_DATA_SIZE, TDigestHistogram.fromValue(spillBytes.longValue() * (1.0d / MEGABYTE.inBytes()))); + } + if (unspillTimeNanos.get() > 0 || unspillCount.get() > 0 || unspillBytes.get() > 0) { + metricsBuilder.put(prefix + UNSPILL_TIME_METRIC_NAME, TDigestHistogram.fromValue(new Duration(unspillTimeNanos.longValue(), NANOSECONDS).getValue(SECONDS))); + metricsBuilder.put(prefix + UNSPILL_COUNT_METRIC_NAME, TDigestHistogram.fromValue(unspillCount.doubleValue())); + metricsBuilder.put(prefix + UNSPILL_DATA_SIZE, TDigestHistogram.fromValue(unspillBytes.longValue() * (1.0d / MEGABYTE.inBytes()))); + } + return new Metrics(metricsBuilder.buildOrThrow()); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java index e72670b2f4c6..55a98f19c56b 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java @@ -461,6 +461,8 @@ public TaskStats getTaskStats() int blockedDrivers = 0; int completedDrivers = 0; + long spilledDataSize = 0; + long totalScheduledTime = 0; long totalCpuTime = 0; long totalBlockedTime = 0; @@ -512,6 +514,8 @@ public TaskStats getTaskStats() blockedDrivers += pipeline.getBlockedDrivers(); completedDrivers += pipeline.getCompletedDrivers(); + spilledDataSize += pipeline.getSpilledDataSize().toBytes(); + totalScheduledTime += pipeline.getTotalScheduledTime().roundTo(NANOSECONDS); totalCpuTime += pipeline.getTotalCpuTime().roundTo(NANOSECONDS); totalBlockedTime += pipeline.getTotalBlockedTime().roundTo(NANOSECONDS); @@ -600,6 +604,7 @@ public TaskStats getTaskStats() succinctBytes(userMemory), getPeakMemoryReservation().succinct(), succinctBytes(taskMemoryContext.getRevocableMemory()), + succinctBytes(spilledDataSize), new Duration(totalScheduledTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(totalCpuTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(totalBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskStats.java b/core/trino-main/src/main/java/io/trino/operator/TaskStats.java index b8719dd2f7ee..23a14e98769b 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskStats.java @@ -58,6 +58,8 @@ public class TaskStats private final DataSize peakUserMemoryReservation; private final DataSize revocableMemoryReservation; + private final DataSize spilledDataSize; + private final Duration totalScheduledTime; private final Duration totalCpuTime; private final Duration totalBlockedTime; @@ -116,6 +118,7 @@ public TaskStats(DateTime createTime, DateTime endTime) DataSize.ofBytes(0), DataSize.ofBytes(0), DataSize.ofBytes(0), + DataSize.ofBytes(0), new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), @@ -168,6 +171,8 @@ public TaskStats( @JsonProperty("peakUserMemoryReservation") DataSize peakUserMemoryReservation, @JsonProperty("revocableMemoryReservation") DataSize revocableMemoryReservation, + @JsonProperty("spilledDataSize") DataSize spilledDataSize, + @JsonProperty("totalScheduledTime") Duration totalScheduledTime, @JsonProperty("totalCpuTime") Duration totalCpuTime, @JsonProperty("totalBlockedTime") Duration totalBlockedTime, @@ -239,6 +244,8 @@ public TaskStats( this.peakUserMemoryReservation = requireNonNull(peakUserMemoryReservation, "peakUserMemoryReservation is null"); this.revocableMemoryReservation = requireNonNull(revocableMemoryReservation, "revocableMemoryReservation is null"); + this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null"); + this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null"); this.totalCpuTime = requireNonNull(totalCpuTime, "totalCpuTime is null"); this.totalBlockedTime = requireNonNull(totalBlockedTime, "totalBlockedTime is null"); @@ -388,6 +395,12 @@ public DataSize getRevocableMemoryReservation() return revocableMemoryReservation; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @JsonProperty public Duration getTotalScheduledTime() { @@ -580,6 +593,7 @@ public TaskStats summarize() userMemoryReservation, peakUserMemoryReservation, revocableMemoryReservation, + spilledDataSize, totalScheduledTime, totalCpuTime, totalBlockedTime, @@ -630,6 +644,7 @@ public TaskStats summarizeFinal() userMemoryReservation, peakUserMemoryReservation, revocableMemoryReservation, + spilledDataSize, totalScheduledTime, totalCpuTime, totalBlockedTime, @@ -680,6 +695,7 @@ public TaskStats pruneDigests() userMemoryReservation, peakUserMemoryReservation, revocableMemoryReservation, + spilledDataSize, totalScheduledTime, totalCpuTime, totalBlockedTime, diff --git a/core/trino-main/src/main/java/io/trino/operator/WindowOperator.java b/core/trino-main/src/main/java/io/trino/operator/WindowOperator.java index e7e768816ce9..7e0d5a72024f 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WindowOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/WindowOperator.java @@ -55,6 +55,7 @@ import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterators.peekingIterator; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.checkSuccess; import static io.trino.operator.PositionSearcher.findEndPosition; import static io.trino.operator.WorkProcessor.TransformationState.needsMoreData; @@ -801,7 +802,7 @@ ListenableFuture spill() Page anyPage = sortedPages.peek(); verify(anyPage.getPositionCount() != 0, "PagesIndex.getSortedPages returned an empty page"); currentSpillGroupRowPage = Optional.of(anyPage.getSingleValuePage(/* any */0)); - spillInProgress = Optional.of(spiller.get().spill(sortedPages)); + spillInProgress = Optional.of(asVoid(spiller.get().spill(sortedPages))); return spillInProgress.get(); } diff --git a/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/InMemoryHashAggregationBuilder.java b/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/InMemoryHashAggregationBuilder.java index aa5bff3be714..2c9d20455933 100644 --- a/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/InMemoryHashAggregationBuilder.java +++ b/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/InMemoryHashAggregationBuilder.java @@ -248,12 +248,12 @@ public WorkProcessor buildResult() for (GroupedAggregator groupedAggregator : groupedAggregators) { groupedAggregator.prepareFinal(); } - return buildResult(consecutiveGroupIds()); + return buildResult(consecutiveGroupIds(), new PageBuilder(buildTypes()), false); } - public WorkProcessor buildHashSortedResult() + public WorkProcessor buildSpillResult() { - return buildResult(hashSortedGroupIds()); + return buildResult(hashSortedGroupIds(), new PageBuilder(buildSpillTypes()), true); } public List buildSpillTypes() @@ -262,6 +262,8 @@ public List buildSpillTypes() for (GroupedAggregator groupedAggregator : groupedAggregators) { types.add(groupedAggregator.getSpillType()); } + // raw hash + types.add(BIGINT); return types; } @@ -271,9 +273,9 @@ public int getCapacity() return groupByHash.getCapacity(); } - private WorkProcessor buildResult(IntIterator groupIds) + private WorkProcessor buildResult(IntIterator groupIds, PageBuilder pageBuilder, boolean appendRawHash) { - PageBuilder pageBuilder = new PageBuilder(buildTypes()); + int rawHashIndex = groupByChannels.length + groupedAggregators.size(); return WorkProcessor.create(() -> { if (!groupIds.hasNext()) { return ProcessState.finished(); @@ -292,6 +294,10 @@ private WorkProcessor buildResult(IntIterator groupIds) BlockBuilder output = pageBuilder.getBlockBuilder(groupByChannels.length + i); groupedAggregator.evaluate(groupId, output); } + + if (appendRawHash) { + BIGINT.writeLong(pageBuilder.getBlockBuilder(rawHashIndex), groupByHash.getRawHash(groupId)); + } } return ProcessState.ofResult(pageBuilder.build()); diff --git a/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/SpillableHashAggregationBuilder.java b/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/SpillableHashAggregationBuilder.java index 1f9726cc9cdd..8d804fe76517 100644 --- a/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/SpillableHashAggregationBuilder.java +++ b/core/trino-main/src/main/java/io/trino/operator/aggregation/builder/SpillableHashAggregationBuilder.java @@ -27,7 +27,6 @@ import io.trino.operator.aggregation.AggregatorFactory; import io.trino.spi.Page; import io.trino.spi.type.Type; -import io.trino.spi.type.TypeOperators; import io.trino.spiller.Spiller; import io.trino.spiller.SpillerFactory; import io.trino.sql.planner.plan.AggregationNode; @@ -35,13 +34,21 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static io.airlift.concurrent.MoreFutures.addSuccessCallback; +import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.getFutureValue; -import static io.trino.operator.Operator.NOT_BLOCKED; +import static io.trino.operator.WorkProcessor.ProcessState.Type.FINISHED; +import static io.trino.operator.WorkProcessor.ProcessState.Type.RESULT; +import static io.trino.operator.WorkProcessor.ProcessState.blocked; +import static io.trino.operator.WorkProcessor.ProcessState.finished; +import static io.trino.operator.WorkProcessor.ProcessState.ofResult; +import static io.trino.operator.WorkProcessor.flatten; import static java.lang.Math.max; import static java.util.Objects.requireNonNull; @@ -64,15 +71,15 @@ public class SpillableHashAggregationBuilder private Optional spiller = Optional.empty(); private Optional merger = Optional.empty(); private Optional mergeHashSort = Optional.empty(); - private ListenableFuture spillInProgress = immediateVoidFuture(); + private ListenableFuture spillInProgress = immediateFuture(DataSize.ofBytes(0L)); private final FlatHashStrategyCompiler hashStrategyCompiler; - private final TypeOperators typeOperators; private final AggregationMetrics aggregationMetrics; // todo get rid of that and only use revocable memory private long emptyHashAggregationBuilderSize; private boolean producingOutput; + private boolean finishing; public SpillableHashAggregationBuilder( List aggregatorFactories, @@ -86,7 +93,6 @@ public SpillableHashAggregationBuilder( DataSize memoryLimitForMergeWithMemory, SpillerFactory spillerFactory, FlatHashStrategyCompiler hashStrategyCompiler, - TypeOperators typeOperators, AggregationMetrics aggregationMetrics) { this.aggregatorFactories = aggregatorFactories; @@ -102,7 +108,6 @@ public SpillableHashAggregationBuilder( this.memoryLimitForMergeWithMemory = memoryLimitForMergeWithMemory.toBytes(); this.spillerFactory = spillerFactory; this.hashStrategyCompiler = hashStrategyCompiler; - this.typeOperators = typeOperators; this.aggregationMetrics = requireNonNull(aggregationMetrics, "aggregationMetrics is null"); rebuildHashAggregationBuilder(); @@ -152,18 +157,13 @@ private boolean hasPreviousSpillCompletedSuccessfully() @Override public ListenableFuture startMemoryRevoke() { - if (producingOutput) { - // all revocable memory has been released in buildResult method - verify(localRevocableMemoryContext.getBytes() == 0); - return NOT_BLOCKED; - } - return spillToDisk(); } @Override public void finishMemoryRevoke() { + checkState(hasPreviousSpillCompletedSuccessfully(), "Previous spill hasn't yet finished"); updateMemory(); } @@ -175,33 +175,41 @@ private boolean shouldMergeWithMemory(long memorySize) @Override public WorkProcessor buildResult() { - checkState(hasPreviousSpillCompletedSuccessfully(), "Previous spill hasn't yet finished"); - producingOutput = true; - - // Convert revocable memory to user memory as returned WorkProcessor holds on to memory so we no longer can revoke. - if (localRevocableMemoryContext.getBytes() > 0) { - long currentRevocableBytes = localRevocableMemoryContext.getBytes(); - localRevocableMemoryContext.setBytes(0); - if (!localUserMemoryContext.trySetBytes(localUserMemoryContext.getBytes() + currentRevocableBytes)) { - // TODO: this might fail (even though we have just released memory), but we don't - // have a proper way to atomically convert memory reservations - localRevocableMemoryContext.setBytes(currentRevocableBytes); - // spill since revocable memory could not be converted to user memory immediately - // TODO: this should be asynchronous - getFutureValue(spillToDisk()); - updateMemory(); + return flatten(WorkProcessor.create(() -> { + checkState(hasPreviousSpillCompletedSuccessfully(), "Previous spill hasn't yet finished"); + // update memory after potential spill from the previous call to buildResult + updateMemory(); + producingOutput = true; + + if (finishing) { + return finished(); } - } - if (spiller.isEmpty()) { - return hashAggregationBuilder.buildResult(); - } + if (localRevocableMemoryContext.getBytes() > 0) { + // No spill happened, try to build result from memory + if (spiller.isEmpty()) { + // No spill happened, try to build result from memory. Revocable memory needs to be converted to user memory as producing output stage is no longer revocable. + long currentRevocableBytes = localRevocableMemoryContext.getBytes(); + localRevocableMemoryContext.setBytes(0); + if (!localUserMemoryContext.trySetBytes(localUserMemoryContext.getBytes() + currentRevocableBytes)) { + // TODO: this might fail (even though we have just released memory), but we don't + // have a proper way to atomically convert memory reservations + localRevocableMemoryContext.setBytes(currentRevocableBytes); + // spill since revocable memory could not be converted to user memory immediately + return blocked(spillToDisk()); + } + } + else if (!shouldMergeWithMemory(getSizeInMemoryWhenUnspilling())) { + return blocked(spillToDisk()); + } + } - if (shouldMergeWithMemory(getSizeInMemoryWhenUnspilling())) { - return mergeFromDiskAndMemory(); - } - getFutureValue(spillToDisk()); - return mergeFromDisk(); + finishing = true; + if (spiller.isEmpty()) { + return ofResult(hashAggregationBuilder.buildResult()); + } + return ofResult(mergeFromDiskAndMemory()); + })); } /** @@ -236,23 +244,42 @@ public void close() private ListenableFuture spillToDisk() { + if (!spillInProgress.isDone()) { + // Spill can be triggered first in SpillableHashAggregationBuilder.buildResult and then by Driver (via HashAggregationOperator#startMemoryRevoke). + // While spill is in progress revocable memory is not released, hence redundant call to spillToDisk might be made. + return asVoid(spillInProgress); + } + + if (localRevocableMemoryContext.getBytes() == 0 || hasNoGroups()) { + // This must be a stale revoke request as hashAggregationBuilder revocable memory could have been converted to user memory or + // spill is completed but updateMemory() was not called yet. + return immediateVoidFuture(); + } + checkState(hasPreviousSpillCompletedSuccessfully(), "Previous spill hasn't yet finished"); hashAggregationBuilder.setSpillOutput(); if (spiller.isEmpty()) { spiller = Optional.of(spillerFactory.create( - hashAggregationBuilder.buildTypes(), + hashAggregationBuilder.buildSpillTypes(), operatorContext.getSpillContext(), operatorContext.newAggregateUserMemoryContext())); } // start spilling process with current content of the hashAggregationBuilder builder... - spillInProgress = spiller.get().spill(hashAggregationBuilder.buildHashSortedResult().iterator()); + long spillStartNanos = System.nanoTime(); + spillInProgress = spiller.get().spill(hashAggregationBuilder.buildSpillResult().iterator()); + addSuccessCallback(spillInProgress, dataSize -> aggregationMetrics.recordSpillSince(spillStartNanos, dataSize.toBytes())); // ... and immediately create new hashAggregationBuilder so effectively memory ownership // over hashAggregationBuilder is transferred from this thread to a spilling thread rebuildHashAggregationBuilder(); - return spillInProgress; + return asVoid(spillInProgress); + } + + private boolean hasNoGroups() + { + return hashAggregationBuilder.getGroupCount() == 0; } private WorkProcessor mergeFromDiskAndMemory() @@ -260,39 +287,36 @@ private WorkProcessor mergeFromDiskAndMemory() checkState(spiller.isPresent()); hashAggregationBuilder.setSpillOutput(); - mergeHashSort = Optional.of(new MergeHashSort(operatorContext.newAggregateUserMemoryContext(), typeOperators)); + mergeHashSort = Optional.of(new MergeHashSort(operatorContext.newAggregateUserMemoryContext())); + List spillTypes = hashAggregationBuilder.buildSpillTypes(); + long unspillStartNanos = System.nanoTime(); + AtomicLong unspillBytes = new AtomicLong(0); WorkProcessor mergedSpilledPages = mergeHashSort.get().merge( - groupByTypes, - hashAggregationBuilder.buildSpillTypes(), + spillTypes, ImmutableList.>builder() .addAll(spiller.get().getSpills().stream() .map(WorkProcessor::fromIterator) + .map(processor -> processor.withProcessStateMonitor(state -> { + if (state.getType() == RESULT) { + unspillBytes.addAndGet(state.getResult().getSizeInBytes()); + } + })) .collect(toImmutableList())) - .add(hashAggregationBuilder.buildHashSortedResult()) + .add(hashAggregationBuilder.buildSpillResult()) .build(), - operatorContext.getDriverContext().getYieldSignal()); + operatorContext.getDriverContext().getYieldSignal(), + spillTypes.size() - 1); + mergedSpilledPages = mergedSpilledPages.withProcessStateMonitor(state -> { + if (state.getType() == FINISHED) { + aggregationMetrics.recordUnspillSince(unspillStartNanos, unspillBytes.get()); + } + }); + spiller = Optional.empty(); return mergeSortedPages(mergedSpilledPages, max(memoryLimitForMerge - memoryLimitForMergeWithMemory, 1L)); } - private WorkProcessor mergeFromDisk() - { - checkState(spiller.isPresent()); - - mergeHashSort = Optional.of(new MergeHashSort(operatorContext.newAggregateUserMemoryContext(), typeOperators)); - - WorkProcessor mergedSpilledPages = mergeHashSort.get().merge( - groupByTypes, - hashAggregationBuilder.buildSpillTypes(), - spiller.get().getSpills().stream() - .map(WorkProcessor::fromIterator) - .collect(toImmutableList()), - operatorContext.getDriverContext().getYieldSignal()); - - return mergeSortedPages(mergedSpilledPages, memoryLimitForMerge); - } - private WorkProcessor mergeSortedPages(WorkProcessor sortedPages, long memoryLimitForMerge) { merger = Optional.of(new MergingHashAggregationBuilder( diff --git a/core/trino-main/src/main/java/io/trino/operator/join/DefaultPageJoiner.java b/core/trino-main/src/main/java/io/trino/operator/join/DefaultPageJoiner.java index d737b873feb5..cd8c18865de9 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/DefaultPageJoiner.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/DefaultPageJoiner.java @@ -15,11 +15,13 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.DataSize; import io.trino.memory.context.MemoryTrackingContext; import io.trino.operator.DriverYieldSignal; import io.trino.operator.HashGenerator; import io.trino.operator.ProcessorContext; import io.trino.operator.SpillContext; +import io.trino.operator.SpillMetrics; import io.trino.operator.WorkProcessor; import io.trino.operator.exchange.LocalPartitionGenerator; import io.trino.operator.join.JoinProbe.JoinProbeFactory; @@ -43,11 +45,11 @@ import static com.google.common.base.Suppliers.memoize; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.addSuccessCallback; import static io.airlift.concurrent.MoreFutures.checkSuccess; import static io.airlift.concurrent.MoreFutures.getDone; -import static io.trino.operator.Operator.NOT_BLOCKED; import static io.trino.operator.WorkProcessor.TransformationState.blocked; import static io.trino.operator.WorkProcessor.TransformationState.finished; import static io.trino.operator.WorkProcessor.TransformationState.needsMoreData; @@ -76,6 +78,7 @@ public class DefaultPageJoiner private final Map spilledRows = new HashMap<>(); private final boolean probeOnOuterSide; private final boolean outputSingleMatch; + private final SpillMetrics spillMetrics; @Nullable private LookupSourceProvider lookupSourceProvider; @@ -87,7 +90,7 @@ public class DefaultPageJoiner private boolean currentProbePositionProducedRow; private Optional spiller = Optional.empty(); - private ListenableFuture spillInProgress = NOT_BLOCKED; + private ListenableFuture spillInProgress = immediateFuture(DataSize.ofBytes(0)); public DefaultPageJoiner( ProcessorContext processorContext, @@ -95,6 +98,7 @@ public DefaultPageJoiner( List buildOutputTypes, JoinType joinType, boolean outputSingleMatch, + SpillMetrics spillMetrics, HashGenerator hashGenerator, JoinProbeFactory joinProbeFactory, LookupSourceFactory lookupSourceFactory, @@ -116,6 +120,7 @@ public DefaultPageJoiner( this.partitionGenerator = memoize(() -> new LocalPartitionGenerator(hashGenerator, lookupSourceFactory.partitions())); this.pageBuilder = new LookupJoinPageBuilder(buildOutputTypes); this.outputSingleMatch = outputSingleMatch; + this.spillMetrics = requireNonNull(spillMetrics, "spillMetrics is null"); // Cannot use switch case here, because javac will synthesize an inner class and cause IllegalAccessError probeOnOuterSide = joinType == PROBE_OUTER || joinType == FULL_OUTER; @@ -158,7 +163,7 @@ else if (savedRows.hasNext()) { } else if (!spillInProgress.isDone()) { // block on remaining spill before finishing - return blocked(spillInProgress); + return blocked(asVoid(spillInProgress)); } else { checkSuccess(spillInProgress, "spilling failed"); @@ -185,7 +190,7 @@ else if (!spillInProgress.isDone()) { if (spillInfoSnapshotIfSpillChanged.isPresent()) { if (!spillInProgress.isDone()) { // block on previous spill - return blocked(spillInProgress); + return blocked(asVoid(spillInProgress)); } checkSuccess(spillInProgress, "spilling failed"); @@ -374,7 +379,9 @@ private Page spillAndMaskSpilledPositions(Page page, SpillInfoSnapshot spillInfo } PartitioningSpiller.PartitioningSpillResult result = spiller.get().partitionAndSpill(page, spillInfoSnapshot.getSpillMask()); + long spillStartNanos = System.nanoTime(); spillInProgress = result.getSpillingFuture(); + addSuccessCallback(spillInProgress, dataSize -> spillMetrics.recordSpillSince(spillStartNanos, dataSize.toBytes())); return result.getRetained(); } diff --git a/core/trino-main/src/main/java/io/trino/operator/join/HashBuilderOperator.java b/core/trino-main/src/main/java/io/trino/operator/join/HashBuilderOperator.java index 3dd83cadcd80..3ffeb7c29246 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/HashBuilderOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/HashBuilderOperator.java @@ -15,11 +15,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.ThreadSafe; +import io.airlift.concurrent.MoreFutures; import io.airlift.log.Logger; +import io.airlift.units.DataSize; import io.trino.memory.context.LocalMemoryContext; import io.trino.operator.DriverContext; import io.trino.operator.HashArraySizeSupplier; @@ -27,7 +29,10 @@ import io.trino.operator.OperatorContext; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; +import io.trino.operator.SpillMetrics; import io.trino.spi.Page; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; import io.trino.spiller.SingleStreamSpiller; import io.trino.spiller.SingleStreamSpillerFactory; import io.trino.sql.gen.JoinFilterFunctionCompiler.JoinFilterFunctionFactory; @@ -45,8 +50,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.airlift.concurrent.MoreFutures.addSuccessCallback; +import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.checkSuccess; import static io.airlift.concurrent.MoreFutures.getDone; import static io.airlift.units.DataSize.succinctBytes; @@ -219,11 +226,14 @@ public enum State private final boolean spillEnabled; private final SingleStreamSpillerFactory singleStreamSpillerFactory; + private final SpillMetrics inputSpillMetrics = new SpillMetrics("Build input"); + private final SpillMetrics indexSpillMetrics = new SpillMetrics("Index"); + private State state = State.CONSUMING_INPUT; private Optional> lookupSourceNotNeeded = Optional.empty(); private final SpilledLookupSourceHandle spilledLookupSourceHandle = new SpilledLookupSourceHandle(); private Optional spiller = Optional.empty(); - private ListenableFuture spillInProgress = NOT_BLOCKED; + private ListenableFuture spillInProgress = immediateFuture(DataSize.ofBytes(0)); private Optional>> unspillInProgress = Optional.empty(); @Nullable private LookupSourceSupplier lookupSourceSupplier; @@ -287,20 +297,15 @@ public ListenableFuture isBlocked() { return switch (state) { case CONSUMING_INPUT -> NOT_BLOCKED; - case SPILLING_INPUT -> spillInProgress; + case SPILLING_INPUT -> asVoid(spillInProgress); case LOOKUP_SOURCE_BUILT -> lookupSourceNotNeeded.orElseThrow(() -> new IllegalStateException("Lookup source built, but disposal future not set")); case INPUT_SPILLED -> spilledLookupSourceHandle.getUnspillingOrDisposeRequested(); - case INPUT_UNSPILLING -> unspillInProgress.map(HashBuilderOperator::asVoid).orElseThrow(() -> new IllegalStateException("Unspilling in progress, but unspilling future not set")); + case INPUT_UNSPILLING -> unspillInProgress.map(MoreFutures::asVoid).orElseThrow(() -> new IllegalStateException("Unspilling in progress, but unspilling future not set")); case INPUT_UNSPILLED_AND_BUILT -> spilledLookupSourceHandle.getDisposeRequested(); case CLOSED -> NOT_BLOCKED; }; } - private static ListenableFuture asVoid(ListenableFuture future) - { - return Futures.transform(future, v -> null, directExecutor()); - } - @Override public boolean needsInput() { @@ -349,7 +354,12 @@ private void spillInput(Page page) { checkState(spillInProgress.isDone(), "Previous spill still in progress"); checkSuccess(spillInProgress, "spilling failed"); + long spillStartNanos = System.nanoTime(); spillInProgress = getSpiller().spill(page); + addSuccessCallback(spillInProgress, dataSize -> { + inputSpillMetrics.recordSpillSince(spillStartNanos, dataSize.toBytes()); + updateMetrics(); + }); } @Override @@ -404,7 +414,13 @@ private ListenableFuture spillIndex() index.getTypes(), operatorContext.getSpillContext().newLocalSpillContext(), operatorContext.newLocalUserMemoryContext(HashBuilderOperator.class.getSimpleName()))); - return getSpiller().spill(index.getPages()); + long spillStartNanos = System.nanoTime(); + ListenableFuture spillFuture = getSpiller().spill(index.getPages()); + addSuccessCallback(spillFuture, dataSize -> { + indexSpillMetrics.recordSpillSince(spillStartNanos, dataSize.toBytes()); + updateMetrics(); + }); + return asVoid(spillFuture); } @Override @@ -528,8 +544,14 @@ private void unspillLookupSourceIfRequested() verify(spiller.isPresent()); verify(unspillInProgress.isEmpty()); - localUserMemoryContext.setBytes(getSpiller().getSpilledPagesInMemorySize() + index.getEstimatedSize().toBytes()); + long spilledPagesInMemorySize = getSpiller().getSpilledPagesInMemorySize(); + localUserMemoryContext.setBytes(spilledPagesInMemorySize + index.getEstimatedSize().toBytes()); + long unspillStartNanos = System.nanoTime(); unspillInProgress = Optional.of(getSpiller().getAllSpilledPages()); + addSuccessCallback(unspillInProgress.get(), ignored -> { + indexSpillMetrics.recordUnspillSince(unspillStartNanos, spilledPagesInMemorySize); + updateMetrics(); + }); state = State.INPUT_UNSPILLING; } @@ -599,6 +621,14 @@ private LookupSourceSupplier buildLookupSource() return partition; } + private void updateMetrics() + { + operatorContext.setLatestMetrics(new Metrics(ImmutableMap.>builder() + .putAll(inputSpillMetrics.getMetrics().getMetrics()) + .putAll(indexSpillMetrics.getMetrics().getMetrics()) + .buildOrThrow())); + } + @Override public boolean isFinished() { diff --git a/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperator.java b/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperator.java index d00e59a1c3e3..678f6ef98182 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperator.java @@ -17,12 +17,14 @@ import io.trino.operator.HashGenerator; import io.trino.operator.OperatorInfo; import io.trino.operator.ProcessorContext; +import io.trino.operator.SpillMetrics; import io.trino.operator.WorkProcessor; import io.trino.operator.WorkProcessorOperator; import io.trino.operator.join.JoinProbe.JoinProbeFactory; import io.trino.operator.join.LookupJoinOperatorFactory.JoinType; import io.trino.operator.join.PageJoiner.PageJoinerFactory; import io.trino.spi.Page; +import io.trino.spi.metrics.Metrics; import io.trino.spi.type.Type; import io.trino.spiller.PartitioningSpillerFactory; @@ -41,6 +43,7 @@ public class LookupJoinOperator private final WorkProcessor pages; private final SpillingJoinProcessor joinProcessor; private final JoinStatisticsCounter statisticsCounter; + private final SpillMetrics spillMetrics = new SpillMetrics("Probe"); LookupJoinOperator( List probeTypes, @@ -66,6 +69,7 @@ public class LookupJoinOperator buildOutputTypes, joinType, outputSingleMatch, + spillMetrics, hashGenerator, joinProbeFactory, lookupSourceFactory, @@ -80,6 +84,7 @@ public class LookupJoinOperator lookupSourceFactory, lookupSourceProviderFuture, partitioningSpillerFactory, + spillMetrics, pageJoinerFactory, sourcePages); WorkProcessor pages = flatten(WorkProcessor.create(joinProcessor)); @@ -102,6 +107,12 @@ public Optional getOperatorInfo() return Optional.of(statisticsCounter.get()); } + @Override + public Metrics getMetrics() + { + return spillMetrics.getMetrics(); + } + @Override public void close() { diff --git a/core/trino-main/src/main/java/io/trino/operator/join/SpillingJoinProcessor.java b/core/trino-main/src/main/java/io/trino/operator/join/SpillingJoinProcessor.java index 3d34924866b2..664d320ef9d7 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/SpillingJoinProcessor.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/SpillingJoinProcessor.java @@ -16,6 +16,7 @@ import com.google.common.io.Closer; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.trino.operator.SpillMetrics; import io.trino.operator.WorkProcessor; import io.trino.operator.join.DefaultPageJoiner.SavedRow; import io.trino.operator.join.PageJoiner.PageJoinerFactory; @@ -27,11 +28,14 @@ import java.util.Iterator; import java.util.Optional; import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import static com.google.common.collect.Iterators.singletonIterator; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.getDone; +import static io.trino.operator.WorkProcessor.ProcessState.Type.FINISHED; +import static io.trino.operator.WorkProcessor.ProcessState.Type.RESULT; import static java.util.Collections.emptyIterator; import static java.util.Objects.requireNonNull; @@ -43,6 +47,7 @@ public class SpillingJoinProcessor private final boolean waitForBuild; private final LookupSourceFactory lookupSourceFactory; private final ListenableFuture lookupSourceProvider; + private final SpillMetrics spillMetrics; private final PageJoinerFactory pageJoinerFactory; private final PageJoiner sourcePagesJoiner; private final WorkProcessor joinedSourcePages; @@ -65,6 +70,7 @@ public SpillingJoinProcessor( LookupSourceFactory lookupSourceFactory, ListenableFuture lookupSourceProvider, PartitioningSpillerFactory partitioningSpillerFactory, + SpillMetrics spillMetrics, PageJoinerFactory pageJoinerFactory, WorkProcessor sourcePages) { @@ -73,6 +79,7 @@ public SpillingJoinProcessor( this.waitForBuild = waitForBuild; this.lookupSourceFactory = requireNonNull(lookupSourceFactory, "lookupSourceFactory is null"); this.lookupSourceProvider = requireNonNull(lookupSourceProvider, "lookupSourceProvider is null"); + this.spillMetrics = requireNonNull(spillMetrics, "spillMetrics is null"); this.pageJoinerFactory = requireNonNull(pageJoinerFactory, "pageJoinerFactory is null"); sourcePagesJoiner = pageJoinerFactory.getPageJoiner( lookupSourceProvider, @@ -155,9 +162,25 @@ private static ListenableFuture asVoid(ListenableFuture future) private WorkProcessor joinUnspilledPages(PartitionedConsumption.Partition> partition) { int partitionNumber = partition.number(); - WorkProcessor unspilledInputPages = WorkProcessor.fromIterator(sourcePagesJoiner.getSpiller() - .map(spiller -> spiller.getSpilledPages(partitionNumber)) - .orElse(emptyIterator())); + + WorkProcessor unspilledInputPages; + if (sourcePagesJoiner.getSpiller().isPresent()) { + long unspillStartNanos = System.nanoTime(); + AtomicLong unspillBytes = new AtomicLong(0); + unspilledInputPages = WorkProcessor.fromIterator(sourcePagesJoiner.getSpiller().get().getSpilledPages(partitionNumber)) + .withProcessStateMonitor(state -> { + if (state.getType() == FINISHED) { + spillMetrics.recordUnspillSince(unspillStartNanos, unspillBytes.get()); + } + else if (state.getType() == RESULT) { + unspillBytes.addAndGet(state.getResult().getSizeInBytes()); + } + }); + } + else { + unspilledInputPages = WorkProcessor.of(); + } + Iterator savedRow = Optional.ofNullable(sourcePagesJoiner.getSpilledRows().remove(partitionNumber)) .map(row -> (Iterator) singletonIterator(row)) .orElse(emptyIterator()); diff --git a/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java b/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java index d8859002e1cf..4a7f0e49b225 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java +++ b/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java @@ -23,6 +23,7 @@ import io.airlift.slice.OutputStreamSliceOutput; import io.airlift.slice.Slice; import io.airlift.slice.SliceOutput; +import io.airlift.units.DataSize; import io.trino.annotation.NotThreadSafe; import io.trino.execution.buffer.PageDeserializer; import io.trino.execution.buffer.PageSerializer; @@ -44,9 +45,11 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_PREFIX; import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_SUFFIX; @@ -71,9 +74,9 @@ public class FileSingleStreamSpiller private final ListeningExecutorService executor; - private boolean writable = true; - private long spilledPagesInMemorySize; - private ListenableFuture spillInProgress = immediateVoidFuture(); + private final AtomicBoolean writable = new AtomicBoolean(true); + private final AtomicLong spilledPagesInMemorySize = new AtomicLong(); + private ListenableFuture spillInProgress = immediateFuture(DataSize.ofBytes(0L)); private final Runnable fileSystemErrorHandler; @@ -116,7 +119,7 @@ public FileSingleStreamSpiller( } @Override - public ListenableFuture spill(Iterator pageIterator) + public ListenableFuture spill(Iterator pageIterator) { requireNonNull(pageIterator, "pageIterator is null"); checkNoSpillInProgress(); @@ -127,7 +130,7 @@ public ListenableFuture spill(Iterator pageIterator) @Override public long getSpilledPagesInMemorySize() { - return spilledPagesInMemorySize; + return spilledPagesInMemorySize.longValue(); } @Override @@ -143,17 +146,20 @@ public ListenableFuture> getAllSpilledPages() return executor.submit(() -> ImmutableList.copyOf(getSpilledPages())); } - private void writePages(Iterator pageIterator) + private DataSize writePages(Iterator pageIterator) { - checkState(writable, "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent"); + checkState(writable.get(), "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent"); Optional encryptionKey = this.encryptionKey; checkState(encrypted == encryptionKey.isPresent(), "encryptionKey has been discarded"); PageSerializer serializer = serdeFactory.createSerializer(encryptionKey); + long spilledPagesBytes = 0; try (SliceOutput output = new OutputStreamSliceOutput(targetFile.newOutputStream(APPEND), BUFFER_SIZE)) { while (pageIterator.hasNext()) { Page page = pageIterator.next(); - spilledPagesInMemorySize += page.getSizeInBytes(); + long pageSizeInBytes = page.getSizeInBytes(); + spilledPagesBytes += pageSizeInBytes; + spilledPagesInMemorySize.addAndGet(pageSizeInBytes); Slice serializedPage = serializer.serialize(page); long pageSize = serializedPage.length(); localSpillContext.updateBytes(pageSize); @@ -165,12 +171,12 @@ private void writePages(Iterator pageIterator) fileSystemErrorHandler.run(); throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to spill pages", e); } + return DataSize.ofBytes(spilledPagesBytes); } private Iterator readPages() { - checkState(writable, "Repeated reads are disallowed to prevent potential resource leaks"); - writable = false; + checkState(writable.getAndSet(false), "Repeated reads are disallowed to prevent potential resource leaks"); try { Optional encryptionKey = this.encryptionKey; diff --git a/core/trino-main/src/main/java/io/trino/spiller/GenericPartitioningSpiller.java b/core/trino-main/src/main/java/io/trino/spiller/GenericPartitioningSpiller.java index eb56a954aeaf..832e61a6ece1 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/GenericPartitioningSpiller.java +++ b/core/trino-main/src/main/java/io/trino/spiller/GenericPartitioningSpiller.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.ThreadSafe; +import io.airlift.units.DataSize; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.operator.PartitionFunction; import io.trino.operator.SpillContext; @@ -40,7 +41,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; -import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static java.util.Objects.requireNonNull; @@ -114,7 +115,7 @@ public synchronized PartitioningSpillResult partitionAndSpill(Page page, IntPred checkState(!readingStarted, "reading already started"); IntArrayList unspilledPositions = partitionPage(page, spillPartitionMask); - ListenableFuture future = flushFullBuilders(); + ListenableFuture future = flushFullBuilders(); return new PartitioningSpillResult(future, page.getPositions(unspilledPositions.elements(), 0, unspilledPositions.size())); } @@ -143,21 +144,21 @@ private synchronized IntArrayList partitionPage(Page page, IntPredicate spillPar return unspilledPositions; } - private ListenableFuture flushFullBuilders() + private ListenableFuture flushFullBuilders() { return flush(PageBuilder::isFull); } @VisibleForTesting - ListenableFuture flush() + ListenableFuture flush() { return flush(pageBuilder -> true); } - private synchronized ListenableFuture flush(Predicate flushCondition) + private synchronized ListenableFuture flush(Predicate flushCondition) { requireNonNull(flushCondition, "flushCondition is null"); - ImmutableList.Builder> futures = ImmutableList.builder(); + ImmutableList.Builder> futures = ImmutableList.builder(); for (int partition = 0; partition < spillers.size(); partition++) { PageBuilder pageBuilder = pageBuilders.get(partition); @@ -166,19 +167,21 @@ private synchronized ListenableFuture flush(Predicate flushCo } } - return asVoid(Futures.allAsList(futures.build())); + return Futures.transform(Futures.allAsList(futures.build()), + result -> { + long totalBytes = 0; + for (DataSize size : result) { + totalBytes += size.toBytes(); + } + return DataSize.ofBytes(totalBytes); + }, directExecutor()); } - private static ListenableFuture asVoid(ListenableFuture future) - { - return Futures.transform(future, v -> null, directExecutor()); - } - - private synchronized ListenableFuture flush(int partition) + private synchronized ListenableFuture flush(int partition) { PageBuilder pageBuilder = pageBuilders.get(partition); if (pageBuilder.isEmpty()) { - return immediateVoidFuture(); + return immediateFuture(DataSize.ofBytes(0)); } Page page = pageBuilder.build(); pageBuilder.reset(); diff --git a/core/trino-main/src/main/java/io/trino/spiller/GenericSpiller.java b/core/trino-main/src/main/java/io/trino/spiller/GenericSpiller.java index 719311863e95..77fba86fbbaf 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/GenericSpiller.java +++ b/core/trino-main/src/main/java/io/trino/spiller/GenericSpiller.java @@ -15,6 +15,7 @@ import com.google.common.io.Closer; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.DataSize; import io.trino.annotation.NotThreadSafe; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.operator.SpillContext; @@ -27,7 +28,7 @@ import java.util.List; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -40,7 +41,7 @@ public class GenericSpiller private final AggregatedMemoryContext aggregatedMemoryContext; private final SingleStreamSpillerFactory singleStreamSpillerFactory; private final Closer closer = Closer.create(); - private ListenableFuture previousSpill = immediateVoidFuture(); + private ListenableFuture previousSpill = immediateFuture(DataSize.ofBytes(0)); private final List singleStreamSpillers = new ArrayList<>(); public GenericSpiller( @@ -56,7 +57,7 @@ public GenericSpiller( } @Override - public ListenableFuture spill(Iterator pageIterator) + public ListenableFuture spill(Iterator pageIterator) { checkNoSpillInProgress(); SingleStreamSpiller singleStreamSpiller = singleStreamSpillerFactory.create(types, spillContext, aggregatedMemoryContext.newLocalMemoryContext(GenericSpiller.class.getSimpleName())); diff --git a/core/trino-main/src/main/java/io/trino/spiller/PartitioningSpiller.java b/core/trino-main/src/main/java/io/trino/spiller/PartitioningSpiller.java index ca65fd0a48e5..8520371cf103 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/PartitioningSpiller.java +++ b/core/trino-main/src/main/java/io/trino/spiller/PartitioningSpiller.java @@ -14,6 +14,7 @@ package io.trino.spiller; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.DataSize; import io.trino.spi.Page; import java.io.Closeable; @@ -56,16 +57,16 @@ void close() class PartitioningSpillResult { - private final ListenableFuture spillingFuture; + private final ListenableFuture spillingFuture; private final Page retained; - public PartitioningSpillResult(ListenableFuture spillingFuture, Page retained) + public PartitioningSpillResult(ListenableFuture spillingFuture, Page retained) { this.spillingFuture = requireNonNull(spillingFuture, "spillingFuture is null"); this.retained = requireNonNull(retained, "retained is null"); } - public ListenableFuture getSpillingFuture() + public ListenableFuture getSpillingFuture() { return spillingFuture; } diff --git a/core/trino-main/src/main/java/io/trino/spiller/SingleStreamSpiller.java b/core/trino-main/src/main/java/io/trino/spiller/SingleStreamSpiller.java index 93905d56927f..4878472e0780 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/SingleStreamSpiller.java +++ b/core/trino-main/src/main/java/io/trino/spiller/SingleStreamSpiller.java @@ -14,6 +14,7 @@ package io.trino.spiller; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.DataSize; import io.trino.spi.Page; import java.io.Closeable; @@ -27,16 +28,16 @@ public interface SingleStreamSpiller extends Closeable { /** - * Initiate spilling of pages stream. Returns completed future once spilling has finished. + * Initiate spilling of pages stream. Returns completed future (with spilled pages data size) once spilling has finished. * Next spill can be initiated as soon as previous one completes. */ - ListenableFuture spill(Iterator page); + ListenableFuture spill(Iterator page); /** - * Initiate spilling of single page. Returns completed future once spilling has finished. + * Initiate spilling of single page. Returns completed future (with spilled pages data size) once spilling has finished. * Next spill can be initiated as soon as previous one completes. */ - default ListenableFuture spill(Page page) + default ListenableFuture spill(Page page) { return spill(singletonIterator(page)); } diff --git a/core/trino-main/src/main/java/io/trino/spiller/Spiller.java b/core/trino-main/src/main/java/io/trino/spiller/Spiller.java index be35d7ac9a5c..e6669add5600 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/Spiller.java +++ b/core/trino-main/src/main/java/io/trino/spiller/Spiller.java @@ -14,6 +14,7 @@ package io.trino.spiller; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.DataSize; import io.trino.spi.Page; import java.io.Closeable; @@ -26,7 +27,7 @@ public interface Spiller /** * Initiate spilling of pages stream. Returns completed future once spilling has finished. */ - ListenableFuture spill(Iterator pageIterator); + ListenableFuture spill(Iterator pageIterator); /** * Returns list of previously spilled Pages streams. diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 7b47890003f5..8ab43823a937 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -4158,7 +4158,6 @@ private OperatorFactory createHashAggregationOperatorFactory( unspillMemoryLimit, spillerFactory, hashStrategyCompiler, - typeOperators, createPartialAggregationController(maxPartialAggregationMemorySize, step, session)); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java index 30f342a0dcd8..a1ae0607792b 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java @@ -212,6 +212,8 @@ public QueryInfo getFullQueryInfo() DataSize.ofBytes(25), DataSize.ofBytes(26), + DataSize.ofBytes(27), + !state.isDone(), state.isDone() ? OptionalDouble.empty() : OptionalDouble.of(8.88), state.isDone() ? OptionalDouble.empty() : OptionalDouble.of(0), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java index 9552107593c9..72cf76bcf1ad 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java @@ -187,6 +187,7 @@ public void testQueryInfoToResultQueryInfoConversion() assertThat(queryStats.getTotalMemoryReservation()).isEqualTo(basicQueryStats.getTotalMemoryReservation()); assertThat(queryStats.getPeakUserMemoryReservation()).isEqualTo(basicQueryStats.getPeakUserMemoryReservation()); assertThat(queryStats.getPeakTotalMemoryReservation()).isEqualTo(basicQueryStats.getPeakTotalMemoryReservation()); + assertThat(queryStats.getSpilledDataSize()).isEqualTo(basicQueryStats.getSpilledDataSize()); assertThat(queryStats.getTotalCpuTime()).isEqualTo(basicQueryStats.getTotalCpuTime()); assertThat(queryStats.isFullyBlocked()).isEqualTo(basicQueryStats.isFullyBlocked()); assertThat(queryStats.getTotalCpuTime()).isEqualTo(basicQueryStats.getTotalCpuTime()); @@ -286,6 +287,7 @@ private static StageStats createStageStats(int value) succinctBytes(value), succinctBytes(value), succinctBytes(value), + succinctBytes(value), Duration.succinctDuration(value, SECONDS), Duration.succinctDuration(value, SECONDS), Duration.succinctDuration(value, SECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java index c389d31bc680..7a1b1bc5aa06 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java @@ -217,6 +217,7 @@ public class TestQueryStats DataSize.ofBytes(25), DataSize.ofBytes(26), DataSize.ofBytes(27), + DataSize.ofBytes(28), true, OptionalDouble.of(8.88), @@ -331,7 +332,7 @@ public static void assertExpectedQueryStats(QueryStats actual) assertThat(actual.getPeakTaskUserMemory()).isEqualTo(DataSize.ofBytes(25)); assertThat(actual.getPeakTaskRevocableMemory()).isEqualTo(DataSize.ofBytes(26)); assertThat(actual.getPeakTaskTotalMemory()).isEqualTo(DataSize.ofBytes(27)); - assertThat(actual.getSpilledDataSize()).isEqualTo(DataSize.ofBytes(693)); + assertThat(actual.getSpilledDataSize()).isEqualTo(DataSize.ofBytes(28)); assertThat(actual.getTotalScheduledTime()).isEqualTo(new Duration(28, NANOSECONDS)); assertThat(actual.getFailedScheduledTime()).isEqualTo(new Duration(29, NANOSECONDS)); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java index b0f6114c08e4..70a30a4ce635 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java @@ -264,7 +264,7 @@ public void testGetBasicStageInfo() assertThat(stats.getFailedScheduledTime()).isEqualTo(succinctDuration(1, MILLISECONDS)); assertThat(stats.getRunningPercentage()).isEmpty(); assertThat(stats.getProgressPercentage()).isEmpty(); - assertThat(stats.getSpilledDataSize()).isEqualTo(succinctBytes(0)); + assertThat(stats.getSpilledDataSize()).isEqualTo(succinctBytes(expectedStatsValue)); } private static TaskStats taskStats(List pipelineContexts) @@ -295,6 +295,7 @@ private static TaskStats taskStats(List pipelineContexts, int b DataSize.ofBytes(baseValue), DataSize.ofBytes(baseValue), DataSize.ofBytes(baseValue), + DataSize.ofBytes(baseValue), new Duration(baseValue, MILLISECONDS), new Duration(baseValue, MILLISECONDS), new Duration(baseValue, MILLISECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java index df8cde38f918..ea4912546ce0 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java @@ -57,6 +57,7 @@ public class TestStageStats DataSize.ofBytes(16), DataSize.ofBytes(17), DataSize.ofBytes(18), + DataSize.ofBytes(19), new Duration(19, NANOSECONDS), new Duration(20, NANOSECONDS), @@ -152,6 +153,8 @@ private static void assertExpectedStageStats(StageStats actual) assertThat(actual.getPeakUserMemoryReservation()).isEqualTo(DataSize.ofBytes(17)); assertThat(actual.getPeakRevocableMemoryReservation()).isEqualTo(DataSize.ofBytes(18)); + assertThat(actual.getSpilledDataSize()).isEqualTo(DataSize.ofBytes(19)); + assertThat(actual.getTotalScheduledTime()).isEqualTo(new Duration(19, NANOSECONDS)); assertThat(actual.getFailedScheduledTime()).isEqualTo(new Duration(20, NANOSECONDS)); assertThat(actual.getTotalCpuTime()).isEqualTo(new Duration(21, NANOSECONDS)); diff --git a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java index 4917316a2958..e59f8ad5969e 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java @@ -307,6 +307,7 @@ private static TaskInfo buildTaskInfo(TaskId taskId, TaskState state, Duration s DataSize.ofBytes(0), DataSize.ofBytes(0), DataSize.ofBytes(0), + DataSize.ofBytes(0), scheduledTime, new Duration(0, MILLISECONDS), blockedTime, diff --git a/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java index 60f2552432dd..8f3ad3e288f3 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java @@ -282,6 +282,7 @@ private static TaskInfo buildTaskInfo(TaskId taskId, boolean speculative) DataSize.ofBytes(0), DataSize.ofBytes(0), DataSize.ofBytes(0), + DataSize.ofBytes(0), new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), diff --git a/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java b/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java index 8c4b45134e37..8daf3c09e3b5 100644 --- a/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java +++ b/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java @@ -262,7 +262,6 @@ private OperatorFactory createHashAggregationOperatorFactory( succinctBytes(Integer.MAX_VALUE), spillerFactory, new FlatHashStrategyCompiler(TYPE_OPERATORS), - TYPE_OPERATORS, Optional.empty()); } diff --git a/core/trino-main/src/test/java/io/trino/operator/DummySpillerFactory.java b/core/trino-main/src/test/java/io/trino/operator/DummySpillerFactory.java index 38daed5bcedb..f98227da34da 100644 --- a/core/trino-main/src/test/java/io/trino/operator/DummySpillerFactory.java +++ b/core/trino-main/src/test/java/io/trino/operator/DummySpillerFactory.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.DataSize; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.spi.Page; import io.trino.spi.type.Type; @@ -26,7 +27,7 @@ import java.util.List; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; public class DummySpillerFactory implements SpillerFactory @@ -41,11 +42,11 @@ public Spiller create(List types, SpillContext spillContext, AggregatedMem private final List> spills = new ArrayList<>(); @Override - public ListenableFuture spill(Iterator pageIterator) + public ListenableFuture spill(Iterator pageIterator) { spillsCount++; spills.add(ImmutableList.copyOf(pageIterator)); - return immediateVoidFuture(); + return immediateFuture(DataSize.ofBytes(0)); } @Override diff --git a/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java b/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java index 8e9d4c4a6fda..426cd4d2c424 100644 --- a/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java +++ b/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java @@ -14,6 +14,7 @@ package io.trino.operator; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.units.Duration; @@ -346,7 +347,7 @@ public static void assertPagesEqualIgnoreOrder( actualPages = dropChannel(actualPages, ImmutableList.of(hashChannel.get())); } MaterializedResult actual = toMaterializedResult(driverContext.getSession(), expected.getTypes(), actualPages); - assertThat(actual.getMaterializedRows()).containsExactlyInAnyOrderElementsOf(expected.getMaterializedRows()); + assertThat(ImmutableMultiset.copyOf(actual.getMaterializedRows())).isEqualTo(ImmutableMultiset.copyOf(expected.getMaterializedRows())); } public static void assertOperatorIsBlocked(Operator operator) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java index 58f671a05a1c..a98c50630775 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java @@ -39,6 +39,8 @@ public class TestDriverStats DataSize.ofBytes(6), DataSize.ofBytes(7), + DataSize.ofBytes(8), + new Duration(9, NANOSECONDS), new Duration(10, NANOSECONDS), new Duration(12, NANOSECONDS), @@ -92,6 +94,8 @@ public static void assertExpectedDriverStats(DriverStats actual) assertThat(actual.getUserMemoryReservation()).isEqualTo(DataSize.ofBytes(6)); assertThat(actual.getRevocableMemoryReservation()).isEqualTo(DataSize.ofBytes(7)); + assertThat(actual.getSpilledDataSize()).isEqualTo(DataSize.ofBytes(8)); + assertThat(actual.getTotalScheduledTime()).isEqualTo(new Duration(9, NANOSECONDS)); assertThat(actual.getTotalCpuTime()).isEqualTo(new Duration(10, NANOSECONDS)); assertThat(actual.getTotalBlockedTime()).isEqualTo(new Duration(12, NANOSECONDS)); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java index 1503ca948238..3a328a5e29f7 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.airlift.slice.Slices; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; @@ -29,9 +30,12 @@ import io.trino.operator.aggregation.builder.InMemoryHashAggregationBuilder; import io.trino.operator.aggregation.partial.PartialAggregationController; import io.trino.plugin.base.metrics.LongCount; +import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.spi.Page; import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.PageBuilderStatus; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; import io.trino.spi.type.Type; import io.trino.spi.type.TypeOperators; import io.trino.spiller.Spiller; @@ -75,6 +79,8 @@ import static io.trino.operator.OperatorAssertion.dropChannel; import static io.trino.operator.OperatorAssertion.toMaterializedResult; import static io.trino.operator.OperatorAssertion.toPages; +import static io.trino.operator.SpillMetrics.SPILL_COUNT_METRIC_NAME; +import static io.trino.operator.SpillMetrics.SPILL_DATA_SIZE; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DoubleType.DOUBLE; @@ -108,8 +114,7 @@ public class TestHashAggregationOperator private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed(getClass().getSimpleName() + "-%s")); private final ScheduledExecutorService scheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed(getClass().getSimpleName() + "-scheduledExecutor-%s")); - private final TypeOperators typeOperators = new TypeOperators(); - private final FlatHashStrategyCompiler hashStrategyCompiler = new FlatHashStrategyCompiler(typeOperators); + private final FlatHashStrategyCompiler hashStrategyCompiler = new FlatHashStrategyCompiler(new TypeOperators()); @AfterAll public void tearDown() @@ -172,7 +177,6 @@ private void testHashAggregation(boolean hashEnabled, boolean spillEnabled, bool succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, hashStrategyCompiler, - typeOperators, Optional.empty()); DriverContext driverContext = createDriverContext(memoryLimitForMerge); @@ -243,7 +247,6 @@ private void testHashAggregationWithGlobals(boolean hashEnabled, boolean spillEn succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, hashStrategyCompiler, - typeOperators, Optional.empty()); DriverContext driverContext = createDriverContext(memoryLimitForMerge); @@ -305,7 +308,6 @@ private void testHashAggregationMemoryReservation(boolean hashEnabled, boolean s succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, hashStrategyCompiler, - typeOperators, Optional.empty()); Operator operator = operatorFactory.createOperator(driverContext); @@ -359,7 +361,6 @@ private void testMemoryLimit(boolean hashEnabled) 100_000, Optional.of(DataSize.of(16, MEGABYTE)), hashStrategyCompiler, - typeOperators, Optional.empty()); toPages(operatorFactory, driverContext, input); @@ -415,7 +416,6 @@ private void testHashBuilderResize(boolean hashEnabled, boolean spillEnabled, bo succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, hashStrategyCompiler, - typeOperators, Optional.empty()); toPages(operatorFactory, driverContext, input, revokeMemoryWhenAddingPages); @@ -444,7 +444,6 @@ public void testMemoryReservationYield(Type type) 1, Optional.of(DataSize.of(16, MEGABYTE)), hashStrategyCompiler, - typeOperators, Optional.empty()); // get result with yield; pick a relatively small buffer for aggregator's memory usage @@ -508,7 +507,6 @@ private void testHashBuilderResizeLimit(boolean hashEnabled) 100_000, Optional.of(DataSize.of(16, MEGABYTE)), hashStrategyCompiler, - typeOperators, Optional.empty()); toPages(operatorFactory, driverContext, input); @@ -549,7 +547,6 @@ private void testMultiSliceAggregationOutput(boolean hashEnabled) 100_000, Optional.of(DataSize.of(16, MEGABYTE)), hashStrategyCompiler, - typeOperators, Optional.empty()); assertThat(toPages(operatorFactory, createDriverContext(), input)).hasSize(2); @@ -588,7 +585,6 @@ private void testMultiplePartialFlushes(boolean hashEnabled) 100_000, Optional.of(DataSize.of(1, KILOBYTE)), hashStrategyCompiler, - typeOperators, Optional.empty()); DriverContext driverContext = createDriverContext(1024); @@ -676,7 +672,6 @@ public void testMergeWithMemorySpill() succinctBytes(Integer.MAX_VALUE), spillerFactory, hashStrategyCompiler, - typeOperators, Optional.empty()); DriverContext driverContext = createDriverContext(smallPagesSpillThresholdSize); @@ -689,6 +684,67 @@ public void testMergeWithMemorySpill() assertOperatorEqualsIgnoreOrder(operatorFactory, driverContext, input, resultBuilder.build()); } + @Test + public void testSpillMetricsRecorded() + { + /* + * Force the operator to spill by setting ridiculous per-operator memory + * limits (8 bytes) and feeding it more rows than can possibly stay in + * memory. After the run we assert that the driver-level metric map + * contains the histogram entries produced by SpillMetrics – and that + * their counts/values are strictly positive. + */ + + DummySpillerFactory spillerFactory = new DummySpillerFactory(); + + RowPagesBuilder pages = rowPagesBuilder(BIGINT) + // ~0.8 MB of data – comfortably larger than the 8 B quota + .addSequencePage(50_000, 0); + + HashAggregationOperatorFactory factory = new HashAggregationOperatorFactory( + 0, + new PlanNodeId("spill-metrics"), + ImmutableList.of(BIGINT), + ImmutableList.of(0), + ImmutableList.of(), + SINGLE, + false, + ImmutableList.of(LONG_MIN.createAggregatorFactory(SINGLE, ImmutableList.of(0), OptionalInt.empty())), + pages.getHashChannel(), + Optional.empty(), + 10, + Optional.of(DataSize.of(16, MEGABYTE)), + /* spill enabled */ true, + /* memoryLimitForMerge */ DataSize.ofBytes(8), + /* memoryLimitForMergeWithMemory */ succinctBytes(Integer.MAX_VALUE), + spillerFactory, + hashStrategyCompiler, + Optional.empty()); + + DriverContext context = createDriverContext(8); + + // run the operator; we don’t care about the output pages here + toPages(factory, context, pages.build()); + + Metrics metrics = context.getDriverStats().getOperatorStats().get(0).getMetrics(); + Metric spillCountMetric = metrics.getMetrics().get(SPILL_COUNT_METRIC_NAME); + Metric spillSizeMetric = metrics.getMetrics().get(SPILL_DATA_SIZE); + + assertThat(spillCountMetric).describedAs("metric present").isNotNull(); + assertThat(spillSizeMetric).describedAs("metric present").isNotNull(); + + TDigestHistogram spillCountHistogram = (TDigestHistogram) spillCountMetric; + TDigestHistogram spillSizeHist = (TDigestHistogram) spillSizeMetric; + + assertThat(spillCountHistogram.getDigest().getCount()) + .describedAs("exact number of spills recorded") + .isEqualTo(spillerFactory.getSpillsCount()); + + assertThat(spillSizeHist.getDigest().getCount()) + .describedAs("histogram contains at least one entry") + .isGreaterThan(0); + } + @Test public void testSpillerFailure() { @@ -732,7 +788,6 @@ public void testSpillerFailure() succinctBytes(Integer.MAX_VALUE), new FailingSpillerFactory(), hashStrategyCompiler, - typeOperators, Optional.empty()); assertThatThrownBy(() -> toPages(operatorFactory, driverContext, input)) @@ -762,7 +817,6 @@ public void testMemoryTracking() 100_000, Optional.of(DataSize.of(16, MEGABYTE)), hashStrategyCompiler, - typeOperators, Optional.empty()); DriverContext driverContext = createDriverContext(1024); @@ -800,7 +854,6 @@ public void testAdaptivePartialAggregation() 100, Optional.of(maxPartialMemory), // this setting makes operator to flush after each page hashStrategyCompiler, - typeOperators, // 1 byte maxPartialMemory causes adaptive partial aggregation to be triggered after each page flush Optional.of(partialAggregationController)); @@ -882,7 +935,6 @@ public void testAdaptivePartialAggregationTriggeredOnlyOnFlush() 10, Optional.of(DataSize.of(16, MEGABYTE)), // this setting makes operator to flush only after all pages hashStrategyCompiler, - typeOperators, // 1 byte maxPartialMemory causes adaptive partial aggregation to be triggered after each page flush Optional.of(partialAggregationController)); @@ -916,6 +968,197 @@ public void testAdaptivePartialAggregationTriggeredOnlyOnFlush() assertInputRowsWithPartialAggregationDisabled(driverContext, 20); } + @Test + public void testAsyncSpillBlocksAndUnblocksDriver() + throws Exception + { + /* + * – force: revocable bytes > 0 + * spiller.present == true + * shouldMergeWithMemory(size) == false + * + * so buildResult() will hit `blocked(spillToDisk())` + */ + SlowSpiller spiller = new SlowSpiller(); + SlowSpillerFactory spillerFactory = new SlowSpillerFactory(spiller); + + // tiny memory limits → convert-to-user will fail + long memoryLimitForMerge = 8; + long memoryLimitForMergeWithMemory = 0; + + // plenty of rows → revocable mem > + RowPagesBuilder pages = rowPagesBuilder(false, Ints.asList(0), BIGINT) + .addSequencePage(5_000, 0); + + HashAggregationOperatorFactory factory = + new HashAggregationOperatorFactory( + 0, + new PlanNodeId("async"), + ImmutableList.of(BIGINT), + Ints.asList(0), + ImmutableList.of(), + SINGLE, + false, + ImmutableList.of(COUNT.createAggregatorFactory(SINGLE, ImmutableList.of(0), OptionalInt.empty())), + Optional.empty(), + Optional.empty(), + /* expectedGroups */ 1, + Optional.of(DataSize.of(16, MEGABYTE)), + /* spill enabled */ true, + succinctBytes(memoryLimitForMerge), + succinctBytes(memoryLimitForMergeWithMemory), + spillerFactory, + hashStrategyCompiler, + Optional.empty()); + + DriverContext context = createDriverContext(memoryLimitForMerge); + + try (Operator operator = factory.createOperator(context)) { + // feed all input + for (Page page : pages.build()) { + assertThat(operator.needsInput()).isTrue(); + operator.addInput(page); + } + operator.finish(); + + // first call returns null, operator is now blocked + assertThat(operator.getOutput()).isNull(); + ListenableFuture blocked = operator.isBlocked(); + assertThat(blocked.isDone()).isFalse(); + + // unblock the spiller + spiller.complete(); + + // driver sees the unblock + blocked.get(); + // drive operator to completion + toPages(operator, emptyIterator()); + assertThat(operator.isFinished()) + .as("operator must finish after async spill") + .isTrue(); + } + } + + @Test + public void testRevocableMemoryConvertedAfterAsyncSpill() + throws Exception + { + long memoryLimitForMerge = DataSize.of(64, KILOBYTE).toBytes(); // force spill early + long memoryLimitForMergeWithMemory = 0; // make shouldMergeWithMemory() return false + + // plenty of rows to allocate >64 kB in the hash builder + RowPagesBuilder pagesBuilder = rowPagesBuilder(false, Ints.asList(0), BIGINT) + .addSequencePage(50_000, 0); + + SlowSpiller slowSpiller = new SlowSpiller(); + SlowSpillerFactory slowSpillerFactory = new SlowSpillerFactory(slowSpiller); + + HashAggregationOperatorFactory factory = new HashAggregationOperatorFactory( + 0, + new PlanNodeId("async-spill"), + ImmutableList.of(BIGINT), + Ints.asList(0), + ImmutableList.of(), + SINGLE, + false, + ImmutableList.of( + COUNT.createAggregatorFactory(SINGLE, + ImmutableList.of(0), + OptionalInt.empty())), + Optional.empty(), + Optional.empty(), + 10, + Optional.of(DataSize.of(16, MEGABYTE)), + /* spill enabled */ true, + DataSize.ofBytes(memoryLimitForMerge), + DataSize.ofBytes(memoryLimitForMergeWithMemory), + slowSpillerFactory, + hashStrategyCompiler, + Optional.empty()); + + DriverContext context = createDriverContext(memoryLimitForMerge); + + try (Operator operator = factory.createOperator(context)) { + for (Page page : pagesBuilder.build()) { + operator.addInput(page); + } + operator.finish(); + + // first call returns null, operator is now blocked + assertThat(operator.getOutput()).isNull(); + ListenableFuture blocked = operator.isBlocked(); + assertThat(blocked.isDone()).isFalse(); + + assertThat(context.getRevocableMemoryUsage()) + .as("revocable bytes should be > 0 while spill is running") + .isGreaterThan(0L); + + // complete the spill asynchronously + slowSpiller.complete(); // finish spill + blocked.get(); // wait until operator is unblocked + + // advance state + operator.getOutput(); + // revocable memory must have been cleared by updateMemory() + long revocableAfterSpill = context.getRevocableMemoryUsage(); + assertThat(revocableAfterSpill) + .as("revocable bytes must be 0 right after spill completion") + .isZero(); + + // drive operator to completion + toPages(operator, emptyIterator()); + + assertThat(operator.isFinished()).isTrue(); + + // all reservations are released at the very end + assertThat(context.getRevocableMemoryUsage()).isZero(); + assertThat(context.getMemoryUsage()).isZero(); + } + } + + private static class SlowSpillerFactory + implements SpillerFactory + { + private final SlowSpiller spiller; + + SlowSpillerFactory(SlowSpiller spiller) + { + this.spiller = spiller; + } + + @Override + public Spiller create(List t, SpillContext sc, AggregatedMemoryContext mc) + { + return spiller; + } + } + + private static class SlowSpiller + implements Spiller + { + private final SettableFuture future = SettableFuture.create(); + + @Override + public ListenableFuture spill(Iterator i) + { + return future; + } + + @Override + public List> getSpills() + { + return ImmutableList.of(); + } + + @Override + public void close() {} + + void complete() + { + future.set(DataSize.ofBytes(0)); + } + } + private void assertInputRowsWithPartialAggregationDisabled(DriverContext context, long expectedRowCount) { LongCount metric = ((LongCount) context.getDriverStats().getOperatorStats().get(0).getMetrics().getMetrics().get(INPUT_ROWS_WITH_PARTIAL_AGGREGATION_DISABLED_METRIC_NAME)); @@ -974,7 +1217,7 @@ public Spiller create(List types, SpillContext spillContext, AggregatedMem return new Spiller() { @Override - public ListenableFuture spill(Iterator pageIterator) + public ListenableFuture spill(Iterator pageIterator) { return immediateFailedFuture(new IOException("Failed to spill")); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestMergeHashSort.java b/core/trino-main/src/test/java/io/trino/operator/TestMergeHashSort.java index c73fca995557..ae34c5bc45b3 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestMergeHashSort.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestMergeHashSort.java @@ -21,26 +21,27 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static io.trino.operator.InterpretedHashGenerator.createPagePrefixHashGenerator; import static io.trino.operator.WorkProcessorAssertion.assertFinishes; import static io.trino.spi.type.BigintType.BIGINT; import static org.assertj.core.api.Assertions.assertThat; public class TestMergeHashSort { - private final TypeOperators typeOperators = new TypeOperators(); + private final InterpretedHashGenerator hashGenerator = createPagePrefixHashGenerator(ImmutableList.of(BIGINT), new TypeOperators()); @Test public void testBinaryMergeIteratorOverEmptyPage() { Page emptyPage = new Page(0, BIGINT.createFixedSizeBlockBuilder(0).build()); - WorkProcessor mergedPage = new MergeHashSort(newSimpleAggregatedMemoryContext(), typeOperators).merge( - ImmutableList.of(BIGINT), + WorkProcessor mergedPage = new MergeHashSort(newSimpleAggregatedMemoryContext()).merge( ImmutableList.of(BIGINT), ImmutableList.of(ImmutableList.of(emptyPage).iterator()).stream() .map(WorkProcessor::fromIterator) .collect(toImmutableList()), - new DriverYieldSignal()); + new DriverYieldSignal(), + hashGenerator); assertFinishes(mergedPage); } @@ -51,13 +52,13 @@ public void testBinaryMergeIteratorOverEmptyPageAndNonEmptyPage() Page emptyPage = new Page(0, BIGINT.createFixedSizeBlockBuilder(0).build()); Page page = rowPagesBuilder(BIGINT).row(42).build().get(0); - WorkProcessor mergedPage = new MergeHashSort(newSimpleAggregatedMemoryContext(), typeOperators).merge( - ImmutableList.of(BIGINT), + WorkProcessor mergedPage = new MergeHashSort(newSimpleAggregatedMemoryContext()).merge( ImmutableList.of(BIGINT), ImmutableList.of(ImmutableList.of(emptyPage, page).iterator()).stream() .map(WorkProcessor::fromIterator) .collect(toImmutableList()), - new DriverYieldSignal()); + new DriverYieldSignal(), + hashGenerator); assertThat(mergedPage.process()).isTrue(); Page actualPage = mergedPage.getResult(); @@ -74,13 +75,13 @@ public void testBinaryMergeIteratorOverPageWith() Page emptyPage = new Page(0, BIGINT.createFixedSizeBlockBuilder(0).build()); Page page = rowPagesBuilder(BIGINT).row(42).build().get(0); - WorkProcessor mergedPage = new MergeHashSort(newSimpleAggregatedMemoryContext(), typeOperators).merge( - ImmutableList.of(BIGINT), + WorkProcessor mergedPage = new MergeHashSort(newSimpleAggregatedMemoryContext()).merge( ImmutableList.of(BIGINT), ImmutableList.of(ImmutableList.of(emptyPage, page).iterator()).stream() .map(WorkProcessor::fromIterator) .collect(toImmutableList()), - new DriverYieldSignal()); + new DriverYieldSignal(), + hashGenerator); assertThat(mergedPage.process()).isTrue(); Page actualPage = mergedPage.getResult(); @@ -101,13 +102,13 @@ public void testBinaryMergeIteratorOverPageWithDifferentHashes() .row(60) .build().get(0); - WorkProcessor mergedPages = new MergeHashSort(newSimpleAggregatedMemoryContext(), typeOperators).merge( - ImmutableList.of(BIGINT), + WorkProcessor mergedPages = new MergeHashSort(newSimpleAggregatedMemoryContext()).merge( ImmutableList.of(BIGINT), ImmutableList.of(ImmutableList.of(page).iterator()).stream() .map(WorkProcessor::fromIterator) .collect(toImmutableList()), - new DriverYieldSignal()); + new DriverYieldSignal(), + hashGenerator); assertThat(mergedPages.process()).isTrue(); Page resultPage = mergedPages.getResult(); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java b/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java index 7c622f61a95c..6ff880cf0638 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java @@ -54,6 +54,8 @@ public class TestPipelineStats DataSize.ofBytes(5), DataSize.ofBytes(6), + DataSize.ofBytes(7), + getTestDistribution(8), getTestDistribution(9), @@ -120,6 +122,8 @@ public static void assertExpectedPipelineStats(PipelineStats actual) assertThat(actual.getUserMemoryReservation()).isEqualTo(DataSize.ofBytes(5)); assertThat(actual.getRevocableMemoryReservation()).isEqualTo(DataSize.ofBytes(6)); + assertThat(actual.getSpilledDataSize()).isEqualTo(DataSize.ofBytes(7)); + assertThat(actual.getQueuedTime().getCount()).isEqualTo(8.0); assertThat(actual.getElapsedTime().getCount()).isEqualTo(9.0); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java b/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java index bffb372a199b..c6ec3566e9a6 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java @@ -54,6 +54,7 @@ public class TestTaskStats DataSize.ofBytes(12), DataSize.ofBytes(120), DataSize.ofBytes(13), + DataSize.ofBytes(14), new Duration(15, NANOSECONDS), new Duration(16, NANOSECONDS), new Duration(18, NANOSECONDS), @@ -126,6 +127,8 @@ public static void assertExpectedTaskStats(TaskStats actual) assertThat(actual.getPeakUserMemoryReservation()).isEqualTo(DataSize.ofBytes(120)); assertThat(actual.getRevocableMemoryReservation()).isEqualTo(DataSize.ofBytes(13)); + assertThat(actual.getSpilledDataSize()).isEqualTo(DataSize.ofBytes(14)); + assertThat(actual.getTotalScheduledTime()).isEqualTo(new Duration(15, NANOSECONDS)); assertThat(actual.getTotalCpuTime()).isEqualTo(new Duration(16, NANOSECONDS)); assertThat(actual.getTotalBlockedTime()).isEqualTo(new Duration(18, NANOSECONDS)); diff --git a/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java b/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java index e585b93f1181..72c5851b0605 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java @@ -58,7 +58,6 @@ import static com.google.common.collect.Iterators.unmodifiableIterator; import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; -import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; import static io.trino.operator.JoinOperatorType.innerJoin; @@ -336,14 +335,14 @@ public SingleStreamSpiller create(List types, SpillContext spillContext, L private final List spills = new ArrayList<>(); @Override - public ListenableFuture spill(Iterator pageIterator) + public ListenableFuture spill(Iterator pageIterator) { checkState(writing, "writing already finished"); if (failSpill) { return immediateFailedFuture(new TrinoException(GENERIC_INTERNAL_ERROR, "Spill failed")); } Iterators.addAll(spills, pageIterator); - return immediateVoidFuture(); + return immediateFuture(DataSize.ofBytes(0L)); } @Override diff --git a/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java b/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java index 7ed900f42200..51cc8ffd225e 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java @@ -44,8 +44,10 @@ import io.trino.operator.join.JoinTestUtils.BuildSideSetup; import io.trino.operator.join.JoinTestUtils.DummySpillerFactory; import io.trino.operator.join.JoinTestUtils.TestInternalJoinFilterFunction; +import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.spi.Page; import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.metrics.Metrics; import io.trino.spi.type.Type; import io.trino.spi.type.TypeOperators; import io.trino.spiller.GenericPartitioningSpillerFactory; @@ -86,6 +88,7 @@ import static io.trino.operator.OperatorAssertion.dropChannel; import static io.trino.operator.OperatorAssertion.without; import static io.trino.operator.OperatorFactories.spillingJoin; +import static io.trino.operator.SpillMetrics.SPILL_COUNT_METRIC_NAME; import static io.trino.operator.join.JoinTestUtils.buildLookupSource; import static io.trino.operator.join.JoinTestUtils.getHashChannelAsInt; import static io.trino.operator.join.JoinTestUtils.innerJoinOperatorFactory; @@ -484,6 +487,16 @@ private void innerJoinWithSpill(boolean probeHashEnabled, List whenSp .build(); assertThat(getProperColumns(joinOperator, concat(probePages.getTypes(), buildPages.getTypes()), probePages, actualPages).getMaterializedRows()).containsExactlyInAnyOrderElementsOf(expected.getMaterializedRows()); + + Metrics probeMetrics = joinOperator.getOperatorContext() + .getOperatorStats() + .getMetrics(); + + // The lookup-join (probe side) keeps its own SpillMetrics. + if (!whenSpill.stream().allMatch(when -> when == WhenSpill.NEVER)) { + TDigestHistogram probeSpillCount = (TDigestHistogram) probeMetrics.getMetrics().get("Probe: " + SPILL_COUNT_METRIC_NAME); + assertThat(probeSpillCount.getDigest().getMax()).isNotNegative(); + } } finally { joinOperatorFactory.noMoreOperators(); @@ -580,6 +593,16 @@ public void testBuildGracefulSpill() lookupSourceFactory.destroy(); assertThat(hashBuilderOperator.isFinished()).isTrue(); + + // Take the latest metrics the HashBuilderOperator reported + Metrics metrics = hashBuilderOperator.getOperatorContext().getOperatorStats().getMetrics(); + + TDigestHistogram spillCount = (TDigestHistogram) metrics.getMetrics().get("Index: " + SPILL_COUNT_METRIC_NAME); + + // The test triggers exactly one graceful spill + assertThat(spillCount.getDigest().getMax()) + .describedAs("exact number of spills recorded") + .isEqualTo(1); } @Test diff --git a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java index 063200e96818..adbb3c89a364 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java @@ -90,6 +90,7 @@ public void testConstructor() DataSize.valueOf("29GB"), DataSize.valueOf("30GB"), DataSize.valueOf("31GB"), + DataSize.valueOf("32GB"), true, OptionalDouble.of(100), OptionalDouble.of(0), @@ -189,6 +190,7 @@ public void testConstructor() assertThat(basicInfo.getQueryStats().getUserMemoryReservation()).isEqualTo(DataSize.valueOf("23GB")); assertThat(basicInfo.getQueryStats().getTotalMemoryReservation()).isEqualTo(DataSize.valueOf("25GB")); assertThat(basicInfo.getQueryStats().getPeakUserMemoryReservation()).isEqualTo(DataSize.valueOf("26GB")); + assertThat(basicInfo.getQueryStats().getSpilledDataSize()).isEqualTo(DataSize.valueOf("32GB")); assertThat(basicInfo.getQueryStats().getTotalScheduledTime()).isEqualTo(new Duration(32, MINUTES)); assertThat(basicInfo.getQueryStats().getFailedScheduledTime()).isEqualTo(new Duration(33, MINUTES)); assertThat(basicInfo.getQueryStats().getTotalCpuTime()).isEqualTo(new Duration(34, MINUTES)); diff --git a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java index e218e762314d..8e03c6922fe3 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java @@ -142,6 +142,7 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query DataSize.valueOf("27GB"), DataSize.valueOf("28GB"), DataSize.valueOf("29GB"), + DataSize.valueOf("30GB"), true, OptionalDouble.of(8.88), OptionalDouble.of(0), diff --git a/docs/src/main/sphinx/admin/properties-spilling.md b/docs/src/main/sphinx/admin/properties-spilling.md index 0f8d4508cd86..d792c7afb7ff 100644 --- a/docs/src/main/sphinx/admin/properties-spilling.md +++ b/docs/src/main/sphinx/admin/properties-spilling.md @@ -39,7 +39,8 @@ this spill path is not eligible for spilling. ## `spiller-threads` - **Type:** {ref}`prop-type-integer` -- **Default value:** `4` +- **Default value:** The number of spill directories multiplied by 2, with a minimum + value of 4. Number of spiller threads. Increase this value if the default is not able to saturate the underlying spilling device (for example, when using RAID).