diff --git a/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java b/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java index 0dc5c516901b7..d38b4949fea64 100644 --- a/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java +++ b/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java @@ -48,7 +48,7 @@ import com.facebook.presto.spi.eventlistener.QueryMetadata; import com.facebook.presto.spi.eventlistener.QueryOutputMetadata; import com.facebook.presto.spi.eventlistener.QueryStatistics; -import com.facebook.presto.spi.eventlistener.StageCpuDistribution; +import com.facebook.presto.spi.eventlistener.ResourceDistribution; import com.facebook.presto.spi.resourceGroups.ResourceGroupId; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.transaction.TransactionId; @@ -169,6 +169,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur 0, true, ImmutableList.of(), + ImmutableList.of(), ImmutableList.of()), createQueryContext(queryInfo.getSession(), queryInfo.getResourceGroupId()), new QueryIOMetadata(ImmutableList.of(), Optional.empty()), @@ -205,6 +206,26 @@ public void queryCompletedEvent(QueryInfo queryInfo) logQueryTimeline(queryInfo); } + public static ResourceDistribution createResourceDistribution( + int stageId, + int tasks, + DistributionSnapshot distributionSnapshot) + { + return new ResourceDistribution( + stageId, + tasks, + distributionSnapshot.getP25(), + distributionSnapshot.getP50(), + distributionSnapshot.getP75(), + distributionSnapshot.getP90(), + distributionSnapshot.getP95(), + distributionSnapshot.getP99(), + distributionSnapshot.getMin(), + distributionSnapshot.getMax(), + (long) distributionSnapshot.getTotal(), + distributionSnapshot.getTotal() / distributionSnapshot.getCount()); + } + private QueryMetadata createQueryMetadata(QueryInfo queryInfo) { return new QueryMetadata( @@ -226,6 +247,13 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo) } QueryStats queryStats = queryInfo.getQueryStats(); + + ImmutableList.Builder cpuDistributionBuilder = ImmutableList.builder(); + ImmutableList.Builder memoryDistributionBuilder = ImmutableList.builder(); + if (queryInfo.getOutputStage().isPresent()) { + computeCpuAndMemoryDistributions(queryInfo.getOutputStage().get(), cpuDistributionBuilder, memoryDistributionBuilder); + } + return new QueryStatistics( ofMillis(queryStats.getTotalCpuTime().toMillis()), ofMillis(queryStats.getTotalScheduledTime().toMillis()), @@ -247,7 +275,8 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo) queryStats.getStageGcStatistics(), queryStats.getCompletedDrivers(), queryInfo.isCompleteInfo(), - getCpuDistributions(queryInfo), + cpuDistributionBuilder.build(), + memoryDistributionBuilder.build(), operatorSummaries.build()); } @@ -506,49 +535,32 @@ private static void logQueryTimeline( queryEndTime); } - private static List getCpuDistributions(QueryInfo queryInfo) - { - if (!queryInfo.getOutputStage().isPresent()) { - return ImmutableList.of(); - } - - ImmutableList.Builder builder = ImmutableList.builder(); - populateDistribution(queryInfo.getOutputStage().get(), builder); - - return builder.build(); - } - - private static void populateDistribution(StageInfo stageInfo, ImmutableList.Builder distributions) - { - distributions.add(computeCpuDistribution(stageInfo)); - for (StageInfo subStage : stageInfo.getSubStages()) { - populateDistribution(subStage, distributions); - } - } - - private static StageCpuDistribution computeCpuDistribution(StageInfo stageInfo) + private static void computeCpuAndMemoryDistributions( + StageInfo stageInfo, + ImmutableList.Builder cpuDistributionBuilder, + ImmutableList.Builder memoryDistributionBuilder) { Distribution cpuDistribution = new Distribution(); + Distribution memoryDistribution = new Distribution(); for (TaskInfo taskInfo : stageInfo.getLatestAttemptExecutionInfo().getTasks()) { cpuDistribution.add(taskInfo.getStats().getTotalCpuTime().toMillis()); + memoryDistribution.add(taskInfo.getStats().getPeakTotalMemoryInBytes()); } - DistributionSnapshot snapshot = cpuDistribution.snapshot(); + cpuDistributionBuilder.add(createResourceDistribution( + stageInfo.getStageId().getId(), + stageInfo.getLatestAttemptExecutionInfo().getTasks().size(), + cpuDistribution.snapshot())); - return new StageCpuDistribution( + memoryDistributionBuilder.add(createResourceDistribution( stageInfo.getStageId().getId(), stageInfo.getLatestAttemptExecutionInfo().getTasks().size(), - snapshot.getP25(), - snapshot.getP50(), - snapshot.getP75(), - snapshot.getP90(), - snapshot.getP95(), - snapshot.getP99(), - snapshot.getMin(), - snapshot.getMax(), - (long) snapshot.getTotal(), - snapshot.getTotal() / snapshot.getCount()); + memoryDistribution.snapshot())); + + for (StageInfo subStage : stageInfo.getSubStages()) { + computeCpuAndMemoryDistributions(subStage, cpuDistributionBuilder, memoryDistributionBuilder); + } } public static class JsonPlanFragment diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java index 3db93884156e0..980f7c769be7a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java @@ -96,6 +96,7 @@ public class TaskContext private final Object cumulativeMemoryLock = new Object(); private final AtomicDouble cumulativeUserMemory = new AtomicDouble(0.0); + private final AtomicLong peakTotalMemoryInBytes = new AtomicLong(0); @GuardedBy("cumulativeMemoryLock") private long lastUserMemoryReservation; @@ -475,6 +476,9 @@ public TaskStats getTaskStats() Duration fullGcTime = getFullGcTime(); long userMemory = taskMemoryContext.getUserMemory(); + long systemMemory = taskMemoryContext.getSystemMemory(); + + peakTotalMemoryInBytes.accumulateAndGet(userMemory + systemMemory, Math::max); synchronized (cumulativeMemoryLock) { double sinceLastPeriodMillis = (System.nanoTime() - lastTaskStatCallNanos) / 1_000_000.0; @@ -512,7 +516,8 @@ public TaskStats getTaskStats() cumulativeUserMemory.get(), succinctBytes(userMemory), succinctBytes(taskMemoryContext.getRevocableMemory()), - succinctBytes(taskMemoryContext.getSystemMemory()), + succinctBytes(systemMemory), + peakTotalMemoryInBytes.get(), succinctNanos(totalScheduledTime), succinctNanos(totalCpuTime), succinctNanos(totalBlockedTime), diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java index 88ce4423a5ae0..a66faf8d14980 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java @@ -55,6 +55,7 @@ public class TaskStats private final DataSize userMemoryReservation; private final DataSize revocableMemoryReservation; private final DataSize systemMemoryReservation; + private final long peakTotalMemoryInBytes; private final Duration totalScheduledTime; private final Duration totalCpuTime; @@ -98,6 +99,7 @@ public TaskStats(DateTime createTime, DateTime endTime) new DataSize(0, BYTE), new DataSize(0, BYTE), new DataSize(0, BYTE), + 0, new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), @@ -137,6 +139,7 @@ public TaskStats( @JsonProperty("userMemoryReservation") DataSize userMemoryReservation, @JsonProperty("revocableMemoryReservation") DataSize revocableMemoryReservation, @JsonProperty("systemMemoryReservation") DataSize systemMemoryReservation, + @JsonProperty("peakTotalMemoryInBytes") long peakTotalMemoryInBytes, @JsonProperty("totalScheduledTime") Duration totalScheduledTime, @JsonProperty("totalCpuTime") Duration totalCpuTime, @@ -190,6 +193,7 @@ public TaskStats( this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null"); this.revocableMemoryReservation = requireNonNull(revocableMemoryReservation, "revocableMemoryReservation is null"); this.systemMemoryReservation = requireNonNull(systemMemoryReservation, "systemMemoryReservation is null"); + this.peakTotalMemoryInBytes = peakTotalMemoryInBytes; this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null"); this.totalCpuTime = requireNonNull(totalCpuTime, "totalCpuTime is null"); @@ -318,6 +322,12 @@ public DataSize getSystemMemoryReservation() return systemMemoryReservation; } + @JsonProperty + public long getPeakTotalMemoryInBytes() + { + return peakTotalMemoryInBytes; + } + @JsonProperty public Duration getTotalScheduledTime() { @@ -441,6 +451,7 @@ public TaskStats summarize() userMemoryReservation, revocableMemoryReservation, systemMemoryReservation, + peakTotalMemoryInBytes, totalScheduledTime, totalCpuTime, totalBlockedTime, @@ -479,6 +490,7 @@ public TaskStats summarizeFinal() userMemoryReservation, revocableMemoryReservation, systemMemoryReservation, + peakTotalMemoryInBytes, totalScheduledTime, totalCpuTime, totalBlockedTime, diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java index a8056a860cbeb..cbcb44c2f676e 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java @@ -50,6 +50,7 @@ public class TestTaskStats new DataSize(12, BYTE), new DataSize(13, BYTE), new DataSize(14, BYTE), + 26, new Duration(15, NANOSECONDS), new Duration(16, NANOSECONDS), new Duration(18, NANOSECONDS), @@ -105,6 +106,7 @@ public static void assertExpectedTaskStats(TaskStats actual) assertEquals(actual.getUserMemoryReservation(), new DataSize(12, BYTE)); assertEquals(actual.getRevocableMemoryReservation(), new DataSize(13, BYTE)); assertEquals(actual.getSystemMemoryReservation(), new DataSize(14, BYTE)); + assertEquals(actual.getPeakTotalMemoryInBytes(), 26); assertEquals(actual.getTotalScheduledTime(), new Duration(15, NANOSECONDS)); assertEquals(actual.getTotalCpuTime(), new Duration(16, NANOSECONDS)); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryStatistics.java b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryStatistics.java index 8edbf49ea4944..8c2e3e1eab57e 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryStatistics.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryStatistics.java @@ -47,7 +47,8 @@ public class QueryStatistics private final int completedSplits; private final boolean complete; - private final List cpuTimeDistribution; + private final List cpuTimeDistribution; + private final List peakMemoryDistribution; private final List operatorSummaries; @@ -72,7 +73,8 @@ public QueryStatistics( List stageGcStatistics, int completedSplits, boolean complete, - List cpuTimeDistribution, + List cpuTimeDistribution, + List peakMemoryDistribution, List operatorSummaries) { this.cpuTime = requireNonNull(cpuTime, "cpuTime is null"); @@ -96,6 +98,7 @@ public QueryStatistics( this.completedSplits = completedSplits; this.complete = complete; this.cpuTimeDistribution = requireNonNull(cpuTimeDistribution, "cpuTimeDistribution is null"); + this.peakMemoryDistribution = requireNonNull(peakMemoryDistribution, "peakMemoryDistribution is null"); this.operatorSummaries = requireNonNull(operatorSummaries, "operatorSummaries is null"); } @@ -199,11 +202,16 @@ public boolean isComplete() return complete; } - public List getCpuTimeDistribution() + public List getCpuTimeDistribution() { return cpuTimeDistribution; } + public List getPeakMemoryDistribution() + { + return peakMemoryDistribution; + } + public List getOperatorSummaries() { return operatorSummaries; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/StageCpuDistribution.java b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/ResourceDistribution.java similarity index 97% rename from presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/StageCpuDistribution.java rename to presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/ResourceDistribution.java index f3c20a84f7d79..8102de8af8907 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/StageCpuDistribution.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/ResourceDistribution.java @@ -16,7 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -public class StageCpuDistribution +public class ResourceDistribution { private final int stageId; private final int tasks; @@ -32,7 +32,7 @@ public class StageCpuDistribution private final double average; @JsonCreator - public StageCpuDistribution( + public ResourceDistribution( @JsonProperty("stageId") int stageId, @JsonProperty("tasks") int tasks, @JsonProperty("p25") long p25,