Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 55 additions & 63 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ int64_t SharedArbitrator::minGrowCapacity(const MemoryPool& pool) const {

uint64_t SharedArbitrator::growCapacity(
MemoryPool* pool,
uint64_t targetBytes) {
uint64_t requestBytes) {
std::lock_guard<std::mutex> l(mutex_);
++numReserves_;
const int64_t maxBytesToReserve =
std::min<int64_t>(maxGrowCapacity(*pool), targetBytes);
std::min<int64_t>(maxGrowCapacity(*pool), requestBytes);
const int64_t minBytesToReserve = minGrowCapacity(*pool);
uint64_t reservedBytes =
decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve);
Expand Down Expand Up @@ -283,45 +283,44 @@ uint64_t SharedArbitrator::decrementFreeCapacityLocked(

uint64_t SharedArbitrator::shrinkCapacity(
MemoryPool* pool,
uint64_t targetBytes) {
uint64_t requestBytes) {
std::lock_guard<std::mutex> l(mutex_);
++numReleases_;
const uint64_t freedBytes = shrinkPool(pool, targetBytes);
const uint64_t freedBytes = shrinkPool(pool, requestBytes);
incrementFreeCapacityLocked(freedBytes);
return freedBytes;
}

uint64_t SharedArbitrator::shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes,
uint64_t requestBytes,
bool allowSpill,
bool allowAbort) {
incrementGlobalArbitrationCount();
ArbitrationOperation op(targetBytes, pools);
requestBytes = requestBytes == 0 ? capacity_ : requestBytes;
ArbitrationOperation op(requestBytes, pools);
ScopedArbitration scopedArbitration(this, &op);
if (targetBytes == 0) {
targetBytes = capacity_;
} else {
targetBytes = std::max(memoryPoolTransferCapacity_, targetBytes);
}

uint64_t fastReclaimTargetBytes =
std::max(memoryPoolTransferCapacity_, requestBytes);

std::lock_guard<std::shared_mutex> exclusiveLock(arbitrationLock_);
getCandidateStats(&op);
uint64_t freedBytes =
reclaimFreeMemoryFromCandidates(&op, targetBytes, false);
reclaimFreeMemoryFromCandidates(&op, fastReclaimTargetBytes, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::max(memoryPoolTransferCapacity_, requestBytes)? directly use it here?

auto freeGuard = folly::makeGuard([&]() {
// Returns the freed memory capacity back to the arbitrator.
if (freedBytes > 0) {
incrementFreeCapacity(freedBytes);
}
});
if (freedBytes >= targetBytes) {
if (freedBytes >= op.requestBytes) {
return freedBytes;
}
RECORD_METRIC_VALUE(kMetricArbitratorSlowGlobalArbitrationCount);
if (allowSpill) {
freedBytes +=
reclaimUsedMemoryFromCandidatesBySpill(&op, targetBytes - freedBytes);
if (freedBytes >= targetBytes) {
reclaimUsedMemoryFromCandidatesBySpill(&op, freedBytes);
if (freedBytes >= op.requestBytes) {
return freedBytes;
}
if (allowAbort) {
Expand All @@ -330,8 +329,7 @@ uint64_t SharedArbitrator::shrinkCapacity(
}
}
if (allowAbort) {
freedBytes +=
reclaimUsedMemoryFromCandidatesByAbort(&op, targetBytes - freedBytes);
reclaimUsedMemoryFromCandidatesByAbort(&op, freedBytes);
}
return freedBytes;
}
Expand All @@ -348,8 +346,8 @@ uint64_t SharedArbitrator::testingNumRequests() const {
bool SharedArbitrator::growCapacity(
MemoryPool* pool,
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) {
ArbitrationOperation op(pool, targetBytes, candidatePools);
uint64_t requestBytes) {
ArbitrationOperation op(pool, requestBytes, candidatePools);
ScopedArbitration scopedArbitration(this, &op);

bool needGlobalArbitration{false};
Expand Down Expand Up @@ -390,12 +388,12 @@ bool SharedArbitrator::runLocalArbitration(
VELOX_MEM_LOG(ERROR) << "Can't grow " << op->requestRoot->name()
<< " capacity to "
<< succinctBytes(
op->requestRoot->capacity() + op->targetBytes)
op->requestRoot->capacity() + op->requestBytes)
<< " which exceeds its max capacity "
<< succinctBytes(op->requestRoot->maxCapacity())
<< ", current capacity "
<< succinctBytes(op->requestRoot->capacity())
<< ", request " << succinctBytes(op->targetBytes);
<< ", request " << succinctBytes(op->requestBytes);
return false;
}
VELOX_CHECK(!op->requestRoot->aborted());
Expand All @@ -415,8 +413,8 @@ bool SharedArbitrator::runLocalArbitration(
incrementFreeCapacity(freedBytes);
}
});
if (freedBytes >= op->targetBytes) {
checkedGrow(op->requestRoot, freedBytes, op->targetBytes);
if (freedBytes >= op->requestBytes) {
checkedGrow(op->requestRoot, freedBytes, op->requestBytes);
freedBytes = 0;
return true;
}
Expand All @@ -425,9 +423,9 @@ bool SharedArbitrator::runLocalArbitration(
getCandidateStats(op, true);
freedBytes +=
reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, true);
if (freedBytes >= op->targetBytes) {
if (freedBytes >= op->requestBytes) {
const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
Expand All @@ -438,9 +436,9 @@ bool SharedArbitrator::runLocalArbitration(
}
checkIfAborted(op);

if (freedBytes >= op->targetBytes) {
if (freedBytes >= op->requestBytes) {
const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
Expand Down Expand Up @@ -482,7 +480,7 @@ bool SharedArbitrator::runGlobalArbitration(ArbitrationOperation* op) {
VELOX_MEM_LOG(ERROR)
<< "Failed to arbitrate sufficient memory for memory pool "
<< op->requestRoot->name() << ", request "
<< succinctBytes(op->targetBytes) << " after " << attempts
<< succinctBytes(op->requestBytes) << " after " << attempts
<< " attempts, Arbitrator state: " << toString();
updateArbitrationFailureStats();
return false;
Expand All @@ -494,7 +492,7 @@ void SharedArbitrator::getGrowTargets(
uint64_t& minGrowTarget) {
maxGrowTarget = std::min(
maxGrowCapacity(*op->requestRoot),
std::max(memoryPoolTransferCapacity_, op->targetBytes));
std::max(memoryPoolTransferCapacity_, op->requestBytes));
minGrowTarget = minGrowCapacity(*op->requestRoot);
}

Expand All @@ -506,30 +504,30 @@ void SharedArbitrator::checkIfAborted(ArbitrationOperation* op) {
}

bool SharedArbitrator::maybeGrowFromSelf(ArbitrationOperation* op) {
if (op->requestRoot->freeBytes() >= op->targetBytes) {
if (growPool(op->requestRoot, 0, op->targetBytes)) {
if (op->requestRoot->freeBytes() >= op->requestBytes) {
if (growPool(op->requestRoot, 0, op->requestBytes)) {
return true;
}
}
return false;
}

bool SharedArbitrator::checkCapacityGrowth(ArbitrationOperation* op) const {
return (maxGrowCapacity(*op->requestRoot) >= op->targetBytes) &&
(capacityAfterGrowth(*op->requestRoot, op->targetBytes) <= capacity_);
return (maxGrowCapacity(*op->requestRoot) >= op->requestBytes) &&
(capacityAfterGrowth(*op->requestRoot, op->requestBytes) <= capacity_);
}

bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) {
if ((op->targetBytes > capacity_) ||
(op->targetBytes > op->requestRoot->maxCapacity())) {
if ((op->requestBytes > capacity_) ||
(op->requestBytes > op->requestRoot->maxCapacity())) {
return false;
}
if (checkCapacityGrowth(op)) {
return true;
}

const uint64_t reclaimedBytes =
reclaim(op->requestRoot, op->targetBytes, true);
reclaim(op->requestRoot, op->requestBytes, true);
// NOTE: return the reclaimed bytes back to the arbitrator and let the memory
// arbitration process to grow the requestor's memory capacity accordingly.
incrementFreeCapacity(reclaimedBytes);
Expand All @@ -543,7 +541,7 @@ bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) {

bool SharedArbitrator::handleOOM(ArbitrationOperation* op) {
MemoryPool* victim = findCandidateWithLargestCapacity(
op->requestRoot, op->targetBytes, op->candidates)
op->requestRoot, op->requestBytes, op->candidates)
.pool;
if (op->requestRoot == victim) {
VELOX_MEM_LOG(ERROR)
Expand All @@ -557,10 +555,10 @@ bool SharedArbitrator::handleOOM(ArbitrationOperation* op) {
try {
if (victim == op->requestRoot) {
VELOX_MEM_POOL_CAP_EXCEEDED(
memoryPoolAbortMessage(victim, op->requestRoot, op->targetBytes));
memoryPoolAbortMessage(victim, op->requestRoot, op->requestBytes));
} else {
VELOX_MEM_POOL_ABORTED(
memoryPoolAbortMessage(victim, op->requestRoot, op->targetBytes));
memoryPoolAbortMessage(victim, op->requestRoot, op->requestBytes));
}
} catch (VeloxRuntimeError&) {
abort(victim, std::current_exception());
Expand Down Expand Up @@ -597,8 +595,8 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) {
incrementFreeCapacity(freedBytes);
}
});
if (freedBytes >= op->targetBytes) {
checkedGrow(op->requestRoot, freedBytes, op->targetBytes);
if (freedBytes >= op->requestBytes) {
checkedGrow(op->requestRoot, freedBytes, op->requestBytes);
freedBytes = 0;
return true;
}
Expand All @@ -609,31 +607,30 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) {

freedBytes +=
reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, false);
if (freedBytes >= op->targetBytes) {
if (freedBytes >= op->requestBytes) {
const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
VELOX_CHECK_LT(freedBytes, maxGrowTarget);

RECORD_METRIC_VALUE(kMetricArbitratorSlowGlobalArbitrationCount);
freedBytes +=
reclaimUsedMemoryFromCandidatesBySpill(op, maxGrowTarget - freedBytes);
reclaimUsedMemoryFromCandidatesBySpill(op, freedBytes);
checkIfAborted(op);

if (freedBytes < op->targetBytes) {
if (freedBytes < op->requestBytes) {
VELOX_MEM_LOG(WARNING)
<< "Failed to arbitrate sufficient memory for memory pool "
<< op->requestRoot->name() << ", request "
<< succinctBytes(op->targetBytes) << ", only "
<< succinctBytes(op->requestBytes) << ", only "
<< succinctBytes(freedBytes)
<< " has been freed, Arbitrator state: " << toString();
return false;
}

const uint64_t bytesToGrow = std::min(freedBytes, maxGrowTarget);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
Expand Down Expand Up @@ -676,36 +673,32 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
return reclaimedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill(
void SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill(
ArbitrationOperation* op,
uint64_t reclaimTargetBytes) {
uint64_t& freedBytes) {
// Sort candidate memory pools based on their reclaimable used capacity.
sortCandidatesByReclaimableUsedCapacity(op->candidates);

uint64_t reclaimedBytes{0};
for (const auto& candidate : op->candidates) {
VELOX_CHECK_LT(reclaimedBytes, reclaimTargetBytes);
VELOX_CHECK_LT(freedBytes, op->requestBytes);
if (candidate.reclaimableBytes == 0) {
break;
}
reclaimedBytes +=
reclaim(candidate.pool, reclaimTargetBytes - reclaimedBytes, false);
if ((reclaimedBytes >= reclaimTargetBytes) ||
freedBytes += reclaim(candidate.pool, op->requestBytes - freedBytes, false);
if ((freedBytes >= op->requestBytes) ||
(op->requestRoot != nullptr && op->requestRoot->aborted())) {
break;
}
}
return reclaimedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
void SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
ArbitrationOperation* op,
uint64_t reclaimTargetBytes) {
uint64_t& freedBytes) {
sortCandidatesByUsage(op->candidates);

uint64_t freedBytes{0};
for (const auto& candidate : op->candidates) {
VELOX_CHECK_LT(freedBytes, reclaimTargetBytes);
VELOX_CHECK_LT(freedBytes, op->requestBytes);
if (candidate.pool->capacity() == 0) {
break;
}
Expand All @@ -720,11 +713,10 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
abort(candidate.pool, std::current_exception());
}
freedBytes += shrinkPool(candidate.pool, 0);
if (freedBytes >= reclaimTargetBytes) {
if (freedBytes >= op->requestBytes) {
break;
}
}
return freedBytes;
}

uint64_t SharedArbitrator::reclaim(
Expand Down
Loading