From aeaef23ba10e03fcc64c9bb8b9bb9f68b535fdbc Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Mon, 1 Jul 2024 21:01:31 -0700 Subject: [PATCH] Push target inside reclaim functions in arbitrator --- velox/common/memory/SharedArbitrator.cpp | 118 +++++++++++------------ velox/common/memory/SharedArbitrator.h | 26 ++--- 2 files changed, 68 insertions(+), 76 deletions(-) diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 2e6096dc199..9569d52a6d0 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -238,11 +238,11 @@ int64_t SharedArbitrator::minGrowCapacity(const MemoryPool& pool) const { uint64_t SharedArbitrator::growCapacity( MemoryPool* pool, - uint64_t targetBytes) { + uint64_t requestBytes) { std::lock_guard l(mutex_); ++numReserves_; const int64_t maxBytesToReserve = - std::min(maxGrowCapacity(*pool), targetBytes); + std::min(maxGrowCapacity(*pool), requestBytes); const int64_t minBytesToReserve = minGrowCapacity(*pool); uint64_t reservedBytes = decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); @@ -283,45 +283,44 @@ uint64_t SharedArbitrator::decrementFreeCapacityLocked( uint64_t SharedArbitrator::shrinkCapacity( MemoryPool* pool, - uint64_t targetBytes) { + uint64_t requestBytes) { std::lock_guard l(mutex_); ++numReleases_; - const uint64_t freedBytes = shrinkPool(pool, targetBytes); + const uint64_t freedBytes = shrinkPool(pool, requestBytes); incrementFreeCapacityLocked(freedBytes); return freedBytes; } uint64_t SharedArbitrator::shrinkCapacity( const std::vector>& pools, - uint64_t targetBytes, + uint64_t requestBytes, bool allowSpill, bool allowAbort) { incrementGlobalArbitrationCount(); - ArbitrationOperation op(targetBytes, pools); + requestBytes = requestBytes == 0 ? capacity_ : requestBytes; + ArbitrationOperation op(requestBytes, pools); ScopedArbitration scopedArbitration(this, &op); - if (targetBytes == 0) { - targetBytes = capacity_; - } else { - targetBytes = std::max(memoryPoolTransferCapacity_, targetBytes); - } + + uint64_t fastReclaimTargetBytes = + std::max(memoryPoolTransferCapacity_, requestBytes); + std::lock_guard exclusiveLock(arbitrationLock_); getCandidateStats(&op); uint64_t freedBytes = - reclaimFreeMemoryFromCandidates(&op, targetBytes, false); + reclaimFreeMemoryFromCandidates(&op, fastReclaimTargetBytes, false); auto freeGuard = folly::makeGuard([&]() { // Returns the freed memory capacity back to the arbitrator. if (freedBytes > 0) { incrementFreeCapacity(freedBytes); } }); - if (freedBytes >= targetBytes) { + if (freedBytes >= op.requestBytes) { return freedBytes; } RECORD_METRIC_VALUE(kMetricArbitratorSlowGlobalArbitrationCount); if (allowSpill) { - freedBytes += - reclaimUsedMemoryFromCandidatesBySpill(&op, targetBytes - freedBytes); - if (freedBytes >= targetBytes) { + reclaimUsedMemoryFromCandidatesBySpill(&op, freedBytes); + if (freedBytes >= op.requestBytes) { return freedBytes; } if (allowAbort) { @@ -330,8 +329,7 @@ uint64_t SharedArbitrator::shrinkCapacity( } } if (allowAbort) { - freedBytes += - reclaimUsedMemoryFromCandidatesByAbort(&op, targetBytes - freedBytes); + reclaimUsedMemoryFromCandidatesByAbort(&op, freedBytes); } return freedBytes; } @@ -348,8 +346,8 @@ uint64_t SharedArbitrator::testingNumRequests() const { bool SharedArbitrator::growCapacity( MemoryPool* pool, const std::vector>& candidatePools, - uint64_t targetBytes) { - ArbitrationOperation op(pool, targetBytes, candidatePools); + uint64_t requestBytes) { + ArbitrationOperation op(pool, requestBytes, candidatePools); ScopedArbitration scopedArbitration(this, &op); bool needGlobalArbitration{false}; @@ -390,12 +388,12 @@ bool SharedArbitrator::runLocalArbitration( VELOX_MEM_LOG(ERROR) << "Can't grow " << op->requestRoot->name() << " capacity to " << succinctBytes( - op->requestRoot->capacity() + op->targetBytes) + op->requestRoot->capacity() + op->requestBytes) << " which exceeds its max capacity " << succinctBytes(op->requestRoot->maxCapacity()) << ", current capacity " << succinctBytes(op->requestRoot->capacity()) - << ", request " << succinctBytes(op->targetBytes); + << ", request " << succinctBytes(op->requestBytes); return false; } VELOX_CHECK(!op->requestRoot->aborted()); @@ -415,8 +413,8 @@ bool SharedArbitrator::runLocalArbitration( incrementFreeCapacity(freedBytes); } }); - if (freedBytes >= op->targetBytes) { - checkedGrow(op->requestRoot, freedBytes, op->targetBytes); + if (freedBytes >= op->requestBytes) { + checkedGrow(op->requestRoot, freedBytes, op->requestBytes); freedBytes = 0; return true; } @@ -425,9 +423,9 @@ bool SharedArbitrator::runLocalArbitration( getCandidateStats(op, true); freedBytes += reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, true); - if (freedBytes >= op->targetBytes) { + if (freedBytes >= op->requestBytes) { const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes); - checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes); + checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes); freedBytes -= bytesToGrow; return true; } @@ -438,9 +436,9 @@ bool SharedArbitrator::runLocalArbitration( } checkIfAborted(op); - if (freedBytes >= op->targetBytes) { + if (freedBytes >= op->requestBytes) { const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes); - checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes); + checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes); freedBytes -= bytesToGrow; return true; } @@ -482,7 +480,7 @@ bool SharedArbitrator::runGlobalArbitration(ArbitrationOperation* op) { VELOX_MEM_LOG(ERROR) << "Failed to arbitrate sufficient memory for memory pool " << op->requestRoot->name() << ", request " - << succinctBytes(op->targetBytes) << " after " << attempts + << succinctBytes(op->requestBytes) << " after " << attempts << " attempts, Arbitrator state: " << toString(); updateArbitrationFailureStats(); return false; @@ -494,7 +492,7 @@ void SharedArbitrator::getGrowTargets( uint64_t& minGrowTarget) { maxGrowTarget = std::min( maxGrowCapacity(*op->requestRoot), - std::max(memoryPoolTransferCapacity_, op->targetBytes)); + std::max(memoryPoolTransferCapacity_, op->requestBytes)); minGrowTarget = minGrowCapacity(*op->requestRoot); } @@ -506,8 +504,8 @@ void SharedArbitrator::checkIfAborted(ArbitrationOperation* op) { } bool SharedArbitrator::maybeGrowFromSelf(ArbitrationOperation* op) { - if (op->requestRoot->freeBytes() >= op->targetBytes) { - if (growPool(op->requestRoot, 0, op->targetBytes)) { + if (op->requestRoot->freeBytes() >= op->requestBytes) { + if (growPool(op->requestRoot, 0, op->requestBytes)) { return true; } } @@ -515,13 +513,13 @@ bool SharedArbitrator::maybeGrowFromSelf(ArbitrationOperation* op) { } bool SharedArbitrator::checkCapacityGrowth(ArbitrationOperation* op) const { - return (maxGrowCapacity(*op->requestRoot) >= op->targetBytes) && - (capacityAfterGrowth(*op->requestRoot, op->targetBytes) <= capacity_); + return (maxGrowCapacity(*op->requestRoot) >= op->requestBytes) && + (capacityAfterGrowth(*op->requestRoot, op->requestBytes) <= capacity_); } bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) { - if ((op->targetBytes > capacity_) || - (op->targetBytes > op->requestRoot->maxCapacity())) { + if ((op->requestBytes > capacity_) || + (op->requestBytes > op->requestRoot->maxCapacity())) { return false; } if (checkCapacityGrowth(op)) { @@ -529,7 +527,7 @@ bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) { } const uint64_t reclaimedBytes = - reclaim(op->requestRoot, op->targetBytes, true); + reclaim(op->requestRoot, op->requestBytes, true); // NOTE: return the reclaimed bytes back to the arbitrator and let the memory // arbitration process to grow the requestor's memory capacity accordingly. incrementFreeCapacity(reclaimedBytes); @@ -543,7 +541,7 @@ bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) { bool SharedArbitrator::handleOOM(ArbitrationOperation* op) { MemoryPool* victim = findCandidateWithLargestCapacity( - op->requestRoot, op->targetBytes, op->candidates) + op->requestRoot, op->requestBytes, op->candidates) .pool; if (op->requestRoot == victim) { VELOX_MEM_LOG(ERROR) @@ -557,10 +555,10 @@ bool SharedArbitrator::handleOOM(ArbitrationOperation* op) { try { if (victim == op->requestRoot) { VELOX_MEM_POOL_CAP_EXCEEDED( - memoryPoolAbortMessage(victim, op->requestRoot, op->targetBytes)); + memoryPoolAbortMessage(victim, op->requestRoot, op->requestBytes)); } else { VELOX_MEM_POOL_ABORTED( - memoryPoolAbortMessage(victim, op->requestRoot, op->targetBytes)); + memoryPoolAbortMessage(victim, op->requestRoot, op->requestBytes)); } } catch (VeloxRuntimeError&) { abort(victim, std::current_exception()); @@ -597,8 +595,8 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) { incrementFreeCapacity(freedBytes); } }); - if (freedBytes >= op->targetBytes) { - checkedGrow(op->requestRoot, freedBytes, op->targetBytes); + if (freedBytes >= op->requestBytes) { + checkedGrow(op->requestRoot, freedBytes, op->requestBytes); freedBytes = 0; return true; } @@ -609,31 +607,30 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) { freedBytes += reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, false); - if (freedBytes >= op->targetBytes) { + if (freedBytes >= op->requestBytes) { const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes); - checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes); + checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes); freedBytes -= bytesToGrow; return true; } VELOX_CHECK_LT(freedBytes, maxGrowTarget); RECORD_METRIC_VALUE(kMetricArbitratorSlowGlobalArbitrationCount); - freedBytes += - reclaimUsedMemoryFromCandidatesBySpill(op, maxGrowTarget - freedBytes); + reclaimUsedMemoryFromCandidatesBySpill(op, freedBytes); checkIfAborted(op); - if (freedBytes < op->targetBytes) { + if (freedBytes < op->requestBytes) { VELOX_MEM_LOG(WARNING) << "Failed to arbitrate sufficient memory for memory pool " << op->requestRoot->name() << ", request " - << succinctBytes(op->targetBytes) << ", only " + << succinctBytes(op->requestBytes) << ", only " << succinctBytes(freedBytes) << " has been freed, Arbitrator state: " << toString(); return false; } const uint64_t bytesToGrow = std::min(freedBytes, maxGrowTarget); - checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes); + checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes); freedBytes -= bytesToGrow; return true; } @@ -676,36 +673,32 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( return reclaimedBytes; } -uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill( +void SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill( ArbitrationOperation* op, - uint64_t reclaimTargetBytes) { + uint64_t& freedBytes) { // Sort candidate memory pools based on their reclaimable used capacity. sortCandidatesByReclaimableUsedCapacity(op->candidates); - uint64_t reclaimedBytes{0}; for (const auto& candidate : op->candidates) { - VELOX_CHECK_LT(reclaimedBytes, reclaimTargetBytes); + VELOX_CHECK_LT(freedBytes, op->requestBytes); if (candidate.reclaimableBytes == 0) { break; } - reclaimedBytes += - reclaim(candidate.pool, reclaimTargetBytes - reclaimedBytes, false); - if ((reclaimedBytes >= reclaimTargetBytes) || + freedBytes += reclaim(candidate.pool, op->requestBytes - freedBytes, false); + if ((freedBytes >= op->requestBytes) || (op->requestRoot != nullptr && op->requestRoot->aborted())) { break; } } - return reclaimedBytes; } -uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort( +void SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort( ArbitrationOperation* op, - uint64_t reclaimTargetBytes) { + uint64_t& freedBytes) { sortCandidatesByUsage(op->candidates); - uint64_t freedBytes{0}; for (const auto& candidate : op->candidates) { - VELOX_CHECK_LT(freedBytes, reclaimTargetBytes); + VELOX_CHECK_LT(freedBytes, op->requestBytes); if (candidate.pool->capacity() == 0) { break; } @@ -720,11 +713,10 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort( abort(candidate.pool, std::current_exception()); } freedBytes += shrinkPool(candidate.pool, 0); - if (freedBytes >= reclaimTargetBytes) { + if (freedBytes >= op->requestBytes) { break; } } - return freedBytes; } uint64_t SharedArbitrator::reclaim( diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index c4d8630d3c2..c7b7bbce26f 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -43,18 +43,18 @@ class SharedArbitrator : public memory::MemoryArbitrator { static void unregisterFactory(); - uint64_t growCapacity(MemoryPool* pool, uint64_t targetBytes) final; + uint64_t growCapacity(MemoryPool* pool, uint64_t requestBytes) final; bool growCapacity( MemoryPool* pool, const std::vector>& candidatePools, - uint64_t targetBytes) final; + uint64_t requestBytes) final; - uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) final; + uint64_t shrinkCapacity(MemoryPool* pool, uint64_t requestBytes) final; uint64_t shrinkCapacity( const std::vector>& pools, - uint64_t targetBytes, + uint64_t requestBytes, bool allowSpill = true, bool force = false) override final; @@ -108,7 +108,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { MemoryPool* const requestPool; MemoryPool* const requestRoot; const std::vector>& candidatePools; - const uint64_t targetBytes; + const uint64_t requestBytes; // The start time of this arbitration operation. const std::chrono::steady_clock::time_point startTime; @@ -125,18 +125,18 @@ class SharedArbitrator : public memory::MemoryArbitrator { uint64_t globalArbitrationLockWaitTimeUs{0}; ArbitrationOperation( - uint64_t targetBytes, + uint64_t requestBytes, const std::vector>& candidatePools) - : ArbitrationOperation(nullptr, targetBytes, candidatePools) {} + : ArbitrationOperation(nullptr, requestBytes, candidatePools) {} ArbitrationOperation( MemoryPool* _requestor, - uint64_t _targetBytes, + uint64_t _requestBytes, const std::vector>& _candidatePools) : requestPool(_requestor), requestRoot(_requestor == nullptr ? nullptr : _requestor->root()), candidatePools(_candidatePools), - targetBytes(_targetBytes), + requestBytes(_requestBytes), startTime(std::chrono::steady_clock::now()) {} uint64_t waitTimeUs() const { @@ -251,15 +251,15 @@ class SharedArbitrator : public memory::MemoryArbitrator { // // NOTE: the function might sort 'candidates' based on each candidate's // reclaimable memory internally. - uint64_t reclaimUsedMemoryFromCandidatesBySpill( + void reclaimUsedMemoryFromCandidatesBySpill( ArbitrationOperation* op, - uint64_t reclaimTargetBytes); + uint64_t& freedBytes); // Invoked to reclaim used memory capacity from 'candidates' by aborting the // top memory users' queries. - uint64_t reclaimUsedMemoryFromCandidatesByAbort( + void reclaimUsedMemoryFromCandidatesByAbort( ArbitrationOperation* op, - uint64_t reclaimTargetBytes); + uint64_t& freedBytes); // Checks if request pool has been aborted or not. void checkIfAborted(ArbitrationOperation* op);