diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 93a88804ec0d..4b0accd72a28 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -449,15 +449,27 @@ public void updateMemoryUsage( long taskRevocableMemoryInBytes, long taskTotalMemoryInBytes) { - currentUserMemory.addAndGet(deltaUserMemoryInBytes); - currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes); - currentTotalMemory.addAndGet(deltaTotalMemoryInBytes); - peakUserMemory.updateAndGet(currentPeakValue -> Math.max(currentUserMemory.get(), currentPeakValue)); - peakRevocableMemory.updateAndGet(currentPeakValue -> Math.max(currentRevocableMemory.get(), currentPeakValue)); - peakTotalMemory.updateAndGet(currentPeakValue -> Math.max(currentTotalMemory.get(), currentPeakValue)); - peakTaskUserMemory.accumulateAndGet(taskUserMemoryInBytes, Math::max); - peakTaskRevocableMemory.accumulateAndGet(taskRevocableMemoryInBytes, Math::max); - peakTaskTotalMemory.accumulateAndGet(taskTotalMemoryInBytes, Math::max); + long currentUserMemory = this.currentUserMemory.addAndGet(deltaUserMemoryInBytes); + long currentRevocableMemory = this.currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes); + long currentTotalMemory = this.currentTotalMemory.addAndGet(deltaTotalMemoryInBytes); + if (currentUserMemory > peakUserMemory.get()) { + peakUserMemory.accumulateAndGet(currentUserMemory, Math::max); + } + if (currentRevocableMemory > peakRevocableMemory.get()) { + peakRevocableMemory.accumulateAndGet(currentRevocableMemory, Math::max); + } + if (currentTotalMemory > peakTotalMemory.get()) { + peakTotalMemory.accumulateAndGet(currentTotalMemory, Math::max); + } + if (taskUserMemoryInBytes > peakTaskUserMemory.get()) { + peakTaskUserMemory.accumulateAndGet(taskUserMemoryInBytes, Math::max); + } + if (taskRevocableMemoryInBytes > peakTaskRevocableMemory.get()) { + peakTaskRevocableMemory.accumulateAndGet(taskRevocableMemoryInBytes, Math::max); + } + if (taskTotalMemoryInBytes > peakTaskTotalMemory.get()) { + peakTaskTotalMemory.accumulateAndGet(taskTotalMemoryInBytes, Math::max); + } } public BasicQueryInfo getBasicQueryInfo(Optional rootStage) diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index f6ca17ab64d3..0d3b9d8d187e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -243,11 +243,15 @@ public long getTotalMemoryReservation() public void updateMemoryUsage(long deltaUserMemoryInBytes, long deltaRevocableMemoryInBytes, long deltaTotalMemoryInBytes) { - currentUserMemory.addAndGet(deltaUserMemoryInBytes); - currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes); + long currentUserMemory = this.currentUserMemory.addAndGet(deltaUserMemoryInBytes); + long currentRevocableMemory = this.currentRevocableMemory.addAndGet(deltaRevocableMemoryInBytes); currentTotalMemory.addAndGet(deltaTotalMemoryInBytes); - peakUserMemory.updateAndGet(currentPeakValue -> max(currentUserMemory.get(), currentPeakValue)); - peakRevocableMemory.updateAndGet(currentPeakValue -> max(currentRevocableMemory.get(), currentPeakValue)); + if (currentUserMemory > peakUserMemory.get()) { + peakUserMemory.accumulateAndGet(currentUserMemory, Math::max); + } + if (currentRevocableMemory > peakRevocableMemory.get()) { + peakRevocableMemory.accumulateAndGet(currentRevocableMemory, Math::max); + } } public BasicStageStats getBasicStageStats(Supplier> taskInfosSupplier) diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java index 2431cc5f01f5..7953eae1efc0 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBufferMemoryManager.java @@ -39,7 +39,7 @@ * - the memory pool is exhausted */ @ThreadSafe -class OutputBufferMemoryManager +final class OutputBufferMemoryManager { private static final ListenableFuture NOT_BLOCKED = immediateVoidFuture(); @@ -90,7 +90,7 @@ public void updateMemoryUsage(long bytesAdded) ListenableFuture waitForMemory = null; SettableFuture notifyUnblocked = null; - long currentBufferedBytes; + final long currentBufferedBytes; synchronized (this) { // If closed is true, that means the task is completed. In that state, // the output buffers already ignore the newly added pages, and therefore @@ -99,9 +99,9 @@ public void updateMemoryUsage(long bytesAdded) return; } - currentBufferedBytes = bufferedBytes.updateAndGet(bytes -> { - long result = bytes + bytesAdded; - checkArgument(result >= 0, "bufferedBytes (%s) plus delta (%s) would be negative", bytes, bytesAdded); + currentBufferedBytes = bufferedBytes.accumulateAndGet(bytesAdded, (bufferedBytes, delta) -> { + long result = bufferedBytes + delta; + checkArgument(result >= 0, "bufferedBytes (%s) plus delta (%s) would be negative", bufferedBytes, delta); return result; }); ListenableFuture blockedOnMemory = memoryContext.setBytes(currentBufferedBytes); @@ -121,9 +121,12 @@ public void updateMemoryUsage(long bytesAdded) this.bufferBlockedFuture = null; } } - recordBufferUtilization(); + recordBufferUtilization(currentBufferedBytes); + } + // Reduce contention by reading first and only updating if the new value might become the maximum (uncommon) + if (currentBufferedBytes > peakMemoryUsage.get()) { + peakMemoryUsage.accumulateAndGet(currentBufferedBytes, Math::max); } - peakMemoryUsage.accumulateAndGet(currentBufferedBytes, Math::max); // Notify listeners outside of the critical section notifyListener(notifyUnblocked); if (waitForMemory != null) { @@ -131,13 +134,13 @@ public void updateMemoryUsage(long bytesAdded) } } - private synchronized void recordBufferUtilization() + private synchronized void recordBufferUtilization(long currentBufferedBytes) { long recordTime = ticker.read(); if (lastBufferUtilizationRecordTime != -1) { bufferUtilization.add(lastBufferUtilization, (double) recordTime - this.lastBufferUtilizationRecordTime); } - double utilization = getUtilization(); + double utilization = getUtilization(currentBufferedBytes); // skip recording of buffer utilization until data is put into buffer if (lastBufferUtilizationRecordTime != -1 || utilization != 0.0) { lastBufferUtilizationRecordTime = recordTime; @@ -187,13 +190,18 @@ public long getBufferedBytes() public double getUtilization() { - return bufferedBytes.get() / (double) maxBufferedBytes; + return getUtilization(bufferedBytes.get()); + } + + private double getUtilization(long currentBufferedBytes) + { + return currentBufferedBytes / (double) maxBufferedBytes; } public synchronized TDigest getUtilizationHistogram() { // always get most up to date histogram - recordBufferUtilization(); + recordBufferUtilization(bufferedBytes.get()); return TDigest.copyOf(bufferUtilization); } diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index ffa1053a5efd..18926bd39334 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -350,9 +350,15 @@ private void updatePeakMemoryReservations() // Here, the total memory used to be user+system, and sans revocable. This apparent inconsistency should be removed. // Perhaps, we don't need to track "total memory" here. long totalMemory = userMemory; - peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max); - peakRevocableMemoryReservation.accumulateAndGet(revocableMemory, Math::max); - peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max); + if (userMemory > peakUserMemoryReservation.get()) { + peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max); + } + if (revocableMemory > peakRevocableMemoryReservation.get()) { + peakRevocableMemoryReservation.accumulateAndGet(revocableMemory, Math::max); + } + if (totalMemory > peakTotalMemoryReservation.get()) { + peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max); + } } public long getReservedRevocableBytes() diff --git a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java index a16b66075ebd..8733907d2c34 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java @@ -414,7 +414,9 @@ private void updateMemoryUsage() { long pageSinkMemoryUsage = pageSink.getMemoryUsage(); pageSinkMemoryContext.setBytes(pageSinkMemoryUsage); - pageSinkPeakMemoryUsage.accumulateAndGet(pageSinkMemoryUsage, Math::max); + if (pageSinkMemoryUsage > pageSinkPeakMemoryUsage.get()) { + pageSinkPeakMemoryUsage.accumulateAndGet(pageSinkMemoryUsage, Math::max); + } } @VisibleForTesting diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java index cc5a29d5c474..4d9a71875a03 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java @@ -270,8 +270,11 @@ public DataSize getMemoryReservation() public DataSize getPeakMemoryReservation() { long userMemory = taskMemoryContext.getUserMemory(); - currentPeakUserMemoryReservation.updateAndGet(oldValue -> max(oldValue, userMemory)); - return DataSize.ofBytes(currentPeakUserMemoryReservation.get()); + long currentPeakUserMemoryReservation = this.currentPeakUserMemoryReservation.get(); + if (userMemory > currentPeakUserMemoryReservation) { + currentPeakUserMemoryReservation = this.currentPeakUserMemoryReservation.accumulateAndGet(userMemory, Math::max); + } + return DataSize.ofBytes(currentPeakUserMemoryReservation); } public DataSize getRevocableMemoryReservation()