Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -449,15 +449,27 @@ public void updateMemoryUsage(
long taskRevocableMemoryInBytes,
long taskTotalMemoryInBytes)
{
currentUserMemory.addAndGet(deltaUserMemoryInBytes);
currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes);
currentTotalMemory.addAndGet(deltaTotalMemoryInBytes);
peakUserMemory.updateAndGet(currentPeakValue -> Math.max(currentUserMemory.get(), currentPeakValue));
peakRevocableMemory.updateAndGet(currentPeakValue -> Math.max(currentRevocableMemory.get(), currentPeakValue));
peakTotalMemory.updateAndGet(currentPeakValue -> Math.max(currentTotalMemory.get(), currentPeakValue));
peakTaskUserMemory.accumulateAndGet(taskUserMemoryInBytes, Math::max);
peakTaskRevocableMemory.accumulateAndGet(taskRevocableMemoryInBytes, Math::max);
peakTaskTotalMemory.accumulateAndGet(taskTotalMemoryInBytes, Math::max);
long currentUserMemory = this.currentUserMemory.addAndGet(deltaUserMemoryInBytes);
long currentRevocableMemory = this.currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes);
long currentTotalMemory = this.currentTotalMemory.addAndGet(deltaTotalMemoryInBytes);
if (currentUserMemory > peakUserMemory.get()) {
peakUserMemory.accumulateAndGet(currentUserMemory, Math::max);
}
if (currentRevocableMemory > peakRevocableMemory.get()) {
peakRevocableMemory.accumulateAndGet(currentRevocableMemory, Math::max);
}
if (currentTotalMemory > peakTotalMemory.get()) {
peakTotalMemory.accumulateAndGet(currentTotalMemory, Math::max);
}
if (taskUserMemoryInBytes > peakTaskUserMemory.get()) {
peakTaskUserMemory.accumulateAndGet(taskUserMemoryInBytes, Math::max);
}
if (taskRevocableMemoryInBytes > peakTaskRevocableMemory.get()) {
peakTaskRevocableMemory.accumulateAndGet(taskRevocableMemoryInBytes, Math::max);
}
if (taskTotalMemoryInBytes > peakTaskTotalMemory.get()) {
peakTaskTotalMemory.accumulateAndGet(taskTotalMemoryInBytes, Math::max);
}
}

public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageStats> rootStage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,15 @@ public long getTotalMemoryReservation()

public void updateMemoryUsage(long deltaUserMemoryInBytes, long deltaRevocableMemoryInBytes, long deltaTotalMemoryInBytes)
{
currentUserMemory.addAndGet(deltaUserMemoryInBytes);
currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes);
long currentUserMemory = this.currentUserMemory.addAndGet(deltaUserMemoryInBytes);
long currentRevocableMemory = this.currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes);
currentTotalMemory.addAndGet(deltaTotalMemoryInBytes);
peakUserMemory.updateAndGet(currentPeakValue -> max(currentUserMemory.get(), currentPeakValue));
peakRevocableMemory.updateAndGet(currentPeakValue -> max(currentRevocableMemory.get(), currentPeakValue));
if (currentUserMemory > peakUserMemory.get()) {
peakUserMemory.accumulateAndGet(currentUserMemory, Math::max);
}
if (currentRevocableMemory > peakRevocableMemory.get()) {
peakRevocableMemory.accumulateAndGet(currentRevocableMemory, Math::max);
}
}

