Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import com.facebook.presto.spi.eventlistener.QueryMetadata;
import com.facebook.presto.spi.eventlistener.QueryOutputMetadata;
import com.facebook.presto.spi.eventlistener.QueryStatistics;
import com.facebook.presto.spi.eventlistener.StageCpuDistribution;
import com.facebook.presto.spi.eventlistener.ResourceDistribution;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.transaction.TransactionId;
Expand Down Expand Up @@ -169,6 +169,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
0,
true,
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of()),
createQueryContext(queryInfo.getSession(), queryInfo.getResourceGroupId()),
new QueryIOMetadata(ImmutableList.of(), Optional.empty()),
Expand Down Expand Up @@ -205,6 +206,26 @@ public void queryCompletedEvent(QueryInfo queryInfo)
logQueryTimeline(queryInfo);
}

public static ResourceDistribution createResourceDistribution(
int stageId,
int tasks,
DistributionSnapshot distributionSnapshot)
{
return new ResourceDistribution(
stageId,
tasks,
distributionSnapshot.getP25(),
distributionSnapshot.getP50(),
distributionSnapshot.getP75(),
distributionSnapshot.getP90(),
distributionSnapshot.getP95(),
distributionSnapshot.getP99(),
distributionSnapshot.getMin(),
distributionSnapshot.getMax(),
(long) distributionSnapshot.getTotal(),
distributionSnapshot.getTotal() / distributionSnapshot.getCount());
}

private QueryMetadata createQueryMetadata(QueryInfo queryInfo)
{
return new QueryMetadata(
Expand All @@ -226,6 +247,13 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
}

QueryStats queryStats = queryInfo.getQueryStats();

ImmutableList.Builder<ResourceDistribution> cpuDistributionBuilder = ImmutableList.builder();
ImmutableList.Builder<ResourceDistribution> memoryDistributionBuilder = ImmutableList.builder();
if (queryInfo.getOutputStage().isPresent()) {
computeCpuAndMemoryDistributions(queryInfo.getOutputStage().get(), cpuDistributionBuilder, memoryDistributionBuilder);
}

return new QueryStatistics(
ofMillis(queryStats.getTotalCpuTime().toMillis()),
ofMillis(queryStats.getTotalScheduledTime().toMillis()),
Expand All @@ -247,7 +275,8 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
queryStats.getStageGcStatistics(),
queryStats.getCompletedDrivers(),
queryInfo.isCompleteInfo(),
getCpuDistributions(queryInfo),
cpuDistributionBuilder.build(),
memoryDistributionBuilder.build(),
operatorSummaries.build());
}

Expand Down Expand Up @@ -506,49 +535,32 @@ private static void logQueryTimeline(
queryEndTime);
}

private static List<StageCpuDistribution> getCpuDistributions(QueryInfo queryInfo)
{
if (!queryInfo.getOutputStage().isPresent()) {
return ImmutableList.of();
}

ImmutableList.Builder<StageCpuDistribution> builder = ImmutableList.builder();
populateDistribution(queryInfo.getOutputStage().get(), builder);

return builder.build();
}

private static void populateDistribution(StageInfo stageInfo, ImmutableList.Builder<StageCpuDistribution> distributions)
{
distributions.add(computeCpuDistribution(stageInfo));
for (StageInfo subStage : stageInfo.getSubStages()) {
populateDistribution(subStage, distributions);
}
}

private static StageCpuDistribution computeCpuDistribution(StageInfo stageInfo)
private static void computeCpuAndMemoryDistributions(
StageInfo stageInfo,
ImmutableList.Builder<ResourceDistribution> cpuDistributionBuilder,
ImmutableList.Builder<ResourceDistribution> memoryDistributionBuilder)
{
Distribution cpuDistribution = new Distribution();
Distribution memoryDistribution = new Distribution();

for (TaskInfo taskInfo : stageInfo.getLatestAttemptExecutionInfo().getTasks()) {
cpuDistribution.add(taskInfo.getStats().getTotalCpuTime().toMillis());
memoryDistribution.add(taskInfo.getStats().getPeakTotalMemoryInBytes());
}

DistributionSnapshot snapshot = cpuDistribution.snapshot();
cpuDistributionBuilder.add(createResourceDistribution(
stageInfo.getStageId().getId(),
stageInfo.getLatestAttemptExecutionInfo().getTasks().size(),
cpuDistribution.snapshot()));

return new StageCpuDistribution(
memoryDistributionBuilder.add(createResourceDistribution(
stageInfo.getStageId().getId(),
stageInfo.getLatestAttemptExecutionInfo().getTasks().size(),
snapshot.getP25(),
snapshot.getP50(),
snapshot.getP75(),
snapshot.getP90(),
snapshot.getP95(),
snapshot.getP99(),
snapshot.getMin(),
snapshot.getMax(),
(long) snapshot.getTotal(),
snapshot.getTotal() / snapshot.getCount());
memoryDistribution.snapshot()));

for (StageInfo subStage : stageInfo.getSubStages()) {
computeCpuAndMemoryDistributions(subStage, cpuDistributionBuilder, memoryDistributionBuilder);
}
}

