Skip to content

Commit b5819f4

Browse files
committed
Add spilling metrics for aggregation and join operator
Spilling metrics for: - number of spills - spill wall time - spilled data - number of unspills - unspill wall time - unspilled data
1 parent 7acf2b7 commit b5819f4

19 files changed

+366
-68
lines changed

core/trino-main/src/main/java/io/trino/operator/AggregationMetrics.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.airlift.units.Duration;
1919
import io.trino.plugin.base.metrics.DurationTiming;
2020
import io.trino.plugin.base.metrics.LongCount;
21+
import io.trino.spi.metrics.Metric;
2122
import io.trino.spi.metrics.Metrics;
2223

2324
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -29,6 +30,8 @@ public class AggregationMetrics
2930
private static final String ACCUMULATOR_TIME_METRIC_NAME = "Accumulator update CPU time";
3031
private static final String GROUP_BY_HASH_TIME_METRIC_NAME = "Group by hash update CPU time";
3132

33+
private final SpillMetrics spillMetrics = new SpillMetrics();
34+
3235
private long accumulatorTimeNanos;
3336
private long groupByHashTimeNanos;
3437
private long inputRowsProcessedWithPartialAggregationDisabled;
@@ -48,11 +51,23 @@ public void recordInputRowsProcessedWithPartialAggregationDisabled(long rows)
4851
inputRowsProcessedWithPartialAggregationDisabled += rows;
4952
}
5053

54+
public void recordSpillSince(long startNanos, long spillBytes)
55+
{
56+
spillMetrics.recordSpillSince(startNanos, spillBytes);
57+
}
58+
59+
public void recordUnspillSince(long startNanos, long unspillBytes)
60+
{
61+
spillMetrics.recordUnspillSince(startNanos, unspillBytes);
62+
}
63+
5164
public Metrics getMetrics()
5265
{
53-
return new Metrics(ImmutableMap.of(
54-
INPUT_ROWS_WITH_PARTIAL_AGGREGATION_DISABLED_METRIC_NAME, new LongCount(inputRowsProcessedWithPartialAggregationDisabled),
55-
ACCUMULATOR_TIME_METRIC_NAME, new DurationTiming(new Duration(accumulatorTimeNanos, NANOSECONDS)),
56-
GROUP_BY_HASH_TIME_METRIC_NAME, new DurationTiming(new Duration(groupByHashTimeNanos, NANOSECONDS))));
66+
return new Metrics(ImmutableMap.<String, Metric<?>>builder()
67+
.put(INPUT_ROWS_WITH_PARTIAL_AGGREGATION_DISABLED_METRIC_NAME, new LongCount(inputRowsProcessedWithPartialAggregationDisabled))
68+
.put(ACCUMULATOR_TIME_METRIC_NAME, new DurationTiming(new Duration(accumulatorTimeNanos, NANOSECONDS)))
69+
.put(GROUP_BY_HASH_TIME_METRIC_NAME, new DurationTiming(new Duration(groupByHashTimeNanos, NANOSECONDS)))
70+
.putAll(spillMetrics.getMetrics().getMetrics())
71+
.buildOrThrow());
5772
}
5873
}

core/trino-main/src/main/java/io/trino/operator/OrderByOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static com.google.common.collect.ImmutableList.toImmutableList;
3737
import static com.google.common.collect.Iterators.transform;
3838
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
39+
import static io.airlift.concurrent.MoreFutures.asVoid;
3940
import static io.airlift.concurrent.MoreFutures.checkSuccess;
4041
import static io.airlift.concurrent.MoreFutures.getFutureValue;
4142
import static io.trino.util.MergeSortedPages.mergeSortedPages;
@@ -315,7 +316,7 @@ private ListenableFuture<Void> spillToDisk()
315316
}
316317

