Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 102 additions & 61 deletions presto-main/src/main/java/com/facebook/presto/memory/MemoryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.weakref.jmx.Managed;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import static com.facebook.presto.memory.context.AbstractAggregatedMemoryContext.FORCE_FREE_TAG;
Expand All @@ -49,25 +49,19 @@ public class MemoryPool
private final MemoryPoolId id;
private final long maxBytes;

@GuardedBy("this")
private long reservedBytes;
@GuardedBy("this")
private long reservedRevocableBytes;
private volatile long reservedBytes;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to mark it with @GuardedBy("this")? cc @aweisberg , @arhimondr

private volatile long reservedRevocableBytes;

@Nullable
@GuardedBy("this")
private NonCancellableMemoryFuture<?> future;

@GuardedBy("this")
// TODO: It would be better if we just tracked QueryContexts, but their lifecycle is managed by a weak reference, so we can't do that
private final Map<QueryId, Long> queryMemoryReservations = new HashMap<>();
private final Map<QueryId, Long> queryMemoryReservations = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three maps (queryMemoryReservations, taggedMemoryAllocations, queryMemoryRevocableReservations) are here to track memory reservations per query. These maps are quite orthogonal to the main function of the MemoryPool class. Instead of trying to optimize locking here by applying various "ninja" techniques i would strongly recommend moving the "per query" related memory tracking to the QueryContext. QueryContext is synchronized on the per query basis, thus the locking impact should be lower. The MemoryPool class should remain only with two long fields that have to be updated under a lock (reservedBytes, reservedRevocableBytes). Updating a long under a global lock should be very lightweight operation, and thus shouldn't cause significant locking contention.

CC @nezihyigitbasi

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arhimondr : This makes sense. But will this be a lot of additional work? -- maybe we can merge this as it is (since it brings some incremental value) and leave the query level syntonization as future work? :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the high risk nature of this change and very limited improvement it brings I would rather postpone it, and do the right refactoring once we have more time.


// This map keeps track of all the tagged allocations, e.g., query-1 -> ['TableScanOperator': 10MB, 'LazyOutputBuffer': 5MB, ...]
@GuardedBy("this")
private final Map<QueryId, Map<String, Long>> taggedMemoryAllocations = new HashMap<>();
private final Map<QueryId, Map<String, Long>> taggedMemoryAllocations = new ConcurrentHashMap<>();

@GuardedBy("this")
private final Map<QueryId, Long> queryMemoryRevocableReservations = new HashMap<>();
private final Map<QueryId, Long> queryMemoryRevocableReservations = new ConcurrentHashMap<>();

private final List<MemoryPoolListener> listeners = new CopyOnWriteArrayList<>();

Expand All @@ -83,7 +77,7 @@ public MemoryPoolId getId()
return id;
}

