From 96acec2367275ecfce2545445c764109c99c53fb Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 18 Jul 2022 15:54:50 +0200 Subject: [PATCH] Update retained size without causing lock congestion --- .../trino/operator/DirectExchangeClient.java | 53 +++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java index 5808845c96a..32415aa1d3f 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java +++ b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java @@ -41,6 +41,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Sets.newConcurrentHashSet; @@ -79,7 +82,12 @@ public class DirectExchangeClient @GuardedBy("this") private boolean closed; - private final LocalMemoryContext memoryContext; + @GuardedBy("memoryContextLock") + @Nullable + private LocalMemoryContext memoryContext; + private final ReadWriteLock memoryContextLock = new ReentrantReadWriteLock(); + private final Lock memoryContextReadLock = memoryContextLock.readLock(); + private final Lock memoryContextWriteLock = memoryContextLock.writeLock(); private final Executor pageBufferClientCallbackExecutor; private final TaskFailureListener taskFailureListener; @@ -217,12 +225,9 @@ public Slice pollPage() return null; } - synchronized (this) { - if (!closed) { - memoryContext.setBytes(buffer.getRetainedSizeInBytes()); - scheduleRequestIfNecessary(); - } - } + // updating retained memory might be expensive, therefore it needs to be updated outside of exclusive lock + updateRetainedMemory(); + scheduleRequestIfNecessary(); // Return the page even if the client is closed. // A concurrent thread may have responded to the `isFinished` change @@ -253,7 +258,7 @@ public synchronized void close() log.warn(e, "error closing buffer"); } finally { - memoryContext.setBytes(0); + releaseMemoryContext(); } } @@ -300,6 +305,8 @@ private boolean addPages(HttpPageBufferClient client, List pages) } // Buffer may already be closed at this point. In such situation the buffer is expected to simply ignore this call. buffer.addPages(client.getRemoteTaskId(), pages); + // updating retained memory might be expensive, therefore it needs to be updated outside of exclusive lock + updateRetainedMemory(); } synchronized (this) { @@ -310,13 +317,39 @@ private boolean addPages(HttpPageBufferClient client, List pages) successfulRequests++; // AVG_n = AVG_(n-1) * (n-1)/n + VALUE_n / n averageBytesPerRequest = (long) (1.0 * averageBytesPerRequest * (successfulRequests - 1) / successfulRequests + responseSize / successfulRequests); - - memoryContext.setBytes(buffer.getRetainedSizeInBytes()); } return true; } + private void updateRetainedMemory() + { + memoryContextReadLock.lock(); + try { + if (memoryContext != null) { + memoryContext.setBytes(buffer.getRetainedSizeInBytes()); + } + } + finally { + memoryContextReadLock.unlock(); + } + } + + private void releaseMemoryContext() + { + memoryContextWriteLock.lock(); + try { + if (memoryContext != null) { + memoryContext.setBytes(0); + // prevent further memory allocations + memoryContext = null; + } + } + finally { + memoryContextWriteLock.unlock(); + } + } + private synchronized void requestComplete(HttpPageBufferClient client) { if (!completedClients.contains(client) && !queuedClients.contains(client)) {