317318
pageIndex.sort(sortChannels, sortOrder);
318-
spillInProgress = spiller.get().spill(pageIndex.getSortedPages());
319+
spillInProgress = asVoid(spiller.get().spill(pageIndex.getSortedPages()));
319320
finishMemoryRevoke = Optional.of(() -> {
320321
pageIndex.clear();
321322
updateMemoryUsage();
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.operator;
15+
16+
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.common.collect.ImmutableMap;
18+
import io.airlift.units.Duration;
19+
import io.trino.plugin.base.metrics.TDigestHistogram;
20+
import io.trino.spi.metrics.Metric;
21+
import io.trino.spi.metrics.Metrics;
22+
23+
import java.util.concurrent.atomic.AtomicLong;
24+
25+
import static io.airlift.units.DataSize.Unit.MEGABYTE;
26+
import static java.util.Objects.requireNonNull;
27+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
28+
import static java.util.concurrent.TimeUnit.SECONDS;
29+
30+
public class SpillMetrics
31+
{
32+
private static final String SPILL_TIME_METRIC_NAME = "Spill wall time (s)";
33+
@VisibleForTesting
34+
public static final String SPILL_COUNT_METRIC_NAME = "Spill count";
35+
@VisibleForTesting
36+
public static final String SPILL_DATA_SIZE = "Spill data size (MB)";
37+
private static final String UNSPILL_TIME_METRIC_NAME = "Unspill wall time (s)";
38+
private static final String UNSPILL_COUNT_METRIC_NAME = "Unspill count";
39+
private static final String UNSPILL_DATA_SIZE = "Unspill data size (MB)";
40+
41+
private final String prefix;
42+
43+
private final AtomicLong spillTimeNanos = new AtomicLong();
44+
private final AtomicLong spillCount = new AtomicLong();
45+
private final AtomicLong spillBytes = new AtomicLong();
46+
private final AtomicLong unspillTimeNanos = new AtomicLong();
47+
private final AtomicLong unspillCount = new AtomicLong();
48+
private final AtomicLong unspillBytes = new AtomicLong();
49+
50+
public SpillMetrics()
51+
{
52+
this.prefix = "";
53+
}
54+
55+
public SpillMetrics(String prefix)
56+
{
57+
this.prefix = requireNonNull(prefix, "prefix is null") + ": ";
58+
}
59+
60+
public void recordSpillSince(long startNanos, long spillBytes)
61+
{
62+
spillTimeNanos.addAndGet(System.nanoTime() - startNanos);
63+
spillCount.incrementAndGet();
64+
this.spillBytes.addAndGet(spillBytes);
65+
}
66+
67+
public void recordUnspillSince(long startNanos, long unspillBytes)
68+
{
69+
unspillTimeNanos.addAndGet(System.nanoTime() - startNanos);
70+
unspillCount.incrementAndGet();
71+
this.unspillBytes.addAndGet(unspillBytes);
72+
}
73+
74+
public Metrics getMetrics()
75+
{
76+
ImmutableMap.Builder<String, Metric<?>> metricsBuilder = ImmutableMap.builder();
77+
if (spillTimeNanos.get() > 0 || spillCount.get() > 0 || spillBytes.get() > 0) {
78+
metricsBuilder.put(prefix + SPILL_TIME_METRIC_NAME, TDigestHistogram.fromValue(new Duration(spillTimeNanos.longValue(), NANOSECONDS).getValue(SECONDS)));
79+
metricsBuilder.put(prefix + SPILL_COUNT_METRIC_NAME, TDigestHistogram.fromValue(spillCount.doubleValue()));
80+
metricsBuilder.put(prefix + SPILL_DATA_SIZE, TDigestHistogram.fromValue(spillBytes.longValue() * (1.0d / MEGABYTE.inBytes())));
81+
}
82+
if (unspillTimeNanos.get() > 0 || unspillCount.get() > 0 || unspillBytes.get() > 0) {
83+
metricsBuilder.put(prefix + UNSPILL_TIME_METRIC_NAME, TDigestHistogram.fromValue(new Duration(unspillTimeNanos.longValue(), NANOSECONDS).getValue(SECONDS)));
84+
metricsBuilder.put(prefix + UNSPILL_COUNT_METRIC_NAME, TDigestHistogram.fromValue(unspillCount.doubleValue()));
85+
metricsBuilder.put(prefix + UNSPILL_DATA_SIZE, TDigestHistogram.fromValue(unspillBytes.longValue() * (1.0d / MEGABYTE.inBytes())));
86+
}
87+
return new Metrics(metricsBuilder.buildOrThrow());
88+
}
89+
}

core/trino-main/src/main/java/io/trino/operator/WindowOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import static com.google.common.collect.Iterables.concat;
5656
import static com.google.common.collect.Iterators.peekingIterator;
5757
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
58+
import static io.airlift.concurrent.MoreFutures.asVoid;
5859
import static io.airlift.concurrent.MoreFutures.checkSuccess;
5960
import static io.trino.operator.PositionSearcher.findEndPosition;
6061
import static io.trino.operator.WorkProcessor.TransformationState.needsMoreData;
@@ -801,7 +802,7 @@ ListenableFuture<Void> spill()
801802
Page anyPage = sortedPages.peek();
802803
verify(anyPage.getPositionCount() != 0, "PagesIndex.getSortedPages returned an empty page");
803804
currentSpillGroupRowPage = Optional.of(anyPage.getSingleValuePage(/* any */0));
804-
spillInProgress = Optional.of(spiller.get().spill(sortedPages));
805+
spillInProgress = Optional.of(asVoid(spiller.get().spill(sortedPages)));
805806

806807
return spillInProgress.get();
807808
}

core/trino-main/src/main/java/io/trino/operator/aggregation/builder/SpillableHashAggregationBuilder.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,17 @@
3434
import java.io.IOException;
3535
import java.util.List;
3636
import java.util.Optional;
37+
import java.util.concurrent.atomic.AtomicLong;
3738

3839
import static com.google.common.base.Preconditions.checkState;
3940
import static com.google.common.collect.ImmutableList.toImmutableList;
41+
import static com.google.common.util.concurrent.Futures.immediateFuture;
4042
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
43+
import static io.airlift.concurrent.MoreFutures.addSuccessCallback;
44+
import static io.airlift.concurrent.MoreFutures.asVoid;
4145
import static io.airlift.concurrent.MoreFutures.getFutureValue;
46+
import static io.trino.operator.WorkProcessor.ProcessState.Type.FINISHED;
47+
import static io.trino.operator.WorkProcessor.ProcessState.Type.RESULT;
4248
import static io.trino.operator.WorkProcessor.ProcessState.blocked;
4349
import static io.trino.operator.WorkProcessor.ProcessState.finished;
4450
import static io.trino.operator.WorkProcessor.ProcessState.ofResult;
@@ -65,7 +71,7 @@ public class SpillableHashAggregationBuilder
6571
private Optional<Spiller> spiller = Optional.empty();
6672
private Optional<MergingHashAggregationBuilder> merger = Optional.empty();
6773
private Optional<MergeHashSort> mergeHashSort = Optional.empty();
68-
private ListenableFuture<Void> spillInProgress = immediateVoidFuture();
74+
private ListenableFuture<DataSize> spillInProgress = immediateFuture(DataSize.ofBytes(0L));
6975
private final FlatHashStrategyCompiler hashStrategyCompiler;
7076
private final AggregationMetrics aggregationMetrics;
7177

@@ -241,7 +247,7 @@ private ListenableFuture<Void> spillToDisk()
241247
if (!spillInProgress.isDone()) {
242248
// Spill can be triggered first in SpillableHashAggregationBuilder.buildResult and then by Driver (via HashAggregationOperator#startMemoryRevoke).
243249
// While spill is in progress revocable memory is not released, hence redundant call to spillToDisk might be made.
244-
return spillInProgress;
250+
return asVoid(spillInProgress);
245251
}
246252

247253
if (localRevocableMemoryContext.getBytes() == 0 || hasNoGroups()) {
@@ -261,12 +267,14 @@ private ListenableFuture<Void> spillToDisk()
261267
}
262268

263269
// start spilling process with current content of the hashAggregationBuilder builder...
270+
long spillStartNanos = System.nanoTime();
264271
spillInProgress = spiller.get().spill(hashAggregationBuilder.buildSpillResult().iterator());
272+
addSuccessCallback(spillInProgress, dataSize -> aggregationMetrics.recordSpillSince(spillStartNanos, dataSize.toBytes()));
265273
// ... and immediately create new hashAggregationBuilder so effectively memory ownership
266274
// over hashAggregationBuilder is transferred from this thread to a spilling thread
267275
rebuildHashAggregationBuilder();
268276

269-
return spillInProgress;
277+
return asVoid(spillInProgress);
270278
}
271279

