diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 1ac8e0e564d5..6dc8479fce2f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -24,11 +24,13 @@ import io.trino.operator.OperatorStats; import io.trino.operator.PipelineStats; import io.trino.operator.TaskStats; +import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.spi.eventlistener.StageGcStatistics; import io.trino.sql.planner.PlanFragment; import io.trino.sql.planner.plan.PlanNodeId; import io.trino.sql.planner.plan.TableScanNode; import io.trino.util.Failures; +import io.trino.util.Optionals; import org.joda.time.DateTime; import javax.annotation.concurrent.ThreadSafe; @@ -420,6 +422,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) long failedInputBlockedTime = 0; long bufferedDataSize = 0; + Optional outputBufferUtilization = Optional.empty(); long outputDataSize = 0; long failedOutputDataSize = 0; long outputPositions = 0; @@ -495,6 +498,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) inputBlockedTime += taskStats.getInputBlockedTime().roundTo(NANOSECONDS); bufferedDataSize += taskInfo.getOutputBuffers().getTotalBufferedBytes(); + outputBufferUtilization = Optionals.combine(outputBufferUtilization, taskInfo.getOutputBuffers().getUtilization(), TDigestHistogram::mergeWith); outputDataSize += taskStats.getOutputDataSize().toBytes(); outputPositions += taskStats.getOutputPositions(); @@ -596,6 +600,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) succinctDuration(inputBlockedTime, NANOSECONDS), succinctDuration(failedInputBlockedTime, NANOSECONDS), succinctBytes(bufferedDataSize), + outputBufferUtilization, succinctBytes(outputDataSize), succinctBytes(failedOutputDataSize), outputPositions, diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index b783283c9f71..60fa4a84bf3b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -22,12 +22,14 @@ import io.airlift.units.Duration; import io.trino.operator.BlockedReason; import io.trino.operator.OperatorStats; +import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.spi.eventlistener.StageGcStatistics; import org.joda.time.DateTime; import javax.annotation.concurrent.Immutable; import java.util.List; +import java.util.Optional; import java.util.OptionalDouble; import java.util.Set; @@ -96,6 +98,7 @@ public class StageStats private final Duration failedInputBlockedTime; private final DataSize bufferedDataSize; + private final Optional outputBufferUtilization; private final DataSize outputDataSize; private final DataSize failedOutputDataSize; private final long outputPositions; @@ -170,6 +173,7 @@ public StageStats( @JsonProperty("failedInputBlockedTime") Duration failedInputBlockedTime, @JsonProperty("bufferedDataSize") DataSize bufferedDataSize, + @JsonProperty("outputBufferUtilization") Optional outputBufferUtilization, @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize, @JsonProperty("outputPositions") long outputPositions, @@ -258,6 +262,7 @@ public StageStats( this.failedInputBlockedTime = requireNonNull(failedInputBlockedTime, "failedInputBlockedTime is null"); this.bufferedDataSize = requireNonNull(bufferedDataSize, "bufferedDataSize is null"); + this.outputBufferUtilization = requireNonNull(outputBufferUtilization, "outputBufferUtilization is null"); this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); @@ -552,6 +557,12 @@ public DataSize getBufferedDataSize() return bufferedDataSize; } + @JsonProperty + public Optional getOutputBufferUtilization() + { + return outputBufferUtilization; + } + @JsonProperty public DataSize getOutputDataSize() { diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskInfo.java b/core/trino-main/src/main/java/io/trino/execution/TaskInfo.java index 45721bba43f6..1558095b53f2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskInfo.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskInfo.java @@ -111,7 +111,7 @@ public boolean isNeedsPlan() public TaskInfo summarize() { if (taskStatus.getState().isDone()) { - return new TaskInfo(taskStatus, lastHeartbeat, outputBuffers.summarize(), noMoreSplits, stats.summarizeFinal(), estimatedMemory, needsPlan); + return new TaskInfo(taskStatus, lastHeartbeat, outputBuffers.summarizeFinal(), noMoreSplits, stats.summarizeFinal(), estimatedMemory, needsPlan); } return new TaskInfo(taskStatus, lastHeartbeat, outputBuffers.summarize(), noMoreSplits, stats.summarize(), estimatedMemory, needsPlan); } @@ -130,7 +130,7 @@ public static TaskInfo createInitialTask(TaskId taskId, URI location, String nod return new TaskInfo( initialTaskStatus(taskId, location, nodeId), DateTime.now(), - new OutputBufferInfo("UNINITIALIZED", OPEN, true, true, 0, 0, 0, 0, bufferStates), + new OutputBufferInfo("UNINITIALIZED", OPEN, true, true, 0, 0, 0, 0, bufferStates, Optional.empty()), ImmutableSet.of(), taskStats, Optional.empty(), diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java index 2e7799e45ef5..d3262645c7c1 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java @@ -25,6 +25,7 @@ import io.trino.execution.buffer.OutputBuffers.OutputBufferId; import io.trino.execution.buffer.SerializedPageReference.PagesReleasedListener; import io.trino.memory.context.LocalMemoryContext; +import io.trino.plugin.base.metrics.TDigestHistogram; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -150,7 +151,8 @@ public OutputBufferInfo getInfo() totalBufferedPages, totalRowsAdded.get(), totalPagesAdded.get(), - infos.build()); + infos.build(), + Optional.of(new TDigestHistogram(memoryManager.getUtilizationHistogram()))); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java index 95dbff4a4d03..5b40b9ac447b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java @@ -24,6 +24,7 @@ import io.trino.execution.buffer.OutputBuffers.OutputBufferId; import io.trino.execution.buffer.SerializedPageReference.PagesReleasedListener; import io.trino.memory.context.LocalMemoryContext; +import io.trino.plugin.base.metrics.TDigestHistogram; import javax.annotation.concurrent.GuardedBy; @@ -142,7 +143,8 @@ public OutputBufferInfo getInfo() totalPagesAdded.get(), buffers.stream() .map(ClientBuffer::getInfo) - .collect(toImmutableList())); + .collect(toImmutableList()), + Optional.of(new TDigestHistogram(memoryManager.getUtilizationHistogram()))); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java index b9f26a3412fa..9f257b50499a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java @@ -137,7 +137,8 @@ public OutputBufferInfo getInfo() 0, 0, 0, - ImmutableList.of()); + ImmutableList.of(), + Optional.empty()); } return outputBuffer.getInfo(); } diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferInfo.java b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferInfo.java index f956fa150c8a..ca4371edeb24 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferInfo.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferInfo.java @@ -16,9 +16,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import io.trino.plugin.base.metrics.TDigestHistogram; import java.util.List; import java.util.Objects; +import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; @@ -33,6 +35,7 @@ public final class OutputBufferInfo private final long totalRowsSent; private final long totalPagesSent; private final List buffers; + private final Optional utilization; @JsonCreator public OutputBufferInfo( @@ -44,7 +47,8 @@ public OutputBufferInfo( @JsonProperty("totalBufferedPages") long totalBufferedPages, @JsonProperty("totalRowsSent") long totalRowsSent, @JsonProperty("totalPagesSent") long totalPagesSent, - @JsonProperty("buffers") List buffers) + @JsonProperty("buffers") List buffers, + @JsonProperty("utilization") Optional utilization) { this.type = type; this.state = state; @@ -55,6 +59,7 @@ public OutputBufferInfo( this.totalRowsSent = totalRowsSent; this.totalPagesSent = totalPagesSent; this.buffers = ImmutableList.copyOf(buffers); + this.utilization = utilization; } @JsonProperty @@ -111,9 +116,20 @@ public long getTotalPagesSent() return totalPagesSent; } + @JsonProperty + public Optional getUtilization() + { + return utilization; + } + public OutputBufferInfo summarize() { - return new OutputBufferInfo(type, state, canAddBuffers, canAddPages, totalBufferedBytes, totalBufferedPages, totalRowsSent, totalPagesSent, ImmutableList.of()); + return new OutputBufferInfo(type, state, canAddBuffers, canAddPages, totalBufferedBytes, totalBufferedPages, totalRowsSent, totalPagesSent, ImmutableList.of(), Optional.empty()); + } + + public OutputBufferInfo summarizeFinal() + { + return new OutputBufferInfo(type, state, canAddBuffers, canAddPages, totalBufferedBytes, totalBufferedPages, totalRowsSent, totalPagesSent, ImmutableList.of(), utilization); } @Override @@ -134,13 +150,14 @@ public boolean equals(Object o) Objects.equals(totalRowsSent, that.totalRowsSent) && Objects.equals(totalPagesSent, that.totalPagesSent) && state == that.state && - Objects.equals(buffers, that.buffers); + Objects.equals(buffers, that.buffers) && + Objects.equals(utilization, that.utilization); } @Override public int hashCode() { - return Objects.hash(state, canAddBuffers, canAddPages, totalBufferedBytes, totalBufferedPages, totalRowsSent, totalPagesSent, buffers); + return Objects.hash(state, canAddBuffers, canAddPages, totalBufferedBytes, totalBufferedPages, totalRowsSent, totalPagesSent, buffers, utilization); } @Override @@ -156,6 +173,7 @@ public String toString() .add("totalRowsSent", totalRowsSent) .add("totalPagesSent", totalPagesSent) .add("buffers", buffers) + .add("bufferUtilization", utilization) .toString(); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java index b805b86780e2..4c8c0801bc25 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java @@ -15,8 +15,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; +import com.google.common.base.Ticker; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.airlift.stats.TDigest; import io.trino.memory.context.LocalMemoryContext; import javax.annotation.Nullable; @@ -54,11 +56,20 @@ class OutputBufferMemoryManager @GuardedBy("this") private ListenableFuture blockedOnMemory = NOT_BLOCKED; + private final Ticker ticker = Ticker.systemTicker(); + private final AtomicBoolean blockOnFull = new AtomicBoolean(true); private final Supplier memoryContextSupplier; private final Executor notificationExecutor; + @GuardedBy("this") + private final TDigest bufferUtilization = new TDigest(); + @GuardedBy("this") + private long lastBufferUtilizationRecordTime; + @GuardedBy("this") + private double lastBufferUtilization; + public OutputBufferMemoryManager(long maxBufferedBytes, Supplier memoryContextSupplier, Executor notificationExecutor) { requireNonNull(memoryContextSupplier, "memoryContextSupplier is null"); @@ -66,6 +77,8 @@ public OutputBufferMemoryManager(long maxBufferedBytes, Supplier getBufferBlockedFuture() { if (bufferBlockedFuture == null) { @@ -155,6 +177,13 @@ public double getUtilization() return bufferedBytes.get() / (double) maxBufferedBytes; } + public synchronized TDigest getUtilizationHistogram() + { + // always get most up to date histogram + recordBufferUtilization(); + return TDigest.copyOf(bufferUtilization); + } + public boolean isOverutilized() { return isBufferFull(); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java index b5b3677d8735..0a7dfcac1d09 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java @@ -22,6 +22,7 @@ import io.trino.execution.buffer.OutputBuffers.OutputBufferId; import io.trino.execution.buffer.SerializedPageReference.PagesReleasedListener; import io.trino.memory.context.LocalMemoryContext; +import io.trino.plugin.base.metrics.TDigestHistogram; import java.util.List; import java.util.Optional; @@ -127,7 +128,8 @@ public OutputBufferInfo getInfo() totalBufferedPages, totalRowsAdded.get(), totalPagesAdded.get(), - infos.build()); + infos.build(), + Optional.of(new TDigestHistogram(memoryManager.getUtilizationHistogram()))); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java index 2d5f0bb7146e..e52860fcc7dc 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java @@ -86,7 +86,8 @@ public OutputBufferInfo getInfo() totalPagesAdded.get(), totalRowsAdded.get(), totalPagesAdded.get(), - ImmutableList.of()); + ImmutableList.of(), + Optional.empty()); } @Override diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java index ba743d7244c3..c2ecc8ca631a 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java @@ -18,12 +18,16 @@ import io.airlift.json.JsonCodec; import io.airlift.stats.Distribution; import io.airlift.stats.Distribution.DistributionSnapshot; +import io.airlift.stats.TDigest; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.spi.eventlistener.StageGcStatistics; import org.joda.time.DateTime; import org.testng.annotations.Test; +import java.util.Optional; + import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.testng.Assert.assertEquals; @@ -87,6 +91,7 @@ public class TestStageStats new Duration(202, NANOSECONDS), DataSize.ofBytes(34), + Optional.of(getTDigestHistogram(10)), DataSize.ofBytes(35), DataSize.ofBytes(36), 37, @@ -177,6 +182,7 @@ private static void assertExpectedStageStats(StageStats actual) assertEquals(actual.getFailedInputBlockedTime(), new Duration(202, NANOSECONDS)); assertEquals(actual.getBufferedDataSize(), DataSize.ofBytes(34)); + assertEquals(actual.getOutputBufferUtilization().get().getMax(), 9.0); assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(35)); assertEquals(actual.getFailedOutputDataSize(), DataSize.ofBytes(36)); assertEquals(actual.getOutputPositions(), 37); @@ -205,4 +211,13 @@ private static DistributionSnapshot getTestDistribution(int count) } return distribution.snapshot(); } + + private static TDigestHistogram getTDigestHistogram(int count) + { + TDigest digest = new TDigest(); + for (int i = 0; i < count; i++) { + digest.add(i); + } + return new TDigestHistogram(digest); + } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java b/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java index db478391af51..6c6324ecea5c 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestingRemoteTaskFactory.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.stats.TDigest; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.Session; @@ -31,6 +32,7 @@ import io.trino.metadata.InternalNode; import io.trino.metadata.Split; import io.trino.operator.TaskStats; +import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.sql.planner.PlanFragment; import io.trino.sql.planner.plan.DynamicFilterId; import io.trino.sql.planner.plan.PlanNodeId; @@ -146,7 +148,8 @@ public TaskInfo getTaskInfo() 0, 0, 0, - ImmutableList.of()), + ImmutableList.of(), + Optional.of(new TDigestHistogram(new TDigest()))), ImmutableSet.copyOf(noMoreSplits), new TaskStats(DateTime.now(), null), Optional.empty(), diff --git a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java index b35a14e1fc99..60b602dcb87b 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.stats.TDigest; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.execution.TaskId; @@ -26,6 +27,7 @@ import io.trino.execution.buffer.BufferState; import io.trino.execution.buffer.OutputBufferInfo; import io.trino.operator.TaskStats; +import io.trino.plugin.base.metrics.TDigestHistogram; import org.joda.time.DateTime; import org.testng.annotations.Test; @@ -247,7 +249,8 @@ private static TaskInfo buildTaskInfo(TaskId taskId, TaskState state, Duration s 0, 0, 0, - ImmutableList.of()), + ImmutableList.of(), + Optional.of(new TDigestHistogram(new TDigest()))), ImmutableSet.of(), new TaskStats(DateTime.now(), null,