public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* - the memory pool is exhausted
*/
@ThreadSafe
class OutputBufferMemoryManager
final class OutputBufferMemoryManager
{
private static final ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();

Expand Down Expand Up @@ -90,7 +90,7 @@ public void updateMemoryUsage(long bytesAdded)

ListenableFuture<Void> waitForMemory = null;
SettableFuture<Void> notifyUnblocked = null;
long currentBufferedBytes;
final long currentBufferedBytes;
synchronized (this) {
// If closed is true, that means the task is completed. In that state,
// the output buffers already ignore the newly added pages, and therefore
Expand All @@ -99,9 +99,9 @@ public void updateMemoryUsage(long bytesAdded)
return;
}

currentBufferedBytes = bufferedBytes.updateAndGet(bytes -> {
long result = bytes + bytesAdded;
checkArgument(result >= 0, "bufferedBytes (%s) plus delta (%s) would be negative", bytes, bytesAdded);
currentBufferedBytes = bufferedBytes.accumulateAndGet(bytesAdded, (bufferedBytes, delta) -> {
long result = bufferedBytes + delta;
checkArgument(result >= 0, "bufferedBytes (%s) plus delta (%s) would be negative", bufferedBytes, delta);
return result;
});
ListenableFuture<Void> blockedOnMemory = memoryContext.setBytes(currentBufferedBytes);
Expand All @@ -121,23 +121,26 @@ public void updateMemoryUsage(long bytesAdded)
this.bufferBlockedFuture = null;
}
}
recordBufferUtilization();
recordBufferUtilization(currentBufferedBytes);
}
// Reduce contention by reading first and only updating if the new value might become the maximum (uncommon)
if (currentBufferedBytes > peakMemoryUsage.get()) {
peakMemoryUsage.accumulateAndGet(currentBufferedBytes, Math::max);
}
peakMemoryUsage.accumulateAndGet(currentBufferedBytes, Math::max);
// Notify listeners outside of the critical section
notifyListener(notifyUnblocked);
if (waitForMemory != null) {
waitForMemory.addListener(this::onMemoryAvailable, notificationExecutor);
}
}

private synchronized void recordBufferUtilization()
private synchronized void recordBufferUtilization(long currentBufferedBytes)
{
long recordTime = ticker.read();
if (lastBufferUtilizationRecordTime != -1) {
bufferUtilization.add(lastBufferUtilization, (double) recordTime - this.lastBufferUtilizationRecordTime);
}
double utilization = getUtilization();
double utilization = getUtilization(currentBufferedBytes);
// skip recording of buffer utilization until data is put into buffer
if (lastBufferUtilizationRecordTime != -1 || utilization != 0.0) {
lastBufferUtilizationRecordTime = recordTime;
Expand Down Expand Up @@ -187,13 +190,18 @@ public long getBufferedBytes()

public double getUtilization()
{
return bufferedBytes.get() / (double) maxBufferedBytes;
return getUtilization(bufferedBytes.get());
}

private double getUtilization(long currentBufferedBytes)
{
return currentBufferedBytes / (double) maxBufferedBytes;
}

public synchronized TDigest getUtilizationHistogram()
{
// always get most up to date histogram
recordBufferUtilization();
recordBufferUtilization(bufferedBytes.get());
return TDigest.copyOf(bufferUtilization);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,15 @@ private void updatePeakMemoryReservations()
// Here, the total memory used to be user+system, and sans revocable. This apparent inconsistency should be removed.
// Perhaps, we don't need to track "total memory" here.
long totalMemory = userMemory;
peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max);
peakRevocableMemoryReservation.accumulateAndGet(revocableMemory, Math::max);
peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max);
if (userMemory > peakUserMemoryReservation.get()) {
peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max);
}
if (revocableMemory > peakRevocableMemoryReservation.get()) {
peakRevocableMemoryReservation.accumulateAndGet(revocableMemory, Math::max);
}
if (totalMemory > peakTotalMemoryReservation.get()) {
peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max);
}
}

public long getReservedRevocableBytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ private void updateMemoryUsage()
{
long pageSinkMemoryUsage = pageSink.getMemoryUsage();
pageSinkMemoryContext.setBytes(pageSinkMemoryUsage);
pageSinkPeakMemoryUsage.accumulateAndGet(pageSinkMemoryUsage, Math::max);
if (pageSinkMemoryUsage > pageSinkPeakMemoryUsage.get()) {
pageSinkPeakMemoryUsage.accumulateAndGet(pageSinkMemoryUsage, Math::max);
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,11 @@ public DataSize getMemoryReservation()
public DataSize getPeakMemoryReservation()
{
long userMemory = taskMemoryContext.getUserMemory();
currentPeakUserMemoryReservation.updateAndGet(oldValue -> max(oldValue, userMemory));
return DataSize.ofBytes(currentPeakUserMemoryReservation.get());
long currentPeakUserMemoryReservation = this.currentPeakUserMemoryReservation.get();
if (userMemory > currentPeakUserMemoryReservation) {
currentPeakUserMemoryReservation = this.currentPeakUserMemoryReservation.accumulateAndGet(userMemory, Math::max);
}
return DataSize.ofBytes(currentPeakUserMemoryReservation);
}

public DataSize getRevocableMemoryReservation()
Expand Down