272280
private boolean hasNoGroups()
@@ -282,16 +290,28 @@ private WorkProcessor<Page> mergeFromDiskAndMemory()
282290
mergeHashSort = Optional.of(new MergeHashSort(operatorContext.newAggregateUserMemoryContext()));
283291

284292
List<Type> spillTypes = hashAggregationBuilder.buildSpillTypes();
293+
long unspillStartNanos = System.nanoTime();
294+
AtomicLong unspillBytes = new AtomicLong(0);
285295
WorkProcessor<Page> mergedSpilledPages = mergeHashSort.get().merge(
286296
spillTypes,
287297
ImmutableList.<WorkProcessor<Page>>builder()
288298
.addAll(spiller.get().getSpills().stream()
289299
.map(WorkProcessor::fromIterator)
300+
.map(processor -> processor.withProcessStateMonitor(state -> {
301+
if (state.getType() == RESULT) {
302+
unspillBytes.addAndGet(state.getResult().getSizeInBytes());
303+
}
304+
}))
290305
.collect(toImmutableList()))
291306
.add(hashAggregationBuilder.buildSpillResult())
292307
.build(),
293308
operatorContext.getDriverContext().getYieldSignal(),
294309
spillTypes.size() - 1);
310+
mergedSpilledPages = mergedSpilledPages.withProcessStateMonitor(state -> {
311+
if (state.getType() == FINISHED) {
312+
aggregationMetrics.recordUnspillSince(unspillStartNanos, unspillBytes.get());
313+
}
314+
});
295315

