Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
0,
0,
0,
0,
true),
createQueryContext(queryInfo.getSession(), queryInfo.getResourceGroupId()),
new QueryIOMetadata(ImmutableList.of(), Optional.empty()),
Expand Down Expand Up @@ -310,6 +311,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
queryStats.getWrittenIntermediatePhysicalDataSize().toBytes(),
queryStats.getSpilledDataSize().toBytes(),
queryStats.getCumulativeUserMemory(),
queryStats.getCumulativeTotalMemory(),
queryStats.getCompletedDrivers(),
queryInfo.isCompleteInfo());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class BasicStageExecutionStats
new DataSize(0, BYTE),
0,

0.0,
0.0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand All @@ -64,6 +65,7 @@ public class BasicStageExecutionStats
private final DataSize rawInputDataSize;
private final long rawInputPositions;
private final double cumulativeUserMemory;
private final double cumulativeTotalMemory;
private final DataSize userMemoryReservation;
private final DataSize totalMemoryReservation;
private final Duration totalCpuTime;
Expand All @@ -85,6 +87,7 @@ public BasicStageExecutionStats(
long rawInputPositions,

double cumulativeUserMemory,
double cumulativeTotalMemory,
DataSize userMemoryReservation,
DataSize totalMemoryReservation,

Expand All @@ -106,6 +109,7 @@ public BasicStageExecutionStats(
this.rawInputDataSize = requireNonNull(rawInputDataSize, "rawInputDataSize is null");
this.rawInputPositions = rawInputPositions;
this.cumulativeUserMemory = cumulativeUserMemory;
this.cumulativeTotalMemory = cumulativeTotalMemory;
this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null");
this.totalMemoryReservation = requireNonNull(totalMemoryReservation, "totalMemoryReservation is null");
this.totalCpuTime = requireNonNull(totalCpuTime, "totalCpuTime is null");
Expand Down Expand Up @@ -156,6 +160,11 @@ public double getCumulativeUserMemory()
return cumulativeUserMemory;
}

public double getCumulativeTotalMemory()
{
return cumulativeTotalMemory;
}

public DataSize getUserMemoryReservation()
{
return userMemoryReservation;
Expand Down Expand Up @@ -204,6 +213,7 @@ public static BasicStageExecutionStats aggregateBasicStageStats(Iterable<BasicSt
int completedDrivers = 0;

double cumulativeUserMemory = 0;
double cumulativeTotalMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand All @@ -227,6 +237,7 @@ public static BasicStageExecutionStats aggregateBasicStageStats(Iterable<BasicSt
completedDrivers += stageStats.getCompletedDrivers();

cumulativeUserMemory += stageStats.getCumulativeUserMemory();
cumulativeTotalMemory += stageStats.getCumulativeTotalMemory();
userMemoryReservation += stageStats.getUserMemoryReservation().toBytes();
totalMemoryReservation += stageStats.getTotalMemoryReservation().toBytes();

Expand Down Expand Up @@ -261,6 +272,7 @@ public static BasicStageExecutionStats aggregateBasicStageStats(Iterable<BasicSt
rawInputPositions,

cumulativeUserMemory,
cumulativeTotalMemory,
succinctBytes(userMemoryReservation),
succinctBytes(totalMemoryReservation),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageExecutionStats> rootS
stageStats.getRawInputPositions(),

stageStats.getCumulativeUserMemory(),
stageStats.getCumulativeTotalMemory(),
stageStats.getUserMemoryReservation(),
stageStats.getTotalMemoryReservation(),
succinctBytes(getPeakUserMemoryInBytes()),
Expand Down Expand Up @@ -969,6 +970,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getBlockedDrivers(),
queryStats.getCompletedDrivers(),
queryStats.getCumulativeUserMemory(),
queryStats.getCumulativeTotalMemory(),
queryStats.getUserMemoryReservation(),
queryStats.getTotalMemoryReservation(),
queryStats.getPeakUserMemoryReservation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class QueryStats
private final int completedDrivers;

private final double cumulativeUserMemory;
private final double cumulativeTotalMemory;
private final DataSize userMemoryReservation;
private final DataSize totalMemoryReservation;
private final DataSize peakUserMemoryReservation;
Expand Down Expand Up @@ -139,6 +140,7 @@ public QueryStats(
@JsonProperty("completedDrivers") int completedDrivers,

@JsonProperty("cumulativeUserMemory") double cumulativeUserMemory,
@JsonProperty("cumulativeTotalMemory") double cumulativeTotalMemory,
@JsonProperty("userMemoryReservation") DataSize userMemoryReservation,
@JsonProperty("totalMemoryReservation") DataSize totalMemoryReservation,
@JsonProperty("peakUserMemoryReservation") DataSize peakUserMemoryReservation,
Expand Down Expand Up @@ -212,6 +214,8 @@ public QueryStats(
this.completedDrivers = completedDrivers;
checkArgument(cumulativeUserMemory >= 0, "cumulativeUserMemory is negative");
this.cumulativeUserMemory = cumulativeUserMemory;
checkArgument(cumulativeTotalMemory >= 0, "cumulativeTotalMemory is negative");
this.cumulativeTotalMemory = cumulativeTotalMemory;
this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null");
this.totalMemoryReservation = requireNonNull(totalMemoryReservation, "totalMemoryReservation is null");
this.peakUserMemoryReservation = requireNonNull(peakUserMemoryReservation, "peakUserMemoryReservation is null");
Expand Down Expand Up @@ -273,6 +277,7 @@ public static QueryStats create(
int completedDrivers = 0;

double cumulativeUserMemory = 0;
double cumulativeTotalMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand Down Expand Up @@ -318,6 +323,7 @@ public static QueryStats create(
completedDrivers += stageExecutionStats.getCompletedDrivers();

cumulativeUserMemory += stageExecutionStats.getCumulativeUserMemory();
cumulativeTotalMemory += stageExecutionStats.getCumulativeTotalMemory();
userMemoryReservation += stageExecutionStats.getUserMemoryReservation().toBytes();
totalMemoryReservation += stageExecutionStats.getTotalMemoryReservation().toBytes();
totalScheduledTime += stageExecutionStats.getTotalScheduledTime().roundTo(MILLISECONDS);
Expand Down Expand Up @@ -397,6 +403,7 @@ public static QueryStats create(
completedDrivers,

cumulativeUserMemory,
cumulativeTotalMemory,
succinctBytes(userMemoryReservation),
succinctBytes(totalMemoryReservation),
peakUserMemoryReservation,
Expand Down Expand Up @@ -481,6 +488,7 @@ public static QueryStats immediateFailureQueryStats()
0,
0,
0,
0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand Down Expand Up @@ -649,6 +657,12 @@ public double getCumulativeUserMemory()
return cumulativeUserMemory;
}

@JsonProperty
public double getCumulativeTotalMemory()
{
return cumulativeTotalMemory;
}

@JsonProperty
public DataSize getUserMemoryReservation()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public static StageExecutionInfo create(
int completedDrivers = 0;

double cumulativeUserMemory = 0;
double cumulativeTotalMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand Down Expand Up @@ -122,6 +123,7 @@ public static StageExecutionInfo create(
completedDrivers += taskStats.getCompletedDrivers();

cumulativeUserMemory += taskStats.getCumulativeUserMemory();
cumulativeTotalMemory += taskStats.getCumulativeTotalMemory();

long taskUserMemory = taskStats.getUserMemoryReservationInBytes();
long taskSystemMemory = taskStats.getSystemMemoryReservationInBytes();
Expand Down Expand Up @@ -187,6 +189,7 @@ public static StageExecutionInfo create(
completedDrivers,

cumulativeUserMemory,
cumulativeTotalMemory,
succinctBytes(userMemoryReservation),
succinctBytes(totalMemoryReservation),
peakUserMemoryReservation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public BasicStageExecutionStats getBasicStageStats(Supplier<Iterable<TaskInfo>>
int completedDrivers = 0;

double cumulativeUserMemory = 0;
double cumulativeTotalMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand Down Expand Up @@ -300,6 +301,7 @@ public BasicStageExecutionStats getBasicStageStats(Supplier<Iterable<TaskInfo>>
rawInputPositions,

cumulativeUserMemory,
cumulativeTotalMemory,
succinctBytes(userMemoryReservation),
succinctBytes(totalMemoryReservation),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class StageExecutionStats
private final int completedDrivers;

private final double cumulativeUserMemory;
private final double cumulativeTotalMemory;
private final DataSize userMemoryReservation;
private final DataSize totalMemoryReservation;
private final DataSize peakUserMemoryReservation;
Expand Down Expand Up @@ -110,6 +111,7 @@ public StageExecutionStats(
@JsonProperty("completedDrivers") int completedDrivers,

@JsonProperty("cumulativeUserMemory") double cumulativeUserMemory,
@JsonProperty("cumulativeTotalMemory") double cumulativeTotalMemory,
@JsonProperty("userMemoryReservation") DataSize userMemoryReservation,
@JsonProperty("totalMemoryReservation") DataSize totalMemoryReservation,
@JsonProperty("peakUserMemoryReservation") DataSize peakUserMemoryReservation,
Expand Down Expand Up @@ -167,6 +169,8 @@ public StageExecutionStats(
this.completedDrivers = completedDrivers;
checkArgument(cumulativeUserMemory >= 0, "cumulativeUserMemory is negative");
this.cumulativeUserMemory = cumulativeUserMemory;
checkArgument(cumulativeTotalMemory >= 0, "cumulativeTotalMemory is negative");
this.cumulativeTotalMemory = cumulativeTotalMemory;
this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null");
this.totalMemoryReservation = requireNonNull(totalMemoryReservation, "totalMemoryReservation is null");
this.peakUserMemoryReservation = requireNonNull(peakUserMemoryReservation, "peakUserMemoryReservation is null");
Expand Down Expand Up @@ -279,6 +283,12 @@ public double getCumulativeUserMemory()
return cumulativeUserMemory;
}

@JsonProperty
public double getCumulativeTotalMemory()
{
return cumulativeTotalMemory;
}

@JsonProperty
public DataSize getUserMemoryReservation()
{
Expand Down Expand Up @@ -423,6 +433,7 @@ public BasicStageExecutionStats toBasicStageStats(StageExecutionState stageExecu
rawInputDataSize,
rawInputPositions,
cumulativeUserMemory,
cumulativeTotalMemory,
userMemoryReservation,
totalMemoryReservation,
totalCpuTime,
Expand All @@ -449,6 +460,7 @@ public static StageExecutionStats zero(int stageId)
0,
0,
0,
0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,17 @@ public class TaskContext

private final Object cumulativeMemoryLock = new Object();
private final AtomicDouble cumulativeUserMemory = new AtomicDouble(0.0);
private final AtomicDouble cumulativeTotalMemory = new AtomicDouble(0.0);

private final AtomicLong peakTotalMemoryInBytes = new AtomicLong(0);
private final AtomicLong peakUserMemoryInBytes = new AtomicLong(0);

@GuardedBy("cumulativeMemoryLock")
private long lastUserMemoryReservation;

@GuardedBy("cumulativeMemoryLock")
private long lastTotalMemoryReservation;

@GuardedBy("cumulativeMemoryLock")
private long lastTaskStatCallNanos;

Expand Down Expand Up @@ -507,11 +511,14 @@ public TaskStats getTaskStats()
lastTaskStatCallNanos = startNanos;
}
double sinceLastPeriodMillis = (System.nanoTime() - lastTaskStatCallNanos) / 1_000_000.0;
long averageMemoryForLastPeriod = (userMemory + lastUserMemoryReservation) / 2;
cumulativeUserMemory.addAndGet(averageMemoryForLastPeriod * sinceLastPeriodMillis);
long averageUserMemoryForLastPeriod = (userMemory + lastUserMemoryReservation) / 2;
long averageTotalMemoryForLastPeriod = (userMemory + systemMemory + lastTotalMemoryReservation) / 2;
cumulativeUserMemory.addAndGet(averageUserMemoryForLastPeriod * sinceLastPeriodMillis);
cumulativeTotalMemory.addAndGet(averageTotalMemoryForLastPeriod * sinceLastPeriodMillis);

lastTaskStatCallNanos = System.nanoTime();
lastUserMemoryReservation = userMemory;
lastTotalMemoryReservation = systemMemory + userMemory;
}

Set<PipelineStats> runningPipelineStats = pipelineStats.stream()
Expand Down Expand Up @@ -539,6 +546,7 @@ public TaskStats getTaskStats()
blockedDrivers,
completedDrivers,
cumulativeUserMemory.get(),
cumulativeTotalMemory.get(),
userMemory,
taskMemoryContext.getRevocableMemory(),
systemMemory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class TaskStats
private final int completedDrivers;

private final double cumulativeUserMemory;
private final double cumulativeTotalMemory;
private final long userMemoryReservationInBytes;
private final long revocableMemoryReservationInBytes;
private final long systemMemoryReservationInBytes;
Expand Down Expand Up @@ -98,6 +99,7 @@ public TaskStats(DateTime createTime, DateTime endTime)
0,
0,
0.0,
0.0,
0L,
0L,
0L,
Expand Down Expand Up @@ -141,6 +143,7 @@ public TaskStats(
@JsonProperty("completedDrivers") int completedDrivers,

@JsonProperty("cumulativeUserMemory") double cumulativeUserMemory,
@JsonProperty("cumulativeTotalMemory") double cumulativeTotalMemory,
@JsonProperty("userMemoryReservation") long userMemoryReservationInBytes,
@JsonProperty("revocableMemoryReservationInBytes") long revocableMemoryReservationInBytes,
@JsonProperty("systemMemoryReservationInBytes") long systemMemoryReservationInBytes,
Expand Down Expand Up @@ -200,6 +203,7 @@ public TaskStats(
this.completedDrivers = completedDrivers;

this.cumulativeUserMemory = cumulativeUserMemory;
this.cumulativeTotalMemory = cumulativeTotalMemory;
this.userMemoryReservationInBytes = userMemoryReservationInBytes;
this.revocableMemoryReservationInBytes = revocableMemoryReservationInBytes;
this.systemMemoryReservationInBytes = systemMemoryReservationInBytes;
Expand Down Expand Up @@ -319,6 +323,12 @@ public double getCumulativeUserMemory()
return cumulativeUserMemory;
}

@JsonProperty
public double getCumulativeTotalMemory()
{
return cumulativeTotalMemory;
}

@JsonProperty
public long getUserMemoryReservationInBytes()
{
Expand Down Expand Up @@ -481,6 +491,7 @@ public TaskStats summarize()
blockedDrivers,
completedDrivers,
cumulativeUserMemory,
cumulativeTotalMemory,
userMemoryReservationInBytes,
revocableMemoryReservationInBytes,
systemMemoryReservationInBytes,
Expand Down Expand Up @@ -523,6 +534,7 @@ public TaskStats summarizeFinal()
blockedDrivers,
completedDrivers,
cumulativeUserMemory,
cumulativeTotalMemory,
userMemoryReservationInBytes,
revocableMemoryReservationInBytes,
systemMemoryReservationInBytes,
Expand Down
Loading