Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.newConcurrentHashSet;
Expand Down Expand Up @@ -79,7 +82,12 @@ public class DirectExchangeClient
@GuardedBy("this")
private boolean closed;

private final LocalMemoryContext memoryContext;
@GuardedBy("memoryContextLock")
@Nullable
private LocalMemoryContext memoryContext;
private final ReadWriteLock memoryContextLock = new ReentrantReadWriteLock();
private final Lock memoryContextReadLock = memoryContextLock.readLock();
private final Lock memoryContextWriteLock = memoryContextLock.writeLock();
private final Executor pageBufferClientCallbackExecutor;
private final TaskFailureListener taskFailureListener;

Expand Down Expand Up @@ -217,12 +225,9 @@ public Slice pollPage()
return null;
}

synchronized (this) {
if (!closed) {
memoryContext.setBytes(buffer.getRetainedSizeInBytes());
scheduleRequestIfNecessary();
}
}
// updating retained memory might be expensive, therefore it needs to be updated outside of exclusive lock
updateRetainedMemory();
scheduleRequestIfNecessary();

// Return the page even if the client is closed.
// A concurrent thread may have responded to the `isFinished` change
Expand Down Expand Up @@ -253,7 +258,7 @@ public synchronized void close()
log.warn(e, "error closing buffer");
}
finally {
memoryContext.setBytes(0);
releaseMemoryContext();
}
}

Expand Down Expand Up @@ -300,6 +305,8 @@ private boolean addPages(HttpPageBufferClient client, List<Slice> pages)
}
// Buffer may already be closed at this point. In such situation the buffer is expected to simply ignore this call.
buffer.addPages(client.getRemoteTaskId(), pages);
// updating retained memory might be expensive, therefore it needs to be updated outside of exclusive lock
updateRetainedMemory();
}

synchronized (this) {
Expand All @@ -310,13 +317,39 @@ private boolean addPages(HttpPageBufferClient client, List<Slice> pages)
successfulRequests++;
// AVG_n = AVG_(n-1) * (n-1)/n + VALUE_n / n
averageBytesPerRequest = (long) (1.0 * averageBytesPerRequest * (successfulRequests - 1) / successfulRequests + responseSize / successfulRequests);

memoryContext.setBytes(buffer.getRetainedSizeInBytes());
}

return true;
}

private void updateRetainedMemory()
{
memoryContextReadLock.lock();
try {
if (memoryContext != null) {
memoryContext.setBytes(buffer.getRetainedSizeInBytes());
}
}
finally {
memoryContextReadLock.unlock();
}
}

private void releaseMemoryContext()
{
memoryContextWriteLock.lock();
try {
if (memoryContext != null) {
memoryContext.setBytes(0);
// prevent further memory allocations
memoryContext = null;
}
}
finally {
memoryContextWriteLock.unlock();
}
}

private synchronized void requestComplete(HttpPageBufferClient client)
{
if (!completedClients.contains(client) && !queuedClients.contains(client)) {
Expand Down