296316
spiller = Optional.empty();
297317
return mergeSortedPages(mergedSpilledPages, max(memoryLimitForMerge - memoryLimitForMergeWithMemory, 1L));

core/trino-main/src/main/java/io/trino/operator/join/DefaultPageJoiner.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515

1616
import com.google.common.util.concurrent.Futures;
1717
import com.google.common.util.concurrent.ListenableFuture;
18+
import io.airlift.units.DataSize;
1819
import io.trino.memory.context.MemoryTrackingContext;
1920
import io.trino.operator.DriverYieldSignal;
2021
import io.trino.operator.HashGenerator;
2122
import io.trino.operator.ProcessorContext;
2223
import io.trino.operator.SpillContext;
24+
import io.trino.operator.SpillMetrics;
2325
import io.trino.operator.WorkProcessor;
2426
import io.trino.operator.exchange.LocalPartitionGenerator;
2527
import io.trino.operator.join.JoinProbe.JoinProbeFactory;
@@ -43,11 +45,11 @@
4345
import static com.google.common.base.Suppliers.memoize;
4446
import static com.google.common.base.Verify.verify;
4547
import static com.google.common.base.Verify.verifyNotNull;
48+
import static com.google.common.util.concurrent.Futures.immediateFuture;
4649
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
4750
import static io.airlift.concurrent.MoreFutures.addSuccessCallback;
4851
import static io.airlift.concurrent.MoreFutures.checkSuccess;
4952
import static io.airlift.concurrent.MoreFutures.getDone;
50-
import static io.trino.operator.Operator.NOT_BLOCKED;
5153
import static io.trino.operator.WorkProcessor.TransformationState.blocked;
5254
import static io.trino.operator.WorkProcessor.TransformationState.finished;
5355
import static io.trino.operator.WorkProcessor.TransformationState.needsMoreData;
@@ -76,6 +78,7 @@ public class DefaultPageJoiner
7678
private final Map<Integer, SavedRow> spilledRows = new HashMap<>();
7779
private final boolean probeOnOuterSide;
7880
private final boolean outputSingleMatch;
81+
private final SpillMetrics spillMetrics;
7982

