From 706e6f32cd9f31f01d987f44d42825bb712fd069 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Mon, 24 Jul 2023 15:57:01 -0700 Subject: [PATCH] =?UTF-8?q?Remove=20inheritence=20relationship=20between?= =?UTF-8?q?=20AsyncDataCache=20and=20MemoryAllo=E2=80=A6=20(#5503)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: X-link: https://github.com/prestodb/presto/pull/20372 Removing the relationship of AsyncDataCache inheritance from MemoryAllocator. Now they are depending on each other with registration mechanism. Related tests are refactored to be consistent with the new change. Pull Request resolved: https://github.com/facebookincubator/velox/pull/5503 Reviewed By: xiaoxmeng Differential Revision: D47536273 Pulled By: tanjialiang fbshipit-source-id: 9a02e8dc5e1f05a00d4cd7e2828fdd5e8648669e --- velox/benchmarks/tpch/TpchBenchmark.cpp | 24 ++- velox/common/caching/AsyncDataCache.cpp | 98 ++++------ velox/common/caching/AsyncDataCache.h | 179 +++++++----------- .../caching/tests/AsyncDataCacheTest.cpp | 25 ++- velox/common/caching/tests/SsdFileTest.cpp | 6 +- velox/common/memory/MallocAllocator.cpp | 29 ++- velox/common/memory/MallocAllocator.h | 65 +++---- velox/common/memory/MemoryAllocator.cpp | 67 +++++++ velox/common/memory/MemoryAllocator.h | 105 +++++++--- velox/common/memory/MmapAllocator.cpp | 32 +++- velox/common/memory/MmapAllocator.h | 83 ++++---- velox/common/memory/tests/MemoryPoolTest.cpp | 13 +- .../memory/tests/SharedArbitratorTest.cpp | 2 +- velox/connectors/Connector.h | 13 +- velox/connectors/hive/HiveConnector.h | 2 +- velox/connectors/hive/HiveDataSource.cpp | 8 +- velox/connectors/hive/HiveDataSource.h | 4 +- velox/core/CMakeLists.txt | 1 + velox/core/QueryCtx.cpp | 8 +- velox/core/QueryCtx.h | 14 +- velox/dwio/common/CachedBufferedInput.cpp | 5 +- velox/dwio/dwrf/test/CacheInputTest.cpp | 12 +- velox/examples/CMakeLists.txt | 4 +- velox/exec/Operator.cpp | 2 +- velox/exec/tests/TableScanTest.cpp | 3 +- velox/exec/tests/ThreadDebugInfoTest.cpp | 2 +- velox/exec/tests/utils/AssertQueryBuilder.cpp | 2 +- velox/exec/tests/utils/Cursor.cpp | 2 +- velox/exec/tests/utils/OperatorTestBase.cpp | 30 +-- velox/exec/tests/utils/OperatorTestBase.h | 9 +- 30 files changed, 487 insertions(+), 362 deletions(-) diff --git a/velox/benchmarks/tpch/TpchBenchmark.cpp b/velox/benchmarks/tpch/TpchBenchmark.cpp index 53ba99feb6b..8890a50a8e1 100644 --- a/velox/benchmarks/tpch/TpchBenchmark.cpp +++ b/velox/benchmarks/tpch/TpchBenchmark.cpp @@ -230,9 +230,10 @@ class TpchBenchmark { static_cast(FLAGS_ssd_checkpoint_interval_gb) << 30); } - auto allocator = std::make_shared(options); - allocator_ = std::make_shared( - allocator, memoryBytes, std::move(ssdCache)); + allocator_ = std::make_shared(options); + cache_ = + cache::AsyncDataCache::create(allocator_.get(), std::move(ssdCache)); + cache::AsyncDataCache::setInstance(cache_.get()); memory::MemoryAllocator::setDefaultInstance(allocator_.get()); } functions::prestosql::registerAllScalarFunctions(); @@ -261,6 +262,10 @@ class TpchBenchmark { connector::registerConnector(hiveConnector); } + void shutdown() { + cache_->prepareShutdown(); + } + std::pair, std::vector> run( const TpchPlan& tpchPlan) { int32_t repeat = 0; @@ -382,15 +387,13 @@ class TpchBenchmark { } #endif - auto cache = dynamic_cast(allocator_.get()); - if (cache) { - cache->clear(); + if (cache_) { + cache_->clear(); } } if (FLAGS_clear_ssd_cache) { - auto cache = dynamic_cast(allocator_.get()); - if (cache) { - auto ssdCache = cache->ssdCache(); + if (cache_) { + auto ssdCache = cache_->ssdCache(); if (ssdCache) { ssdCache->clear(); } @@ -462,7 +465,7 @@ class TpchBenchmark { std::unique_ptr ioExecutor_; std::unique_ptr cacheExecutor_; std::shared_ptr allocator_; - + std::shared_ptr cache_; // Parameter combinations to try. Each element specifies a flag and possible // values. All permutations are tried. std::vector parameters_; @@ -578,6 +581,7 @@ int tpchBenchmarkMain() { } else { benchmark.runAllCombinations(); } + benchmark.shutdown(); queryBuilder.reset(); return 0; } diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp index 92d54deebc8..3e0ff2db0c7 100644 --- a/velox/common/caching/AsyncDataCache.cpp +++ b/velox/common/caching/AsyncDataCache.cpp @@ -32,7 +32,7 @@ AsyncDataCacheEntry::AsyncDataCacheEntry(CacheShard* shard) : shard_(shard) { } AsyncDataCacheEntry::~AsyncDataCacheEntry() { - shard_->cache()->freeNonContiguous(data_); + shard_->cache()->allocator()->freeNonContiguous(data_); } void AsyncDataCacheEntry::setExclusiveToShared() { @@ -108,7 +108,7 @@ void AsyncDataCacheEntry::initialize(FileCacheKey key) { tinyData_.clear(); auto sizePages = bits::roundUp(size_, memory::AllocationTraits::kPageSize) / memory::AllocationTraits::kPageSize; - if (cache->allocateNonContiguous(sizePages, data_)) { + if (cache->allocator()->allocateNonContiguous(sizePages, data_)) { cache->incrementCachedPages(data().numPages()); } else { // No memory to cover 'this'. @@ -324,7 +324,7 @@ void CacheShard::removeEntryLocked(AsyncDataCacheEntry* entry) { auto numPages = entry->data().numPages(); if (numPages) { cache_->incrementCachedPages(-numPages); - cache_->freeNonContiguous(entry->data()); + cache_->allocator()->freeNonContiguous(entry->data()); } } } @@ -412,7 +412,7 @@ void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { void CacheShard::freeAllocations(std::vector& allocations) { for (auto& allocation : allocations) { - cache_->freeNonContiguous(allocation); + cache_->allocator()->freeNonContiguous(allocation); } allocations.clear(); } @@ -495,8 +495,7 @@ void CacheShard::appendSsdSaveable(std::vector& pins) { } AsyncDataCache::AsyncDataCache( - const std::shared_ptr& allocator, - uint64_t /* maxBytes */, + memory::MemoryAllocator* allocator, std::unique_ptr ssdCache) : allocator_(allocator), ssdCache_(std::move(ssdCache)), cachedPages_(0) { for (auto i = 0; i < kNumShards; ++i) { @@ -504,15 +503,44 @@ AsyncDataCache::AsyncDataCache( } } -AsyncDataCache::AsyncDataCache( - const std::shared_ptr& allocator, - std::unique_ptr ssdCache) - : allocator_(allocator), ssdCache_(std::move(ssdCache)), cachedPages_(0) { - for (auto i = 0; i < kNumShards; ++i) { - shards_.push_back(std::make_unique(this)); +AsyncDataCache::~AsyncDataCache() {} + +// static +std::shared_ptr AsyncDataCache::create( + memory::MemoryAllocator* allocator, + std::unique_ptr ssdCache) { + auto cache = std::make_shared(allocator, std::move(ssdCache)); + allocator->registerCache(cache); + return cache; +} + +// static +AsyncDataCache* AsyncDataCache::getInstance() { + return *getInstancePtr(); +} + +// static +void AsyncDataCache::setInstance(AsyncDataCache* asyncDataCache) { + *getInstancePtr() = asyncDataCache; +} + +// static +AsyncDataCache** AsyncDataCache::getInstancePtr() { + static AsyncDataCache* cache_{nullptr}; + return &cache_; +} + +void AsyncDataCache::prepareShutdown() { + for (auto& shard : shards_) { + shard->prepareShutdown(); } } +void CacheShard::prepareShutdown() { + entries_.clear(); + freeEntries_.clear(); +} + CachePin AsyncDataCache::findOrCreate( RawFileCacheKey key, uint64_t size, @@ -611,50 +639,6 @@ void AsyncDataCache::backoff(int32_t counter) { std::this_thread::sleep_for(std::chrono::microseconds(usec)); // NOLINT } -bool AsyncDataCache::allocateNonContiguous( - MachinePageCount numPages, - memory::Allocation& out, - ReservationCallback reservationCB, - MachinePageCount minSizeClass) { - return makeSpace(numPages, [&]() { - return allocator_->allocateNonContiguous( - numPages, out, reservationCB, minSizeClass); - }); -} - -bool AsyncDataCache::allocateContiguous( - memory::MachinePageCount numPages, - memory::Allocation* collateral, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB, - memory::MachinePageCount maxPages) { - return makeSpace(numPages, [&]() { - return allocator_->allocateContiguous( - numPages, collateral, allocation, reservationCB, maxPages); - }); -} - -bool AsyncDataCache::growContiguous( - MachinePageCount increment, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB) { - return makeSpace(increment, [&]() { - return allocator_->growContiguous(increment, allocation, reservationCB); - }); -} - -void* AsyncDataCache::allocateBytes(uint64_t bytes, uint16_t alignment) { - void* result = nullptr; - makeSpace( - bits::roundUp(bytes, memory::AllocationTraits::kPageSize) / - memory::AllocationTraits::kPageSize, - [&]() { - result = allocator_->allocateBytes(bytes, alignment); - return result != nullptr; - }); - return result; -} - void AsyncDataCache::incrementNew(uint64_t size) { newBytes_ += size; if (!ssdCache_) { @@ -725,7 +709,7 @@ std::string AsyncDataCache::toString() const { << " read pins " << stats.numShared << " write pins " << stats.numExclusive << " unused prefetch " << stats.numPrefetch << " Alloc Megaclocks " << (stats.allocClocks >> 20) - << " allocated pages " << numAllocated() << " cached pages " + << " allocated pages " << allocator_->numAllocated() << " cached pages " << cachedPages_; out << "\nBacking: " << allocator_->toString(); if (ssdCache_) { diff --git a/velox/common/caching/AsyncDataCache.h b/velox/common/caching/AsyncDataCache.h index 2dd8a9098b2..2a60f8f273f 100644 --- a/velox/common/caching/AsyncDataCache.h +++ b/velox/common/caching/AsyncDataCache.h @@ -506,30 +506,36 @@ struct CacheStats { std::shared_ptr ssdStats = nullptr; }; -// Collection of cache entries whose key hashes to the same shard of -// the hash number space. The cache population is divided into shards -// to decrease contention on the mutex for the key to entry mapping -// and other housekeeping. + +/// Collection of cache entries whose key hashes to the same shard of +/// the hash number space. The cache population is divided into shards +/// to decrease contention on the mutex for the key to entry mapping +/// and other housekeeping. class CacheShard { public: explicit CacheShard(AsyncDataCache* FOLLY_NONNULL cache) : cache_(cache) {} - // See AsyncDataCache::findOrCreate. + /// See AsyncDataCache::findOrCreate. CachePin findOrCreate( RawFileCacheKey key, uint64_t size, folly::SemiFuture* readyFuture); - // Returns true if there is an entry for 'key'. Updates access time. + /// Returns true if there is an entry for 'key'. Updates access time. bool exists(RawFileCacheKey key) const; - AsyncDataCache* cache() { + AsyncDataCache* cache() const { return cache_; } + std::mutex& mutex() { return mutex_; } + /// Release any resources that consume memory from this 'CacheShard' for a + /// graceful shutdown. The shard will no longer be valid after this call. + void prepareShutdown(); + // removes 'bytesToFree' worth of entries or as many entries as are // not pinned. This favors first removing older and less frequently // used entries. If 'evictAllUnpinned' is true, anything that is @@ -609,101 +615,63 @@ class CacheShard { std::atomic allocClocks_; }; -class AsyncDataCache : public memory::MemoryAllocator { +class AsyncDataCache : public memory::Cache { public: - // TODO(jtan6): Remove this constructor after Presto Native switches to below - // constructor AsyncDataCache( - const std::shared_ptr& allocator, - uint64_t maxBytes, + memory::MemoryAllocator* allocator, std::unique_ptr ssdCache = nullptr); - AsyncDataCache( - const std::shared_ptr& allocator, + ~AsyncDataCache() override; + + static std::shared_ptr create( + memory::MemoryAllocator* allocator, std::unique_ptr ssdCache = nullptr); - // Finds or creates a cache entry corresponding to 'key'. The entry - // is returned in 'pin'. If the entry is new, it is pinned in - // exclusive mode and its 'data_' has uninitialized space for at - // least 'size' bytes. If the entry is in cache and already filled, - // the pin is in shared mode. If the entry is in exclusive mode for - // some other pin, the pin is empty. If 'waitFuture' is not nullptr - // and the pin is exclusive on some other pin, this is set to a - // future that is realized when the pin is no longer exclusive. When - // the future is realized, the caller may retry findOrCreate(). - // runtime error with code kNoCacheSpace if there is no space to create the - // new entry after evicting any unpinned content. + static AsyncDataCache* getInstance(); + + static void setInstance(AsyncDataCache* asyncDataCache); + + /// Release any resources that consume memory from 'allocator_' for a graceful + /// shutdown. The cache will no longer be valid after this call. + void prepareShutdown(); + + /// Calls 'allocate' until this returns true. Returns true if + /// allocate returns true. and Tries to evict at least 'numPages' of + /// cache after each failed call to 'allocate'. May pause to wait + /// for SSD cache flush if ''ssdCache_' is set and is busy + /// writing. Does random back-off after several failures and + /// eventually gives up. Allocation must not be serialized by a mutex + /// for memory arbitration to work. + bool makeSpace( + memory::MachinePageCount numPages, + std::function allocate) override; + + memory::MemoryAllocator* allocator() const override { + return allocator_; + } + + /// Finds or creates a cache entry corresponding to 'key'. The entry + /// is returned in 'pin'. If the entry is new, it is pinned in + /// exclusive mode and its 'data_' has uninitialized space for at + /// least 'size' bytes. If the entry is in cache and already filled, + /// the pin is in shared mode. If the entry is in exclusive mode for + /// some other pin, the pin is empty. If 'waitFuture' is not nullptr + /// and the pin is exclusive on some other pin, this is set to a + /// future that is realized when the pin is no longer exclusive. When + /// the future is realized, the caller may retry findOrCreate(). + /// runtime error with code kNoCacheSpace if there is no space to create the + /// new entry after evicting any unpinned content. CachePin findOrCreate( RawFileCacheKey key, uint64_t size, folly::SemiFuture* waitFuture = nullptr); - // Returns true if there is an entry for 'key'. Updates access time. + /// Returns true if there is an entry for 'key'. Updates access time. bool exists(RawFileCacheKey key) const; - Kind kind() const override { - return allocator_->kind(); - } - - size_t capacity() const override { - return allocator_->capacity(); - } - - bool allocateNonContiguous( - memory::MachinePageCount numPages, - memory::Allocation& out, - ReservationCallback reservationCB = nullptr, - memory::MachinePageCount minSizeClass = 0) override; - - int64_t freeNonContiguous(memory::Allocation& allocation) override { - return allocator_->freeNonContiguous(allocation); - } - - bool allocateContiguous( - memory::MachinePageCount numPages, - memory::Allocation* FOLLY_NULLABLE collateral, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, - memory::MachinePageCount maxPages = 0) override; - - void freeContiguous(memory::ContiguousAllocation& allocation) override { - allocator_->freeContiguous(allocation); - } - - bool growContiguous( - memory::MachinePageCount increment, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr) override; - - void* allocateBytes(uint64_t bytes, uint16_t alignment) override; - - void freeBytes(void* p, uint64_t size) noexcept override { - allocator_->freeBytes(p, size); - } - - bool checkConsistency() const override { - return allocator_->checkConsistency(); - } - - const std::vector& sizeClasses() const override { - return allocator_->sizeClasses(); - } - - size_t totalUsedBytes() const override { - return allocator_->totalUsedBytes(); - } - - memory::MachinePageCount numAllocated() const override { - return allocator_->numAllocated(); - } - - memory::MachinePageCount numMapped() const override { - return allocator_->numMapped(); - } - CacheStats refreshStats() const; - std::string toString() const override; + std::string toString() const; memory::MachinePageCount incrementCachedPages(int64_t pages) { // The counter is unsigned and the increment is signed. @@ -719,19 +687,19 @@ class AsyncDataCache : public memory::MemoryAllocator { return ssdCache_.get(); } - // Updates stats for creation of a new cache entry of 'size' bytes, - // i.e. a cache miss. Periodically updates SSD admission criteria, - // i.e. reconsider criteria every half cache capacity worth of misses. + /// Updates stats for creation of a new cache entry of 'size' bytes, + /// i.e. a cache miss. Periodically updates SSD admission criteria, + /// i.e. reconsider criteria every half cache capacity worth of misses. void incrementNew(uint64_t size); - // Updates statistics after bringing in 'bytes' worth of data that - // qualifies for SSD save and is not backed by SSD. Periodically - // triggers a background write of eligible entries to SSD. + /// Updates statistics after bringing in 'bytes' worth of data that + /// qualifies for SSD save and is not backed by SSD. Periodically + /// triggers a background write of eligible entries to SSD. void possibleSsdSave(uint64_t bytes); - // Sets a callback applied to new entries at the point where - // they are set to shared mode. Used for testing and can be used for - // e.g. checking checksums. + /// Sets a callback applied to new entries at the point where + /// they are set to shared mode. Used for testing and can be used for + /// e.g. checking checksums. void setVerifyHook(std::function hook) { verifyHook_ = hook; } @@ -769,29 +737,16 @@ class AsyncDataCache : public memory::MemoryAllocator { return numSkippedSaves_; } - memory::Stats stats() const override { - return allocator_->stats(); - } - private: static constexpr int32_t kNumShards = 4; // Must be power of 2. static constexpr int32_t kShardMask = kNumShards - 1; + static AsyncDataCache** getInstancePtr(); + // Waits a pseudorandom delay times 'counter'. void backoff(int32_t counter); - // Calls 'allocate' until this returns true. Returns true if - // allocate returns true. and Tries to evict at least 'numPages' of - // cache after each failed call to 'allocate'. May pause to wait - // for SSD cache flush if ''ssdCache_' is set and is busy - // writing. Does random back-off after several failures and - // eventually gives up. Allocation must not be serialized by a mutex - // for memory arbitration to work. - bool makeSpace( - memory::MachinePageCount numPages, - std::function allocate); - - std::shared_ptr allocator_; + memory::MemoryAllocator* const allocator_; std::unique_ptr ssdCache_; std::vector> shards_; std::atomic shardCounter_{0}; diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 1c3580a9d86..66bb6dd65cb 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -81,6 +81,7 @@ class AsyncDataCacheTest : public testing::Test { if (ssdCache) { ssdCache->deleteFiles(); } + cache_->prepareShutdown(); } } @@ -104,18 +105,21 @@ class AsyncDataCacheTest : public testing::Test { } memory::MmapAllocator::Options options; options.capacity = maxBytes; - cache_ = std::make_shared( - std::make_shared(options), - maxBytes, - std::move(ssdCache)); + if (cache_) { + cache_->prepareShutdown(); + } + cache_.reset(); + allocator_.reset(); + allocator_ = std::make_shared(options); + cache_ = AsyncDataCache::create(allocator_.get(), std::move(ssdCache)); if (filenames_.empty()) { for (auto i = 0; i < kNumFiles; ++i) { auto name = fmt::format("testing_file_{}", i); filenames_.push_back(StringIdLease(fileIds(), name)); } } - ASSERT_EQ(cache_->kind(), MemoryAllocator::Kind::kMmap); - ASSERT_EQ(MemoryAllocator::kindString(cache_->kind()), "MMAP"); + ASSERT_EQ(cache_->allocator()->kind(), MemoryAllocator::Kind::kMmap); + ASSERT_EQ(MemoryAllocator::kindString(cache_->allocator()->kind()), "MMAP"); } // Finds one entry from RAM, SSD or storage. Throws if the data @@ -222,12 +226,13 @@ class AsyncDataCacheTest : public testing::Test { void clearAllocations(std::deque& allocations) { while (!allocations.empty()) { - cache_->freeNonContiguous(allocations.front()); + allocator_->freeNonContiguous(allocations.front()); allocations.pop_front(); } } std::shared_ptr tempDirectory_; + std::shared_ptr allocator_; std::shared_ptr cache_; std::vector filenames_; std::unique_ptr executor_; @@ -588,7 +593,7 @@ TEST_F(AsyncDataCacheTest, outOfCapacity) { pins.pop_front(); } memory::Allocation allocation; - ASSERT_FALSE(cache_->allocateNonContiguous(kSizeInPages, allocation)); + ASSERT_FALSE(allocator_->allocateNonContiguous(kSizeInPages, allocation)); // One 4 page entry below the max size of 4K 4 page entries in 16MB of // capacity. ASSERT_EQ(16384, cache_->incrementCachedPages(0)); @@ -597,14 +602,14 @@ TEST_F(AsyncDataCacheTest, outOfCapacity) { // We allocate the full capacity and expect the cache entries to go. for (;;) { - if (!cache_->allocateNonContiguous(kSizeInPages, allocation)) { + if (!allocator_->allocateNonContiguous(kSizeInPages, allocation)) { break; } allocations.push_back(std::move(allocation)); } EXPECT_EQ(0, cache_->incrementCachedPages(0)); EXPECT_EQ(0, cache_->incrementPrefetchPages(0)); - EXPECT_EQ(16384, cache_->numAllocated()); + EXPECT_EQ(16384, allocator_->numAllocated()); clearAllocations(allocations); } diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 33ed1cf7f7c..c4d7c93df2b 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -45,6 +45,9 @@ class SsdFileTest : public testing::Test { if (ssdFile_) { ssdFile_->deleteFile(); } + if (cache_) { + cache_->prepareShutdown(); + } } void initializeCache( @@ -53,8 +56,7 @@ class SsdFileTest : public testing::Test { bool setNoCowFlag = false) { // tmpfs does not support O_DIRECT, so turn this off for testing. FLAGS_ssd_odirect = false; - cache_ = std::make_shared( - MemoryAllocator::createDefaultInstance(), maxBytes); + cache_ = AsyncDataCache::create(MemoryAllocator::getInstance()); fileName_ = StringIdLease(fileIds(), "fileInStorage"); diff --git a/velox/common/memory/MallocAllocator.cpp b/velox/common/memory/MallocAllocator.cpp index df63562916c..73c35c18a3f 100644 --- a/velox/common/memory/MallocAllocator.cpp +++ b/velox/common/memory/MallocAllocator.cpp @@ -23,7 +23,7 @@ namespace facebook::velox::memory { MallocAllocator::MallocAllocator(size_t capacity) : kind_(MemoryAllocator::Kind::kMalloc), capacity_(capacity) {} -bool MallocAllocator::allocateNonContiguous( +bool MallocAllocator::allocateNonContiguousWithoutRetry( MachinePageCount numPages, Allocation& out, ReservationCallback reservationCB, @@ -109,6 +109,20 @@ bool MallocAllocator::allocateNonContiguous( return true; } +bool MallocAllocator::allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB, + MachinePageCount maxPages) { + bool result; + stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { + result = allocateContiguousImpl( + numPages, collateral, allocation, reservationCB, maxPages); + }); + return result; +} + bool MallocAllocator::allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, @@ -216,6 +230,11 @@ int64_t MallocAllocator::freeNonContiguous(Allocation& allocation) { return freedBytes; } +void MallocAllocator::freeContiguous(ContiguousAllocation& allocation) { + stats_.recordFree( + allocation.size(), [&]() { freeContiguousImpl(allocation); }); +} + void MallocAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { if (allocation.empty()) { return; @@ -233,7 +252,7 @@ void MallocAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { allocation.clear(); } -bool MallocAllocator::growContiguous( +bool MallocAllocator::growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB) { @@ -259,7 +278,9 @@ bool MallocAllocator::growContiguous( return true; } -void* MallocAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { +void* MallocAllocator::allocateBytesWithoutRetry( + uint64_t bytes, + uint16_t alignment) { if (!incrementUsage(bytes)) { return nullptr; } @@ -279,7 +300,7 @@ void* MallocAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { return result; } -void* MallocAllocator::allocateZeroFilled(uint64_t bytes) { +void* MallocAllocator::allocateZeroFilledWithoutRetry(uint64_t bytes) { if (!incrementUsage(bytes)) { return nullptr; } diff --git a/velox/common/memory/MallocAllocator.h b/velox/common/memory/MallocAllocator.h index 24503c4570a..debc07cdc96 100644 --- a/velox/common/memory/MallocAllocator.h +++ b/velox/common/memory/MallocAllocator.h @@ -37,6 +37,17 @@ class MallocAllocator : public MemoryAllocator { } } + void registerCache(const std::shared_ptr& cache) override { + VELOX_CHECK_NULL(cache_); + VELOX_CHECK_NOT_NULL(cache); + VELOX_CHECK(cache->allocator() == this); + cache_ = cache; + } + + Cache* cache() const override { + return cache_.get(); + } + Kind kind() const override { return kind_; } @@ -45,42 +56,15 @@ class MallocAllocator : public MemoryAllocator { return capacity_; } - bool allocateNonContiguous( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) override; + void freeContiguous(ContiguousAllocation& allocation) override; int64_t freeNonContiguous(Allocation& allocation) override; - bool allocateContiguous( - MachinePageCount numPages, - Allocation* collateral, - ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, - MachinePageCount maxPages = 0) override { - bool result; - stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { - result = allocateContiguousImpl( - numPages, collateral, allocation, reservationCB, maxPages); - }); - return result; - } - - void freeContiguous(ContiguousAllocation& allocation) override { - stats_.recordFree( - allocation.size(), [&]() { freeContiguousImpl(allocation); }); - } - - bool growContiguous( + bool growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB = nullptr) override; - void* allocateBytes(uint64_t bytes, uint16_t alignment) override; - - void* allocateZeroFilled(uint64_t bytes) override; - void freeBytes(void* p, uint64_t bytes) noexcept override; size_t totalUsedBytes() const override { @@ -95,15 +79,24 @@ class MallocAllocator : public MemoryAllocator { return numMapped_; } - Stats stats() const override { - return stats_; - } - bool checkConsistency() const override; std::string toString() const override; private: + bool allocateNonContiguousWithoutRetry( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB = nullptr, + MachinePageCount minSizeClass = 0) override; + + bool allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* FOLLY_NULLABLE collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr, + MachinePageCount maxPages = 0) override; + bool allocateContiguousImpl( MachinePageCount numPages, Allocation* FOLLY_NULLABLE collateral, @@ -113,6 +106,10 @@ class MallocAllocator : public MemoryAllocator { void freeContiguousImpl(ContiguousAllocation& allocation); + void* allocateBytesWithoutRetry(uint64_t bytes, uint16_t alignment) override; + + void* allocateZeroFilledWithoutRetry(uint64_t bytes) override; + /// Increment current usage and check current allocator consistency to make /// sure current usage does not go above 'capacity_'. If it goes above /// 'capacity_', the increment will not be applied. Returns true if within @@ -161,6 +158,6 @@ class MallocAllocator : public MemoryAllocator { /// Tracks malloc'd pointers to detect bad frees. std::unordered_set mallocs_; - Stats stats_; + std::shared_ptr cache_; }; } // namespace facebook::velox::memory diff --git a/velox/common/memory/MemoryAllocator.cpp b/velox/common/memory/MemoryAllocator.cpp index ccdbb4c448d..a4a684e3384 100644 --- a/velox/common/memory/MemoryAllocator.cpp +++ b/velox/common/memory/MemoryAllocator.cpp @@ -160,7 +160,74 @@ MachinePageCount MemoryAllocator::roundUpToSizeClassSize( return *std::lower_bound(sizes.begin(), sizes.end(), pages); } +bool MemoryAllocator::allocateNonContiguous( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB, + MachinePageCount minSizeClass) { + if (cache() == nullptr) { + return allocateNonContiguousWithoutRetry( + numPages, out, reservationCB, minSizeClass); + } + return cache()->makeSpace(numPages, [&]() { + return allocateNonContiguousWithoutRetry( + numPages, out, reservationCB, minSizeClass); + }); +} + +bool MemoryAllocator::allocateContiguous( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB, + MachinePageCount maxPages) { + if (cache() == nullptr) { + return allocateContiguousWithoutRetry( + numPages, collateral, allocation, reservationCB, maxPages); + } + return cache()->makeSpace(numPages, [&]() { + return allocateContiguousWithoutRetry( + numPages, collateral, allocation, reservationCB, maxPages); + }); +} + +bool MemoryAllocator::growContiguous( + MachinePageCount increment, + ContiguousAllocation& allocation, + ReservationCallback reservationCB) { + if (cache() == nullptr) { + return growContiguousWithoutRetry(increment, allocation, reservationCB); + } + return cache()->makeSpace(increment, [&]() { + return growContiguousWithoutRetry(increment, allocation, reservationCB); + }); +} + +void* MemoryAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { + if (cache() == nullptr) { + return allocateBytesWithoutRetry(bytes, alignment); + } + void* result = nullptr; + cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { + result = allocateBytesWithoutRetry(bytes, alignment); + return result != nullptr; + }); + return result; +} + void* MemoryAllocator::allocateZeroFilled(uint64_t bytes) { + if (cache() == nullptr) { + return allocateZeroFilledWithoutRetry(bytes); + } + void* result = nullptr; + cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { + result = allocateZeroFilledWithoutRetry(bytes); + return result != nullptr; + }); + return result; +} + +void* MemoryAllocator::allocateZeroFilledWithoutRetry(uint64_t bytes) { void* result = allocateBytes(bytes); if (result != nullptr) { ::memset(result, 0, bytes); diff --git a/velox/common/memory/MemoryAllocator.h b/velox/common/memory/MemoryAllocator.h index 6cd0c76d6a6..cae01562025 100644 --- a/velox/common/memory/MemoryAllocator.h +++ b/velox/common/memory/MemoryAllocator.h @@ -139,6 +139,24 @@ struct Stats { int64_t numAdvise{0}; }; +class MemoryAllocator; + +/// A general cache interface using 'MemroyAllocator' to allocate memory, that +/// is also able to free up memory upon request by shrinking itself. +class Cache { + public: + virtual ~Cache() = default; + /// This method should be implemented so that it tries to accommodate the + /// passed in 'allocate' by freeing up space from 'this' if needed. 'numPages' + /// is the number of pages 'allocate' tries to allocate.It should return true + /// if 'allocate' succeeds, and false otherwise. + virtual bool makeSpace( + memory::MachinePageCount numPages, + std::function allocate) = 0; + + virtual MemoryAllocator* allocator() const = 0; +}; + /// This class provides interface for the actual memory allocations from memory /// pool. It allocates runs of machine pages from predefined size classes, and /// supports both contiguous and non-contiguous memory allocations. An @@ -193,6 +211,11 @@ class MemoryAllocator : public std::enable_shared_from_this { /// the kind of the delegated memory allocator underneath. virtual Kind kind() const = 0; + /// Registers a 'Cache' that is used for freeing up space when this allocator + /// is under memory pressure. The allocator of registered 'Cache' needs to be + /// the same as 'this'. + virtual void registerCache(const std::shared_ptr& cache) = 0; + using ReservationCallback = std::function; /// Returns the capacity of the allocator in bytes. @@ -210,18 +233,20 @@ class MemoryAllocator : public std::enable_shared_from_this { /// 'reservationCB' before the actual memory allocation so it needs to release /// the reservation if the actual allocation fails halfway. The function /// returns true if the allocation succeeded. If it returns false, 'out' - /// references no memory and any partially allocated memory is freed. + /// references no memory and any partially allocated memory is freed. The + /// function might retry allocation failure by making space from 'cache()' if + /// registered. But sufficient space is not guaranteed. /// /// NOTE: /// - 'out' is guaranteed to be freed if it's not empty. /// - Allocation is not guaranteed even if collateral 'out' is larger than /// 'numPages', because this method is not atomic. /// - Throws if allocation exceeds capacity. - virtual bool allocateNonContiguous( + bool allocateNonContiguous( MachinePageCount numPages, Allocation& out, ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) = 0; + MachinePageCount minSizeClass = 0); /// Frees non-contiguous 'allocation'. 'allocation' is empty on return. The /// function returns the actual freed bytes. @@ -241,19 +266,18 @@ class MemoryAllocator : public std::enable_shared_from_this { /// cleared. /// /// NOTE: - 'collateral' and passed in 'allocation' are guaranteed - /// to be freed. If 'maxPages' is non-0, 'maxPages' worth of - /// address space is mapped but the utilization in the allocator and - /// pool is incremented by 'numPages'. This allows reserving - /// a large range of addresses for use with huge pages without - /// declaring the whole range as held by the query. The reservation - /// will be increased as and if addresses in the range are used. See - /// growContiguous(). - virtual bool allocateContiguous( + /// to be freed. If 'maxPages' is non-0, 'maxPages' worth of address space is + /// mapped but the utilization in the allocator and pool is incremented by + /// 'numPages'. This allows reserving a large range of addresses for use with + /// huge pages without declaring the whole range as held by the query. The + /// reservation will be increased as and if addresses in the range are used. + /// See growContiguous(). + bool allocateContiguous( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, ReservationCallback reservationCB = nullptr, - MachinePageCount maxPages = 0) = 0; + MachinePageCount maxPages = 0); /// Frees contiguous 'allocation'. 'allocation' is empty on return. virtual void freeContiguous(ContiguousAllocation& allocation) = 0; @@ -262,24 +286,26 @@ class MemoryAllocator : public std::enable_shared_from_this { /// 'increment'. false if would exceed capacity, Throws if size /// would exceed maxSize given in allocateContiguous(). Calls reservationCB /// before increasing the utilization and returns false with no effect if this - /// fails. - virtual bool growContiguous( + /// fails. The function might retry allocation failure by making + /// space from 'cache()' if registered. But sufficient space is not guaranteed + bool growContiguous( MachinePageCount increment, ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr) = 0; + ReservationCallback reservationCB = nullptr); /// Allocates contiguous 'bytes' and return the first byte. Returns nullptr if - /// there is no space. + /// there is no space. The function might retry allocation failure by making + /// space from 'cache()' if registered. But sufficient space is not + /// guaranteed. /// /// NOTE: 'alignment' must be power of two and in range of /// [kMinAlignment, kMaxAlignment]. - virtual void* allocateBytes( - uint64_t bytes, - uint16_t alignment = kMinAlignment) = 0; + void* allocateBytes(uint64_t bytes, uint16_t alignment = kMinAlignment); /// Allocates a zero-filled contiguous bytes. Returns nullptr if there is no - /// space - virtual void* allocateZeroFilled(uint64_t bytes); + /// space. The function might retry allocation failure by making space from + /// 'cache()' if registered. But sufficient space is not guaranteed. + void* allocateZeroFilled(uint64_t bytes); /// Frees contiguous memory allocated by allocateBytes, allocateZeroFilled, /// reallocateBytes. @@ -306,7 +332,7 @@ class MemoryAllocator : public std::enable_shared_from_this { virtual MachinePageCount numMapped() const = 0; virtual Stats stats() const { - return Stats(); + return stats_; } virtual std::string toString() const = 0; @@ -356,7 +382,38 @@ class MemoryAllocator : public std::enable_shared_from_this { } protected: - MemoryAllocator() = default; + explicit MemoryAllocator() = default; + + /// The actual memory allocation function implementation without retry + /// attempts by making space from cache. + virtual bool allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr, + MachinePageCount maxPages = 0) = 0; + + virtual bool allocateNonContiguousWithoutRetry( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB, + MachinePageCount minSizeClass) = 0; + + virtual void* allocateBytesWithoutRetry( + uint64_t bytes, + uint16_t alignment) = 0; + + virtual void* allocateZeroFilledWithoutRetry(uint64_t bytes); + + virtual bool growContiguousWithoutRetry( + MachinePageCount increment, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr) = 0; + + // 'Cache' getter. The cache is only responsible for freeing up memory space + // by shrinking itself when there is not enough space upon allocating. The + // free of space is not guaranteed. + virtual Cache* cache() const = 0; // Returns the size class size that corresponds to 'bytes'. static MachinePageCount roundUpToSizeClassSize( @@ -422,6 +479,8 @@ class MemoryAllocator : public std::enable_shared_from_this { InjectedFailure injectedFailure_{InjectedFailure::kNone}; bool isPersistentFailureInjection_{false}; + Stats stats_; + private: static std::mutex initMutex_; // Singleton instance. diff --git a/velox/common/memory/MmapAllocator.cpp b/velox/common/memory/MmapAllocator.cpp index 8da35bc5bd5..3964b2268cb 100644 --- a/velox/common/memory/MmapAllocator.cpp +++ b/velox/common/memory/MmapAllocator.cpp @@ -51,7 +51,7 @@ MmapAllocator::~MmapAllocator() { (numAllocated_ == 0) && (numExternalMapped_ == 0), "{}", toString()); } -bool MmapAllocator::allocateNonContiguous( +bool MmapAllocator::allocateNonContiguousWithoutRetry( MachinePageCount numPages, Allocation& out, ReservationCallback reservationCB, @@ -211,6 +211,20 @@ MachinePageCount MmapAllocator::freeInternal(Allocation& allocation) { return numFreed; } +bool MmapAllocator::allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB, + MachinePageCount maxPages) { + bool result; + stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { + result = allocateContiguousImpl( + numPages, collateral, allocation, reservationCB, maxPages); + }); + return result; +} + bool MmapAllocator::allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, @@ -364,6 +378,11 @@ bool MmapAllocator::allocateContiguousImpl( return true; } +void MmapAllocator::freeContiguous(ContiguousAllocation& allocation) { + stats_.recordFree( + allocation.size(), [&]() { freeContiguousImpl(allocation); }); +} + void MmapAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { if (allocation.empty()) { return; @@ -384,7 +403,7 @@ void MmapAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { allocation.clear(); } -bool MmapAllocator::growContiguous( +bool MmapAllocator::growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB) { @@ -430,7 +449,9 @@ bool MmapAllocator::growContiguous( return true; } -void* MmapAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { +void* MmapAllocator::allocateBytesWithoutRetry( + uint64_t bytes, + uint16_t alignment) { alignmentCheck(bytes, alignment); if (useMalloc(bytes)) { @@ -448,7 +469,8 @@ void* MmapAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { if (bytes <= AllocationTraits::pageBytes(sizeClassSizes_.back())) { Allocation allocation; const auto numPages = roundUpToSizeClassSize(bytes, sizeClassSizes_); - if (!allocateNonContiguous(numPages, allocation, nullptr, numPages)) { + if (!allocateNonContiguousWithoutRetry( + numPages, allocation, nullptr, numPages)) { return nullptr; } auto run = allocation.runAt(0); @@ -463,7 +485,7 @@ void* MmapAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { ContiguousAllocation allocation; auto numPages = bits::roundUp(bytes, AllocationTraits::kPageSize) / AllocationTraits::kPageSize; - if (!allocateContiguous(numPages, nullptr, allocation)) { + if (!allocateContiguousWithoutRetry(numPages, nullptr, allocation)) { return nullptr; } diff --git a/velox/common/memory/MmapAllocator.h b/velox/common/memory/MmapAllocator.h index 0ecc872e17c..43863af9f53 100644 --- a/velox/common/memory/MmapAllocator.h +++ b/velox/common/memory/MmapAllocator.h @@ -85,56 +85,29 @@ class MmapAllocator : public MemoryAllocator { return kind_; } - size_t capacity() const override { - return AllocationTraits::pageBytes(capacity_); + void registerCache(const std::shared_ptr& cache) override { + VELOX_CHECK_NULL(cache_); + VELOX_CHECK_NOT_NULL(cache); + VELOX_CHECK(cache->allocator() == this); + cache_ = cache; } - bool allocateNonContiguous( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) override; - - int64_t freeNonContiguous(Allocation& allocation) override; - - bool allocateContiguous( - MachinePageCount numPages, - Allocation* collateral, - ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, - MachinePageCount maxPages = 0) override { - bool result; - stats_.recordAllocate(numPages * AllocationTraits::kPageSize, 1, [&]() { - result = allocateContiguousImpl( - numPages, collateral, allocation, reservationCB, maxPages); - }); - return result; + Cache* cache() const override { + return cache_.get(); } - void freeContiguous(ContiguousAllocation& allocation) override { - stats_.recordFree( - allocation.size(), [&]() { freeContiguousImpl(allocation); }); + size_t capacity() const override { + return AllocationTraits::pageBytes(capacity_); } - bool growContiguous( + bool growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB = nullptr) override; - /// Allocates 'bytes' contiguous bytes and returns the pointer to the first - /// byte. If 'bytes' is less than 'maxMallocBytes_', delegates the allocation - /// to malloc. If the size is above that and below the largest size classes' - /// size, allocates one element of the next size classes' size. If 'size' is - /// greater than the largest size classes' size, calls allocateContiguous(). - /// Returns nullptr if there is no space. The amount to allocate is subject to - /// the size limit of 'this'. This function is not virtual but calls the - /// virtual functions allocateNonContiguous and allocateContiguous, which can - /// track sizes and enforce caps etc. If 'alignment' is not kMinAlignment, - /// then 'bytes' must be a multiple of 'alignment'. - /// - /// NOTE: 'alignment' must be power of two and in range of [kMinAlignment, - /// kMaxAlignment]. - void* allocateBytes(uint64_t bytes, uint16_t alignment) override; + void freeContiguous(ContiguousAllocation& allocation) override; + + int64_t freeNonContiguous(Allocation& allocation) override; void freeBytes(void* p, uint64_t bytes) noexcept override; @@ -339,6 +312,19 @@ class MmapAllocator : public MemoryAllocator { uint64_t numAdvisedAway_ = 0; }; + bool allocateNonContiguousWithoutRetry( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB = nullptr, + MachinePageCount minSizeClass = 0) override; + + bool allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr, + MachinePageCount maxPages = 0) override; + bool allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, @@ -348,6 +334,21 @@ class MmapAllocator : public MemoryAllocator { void freeContiguousImpl(ContiguousAllocation& allocation); + // Allocates 'bytes' contiguous bytes and returns the pointer to the first + // byte. If 'bytes' is less than 'maxMallocBytes_', delegates the allocation + // to malloc. If the size is above that and below the largest size classes' + // size, allocates one element of the next size classes' size. If 'size' is + // greater than the largest size classes' size, calls allocateContiguous(). + // Returns nullptr if there is no space. The amount to allocate is subject to + // the size limit of 'this'. This function is not virtual but calls the + // virtual functions allocateNonContiguous and allocateContiguous, which can + // track sizes and enforce caps etc. If 'alignment' is not kMinAlignment, + // then 'bytes' must be a multiple of 'alignment'. + // + // NOTE: 'alignment' must be power of two and in range of [kMinAlignment, + // kMaxAlignment]. + void* allocateBytesWithoutRetry(uint64_t bytes, uint16_t alignment) override; + // Ensures that there are at least 'newMappedNeeded' pages that are // not backing any existing allocation. If capacity_ - numMapped_ < // newMappedNeeded, advises away enough pages backing freed slots in @@ -418,7 +419,7 @@ class MmapAllocator : public MemoryAllocator { std::mutex arenaMutex_; std::unique_ptr managedArenas_; - Stats stats_; + std::shared_ptr cache_; }; } // namespace facebook::velox::memory diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index e2e71c3496c..eabc39a3a95 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -88,18 +88,16 @@ class MemoryPoolTest : public testing::TestWithParam { MmapAllocator::Options opts{8UL << 30}; allocator_ = std::make_shared(opts); if (useCache_) { - cache_ = - std::make_shared(allocator_, kCapacity, nullptr); - MemoryAllocator::setDefaultInstance(cache_.get()); + cache_ = AsyncDataCache::create(allocator_.get()); + MemoryAllocator::setDefaultInstance(allocator_.get()); } else { MemoryAllocator::setDefaultInstance(allocator_.get()); } } else { allocator_ = MemoryAllocator::createDefaultInstance(); if (useCache_) { - cache_ = - std::make_shared(allocator_, kCapacity, nullptr); - MemoryAllocator::setDefaultInstance(cache_.get()); + cache_ = AsyncDataCache::create(allocator_.get()); + MemoryAllocator::setDefaultInstance(allocator_.get()); } else { MemoryAllocator::setDefaultInstance(allocator_.get()); } @@ -111,6 +109,9 @@ class MemoryPoolTest : public testing::TestWithParam { } void TearDown() override { + if (useCache_) { + cache_->prepareShutdown(); + } allocator_->testingClearFailureInjection(); MmapAllocator::setDefaultInstance(nullptr); } diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 63638ddf074..660974cd5d9 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -312,7 +312,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { executor_.get(), std::unordered_map{}, configs, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), std::move(pool)); return queryCtx; } diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index e11d5f60089..b8fa93f191d 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -17,6 +17,7 @@ #include "velox/common/base/AsyncSource.h" #include "velox/common/base/RuntimeMetrics.h" +#include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/ScanTracker.h" #include "velox/common/future/VeloxPromise.h" #include "velox/core/ExpressionEvaluator.h" @@ -223,7 +224,7 @@ class ConnectorQueryCtx { memory::MemoryPool* connectorPool, const Config* connectorConfig, std::unique_ptr expressionEvaluator, - memory::MemoryAllocator* FOLLY_NONNULL allocator, + cache::AsyncDataCache* cache, const std::string& queryId, const std::string& taskId, const std::string& planNodeId, @@ -232,7 +233,7 @@ class ConnectorQueryCtx { connectorPool_(connectorPool), config_(connectorConfig), expressionEvaluator_(std::move(expressionEvaluator)), - allocator_(allocator), + cache_(cache), scanId_(fmt::format("{}.{}", taskId, planNodeId)), queryId_(queryId), taskId_(taskId), @@ -260,10 +261,8 @@ class ConnectorQueryCtx { return expressionEvaluator_.get(); } - // MemoryAllocator for large allocations. Used for caching with - // CachedBufferedImput if this implements cache::AsyncDataCache. - memory::MemoryAllocator* FOLLY_NONNULL allocator() const { - return allocator_; + cache::AsyncDataCache* cache() const { + return cache_; } // This is a combination of task id and the scan's PlanNodeId. This is an id @@ -295,7 +294,7 @@ class ConnectorQueryCtx { memory::MemoryPool* connectorPool_; const Config* FOLLY_NONNULL config_; std::unique_ptr expressionEvaluator_; - memory::MemoryAllocator* FOLLY_NONNULL allocator_; + cache::AsyncDataCache* cache_; const std::string scanId_; const std::string queryId_; const std::string taskId_; diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index 5970b9df80c..f1756255684 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -52,7 +52,7 @@ class HiveConnector : public Connector { columnHandles, &fileHandleFactory_, connectorQueryCtx->expressionEvaluator(), - connectorQueryCtx->allocator(), + connectorQueryCtx->cache(), connectorQueryCtx->scanId(), HiveConfig::isFileColumnNamesReadAsLowerCase( connectorQueryCtx->config()), diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 74b9f56f42d..7f58d125af0 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -358,7 +358,7 @@ HiveDataSource::HiveDataSource( std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, core::ExpressionEvaluator* expressionEvaluator, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, const std::string& scanId, bool fileColumnNamesReadAsLowerCase, folly::Executor* executor, @@ -368,7 +368,7 @@ HiveDataSource::HiveDataSource( pool_(&options.getMemoryPool()), outputType_(outputType), expressionEvaluator_(expressionEvaluator), - allocator_(allocator), + cache_(cache), scanId_(scanId), executor_(executor) { // Column handled keyed on the column alias, the name used in the query. @@ -835,12 +835,12 @@ std::unique_ptr HiveDataSource::createBufferedInput( const FileHandle& fileHandle, const dwio::common::ReaderOptions& readerOpts) { - if (auto* asyncCache = dynamic_cast(allocator_)) { + if (cache_) { return std::make_unique( fileHandle.file, dwio::common::MetricsLog::voidLog(), fileHandle.uuid.id(), - asyncCache, + cache_, Connector::getTracker(scanId_, readerOpts.loadQuantum()), fileHandle.groupId.id(), ioStats_, diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index c5db04e897c..1b822eb9746 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -37,7 +37,7 @@ class HiveDataSource : public DataSource { std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, core::ExpressionEvaluator* expressionEvaluator, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, const std::string& scanId, bool fileColumnNamesReadAsLowerCase, folly::Executor* executor, @@ -162,7 +162,7 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - memory::MemoryAllocator* const allocator_; + cache::AsyncDataCache* const cache_{nullptr}; const std::string& scanId_; folly::Executor* executor_; }; diff --git a/velox/core/CMakeLists.txt b/velox/core/CMakeLists.txt index 86d171485ed..f4db34c8f38 100644 --- a/velox/core/CMakeLists.txt +++ b/velox/core/CMakeLists.txt @@ -23,6 +23,7 @@ add_library(velox_core Expressions.cpp PlanFragment.cpp PlanNode.cpp target_link_libraries( velox_core + velox_caching velox_config velox_expression_functions velox_type diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index cb06600b2fb..2bfa52f6d9f 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -21,13 +21,13 @@ QueryCtx::QueryCtx( folly::Executor* executor, std::unordered_map queryConfigValues, std::unordered_map> connectorConfigs, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, std::shared_ptr pool, std::shared_ptr spillExecutor, const std::string& queryId) : queryId_(queryId), connectorConfigs_(connectorConfigs), - allocator_(allocator), + cache_(cache), pool_(std::move(pool)), executor_(executor), queryConfig_{std::move(queryConfigValues)}, @@ -39,12 +39,12 @@ QueryCtx::QueryCtx( folly::Executor::KeepAlive<> executorKeepalive, std::unordered_map queryConfigValues, std::unordered_map> connectorConfigs, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, std::shared_ptr pool, const std::string& queryId) : queryId_(queryId), connectorConfigs_(connectorConfigs), - allocator_(allocator), + cache_(cache), pool_(std::move(pool)), executorKeepalive_(std::move(executorKeepalive)), queryConfig_{std::move(queryConfigValues)} { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index d4f50874a99..7b0f178b5c7 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -17,8 +17,8 @@ #include #include +#include "velox/common/caching/AsyncDataCache.h" #include "velox/common/memory/Memory.h" -#include "velox/common/memory/MemoryAllocator.h" #include "velox/core/QueryConfig.h" #include "velox/vector/DecodedVector.h" #include "velox/vector/VectorPool.h" @@ -39,8 +39,7 @@ class QueryCtx { std::unordered_map queryConfigValues = {}, std::unordered_map> connectorConfigs = {}, - memory::MemoryAllocator* allocator = - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(), std::shared_ptr pool = nullptr, std::shared_ptr spillExecutor = nullptr, const std::string& queryId = ""); @@ -54,8 +53,7 @@ class QueryCtx { std::unordered_map queryConfigValues = {}, std::unordered_map> connectorConfigs = {}, - memory::MemoryAllocator* allocator = - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(), std::shared_ptr pool = nullptr, const std::string& queryId = ""); @@ -65,8 +63,8 @@ class QueryCtx { return pool_.get(); } - memory::MemoryAllocator* allocator() const { - return allocator_; + cache::AsyncDataCache* cache() const { + return cache_; } folly::Executor* executor() const { @@ -135,7 +133,7 @@ class QueryCtx { const std::string queryId_; std::unordered_map> connectorConfigs_; - memory::MemoryAllocator* allocator_; + cache::AsyncDataCache* cache_; std::shared_ptr pool_; folly::Executor* executor_; folly::Executor::KeepAlive<> executorKeepalive_; diff --git a/velox/dwio/common/CachedBufferedInput.cpp b/velox/dwio/common/CachedBufferedInput.cpp index 9c20aa58049..f8fbf33f68e 100644 --- a/velox/dwio/common/CachedBufferedInput.cpp +++ b/velox/dwio/common/CachedBufferedInput.cpp @@ -87,8 +87,9 @@ bool CachedBufferedInput::shouldPreload(int32_t numPages) { memory::AllocationTraits::kPageSize; } auto cachePages = cache_->incrementCachedPages(0); - auto maxPages = memory::AllocationTraits::numPages(cache_->capacity()); - auto allocatedPages = cache_->numAllocated(); + auto allocator = cache_->allocator(); + auto maxPages = memory::AllocationTraits::numPages(allocator->capacity()); + auto allocatedPages = allocator->numAllocated(); if (numPages < maxPages - allocatedPages) { // There is free space for the read-ahead. return true; diff --git a/velox/dwio/dwrf/test/CacheInputTest.cpp b/velox/dwio/dwrf/test/CacheInputTest.cpp index a31d8214659..3bb9c588d2c 100644 --- a/velox/dwio/dwrf/test/CacheInputTest.cpp +++ b/velox/dwio/dwrf/test/CacheInputTest.cpp @@ -114,6 +114,9 @@ class CacheTest : public testing::Test { if (ssdCache) { ssdCache->deleteFiles(); } + if (cache_) { + cache_->prepareShutdown(); + } } void initializeCache(uint64_t maxBytes, uint64_t ssdBytes = 0) { @@ -130,10 +133,8 @@ class CacheTest : public testing::Test { } memory::MmapAllocator::Options options; options.capacity = maxBytes; - cache_ = std::make_shared( - std::make_shared(options), - maxBytes, - std::move(ssd)); + allocator_ = std::make_shared(options); + cache_ = AsyncDataCache::create(allocator_.get(), std::move(ssd)); cache_->setVerifyHook(checkEntry); for (auto i = 0; i < kMaxStreams; ++i) { streamIds_.push_back(std::make_unique( @@ -424,6 +425,7 @@ class CacheTest : public testing::Test { folly::F14FastMap> pathToInput_; std::shared_ptr tempDirectory_; cache::FileGroupStats* FOLLY_NULLABLE groupStats_ = nullptr; + std::shared_ptr allocator_; std::shared_ptr cache_; std::shared_ptr ioStats_; std::unique_ptr executor_; @@ -471,7 +473,7 @@ TEST_F(CacheTest, window) { auto cacheInput = dynamic_cast(stream.get()); EXPECT_TRUE(cacheInput != nullptr); auto maxSize = - cache_->sizeClasses().back() * memory::AllocationTraits::kPageSize; + allocator_->sizeClasses().back() * memory::AllocationTraits::kPageSize; const void* buffer; int32_t size; int32_t numRead = 0; diff --git a/velox/examples/CMakeLists.txt b/velox/examples/CMakeLists.txt index 013fa6b5db5..8625d34ce82 100644 --- a/velox/examples/CMakeLists.txt +++ b/velox/examples/CMakeLists.txt @@ -20,12 +20,12 @@ target_link_libraries(velox_example_simple_functions velox_functions_lib add_executable(velox_example_expression_eval ExpressionEval.cpp) target_link_libraries(velox_example_expression_eval velox_type velox_vector - velox_memory velox_expression) + velox_caching velox_memory velox_expression) add_executable(velox_example_opaque_type OpaqueType.cpp) target_link_libraries(velox_example_opaque_type velox_type velox_vector - velox_expression velox_memory) + velox_caching velox_expression velox_memory) # This is disabled temporarily until we figure out why g++ is crashing linking # it on linux builds. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index d0f44c53669..0f3114fbe91 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -54,7 +54,7 @@ OperatorCtx::createConnectorQueryCtx( driverCtx_->task->queryCtx()->getConnectorConfig(connectorId), std::make_unique( execCtx()->queryCtx(), execCtx()->pool()), - driverCtx_->task->queryCtx()->allocator(), + driverCtx_->task->queryCtx()->cache(), driverCtx_->task->queryCtx()->queryId(), taskId(), planNodeId, diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index e6df3e94b56..78164dcdc3a 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2492,8 +2492,7 @@ TEST_F(TableScanTest, addSplitsToFailedTask) { } TEST_F(TableScanTest, errorInLoadLazy) { - auto cache = dynamic_cast( - memory::MemoryAllocator::getInstance()); + auto cache = cache::AsyncDataCache::getInstance(); VELOX_CHECK_NOT_NULL(cache); auto vectors = makeVectors(10, 1'000); auto filePath = TempFilePath::create(); diff --git a/velox/exec/tests/ThreadDebugInfoTest.cpp b/velox/exec/tests/ThreadDebugInfoTest.cpp index eea12376af1..8721f5ee61a 100644 --- a/velox/exec/tests/ThreadDebugInfoTest.cpp +++ b/velox/exec/tests/ThreadDebugInfoTest.cpp @@ -91,7 +91,7 @@ TEST_F(ThreadDebugInfoDeathTest, withinTheCallingThread) { executor_.get(), std::unordered_map{}, std::unordered_map>{}, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), nullptr, nullptr, "TaskCursorQuery_0"); diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index 74c52d72696..0e3146eed0f 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -216,7 +216,7 @@ AssertQueryBuilder::readCursor() { executor_.get(), std::unordered_map{}, std::unordered_map>{}, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), nullptr, nullptr, fmt::format("TaskCursorQuery_{}", cursorQueryId++)); diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 59e0dad5f94..0495a15a367 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -123,7 +123,7 @@ TaskCursor::TaskCursor(const CursorParameters& params) executor_.get(), std::unordered_map{}, std::unordered_map>{}, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), nullptr, nullptr, fmt::format("TaskCursorQuery_{}", cursorQueryId++)); diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 8f378b70c36..a3f5e0ff4d0 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -33,9 +33,6 @@ using namespace facebook::velox::common::testutil; namespace facebook::velox::exec::test { -// static -std::shared_ptr OperatorTestBase::asyncDataCache_; - OperatorTestBase::OperatorTestBase() { using memory::MemoryAllocator; facebook::velox::exec::ExchangeSource::registerFactory(); @@ -51,29 +48,34 @@ OperatorTestBase::~OperatorTestBase() { memory::MemoryAllocator::setDefaultInstance(nullptr); } +void OperatorTestBase::SetUpTestCase() { + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + TestValue::enable(); +} + void OperatorTestBase::TearDownTestCase() { Task::testingWaitForAllTasksToBeDeleted(); } void OperatorTestBase::SetUp() { - // Sets the process default MemoryAllocator to an async cache of up - // to 4GB backed by a default MemoryAllocator - if (!asyncDataCache_) { - asyncDataCache_ = std::make_shared( - memory::MemoryAllocator::createDefaultInstance(), 4UL << 30); - } - memory::MemoryAllocator::setDefaultInstance(asyncDataCache_.get()); if (!isRegisteredVectorSerde()) { this->registerVectorSerde(); } driverExecutor_ = std::make_unique(3); ioExecutor_ = std::make_unique(3); + allocator_ = memory::MemoryAllocator::createDefaultInstance(); + if (!asyncDataCache_) { + asyncDataCache_ = cache::AsyncDataCache::create(allocator_.get()); + cache::AsyncDataCache::setInstance(asyncDataCache_.get()); + } + memory::MemoryAllocator::setDefaultInstance(allocator_.get()); } -void OperatorTestBase::SetUpTestCase() { - functions::prestosql::registerAllScalarFunctions(); - aggregate::prestosql::registerAllAggregateFunctions(); - TestValue::enable(); +void OperatorTestBase::TearDown() { + if (asyncDataCache_ != nullptr) { + asyncDataCache_->prepareShutdown(); + } } std::shared_ptr OperatorTestBase::assertQuery( diff --git a/velox/exec/tests/utils/OperatorTestBase.h b/velox/exec/tests/utils/OperatorTestBase.h index e5a40d3f8f4..7900b94f95a 100644 --- a/velox/exec/tests/utils/OperatorTestBase.h +++ b/velox/exec/tests/utils/OperatorTestBase.h @@ -37,6 +37,8 @@ class OperatorTestBase : public testing::Test, void SetUp() override; + void TearDown() override; + /// Allow base classes to register custom vector serde. /// By default, registers Presto-compatible serde. virtual void registerVectorSerde(); @@ -139,8 +141,11 @@ class OperatorTestBase : public testing::Test, protected: DuckDbQueryRunner duckDbQueryRunner_; - // Used as default MappedMemory. Created on first use. - static std::shared_ptr asyncDataCache_; + // Used as default MemoryAllocator. + std::shared_ptr allocator_; + + // Used as default AsyncDataCache. + std::shared_ptr asyncDataCache_; // Used for driver thread execution. std::unique_ptr driverExecutor_;