Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class BasicStageExecutionStats
new DataSize(0, BYTE),
0,

0,
0.0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),

Expand All @@ -63,7 +63,7 @@ public class BasicStageExecutionStats
private final int completedDrivers;
private final DataSize rawInputDataSize;
private final long rawInputPositions;
private final long cumulativeUserMemory;
private final double cumulativeUserMemory;
private final DataSize userMemoryReservation;
private final DataSize totalMemoryReservation;
private final Duration totalCpuTime;
Expand All @@ -84,7 +84,7 @@ public BasicStageExecutionStats(
DataSize rawInputDataSize,
long rawInputPositions,

long cumulativeUserMemory,
double cumulativeUserMemory,
DataSize userMemoryReservation,
DataSize totalMemoryReservation,

Expand Down Expand Up @@ -151,7 +151,7 @@ public long getRawInputPositions()
return rawInputPositions;
}

public long getCumulativeUserMemory()
public double getCumulativeUserMemory()
{
return cumulativeUserMemory;
}
Expand Down Expand Up @@ -203,7 +203,7 @@ public static BasicStageExecutionStats aggregateBasicStageStats(Iterable<BasicSt
int runningDrivers = 0;
int completedDrivers = 0;

long cumulativeUserMemory = 0;
double cumulativeUserMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public static QueryStats create(
int blockedDrivers = 0;
int completedDrivers = 0;

long cumulativeUserMemory = 0;
double cumulativeUserMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static StageExecutionInfo create(
int blockedDrivers = 0;
int completedDrivers = 0;

long cumulativeUserMemory = 0;
double cumulativeUserMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public BasicStageExecutionStats getBasicStageStats(Supplier<Iterable<TaskInfo>>
int runningDrivers = 0;
int completedDrivers = 0;

long cumulativeUserMemory = 0;
double cumulativeUserMemory = 0;
long userMemoryReservation = 0;
long totalMemoryReservation = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public BasicStageExecutionStats toBasicStageStats(StageExecutionState stageExecu
completedDrivers,
rawInputDataSize,
rawInputPositions,
(long) cumulativeUserMemory,
cumulativeUserMemory,
userMemoryReservation,
totalMemoryReservation,
totalCpuTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ public TaskStats getTaskStats()
updatePeakMemory();

synchronized (cumulativeMemoryLock) {
if (lastTaskStatCallNanos == 0) {
lastTaskStatCallNanos = startNanos;
}
double sinceLastPeriodMillis = (System.nanoTime() - lastTaskStatCallNanos) / 1_000_000.0;
long averageMemoryForLastPeriod = (userMemory + lastUserMemoryReservation) / 2;
cumulativeUserMemory.addAndGet(averageMemoryForLastPeriod * sinceLastPeriodMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,22 @@ public void testDestroy()
assertOperatorMemoryAllocations(operatorContext.getOperatorMemoryContext(), 0, 0, 0);
}

@Test
public void testCumulativeUserMemoryEstimation()
{
LocalMemoryContext userMemory = operatorContext.localUserMemoryContext();
long userMemoryBytes = 100_000_000;
userMemory.setBytes(userMemoryBytes);
long startTime = System.nanoTime();
double cumulativeUserMemory = taskContext.getTaskStats().getCumulativeUserMemory();
long endTime = System.nanoTime();

double elapsedTimeInMillis = (endTime - startTime) / 1_000_000.0;
long averageMemoryForLastPeriod = userMemoryBytes / 2;

assertTrue(cumulativeUserMemory < elapsedTimeInMillis * averageMemoryForLastPeriod);
}

private void assertStats(
OperatorStats operatorStats,
DriverStats driverStats,
Expand Down