8083
@Nullable
8184
private LookupSourceProvider lookupSourceProvider;
@@ -87,14 +90,15 @@ public class DefaultPageJoiner
8790
private boolean currentProbePositionProducedRow;
8891

8992
private Optional<PartitioningSpiller> spiller = Optional.empty();
90-
private ListenableFuture<Void> spillInProgress = NOT_BLOCKED;
93+
private ListenableFuture<DataSize> spillInProgress = immediateFuture(DataSize.ofBytes(0));
9194

9295
public DefaultPageJoiner(
9396
ProcessorContext processorContext,
9497
List<Type> probeTypes,
9598
List<Type> buildOutputTypes,
9699
JoinType joinType,
97100
boolean outputSingleMatch,
101+
SpillMetrics spillMetrics,
98102
HashGenerator hashGenerator,
99103
JoinProbeFactory joinProbeFactory,
100104
LookupSourceFactory lookupSourceFactory,
@@ -116,6 +120,7 @@ public DefaultPageJoiner(
116120
this.partitionGenerator = memoize(() -> new LocalPartitionGenerator(hashGenerator, lookupSourceFactory.partitions()));
117121
this.pageBuilder = new LookupJoinPageBuilder(buildOutputTypes);
118122
this.outputSingleMatch = outputSingleMatch;
123+
this.spillMetrics = requireNonNull(spillMetrics, "spillMetrics is null");
119124

120125
// Cannot use switch case here, because javac will synthesize an inner class and cause IllegalAccessError
121126
probeOnOuterSide = joinType == PROBE_OUTER || joinType == FULL_OUTER;
@@ -158,7 +163,7 @@ else if (savedRows.hasNext()) {
158163
}
159164
else if (!spillInProgress.isDone()) {
160165
// block on remaining spill before finishing
161-
return blocked(spillInProgress);
166+
return blocked(asVoid(spillInProgress));
162167
}
163168
else {
164169
checkSuccess(spillInProgress, "spilling failed");
@@ -185,7 +190,7 @@ else if (!spillInProgress.isDone()) {
185190
if (spillInfoSnapshotIfSpillChanged.isPresent()) {
186191
if (!spillInProgress.isDone()) {
187192
// block on previous spill
188-
return blocked(spillInProgress);
193+
return blocked(asVoid(spillInProgress));
189194
}
190195
checkSuccess(spillInProgress, "spilling failed");
191196

@@ -374,7 +379,9 @@ private Page spillAndMaskSpilledPositions(Page page, SpillInfoSnapshot spillInfo
374379
}
375380

376381
PartitioningSpiller.PartitioningSpillResult result = spiller.get().partitionAndSpill(page, spillInfoSnapshot.getSpillMask());
382+
long spillStartNanos = System.nanoTime();
377383
spillInProgress = result.getSpillingFuture();
384+
addSuccessCallback(spillInProgress, dataSize -> spillMetrics.recordSpillSince(spillStartNanos, dataSize.toBytes()));
378385
return result.getRetained();
379386
}
380387

0 commit comments

Comments
 (0)