public static class JsonPlanFragment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class TaskContext

private final Object cumulativeMemoryLock = new Object();
private final AtomicDouble cumulativeUserMemory = new AtomicDouble(0.0);
private final AtomicLong peakTotalMemoryInBytes = new AtomicLong(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • cumulativeUserMemory is stored as double; peakTotalMemoryInBytes seems similar, but it stored as long; why is the discrepancy?
  • peakTotalMemoryInBytes name is not consistent with cumulativeUserMemory; shouldn't it be peakTotalMemory?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • cumulativeUserMemory uses double as its unit is Bytes * Time, this explains the difference of names

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viczhang861 Vic, thanks for explaining. It makes sense now.


@GuardedBy("cumulativeMemoryLock")
private long lastUserMemoryReservation;
Expand Down Expand Up @@ -475,6 +476,9 @@ public TaskStats getTaskStats()
Duration fullGcTime = getFullGcTime();

long userMemory = taskMemoryContext.getUserMemory();
long systemMemory = taskMemoryContext.getSystemMemory();

peakTotalMemoryInBytes.accumulateAndGet(userMemory + systemMemory, Math::max);

synchronized (cumulativeMemoryLock) {
double sinceLastPeriodMillis = (System.nanoTime() - lastTaskStatCallNanos) / 1_000_000.0;
Expand Down Expand Up @@ -512,7 +516,8 @@ public TaskStats getTaskStats()
cumulativeUserMemory.get(),
succinctBytes(userMemory),
succinctBytes(taskMemoryContext.getRevocableMemory()),
succinctBytes(taskMemoryContext.getSystemMemory()),
succinctBytes(systemMemory),
peakTotalMemoryInBytes.get(),
succinctNanos(totalScheduledTime),
succinctNanos(totalCpuTime),
succinctNanos(totalBlockedTime),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class TaskStats
private final DataSize userMemoryReservation;
private final DataSize revocableMemoryReservation;
private final DataSize systemMemoryReservation;
private final long peakTotalMemoryInBytes;

private final Duration totalScheduledTime;
private final Duration totalCpuTime;
Expand Down Expand Up @@ -98,6 +99,7 @@ public TaskStats(DateTime createTime, DateTime endTime)
new DataSize(0, BYTE),
new DataSize(0, BYTE),
new DataSize(0, BYTE),
0,
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
Expand Down Expand Up @@ -137,6 +139,7 @@ public TaskStats(
@JsonProperty("userMemoryReservation") DataSize userMemoryReservation,
@JsonProperty("revocableMemoryReservation") DataSize revocableMemoryReservation,
@JsonProperty("systemMemoryReservation") DataSize systemMemoryReservation,
@JsonProperty("peakTotalMemoryInBytes") long peakTotalMemoryInBytes,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at cumulativeUserMemory, I'd expect

@JsonProperty("peakTotalMemory") double peakTotalMemory,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or peakTotalMemoryReservation and use DataSize as type? No strong opinion here.


@JsonProperty("totalScheduledTime") Duration totalScheduledTime,
@JsonProperty("totalCpuTime") Duration totalCpuTime,
Expand Down Expand Up @@ -190,6 +193,7 @@ public TaskStats(
this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null");
this.revocableMemoryReservation = requireNonNull(revocableMemoryReservation, "revocableMemoryReservation is null");
this.systemMemoryReservation = requireNonNull(systemMemoryReservation, "systemMemoryReservation is null");
this.peakTotalMemoryInBytes = peakTotalMemoryInBytes;

this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null");
this.totalCpuTime = requireNonNull(totalCpuTime, "totalCpuTime is null");
Expand Down Expand Up @@ -318,6 +322,12 @@ public DataSize getSystemMemoryReservation()
return systemMemoryReservation;
}

@JsonProperty
public long getPeakTotalMemoryInBytes()
{
return peakTotalMemoryInBytes;
}

@JsonProperty
public Duration getTotalScheduledTime()
{
Expand Down Expand Up @@ -441,6 +451,7 @@ public TaskStats summarize()
userMemoryReservation,
revocableMemoryReservation,
systemMemoryReservation,
peakTotalMemoryInBytes,
totalScheduledTime,
totalCpuTime,
totalBlockedTime,
Expand Down Expand Up @@ -479,6 +490,7 @@ public TaskStats summarizeFinal()
userMemoryReservation,
revocableMemoryReservation,
systemMemoryReservation,
peakTotalMemoryInBytes,
totalScheduledTime,
totalCpuTime,
totalBlockedTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TestTaskStats
new DataSize(12, BYTE),
new DataSize(13, BYTE),
new DataSize(14, BYTE),
26,
new Duration(15, NANOSECONDS),
new Duration(16, NANOSECONDS),
new Duration(18, NANOSECONDS),
Expand Down Expand Up @@ -105,6 +106,7 @@ public static void assertExpectedTaskStats(TaskStats actual)
assertEquals(actual.getUserMemoryReservation(), new DataSize(12, BYTE));
assertEquals(actual.getRevocableMemoryReservation(), new DataSize(13, BYTE));
assertEquals(actual.getSystemMemoryReservation(), new DataSize(14, BYTE));
assertEquals(actual.getPeakTotalMemoryInBytes(), 26);

assertEquals(actual.getTotalScheduledTime(), new Duration(15, NANOSECONDS));
assertEquals(actual.getTotalCpuTime(), new Duration(16, NANOSECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class QueryStatistics
private final int completedSplits;
private final boolean complete;

private final List<StageCpuDistribution> cpuTimeDistribution;
private final List<ResourceDistribution> cpuTimeDistribution;
private final List<ResourceDistribution> peakMemoryDistribution;

private final List<String> operatorSummaries;

Expand All @@ -72,7 +73,8 @@ public QueryStatistics(
List<StageGcStatistics> stageGcStatistics,
int completedSplits,
boolean complete,
List<StageCpuDistribution> cpuTimeDistribution,
List<ResourceDistribution> cpuTimeDistribution,
List<ResourceDistribution> peakMemoryDistribution,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about taskPeakMemoryDistribution? No strong opinion since peakMemoryDistribution is parallel to cpuTimeDistribution

List<String> operatorSummaries)
{
this.cpuTime = requireNonNull(cpuTime, "cpuTime is null");
Expand All @@ -96,6 +98,7 @@ public QueryStatistics(
this.completedSplits = completedSplits;
this.complete = complete;
this.cpuTimeDistribution = requireNonNull(cpuTimeDistribution, "cpuTimeDistribution is null");
this.peakMemoryDistribution = requireNonNull(peakMemoryDistribution, "peakMemoryDistribution is null");
this.operatorSummaries = requireNonNull(operatorSummaries, "operatorSummaries is null");
}

Expand Down Expand Up @@ -199,11 +202,16 @@ public boolean isComplete()
return complete;
}

public List<StageCpuDistribution> getCpuTimeDistribution()
public List<ResourceDistribution> getCpuTimeDistribution()
{
return cpuTimeDistribution;
}

public List<ResourceDistribution> getPeakMemoryDistribution()
{
return peakMemoryDistribution;
}

public List<String> getOperatorSummaries()
{
return operatorSummaries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class StageCpuDistribution
public class ResourceDistribution
{
private final int stageId;
private final int tasks;
Expand All @@ -32,7 +32,7 @@ public class StageCpuDistribution
private final double average;

@JsonCreator
public StageCpuDistribution(
public ResourceDistribution(
@JsonProperty("stageId") int stageId,
@JsonProperty("tasks") int tasks,
@JsonProperty("p25") long p25,
Expand Down