public synchronized MemoryPoolInfo getInfo()
public MemoryPoolInfo getInfo()
{
Map<QueryId, List<MemoryAllocation>> memoryAllocations = new HashMap<>();
for (Entry<QueryId, Map<String, Long>> entry : taggedMemoryAllocations.entrySet()) {
Expand Down Expand Up @@ -114,11 +108,26 @@ public ListenableFuture<?> reserve(QueryId queryId, String allocationTag, long b
checkArgument(bytes >= 0, "bytes is negative");

ListenableFuture<?> result;
synchronized (this) {
if (bytes != 0) {
queryMemoryReservations.merge(queryId, bytes, Long::sum);
updateTaggedMemoryAllocations(queryId, allocationTag, bytes);
if (bytes != 0) {
while (true) {
if (queryMemoryReservations.computeIfPresent(queryId, (ignored, value) -> value + bytes) != null) {
// queryMemoryReservations is updated,
// use flag putQueryIdAbsent = false to prevent adding new queryId to taggedMemoryAllocations
// in race condition when queryId has been removed
updateTaggedMemoryAllocations(queryId, allocationTag, bytes, false);
break;
}
synchronized (this) {
if (queryMemoryReservations.putIfAbsent(queryId, bytes) == null) {
// queryId is successfully put into queryMemoryReservations,
// safe to update taggedMemoryAllocations
updateTaggedMemoryAllocations(queryId, allocationTag, bytes, true);
break;
}
}
}
}
synchronized (this) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this now needs to be synchronized?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to use synchronization whenever write operation is performed, and let read operation become lock free. Thus, reading is not @GuardedBy("this")

reservedBytes += bytes;
if (getFreeBytes() <= 0) {
if (future == null) {
Expand Down Expand Up @@ -146,10 +155,11 @@ public ListenableFuture<?> reserveRevocable(QueryId queryId, long bytes)
checkArgument(bytes >= 0, "bytes is negative");

ListenableFuture<?> result;

if (bytes != 0) {
queryMemoryRevocableReservations.merge(queryId, bytes, Long::sum);
}
synchronized (this) {
if (bytes != 0) {
queryMemoryRevocableReservations.merge(queryId, bytes, Long::sum);
}
reservedRevocableBytes += bytes;
if (getFreeBytes() <= 0) {
if (future == null) {
Expand Down Expand Up @@ -178,17 +188,32 @@ public boolean tryReserve(QueryId queryId, String allocationTag, long bytes)
return false;
}
reservedBytes += bytes;
if (bytes != 0) {
queryMemoryReservations.merge(queryId, bytes, Long::sum);
updateTaggedMemoryAllocations(queryId, allocationTag, bytes);
}
if (bytes != 0) {
while (true) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i understand what this tries to do (basically either one of the following operation succeed, we are good). But it's a bit mind-twisting :). is this a common practice Java concurrency programming? ;) cc @aweisberg , @arhimondr

if (queryMemoryReservations.computeIfPresent(queryId, (ignored, value) -> value + bytes) != null) {
// queryMemoryReservations is updated,
// use flag putQueryIdAbsent = false to prevent adding new queryId to taggedMemoryAllocations
// in race condition when queryId has been removed
updateTaggedMemoryAllocations(queryId, allocationTag, bytes, false);
break;
}
synchronized (this) {
if (queryMemoryReservations.putIfAbsent(queryId, bytes) == null) {
// queryId is successfully put into queryMemoryReservations,
// safe to update taggedMemoryAllocations
updateTaggedMemoryAllocations(queryId, allocationTag, bytes, true);
break;
}
}
}
}

onMemoryReserved();
return true;
}

public synchronized void free(QueryId queryId, String allocationTag, long bytes)
public void free(QueryId queryId, String allocationTag, long bytes)
Comment thread
aweisberg marked this conversation as resolved.
Outdated
{
checkArgument(bytes >= 0, "bytes is negative");
checkArgument(reservedBytes >= bytes, "tried to free more memory than is reserved");
Expand All @@ -197,26 +222,32 @@ public synchronized void free(QueryId queryId, String allocationTag, long bytes)
return;
}

Long queryReservation = queryMemoryReservations.get(queryId);
requireNonNull(queryReservation, "queryReservation is null");
checkArgument(queryReservation - bytes >= 0, "tried to free more memory than is reserved by query");
queryReservation -= bytes;
Long queryReservation = queryMemoryReservations.merge(queryId, -bytes, Long::sum);
checkArgument(queryReservation != -bytes, "queryId does not exist in queryMemoryReservations");
checkArgument(queryReservation >= 0, "tried to free more memory than is reserved by query");
if (queryReservation == 0) {
queryMemoryReservations.remove(queryId);
taggedMemoryAllocations.remove(queryId);
synchronized (this) {
if (queryMemoryReservations.remove(queryId, queryReservation)) {
// queryId is successfully removed from queryMemoryReservations,
// safe to update taggedMemoryAllocations
taggedMemoryAllocations.remove(queryId);
}
}
}
else {
queryMemoryReservations.put(queryId, queryReservation);
updateTaggedMemoryAllocations(queryId, allocationTag, -bytes);
updateTaggedMemoryAllocations(queryId, allocationTag, -bytes, false);
}
reservedBytes -= bytes;
if (getFreeBytes() > 0 && future != null) {
future.set(null);
future = null;

synchronized (this) {
reservedBytes -= bytes;
if (getFreeBytes() > 0 && future != null) {
future.set(null);
future = null;
}
}
}

public synchronized void freeRevocable(QueryId queryId, long bytes)
public void freeRevocable(QueryId queryId, long bytes)
{
checkArgument(bytes >= 0, "bytes is negative");
checkArgument(reservedRevocableBytes >= bytes, "tried to free more revocable memory than is reserved");
Expand All @@ -225,20 +256,19 @@ public synchronized void freeRevocable(QueryId queryId, long bytes)
return;
}

Long queryReservation = queryMemoryRevocableReservations.get(queryId);
requireNonNull(queryReservation, "queryReservation is null");
checkArgument(queryReservation - bytes >= 0, "tried to free more revocable memory than is reserved by query");
queryReservation -= bytes;
Long queryReservation = queryMemoryRevocableReservations.merge(queryId, -bytes, Long::sum);
checkArgument(queryReservation != -bytes, "queryId does not exist in queryMemoryRevocableReservations");
checkArgument(queryReservation >= 0, "tried to free more revocable memory than is reserved by query");
if (queryReservation == 0) {
queryMemoryRevocableReservations.remove(queryId);
queryMemoryRevocableReservations.remove(queryId, queryReservation);
}
else {
queryMemoryRevocableReservations.put(queryId, queryReservation);
}
reservedRevocableBytes -= bytes;
if (getFreeBytes() > 0 && future != null) {
future.set(null);
future = null;

Comment thread
viczhang861 marked this conversation as resolved.
Outdated
synchronized (this) {
reservedRevocableBytes -= bytes;
if (getFreeBytes() > 0 && future != null) {
future.set(null);
future = null;
}
}
}

Expand All @@ -256,49 +286,51 @@ synchronized ListenableFuture<?> moveQuery(QueryId queryId, MemoryPool targetMem
free(queryId, MOVE_QUERY_TAG, originalReserved);
targetMemoryPool.reserveRevocable(queryId, originalRevocableReserved);
freeRevocable(queryId, originalRevocableReserved);
targetMemoryPool.taggedMemoryAllocations.put(queryId, taggedAllocations);
if (taggedAllocations != null) {
targetMemoryPool.taggedMemoryAllocations.put(queryId, taggedAllocations);
}
return future;
}

/**
* Returns the number of free bytes. This value may be negative, which indicates that the pool is over-committed.
*/
@Managed
public synchronized long getFreeBytes()
public long getFreeBytes()
{
return maxBytes - reservedBytes - reservedRevocableBytes;
}

@Managed
public synchronized long getMaxBytes()
public long getMaxBytes()
{
return maxBytes;
}

@Managed
public synchronized long getReservedBytes()
public long getReservedBytes()
{
return reservedBytes;
}

@Managed
public synchronized long getReservedRevocableBytes()
public long getReservedRevocableBytes()
{
return reservedRevocableBytes;
}

synchronized long getQueryMemoryReservation(QueryId queryId)
long getQueryMemoryReservation(QueryId queryId)
{
return queryMemoryReservations.getOrDefault(queryId, 0L);
}

synchronized long getQueryRevocableMemoryReservation(QueryId queryId)
long getQueryRevocableMemoryReservation(QueryId queryId)
{
return queryMemoryRevocableReservations.getOrDefault(queryId, 0L);
}

@Override
public synchronized String toString()
public String toString()
{
return toStringHelper(this)
.add("id", id)
Expand Down Expand Up @@ -331,13 +363,22 @@ public boolean cancel(boolean mayInterruptIfRunning)
}
}

private synchronized void updateTaggedMemoryAllocations(QueryId queryId, String allocationTag, long delta)
private void updateTaggedMemoryAllocations(QueryId queryId, String allocationTag, long delta, boolean putQueryIdIfAbsent)
{
if (delta == 0) {
return;
}

Map<String, Long> allocations = taggedMemoryAllocations.computeIfAbsent(queryId, ignored -> new HashMap<>());
Map<String, Long> allocations;
if (putQueryIdIfAbsent) {
allocations = taggedMemoryAllocations.computeIfAbsent(queryId, ignored -> new ConcurrentHashMap<>());
}
else {
allocations = taggedMemoryAllocations.get(queryId);
if (allocations == null) {
return;
}
}
allocations.compute(allocationTag, (ignored, oldValue) -> {
if (oldValue == null) {
return delta;
Expand All @@ -351,14 +392,14 @@ private synchronized void updateTaggedMemoryAllocations(QueryId queryId, String
}

@VisibleForTesting
synchronized Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations()
Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations()
{
return taggedMemoryAllocations.keySet().stream()
.collect(toImmutableMap(identity(), this::getTaggedMemoryAllocations));
}

@VisibleForTesting
synchronized Map<String, Long> getTaggedMemoryAllocations(QueryId targetQueryId)
Map<String, Long> getTaggedMemoryAllocations(QueryId targetQueryId)
{
if (taggedMemoryAllocations.get(targetQueryId) == null) {
return null;
Expand Down