diff --git a/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java b/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java new file mode 100644 index 000000000000..5323c5f08a54 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java @@ -0,0 +1,153 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.base.MoreObjects.ToStringHelper; +import io.trino.operator.OperatorStats; +import io.trino.spi.metrics.Distribution; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NONE) // Do not add @class property +@JsonSerialize +public record DistributionSnapshot(long total, double min, double max, double p01, double p05, double p10, double p25, double p50, double p75, double p90, double p95, double p99) + implements Metric +{ + public static List pruneOperatorStats(List operatorStats) + { + requireNonNull(operatorStats, "operatorStats is null"); + return operatorStats.stream() + .map(DistributionSnapshot::pruneOperatorStats) + .collect(toImmutableList()); + } + + public static OperatorStats pruneOperatorStats(OperatorStats operatorStats) + { + requireNonNull(operatorStats, "operatorStats is null"); + return new OperatorStats( + operatorStats.getStageId(), + operatorStats.getPipelineId(), + operatorStats.getOperatorId(), + operatorStats.getPlanNodeId(), + operatorStats.getOperatorType(), + operatorStats.getTotalDrivers(), + operatorStats.getAddInputCalls(), + operatorStats.getAddInputWall(), + operatorStats.getAddInputCpu(), + operatorStats.getPhysicalInputDataSize(), + operatorStats.getPhysicalInputPositions(), + operatorStats.getPhysicalInputReadTime(), + operatorStats.getInternalNetworkInputDataSize(), + operatorStats.getInternalNetworkInputPositions(), + operatorStats.getRawInputDataSize(), + operatorStats.getInputDataSize(), + operatorStats.getInputPositions(), + operatorStats.getSumSquaredInputPositions(), + operatorStats.getGetOutputCalls(), + operatorStats.getGetOutputWall(), + operatorStats.getGetOutputCpu(), + operatorStats.getOutputDataSize(), + operatorStats.getOutputPositions(), + operatorStats.getDynamicFilterSplitsProcessed(), + pruneMetrics(operatorStats.getMetrics()), + pruneMetrics(operatorStats.getConnectorMetrics()), + pruneMetrics(operatorStats.getPipelineMetrics()), + operatorStats.getPhysicalWrittenDataSize(), + operatorStats.getBlockedWall(), + operatorStats.getFinishCalls(), + operatorStats.getFinishWall(), + operatorStats.getFinishCpu(), + operatorStats.getUserMemoryReservation(), + operatorStats.getRevocableMemoryReservation(), + operatorStats.getPeakUserMemoryReservation(), + operatorStats.getPeakRevocableMemoryReservation(), + operatorStats.getPeakTotalMemoryReservation(), + operatorStats.getSpilledDataSize(), + operatorStats.getBlockedReason(), + operatorStats.getInfo()); + } + + private static Metrics pruneMetrics(Metrics metrics) + { + return new Metrics(metrics.getMetrics().entrySet().stream() + .collect(toImmutableMap( + Map.Entry::getKey, + entry -> { + Metric metric = entry.getValue(); + if (metric instanceof Distribution) { + return new DistributionSnapshot((Distribution) metric); + } + return metric; + }))); + } + + public DistributionSnapshot(Distribution distribution) + { + this( + distribution.getTotal(), + distribution.getMin(), + distribution.getMax(), + distribution.getPercentile(0.01), + distribution.getPercentile(0.05), + distribution.getPercentile(0.10), + distribution.getPercentile(0.25), + distribution.getPercentile(0.50), + distribution.getPercentile(0.75), + distribution.getPercentile(0.90), + distribution.getPercentile(0.95), + distribution.getPercentile(0.99)); + } + + @Override + public String toString() + { + ToStringHelper helper = toStringHelper("") + .add("count", total) + .add("p01", formatDouble(p01)) + .add("p05", formatDouble(p05)) + .add("p10", formatDouble(p10)) + .add("p25", formatDouble(p25)) + .add("p50", formatDouble(p50)) + .add("p75", formatDouble(p75)) + .add("p90", formatDouble(p90)) + .add("p95", formatDouble(p95)) + .add("p99", formatDouble(p99)) + .add("min", formatDouble(min)) + .add("max", formatDouble(max)); + return helper.toString(); + } + + @Override + public DistributionSnapshot mergeWith(DistributionSnapshot other) + { + throw new UnsupportedOperationException("Merging of DistributionSnapshot is not supported"); + } + + private static String formatDouble(double value) + { + return format(Locale.US, "%.2f", value); + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java index 68d5d7fb77b8..ad590980def7 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java @@ -33,6 +33,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.units.DataSize.succinctBytes; +import static io.trino.execution.DistributionSnapshot.pruneOperatorStats; import static io.trino.server.DynamicFilterService.DynamicFiltersStats; import static java.util.Objects.requireNonNull; @@ -332,7 +333,7 @@ public QueryStats( this.dynamicFiltersStats = requireNonNull(dynamicFiltersStats, "dynamicFiltersStats is null"); - this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null")); + this.operatorSummaries = pruneOperatorStats(requireNonNull(operatorSummaries, "operatorSummaries is null")); this.optimizerRulesSummaries = ImmutableList.copyOf(requireNonNull(optimizerRulesSummaries, "optimizerRulesSummaries is null")); } diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index 1ec1ad61c9e8..1ca770947773 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -36,6 +36,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.succinctBytes; +import static io.trino.execution.DistributionSnapshot.pruneOperatorStats; import static io.trino.execution.StageState.RUNNING; import static java.lang.Math.min; import static java.util.Objects.requireNonNull; @@ -281,7 +282,7 @@ public StageStats( this.gcInfo = requireNonNull(gcInfo, "gcInfo is null"); - this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null")); + this.operatorSummaries = pruneOperatorStats(requireNonNull(operatorSummaries, "operatorSummaries is null")); } @JsonProperty diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index b3434ff998b3..0a6990831f06 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -513,6 +513,14 @@ "lastEndTimeScaledDistribution" added new stats + + java.method.addedToInterface + method double io.trino.spi.metrics.Distribution<T>::getMax() + + + java.method.addedToInterface + method double io.trino.spi.metrics.Distribution<T>::getMin() + java.method.addedToInterface method java.util.Optional<io.trino.spi.block.ByteArrayBlock> io.trino.spi.block.ValueBlock::getNulls() diff --git a/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java b/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java index e66700548cce..3c7a6ae3f670 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java +++ b/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java @@ -18,5 +18,9 @@ public interface Distribution { long getTotal(); + double getMin(); + + double getMax(); + double getPercentile(double percentile); } diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java index 3baef2615c3a..0c668f1e0583 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java @@ -104,20 +104,22 @@ public synchronized long getTotal() return (long) digest.getCount(); } - // Below are extra properties that make it easy to read and parse serialized distribution - // in operator summaries and event listener. + @Override @JsonProperty public synchronized double getMin() { return digest.getMin(); } + @Override @JsonProperty public synchronized double getMax() { return digest.getMax(); } + // Below are extra properties that make it easy to read and parse serialized distribution + // in operator summaries and event listener. @JsonProperty public synchronized double getP01() {