diff --git a/velox/benchmarks/tpch/TpchBenchmark.cpp b/velox/benchmarks/tpch/TpchBenchmark.cpp index 53ba99feb6b..8890a50a8e1 100644 --- a/velox/benchmarks/tpch/TpchBenchmark.cpp +++ b/velox/benchmarks/tpch/TpchBenchmark.cpp @@ -230,9 +230,10 @@ class TpchBenchmark { static_cast(FLAGS_ssd_checkpoint_interval_gb) << 30); } - auto allocator = std::make_shared(options); - allocator_ = std::make_shared( - allocator, memoryBytes, std::move(ssdCache)); + allocator_ = std::make_shared(options); + cache_ = + cache::AsyncDataCache::create(allocator_.get(), std::move(ssdCache)); + cache::AsyncDataCache::setInstance(cache_.get()); memory::MemoryAllocator::setDefaultInstance(allocator_.get()); } functions::prestosql::registerAllScalarFunctions(); @@ -261,6 +262,10 @@ class TpchBenchmark { connector::registerConnector(hiveConnector); } + void shutdown() { + cache_->prepareShutdown(); + } + std::pair, std::vector> run( const TpchPlan& tpchPlan) { int32_t repeat = 0; @@ -382,15 +387,13 @@ class TpchBenchmark { } #endif - auto cache = dynamic_cast(allocator_.get()); - if (cache) { - cache->clear(); + if (cache_) { + cache_->clear(); } } if (FLAGS_clear_ssd_cache) { - auto cache = dynamic_cast(allocator_.get()); - if (cache) { - auto ssdCache = cache->ssdCache(); + if (cache_) { + auto ssdCache = cache_->ssdCache(); if (ssdCache) { ssdCache->clear(); } @@ -462,7 +465,7 @@ class TpchBenchmark { std::unique_ptr ioExecutor_; std::unique_ptr cacheExecutor_; std::shared_ptr allocator_; - + std::shared_ptr cache_; // Parameter combinations to try. Each element specifies a flag and possible // values. All permutations are tried. std::vector parameters_; @@ -578,6 +581,7 @@ int tpchBenchmarkMain() { } else { benchmark.runAllCombinations(); } + benchmark.shutdown(); queryBuilder.reset(); return 0; } diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp index 92d54deebc8..3e0ff2db0c7 100644 --- a/velox/common/caching/AsyncDataCache.cpp +++ b/velox/common/caching/AsyncDataCache.cpp @@ -32,7 +32,7 @@ AsyncDataCacheEntry::AsyncDataCacheEntry(CacheShard* shard) : shard_(shard) { } AsyncDataCacheEntry::~AsyncDataCacheEntry() { - shard_->cache()->freeNonContiguous(data_); + shard_->cache()->allocator()->freeNonContiguous(data_); } void AsyncDataCacheEntry::setExclusiveToShared() { @@ -108,7 +108,7 @@ void AsyncDataCacheEntry::initialize(FileCacheKey key) { tinyData_.clear(); auto sizePages = bits::roundUp(size_, memory::AllocationTraits::kPageSize) / memory::AllocationTraits::kPageSize; - if (cache->allocateNonContiguous(sizePages, data_)) { + if (cache->allocator()->allocateNonContiguous(sizePages, data_)) { cache->incrementCachedPages(data().numPages()); } else { // No memory to cover 'this'. @@ -324,7 +324,7 @@ void CacheShard::removeEntryLocked(AsyncDataCacheEntry* entry) { auto numPages = entry->data().numPages(); if (numPages) { cache_->incrementCachedPages(-numPages); - cache_->freeNonContiguous(entry->data()); + cache_->allocator()->freeNonContiguous(entry->data()); } } } @@ -412,7 +412,7 @@ void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) { void CacheShard::freeAllocations(std::vector& allocations) { for (auto& allocation : allocations) { - cache_->freeNonContiguous(allocation); + cache_->allocator()->freeNonContiguous(allocation); } allocations.clear(); } @@ -495,8 +495,7 @@ void CacheShard::appendSsdSaveable(std::vector& pins) { } AsyncDataCache::AsyncDataCache( - const std::shared_ptr& allocator, - uint64_t /* maxBytes */, + memory::MemoryAllocator* allocator, std::unique_ptr ssdCache) : allocator_(allocator), ssdCache_(std::move(ssdCache)), cachedPages_(0) { for (auto i = 0; i < kNumShards; ++i) { @@ -504,15 +503,44 @@ AsyncDataCache::AsyncDataCache( } } -AsyncDataCache::AsyncDataCache( - const std::shared_ptr& allocator, - std::unique_ptr ssdCache) - : allocator_(allocator), ssdCache_(std::move(ssdCache)), cachedPages_(0) { - for (auto i = 0; i < kNumShards; ++i) { - shards_.push_back(std::make_unique(this)); +AsyncDataCache::~AsyncDataCache() {} + +// static +std::shared_ptr AsyncDataCache::create( + memory::MemoryAllocator* allocator, + std::unique_ptr ssdCache) { + auto cache = std::make_shared(allocator, std::move(ssdCache)); + allocator->registerCache(cache); + return cache; +} + +// static +AsyncDataCache* AsyncDataCache::getInstance() { + return *getInstancePtr(); +} + +// static +void AsyncDataCache::setInstance(AsyncDataCache* asyncDataCache) { + *getInstancePtr() = asyncDataCache; +} + +// static +AsyncDataCache** AsyncDataCache::getInstancePtr() { + static AsyncDataCache* cache_{nullptr}; + return &cache_; +} + +void AsyncDataCache::prepareShutdown() { + for (auto& shard : shards_) { + shard->prepareShutdown(); } } +void CacheShard::prepareShutdown() { + entries_.clear(); + freeEntries_.clear(); +} + CachePin AsyncDataCache::findOrCreate( RawFileCacheKey key, uint64_t size, @@ -611,50 +639,6 @@ void AsyncDataCache::backoff(int32_t counter) { std::this_thread::sleep_for(std::chrono::microseconds(usec)); // NOLINT } -bool AsyncDataCache::allocateNonContiguous( - MachinePageCount numPages, - memory::Allocation& out, - ReservationCallback reservationCB, - MachinePageCount minSizeClass) { - return makeSpace(numPages, [&]() { - return allocator_->allocateNonContiguous( - numPages, out, reservationCB, minSizeClass); - }); -} - -bool AsyncDataCache::allocateContiguous( - memory::MachinePageCount numPages, - memory::Allocation* collateral, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB, - memory::MachinePageCount maxPages) { - return makeSpace(numPages, [&]() { - return allocator_->allocateContiguous( - numPages, collateral, allocation, reservationCB, maxPages); - }); -} - -bool AsyncDataCache::growContiguous( - MachinePageCount increment, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB) { - return makeSpace(increment, [&]() { - return allocator_->growContiguous(increment, allocation, reservationCB); - }); -} - -void* AsyncDataCache::allocateBytes(uint64_t bytes, uint16_t alignment) { - void* result = nullptr; - makeSpace( - bits::roundUp(bytes, memory::AllocationTraits::kPageSize) / - memory::AllocationTraits::kPageSize, - [&]() { - result = allocator_->allocateBytes(bytes, alignment); - return result != nullptr; - }); - return result; -} - void AsyncDataCache::incrementNew(uint64_t size) { newBytes_ += size; if (!ssdCache_) { @@ -725,7 +709,7 @@ std::string AsyncDataCache::toString() const { << " read pins " << stats.numShared << " write pins " << stats.numExclusive << " unused prefetch " << stats.numPrefetch << " Alloc Megaclocks " << (stats.allocClocks >> 20) - << " allocated pages " << numAllocated() << " cached pages " + << " allocated pages " << allocator_->numAllocated() << " cached pages " << cachedPages_; out << "\nBacking: " << allocator_->toString(); if (ssdCache_) { diff --git a/velox/common/caching/AsyncDataCache.h b/velox/common/caching/AsyncDataCache.h index 2dd8a9098b2..2a60f8f273f 100644 --- a/velox/common/caching/AsyncDataCache.h +++ b/velox/common/caching/AsyncDataCache.h @@ -506,30 +506,36 @@ struct CacheStats { std::shared_ptr ssdStats = nullptr; }; -// Collection of cache entries whose key hashes to the same shard of -// the hash number space. The cache population is divided into shards -// to decrease contention on the mutex for the key to entry mapping -// and other housekeeping. + +/// Collection of cache entries whose key hashes to the same shard of +/// the hash number space. The cache population is divided into shards +/// to decrease contention on the mutex for the key to entry mapping +/// and other housekeeping. class CacheShard { public: explicit CacheShard(AsyncDataCache* FOLLY_NONNULL cache) : cache_(cache) {} - // See AsyncDataCache::findOrCreate. + /// See AsyncDataCache::findOrCreate. CachePin findOrCreate( RawFileCacheKey key, uint64_t size, folly::SemiFuture* readyFuture); - // Returns true if there is an entry for 'key'. Updates access time. + /// Returns true if there is an entry for 'key'. Updates access time. bool exists(RawFileCacheKey key) const; - AsyncDataCache* cache() { + AsyncDataCache* cache() const { return cache_; } + std::mutex& mutex() { return mutex_; } + /// Release any resources that consume memory from this 'CacheShard' for a + /// graceful shutdown. The shard will no longer be valid after this call. + void prepareShutdown(); + // removes 'bytesToFree' worth of entries or as many entries as are // not pinned. This favors first removing older and less frequently // used entries. If 'evictAllUnpinned' is true, anything that is @@ -609,101 +615,63 @@ class CacheShard { std::atomic allocClocks_; }; -class AsyncDataCache : public memory::MemoryAllocator { +class AsyncDataCache : public memory::Cache { public: - // TODO(jtan6): Remove this constructor after Presto Native switches to below - // constructor AsyncDataCache( - const std::shared_ptr& allocator, - uint64_t maxBytes, + memory::MemoryAllocator* allocator, std::unique_ptr ssdCache = nullptr); - AsyncDataCache( - const std::shared_ptr& allocator, + ~AsyncDataCache() override; + + static std::shared_ptr create( + memory::MemoryAllocator* allocator, std::unique_ptr ssdCache = nullptr); - // Finds or creates a cache entry corresponding to 'key'. The entry - // is returned in 'pin'. If the entry is new, it is pinned in - // exclusive mode and its 'data_' has uninitialized space for at - // least 'size' bytes. If the entry is in cache and already filled, - // the pin is in shared mode. If the entry is in exclusive mode for - // some other pin, the pin is empty. If 'waitFuture' is not nullptr - // and the pin is exclusive on some other pin, this is set to a - // future that is realized when the pin is no longer exclusive. When - // the future is realized, the caller may retry findOrCreate(). - // runtime error with code kNoCacheSpace if there is no space to create the - // new entry after evicting any unpinned content. + static AsyncDataCache* getInstance(); + + static void setInstance(AsyncDataCache* asyncDataCache); + + /// Release any resources that consume memory from 'allocator_' for a graceful + /// shutdown. The cache will no longer be valid after this call. + void prepareShutdown(); + + /// Calls 'allocate' until this returns true. Returns true if + /// allocate returns true. and Tries to evict at least 'numPages' of + /// cache after each failed call to 'allocate'. May pause to wait + /// for SSD cache flush if ''ssdCache_' is set and is busy + /// writing. Does random back-off after several failures and + /// eventually gives up. Allocation must not be serialized by a mutex + /// for memory arbitration to work. + bool makeSpace( + memory::MachinePageCount numPages, + std::function allocate) override; + + memory::MemoryAllocator* allocator() const override { + return allocator_; + } + + /// Finds or creates a cache entry corresponding to 'key'. The entry + /// is returned in 'pin'. If the entry is new, it is pinned in + /// exclusive mode and its 'data_' has uninitialized space for at + /// least 'size' bytes. If the entry is in cache and already filled, + /// the pin is in shared mode. If the entry is in exclusive mode for + /// some other pin, the pin is empty. If 'waitFuture' is not nullptr + /// and the pin is exclusive on some other pin, this is set to a + /// future that is realized when the pin is no longer exclusive. When + /// the future is realized, the caller may retry findOrCreate(). + /// runtime error with code kNoCacheSpace if there is no space to create the + /// new entry after evicting any unpinned content. CachePin findOrCreate( RawFileCacheKey key, uint64_t size, folly::SemiFuture* waitFuture = nullptr); - // Returns true if there is an entry for 'key'. Updates access time. + /// Returns true if there is an entry for 'key'. Updates access time. bool exists(RawFileCacheKey key) const; - Kind kind() const override { - return allocator_->kind(); - } - - size_t capacity() const override { - return allocator_->capacity(); - } - - bool allocateNonContiguous( - memory::MachinePageCount numPages, - memory::Allocation& out, - ReservationCallback reservationCB = nullptr, - memory::MachinePageCount minSizeClass = 0) override; - - int64_t freeNonContiguous(memory::Allocation& allocation) override { - return allocator_->freeNonContiguous(allocation); - } - - bool allocateContiguous( - memory::MachinePageCount numPages, - memory::Allocation* FOLLY_NULLABLE collateral, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, - memory::MachinePageCount maxPages = 0) override; - - void freeContiguous(memory::ContiguousAllocation& allocation) override { - allocator_->freeContiguous(allocation); - } - - bool growContiguous( - memory::MachinePageCount increment, - memory::ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr) override; - - void* allocateBytes(uint64_t bytes, uint16_t alignment) override; - - void freeBytes(void* p, uint64_t size) noexcept override { - allocator_->freeBytes(p, size); - } - - bool checkConsistency() const override { - return allocator_->checkConsistency(); - } - - const std::vector& sizeClasses() const override { - return allocator_->sizeClasses(); - } - - size_t totalUsedBytes() const override { - return allocator_->totalUsedBytes(); - } - - memory::MachinePageCount numAllocated() const override { - return allocator_->numAllocated(); - } - - memory::MachinePageCount numMapped() const override { - return allocator_->numMapped(); - } - CacheStats refreshStats() const; - std::string toString() const override; + std::string toString() const; memory::MachinePageCount incrementCachedPages(int64_t pages) { // The counter is unsigned and the increment is signed. @@ -719,19 +687,19 @@ class AsyncDataCache : public memory::MemoryAllocator { return ssdCache_.get(); } - // Updates stats for creation of a new cache entry of 'size' bytes, - // i.e. a cache miss. Periodically updates SSD admission criteria, - // i.e. reconsider criteria every half cache capacity worth of misses. + /// Updates stats for creation of a new cache entry of 'size' bytes, + /// i.e. a cache miss. Periodically updates SSD admission criteria, + /// i.e. reconsider criteria every half cache capacity worth of misses. void incrementNew(uint64_t size); - // Updates statistics after bringing in 'bytes' worth of data that - // qualifies for SSD save and is not backed by SSD. Periodically - // triggers a background write of eligible entries to SSD. + /// Updates statistics after bringing in 'bytes' worth of data that + /// qualifies for SSD save and is not backed by SSD. Periodically + /// triggers a background write of eligible entries to SSD. void possibleSsdSave(uint64_t bytes); - // Sets a callback applied to new entries at the point where - // they are set to shared mode. Used for testing and can be used for - // e.g. checking checksums. + /// Sets a callback applied to new entries at the point where + /// they are set to shared mode. Used for testing and can be used for + /// e.g. checking checksums. void setVerifyHook(std::function hook) { verifyHook_ = hook; } @@ -769,29 +737,16 @@ class AsyncDataCache : public memory::MemoryAllocator { return numSkippedSaves_; } - memory::Stats stats() const override { - return allocator_->stats(); - } - private: static constexpr int32_t kNumShards = 4; // Must be power of 2. static constexpr int32_t kShardMask = kNumShards - 1; + static AsyncDataCache** getInstancePtr(); + // Waits a pseudorandom delay times 'counter'. void backoff(int32_t counter); - // Calls 'allocate' until this returns true. Returns true if - // allocate returns true. and Tries to evict at least 'numPages' of - // cache after each failed call to 'allocate'. May pause to wait - // for SSD cache flush if ''ssdCache_' is set and is busy - // writing. Does random back-off after several failures and - // eventually gives up. Allocation must not be serialized by a mutex - // for memory arbitration to work. - bool makeSpace( - memory::MachinePageCount numPages, - std::function allocate); - - std::shared_ptr allocator_; + memory::MemoryAllocator* const allocator_; std::unique_ptr ssdCache_; std::vector> shards_; std::atomic shardCounter_{0}; diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 1c3580a9d86..66bb6dd65cb 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -81,6 +81,7 @@ class AsyncDataCacheTest : public testing::Test { if (ssdCache) { ssdCache->deleteFiles(); } + cache_->prepareShutdown(); } } @@ -104,18 +105,21 @@ class AsyncDataCacheTest : public testing::Test { } memory::MmapAllocator::Options options; options.capacity = maxBytes; - cache_ = std::make_shared( - std::make_shared(options), - maxBytes, - std::move(ssdCache)); + if (cache_) { + cache_->prepareShutdown(); + } + cache_.reset(); + allocator_.reset(); + allocator_ = std::make_shared(options); + cache_ = AsyncDataCache::create(allocator_.get(), std::move(ssdCache)); if (filenames_.empty()) { for (auto i = 0; i < kNumFiles; ++i) { auto name = fmt::format("testing_file_{}", i); filenames_.push_back(StringIdLease(fileIds(), name)); } } - ASSERT_EQ(cache_->kind(), MemoryAllocator::Kind::kMmap); - ASSERT_EQ(MemoryAllocator::kindString(cache_->kind()), "MMAP"); + ASSERT_EQ(cache_->allocator()->kind(), MemoryAllocator::Kind::kMmap); + ASSERT_EQ(MemoryAllocator::kindString(cache_->allocator()->kind()), "MMAP"); } // Finds one entry from RAM, SSD or storage. Throws if the data @@ -222,12 +226,13 @@ class AsyncDataCacheTest : public testing::Test { void clearAllocations(std::deque& allocations) { while (!allocations.empty()) { - cache_->freeNonContiguous(allocations.front()); + allocator_->freeNonContiguous(allocations.front()); allocations.pop_front(); } } std::shared_ptr tempDirectory_; + std::shared_ptr allocator_; std::shared_ptr cache_; std::vector filenames_; std::unique_ptr executor_; @@ -588,7 +593,7 @@ TEST_F(AsyncDataCacheTest, outOfCapacity) { pins.pop_front(); } memory::Allocation allocation; - ASSERT_FALSE(cache_->allocateNonContiguous(kSizeInPages, allocation)); + ASSERT_FALSE(allocator_->allocateNonContiguous(kSizeInPages, allocation)); // One 4 page entry below the max size of 4K 4 page entries in 16MB of // capacity. ASSERT_EQ(16384, cache_->incrementCachedPages(0)); @@ -597,14 +602,14 @@ TEST_F(AsyncDataCacheTest, outOfCapacity) { // We allocate the full capacity and expect the cache entries to go. for (;;) { - if (!cache_->allocateNonContiguous(kSizeInPages, allocation)) { + if (!allocator_->allocateNonContiguous(kSizeInPages, allocation)) { break; } allocations.push_back(std::move(allocation)); } EXPECT_EQ(0, cache_->incrementCachedPages(0)); EXPECT_EQ(0, cache_->incrementPrefetchPages(0)); - EXPECT_EQ(16384, cache_->numAllocated()); + EXPECT_EQ(16384, allocator_->numAllocated()); clearAllocations(allocations); } diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 33ed1cf7f7c..c4d7c93df2b 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -45,6 +45,9 @@ class SsdFileTest : public testing::Test { if (ssdFile_) { ssdFile_->deleteFile(); } + if (cache_) { + cache_->prepareShutdown(); + } } void initializeCache( @@ -53,8 +56,7 @@ class SsdFileTest : public testing::Test { bool setNoCowFlag = false) { // tmpfs does not support O_DIRECT, so turn this off for testing. FLAGS_ssd_odirect = false; - cache_ = std::make_shared( - MemoryAllocator::createDefaultInstance(), maxBytes); + cache_ = AsyncDataCache::create(MemoryAllocator::getInstance()); fileName_ = StringIdLease(fileIds(), "fileInStorage"); diff --git a/velox/common/memory/MallocAllocator.cpp b/velox/common/memory/MallocAllocator.cpp index df63562916c..73c35c18a3f 100644 --- a/velox/common/memory/MallocAllocator.cpp +++ b/velox/common/memory/MallocAllocator.cpp @@ -23,7 +23,7 @@ namespace facebook::velox::memory { MallocAllocator::MallocAllocator(size_t capacity) : kind_(MemoryAllocator::Kind::kMalloc), capacity_(capacity) {} -bool MallocAllocator::allocateNonContiguous( +bool MallocAllocator::allocateNonContiguousWithoutRetry( MachinePageCount numPages, Allocation& out, ReservationCallback reservationCB, @@ -109,6 +109,20 @@ bool MallocAllocator::allocateNonContiguous( return true; } +bool MallocAllocator::allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB, + MachinePageCount maxPages) { + bool result; + stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { + result = allocateContiguousImpl( + numPages, collateral, allocation, reservationCB, maxPages); + }); + return result; +} + bool MallocAllocator::allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, @@ -216,6 +230,11 @@ int64_t MallocAllocator::freeNonContiguous(Allocation& allocation) { return freedBytes; } +void MallocAllocator::freeContiguous(ContiguousAllocation& allocation) { + stats_.recordFree( + allocation.size(), [&]() { freeContiguousImpl(allocation); }); +} + void MallocAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { if (allocation.empty()) { return; @@ -233,7 +252,7 @@ void MallocAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { allocation.clear(); } -bool MallocAllocator::growContiguous( +bool MallocAllocator::growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB) { @@ -259,7 +278,9 @@ bool MallocAllocator::growContiguous( return true; } -void* MallocAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { +void* MallocAllocator::allocateBytesWithoutRetry( + uint64_t bytes, + uint16_t alignment) { if (!incrementUsage(bytes)) { return nullptr; } @@ -279,7 +300,7 @@ void* MallocAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { return result; } -void* MallocAllocator::allocateZeroFilled(uint64_t bytes) { +void* MallocAllocator::allocateZeroFilledWithoutRetry(uint64_t bytes) { if (!incrementUsage(bytes)) { return nullptr; } diff --git a/velox/common/memory/MallocAllocator.h b/velox/common/memory/MallocAllocator.h index 24503c4570a..debc07cdc96 100644 --- a/velox/common/memory/MallocAllocator.h +++ b/velox/common/memory/MallocAllocator.h @@ -37,6 +37,17 @@ class MallocAllocator : public MemoryAllocator { } } + void registerCache(const std::shared_ptr& cache) override { + VELOX_CHECK_NULL(cache_); + VELOX_CHECK_NOT_NULL(cache); + VELOX_CHECK(cache->allocator() == this); + cache_ = cache; + } + + Cache* cache() const override { + return cache_.get(); + } + Kind kind() const override { return kind_; } @@ -45,42 +56,15 @@ class MallocAllocator : public MemoryAllocator { return capacity_; } - bool allocateNonContiguous( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) override; + void freeContiguous(ContiguousAllocation& allocation) override; int64_t freeNonContiguous(Allocation& allocation) override; - bool allocateContiguous( - MachinePageCount numPages, - Allocation* collateral, - ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, - MachinePageCount maxPages = 0) override { - bool result; - stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { - result = allocateContiguousImpl( - numPages, collateral, allocation, reservationCB, maxPages); - }); - return result; - } - - void freeContiguous(ContiguousAllocation& allocation) override { - stats_.recordFree( - allocation.size(), [&]() { freeContiguousImpl(allocation); }); - } - - bool growContiguous( + bool growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB = nullptr) override; - void* allocateBytes(uint64_t bytes, uint16_t alignment) override; - - void* allocateZeroFilled(uint64_t bytes) override; - void freeBytes(void* p, uint64_t bytes) noexcept override; size_t totalUsedBytes() const override { @@ -95,15 +79,24 @@ class MallocAllocator : public MemoryAllocator { return numMapped_; } - Stats stats() const override { - return stats_; - } - bool checkConsistency() const override; std::string toString() const override; private: + bool allocateNonContiguousWithoutRetry( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB = nullptr, + MachinePageCount minSizeClass = 0) override; + + bool allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* FOLLY_NULLABLE collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr, + MachinePageCount maxPages = 0) override; + bool allocateContiguousImpl( MachinePageCount numPages, Allocation* FOLLY_NULLABLE collateral, @@ -113,6 +106,10 @@ class MallocAllocator : public MemoryAllocator { void freeContiguousImpl(ContiguousAllocation& allocation); + void* allocateBytesWithoutRetry(uint64_t bytes, uint16_t alignment) override; + + void* allocateZeroFilledWithoutRetry(uint64_t bytes) override; + /// Increment current usage and check current allocator consistency to make /// sure current usage does not go above 'capacity_'. If it goes above /// 'capacity_', the increment will not be applied. Returns true if within @@ -161,6 +158,6 @@ class MallocAllocator : public MemoryAllocator { /// Tracks malloc'd pointers to detect bad frees. std::unordered_set mallocs_; - Stats stats_; + std::shared_ptr cache_; }; } // namespace facebook::velox::memory diff --git a/velox/common/memory/MemoryAllocator.cpp b/velox/common/memory/MemoryAllocator.cpp index ccdbb4c448d..a4a684e3384 100644 --- a/velox/common/memory/MemoryAllocator.cpp +++ b/velox/common/memory/MemoryAllocator.cpp @@ -160,7 +160,74 @@ MachinePageCount MemoryAllocator::roundUpToSizeClassSize( return *std::lower_bound(sizes.begin(), sizes.end(), pages); } +bool MemoryAllocator::allocateNonContiguous( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB, + MachinePageCount minSizeClass) { + if (cache() == nullptr) { + return allocateNonContiguousWithoutRetry( + numPages, out, reservationCB, minSizeClass); + } + return cache()->makeSpace(numPages, [&]() { + return allocateNonContiguousWithoutRetry( + numPages, out, reservationCB, minSizeClass); + }); +} + +bool MemoryAllocator::allocateContiguous( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB, + MachinePageCount maxPages) { + if (cache() == nullptr) { + return allocateContiguousWithoutRetry( + numPages, collateral, allocation, reservationCB, maxPages); + } + return cache()->makeSpace(numPages, [&]() { + return allocateContiguousWithoutRetry( + numPages, collateral, allocation, reservationCB, maxPages); + }); +} + +bool MemoryAllocator::growContiguous( + MachinePageCount increment, + ContiguousAllocation& allocation, + ReservationCallback reservationCB) { + if (cache() == nullptr) { + return growContiguousWithoutRetry(increment, allocation, reservationCB); + } + return cache()->makeSpace(increment, [&]() { + return growContiguousWithoutRetry(increment, allocation, reservationCB); + }); +} + +void* MemoryAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { + if (cache() == nullptr) { + return allocateBytesWithoutRetry(bytes, alignment); + } + void* result = nullptr; + cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { + result = allocateBytesWithoutRetry(bytes, alignment); + return result != nullptr; + }); + return result; +} + void* MemoryAllocator::allocateZeroFilled(uint64_t bytes) { + if (cache() == nullptr) { + return allocateZeroFilledWithoutRetry(bytes); + } + void* result = nullptr; + cache()->makeSpace(AllocationTraits::numPages(bytes), [&]() { + result = allocateZeroFilledWithoutRetry(bytes); + return result != nullptr; + }); + return result; +} + +void* MemoryAllocator::allocateZeroFilledWithoutRetry(uint64_t bytes) { void* result = allocateBytes(bytes); if (result != nullptr) { ::memset(result, 0, bytes); diff --git a/velox/common/memory/MemoryAllocator.h b/velox/common/memory/MemoryAllocator.h index 6cd0c76d6a6..cae01562025 100644 --- a/velox/common/memory/MemoryAllocator.h +++ b/velox/common/memory/MemoryAllocator.h @@ -139,6 +139,24 @@ struct Stats { int64_t numAdvise{0}; }; +class MemoryAllocator; + +/// A general cache interface using 'MemroyAllocator' to allocate memory, that +/// is also able to free up memory upon request by shrinking itself. +class Cache { + public: + virtual ~Cache() = default; + /// This method should be implemented so that it tries to accommodate the + /// passed in 'allocate' by freeing up space from 'this' if needed. 'numPages' + /// is the number of pages 'allocate' tries to allocate.It should return true + /// if 'allocate' succeeds, and false otherwise. + virtual bool makeSpace( + memory::MachinePageCount numPages, + std::function allocate) = 0; + + virtual MemoryAllocator* allocator() const = 0; +}; + /// This class provides interface for the actual memory allocations from memory /// pool. It allocates runs of machine pages from predefined size classes, and /// supports both contiguous and non-contiguous memory allocations. An @@ -193,6 +211,11 @@ class MemoryAllocator : public std::enable_shared_from_this { /// the kind of the delegated memory allocator underneath. virtual Kind kind() const = 0; + /// Registers a 'Cache' that is used for freeing up space when this allocator + /// is under memory pressure. The allocator of registered 'Cache' needs to be + /// the same as 'this'. + virtual void registerCache(const std::shared_ptr& cache) = 0; + using ReservationCallback = std::function; /// Returns the capacity of the allocator in bytes. @@ -210,18 +233,20 @@ class MemoryAllocator : public std::enable_shared_from_this { /// 'reservationCB' before the actual memory allocation so it needs to release /// the reservation if the actual allocation fails halfway. The function /// returns true if the allocation succeeded. If it returns false, 'out' - /// references no memory and any partially allocated memory is freed. + /// references no memory and any partially allocated memory is freed. The + /// function might retry allocation failure by making space from 'cache()' if + /// registered. But sufficient space is not guaranteed. /// /// NOTE: /// - 'out' is guaranteed to be freed if it's not empty. /// - Allocation is not guaranteed even if collateral 'out' is larger than /// 'numPages', because this method is not atomic. /// - Throws if allocation exceeds capacity. - virtual bool allocateNonContiguous( + bool allocateNonContiguous( MachinePageCount numPages, Allocation& out, ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) = 0; + MachinePageCount minSizeClass = 0); /// Frees non-contiguous 'allocation'. 'allocation' is empty on return. The /// function returns the actual freed bytes. @@ -241,19 +266,18 @@ class MemoryAllocator : public std::enable_shared_from_this { /// cleared. /// /// NOTE: - 'collateral' and passed in 'allocation' are guaranteed - /// to be freed. If 'maxPages' is non-0, 'maxPages' worth of - /// address space is mapped but the utilization in the allocator and - /// pool is incremented by 'numPages'. This allows reserving - /// a large range of addresses for use with huge pages without - /// declaring the whole range as held by the query. The reservation - /// will be increased as and if addresses in the range are used. See - /// growContiguous(). - virtual bool allocateContiguous( + /// to be freed. If 'maxPages' is non-0, 'maxPages' worth of address space is + /// mapped but the utilization in the allocator and pool is incremented by + /// 'numPages'. This allows reserving a large range of addresses for use with + /// huge pages without declaring the whole range as held by the query. The + /// reservation will be increased as and if addresses in the range are used. + /// See growContiguous(). + bool allocateContiguous( MachinePageCount numPages, Allocation* collateral, ContiguousAllocation& allocation, ReservationCallback reservationCB = nullptr, - MachinePageCount maxPages = 0) = 0; + MachinePageCount maxPages = 0); /// Frees contiguous 'allocation'. 'allocation' is empty on return. virtual void freeContiguous(ContiguousAllocation& allocation) = 0; @@ -262,24 +286,26 @@ class MemoryAllocator : public std::enable_shared_from_this { /// 'increment'. false if would exceed capacity, Throws if size /// would exceed maxSize given in allocateContiguous(). Calls reservationCB /// before increasing the utilization and returns false with no effect if this - /// fails. - virtual bool growContiguous( + /// fails. The function might retry allocation failure by making + /// space from 'cache()' if registered. But sufficient space is not guaranteed + bool growContiguous( MachinePageCount increment, ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr) = 0; + ReservationCallback reservationCB = nullptr); /// Allocates contiguous 'bytes' and return the first byte. Returns nullptr if - /// there is no space. + /// there is no space. The function might retry allocation failure by making + /// space from 'cache()' if registered. But sufficient space is not + /// guaranteed. /// /// NOTE: 'alignment' must be power of two and in range of /// [kMinAlignment, kMaxAlignment]. - virtual void* allocateBytes( - uint64_t bytes, - uint16_t alignment = kMinAlignment) = 0; + void* allocateBytes(uint64_t bytes, uint16_t alignment = kMinAlignment); /// Allocates a zero-filled contiguous bytes. Returns nullptr if there is no - /// space - virtual void* allocateZeroFilled(uint64_t bytes); + /// space. The function might retry allocation failure by making space from + /// 'cache()' if registered. But sufficient space is not guaranteed. + void* allocateZeroFilled(uint64_t bytes); /// Frees contiguous memory allocated by allocateBytes, allocateZeroFilled, /// reallocateBytes. @@ -306,7 +332,7 @@ class MemoryAllocator : public std::enable_shared_from_this { virtual MachinePageCount numMapped() const = 0; virtual Stats stats() const { - return Stats(); + return stats_; } virtual std::string toString() const = 0; @@ -356,7 +382,38 @@ class MemoryAllocator : public std::enable_shared_from_this { } protected: - MemoryAllocator() = default; + explicit MemoryAllocator() = default; + + /// The actual memory allocation function implementation without retry + /// attempts by making space from cache. + virtual bool allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr, + MachinePageCount maxPages = 0) = 0; + + virtual bool allocateNonContiguousWithoutRetry( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB, + MachinePageCount minSizeClass) = 0; + + virtual void* allocateBytesWithoutRetry( + uint64_t bytes, + uint16_t alignment) = 0; + + virtual void* allocateZeroFilledWithoutRetry(uint64_t bytes); + + virtual bool growContiguousWithoutRetry( + MachinePageCount increment, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr) = 0; + + // 'Cache' getter. The cache is only responsible for freeing up memory space + // by shrinking itself when there is not enough space upon allocating. The + // free of space is not guaranteed. + virtual Cache* cache() const = 0; // Returns the size class size that corresponds to 'bytes'. static MachinePageCount roundUpToSizeClassSize( @@ -422,6 +479,8 @@ class MemoryAllocator : public std::enable_shared_from_this { InjectedFailure injectedFailure_{InjectedFailure::kNone}; bool isPersistentFailureInjection_{false}; + Stats stats_; + private: static std::mutex initMutex_; // Singleton instance. diff --git a/velox/common/memory/MmapAllocator.cpp b/velox/common/memory/MmapAllocator.cpp index 8da35bc5bd5..3964b2268cb 100644 --- a/velox/common/memory/MmapAllocator.cpp +++ b/velox/common/memory/MmapAllocator.cpp @@ -51,7 +51,7 @@ MmapAllocator::~MmapAllocator() { (numAllocated_ == 0) && (numExternalMapped_ == 0), "{}", toString()); } -bool MmapAllocator::allocateNonContiguous( +bool MmapAllocator::allocateNonContiguousWithoutRetry( MachinePageCount numPages, Allocation& out, ReservationCallback reservationCB, @@ -211,6 +211,20 @@ MachinePageCount MmapAllocator::freeInternal(Allocation& allocation) { return numFreed; } +bool MmapAllocator::allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB, + MachinePageCount maxPages) { + bool result; + stats_.recordAllocate(AllocationTraits::pageBytes(numPages), 1, [&]() { + result = allocateContiguousImpl( + numPages, collateral, allocation, reservationCB, maxPages); + }); + return result; +} + bool MmapAllocator::allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, @@ -364,6 +378,11 @@ bool MmapAllocator::allocateContiguousImpl( return true; } +void MmapAllocator::freeContiguous(ContiguousAllocation& allocation) { + stats_.recordFree( + allocation.size(), [&]() { freeContiguousImpl(allocation); }); +} + void MmapAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { if (allocation.empty()) { return; @@ -384,7 +403,7 @@ void MmapAllocator::freeContiguousImpl(ContiguousAllocation& allocation) { allocation.clear(); } -bool MmapAllocator::growContiguous( +bool MmapAllocator::growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB) { @@ -430,7 +449,9 @@ bool MmapAllocator::growContiguous( return true; } -void* MmapAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { +void* MmapAllocator::allocateBytesWithoutRetry( + uint64_t bytes, + uint16_t alignment) { alignmentCheck(bytes, alignment); if (useMalloc(bytes)) { @@ -448,7 +469,8 @@ void* MmapAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { if (bytes <= AllocationTraits::pageBytes(sizeClassSizes_.back())) { Allocation allocation; const auto numPages = roundUpToSizeClassSize(bytes, sizeClassSizes_); - if (!allocateNonContiguous(numPages, allocation, nullptr, numPages)) { + if (!allocateNonContiguousWithoutRetry( + numPages, allocation, nullptr, numPages)) { return nullptr; } auto run = allocation.runAt(0); @@ -463,7 +485,7 @@ void* MmapAllocator::allocateBytes(uint64_t bytes, uint16_t alignment) { ContiguousAllocation allocation; auto numPages = bits::roundUp(bytes, AllocationTraits::kPageSize) / AllocationTraits::kPageSize; - if (!allocateContiguous(numPages, nullptr, allocation)) { + if (!allocateContiguousWithoutRetry(numPages, nullptr, allocation)) { return nullptr; } diff --git a/velox/common/memory/MmapAllocator.h b/velox/common/memory/MmapAllocator.h index 0ecc872e17c..43863af9f53 100644 --- a/velox/common/memory/MmapAllocator.h +++ b/velox/common/memory/MmapAllocator.h @@ -85,56 +85,29 @@ class MmapAllocator : public MemoryAllocator { return kind_; } - size_t capacity() const override { - return AllocationTraits::pageBytes(capacity_); + void registerCache(const std::shared_ptr& cache) override { + VELOX_CHECK_NULL(cache_); + VELOX_CHECK_NOT_NULL(cache); + VELOX_CHECK(cache->allocator() == this); + cache_ = cache; } - bool allocateNonContiguous( - MachinePageCount numPages, - Allocation& out, - ReservationCallback reservationCB = nullptr, - MachinePageCount minSizeClass = 0) override; - - int64_t freeNonContiguous(Allocation& allocation) override; - - bool allocateContiguous( - MachinePageCount numPages, - Allocation* collateral, - ContiguousAllocation& allocation, - ReservationCallback reservationCB = nullptr, - MachinePageCount maxPages = 0) override { - bool result; - stats_.recordAllocate(numPages * AllocationTraits::kPageSize, 1, [&]() { - result = allocateContiguousImpl( - numPages, collateral, allocation, reservationCB, maxPages); - }); - return result; + Cache* cache() const override { + return cache_.get(); } - void freeContiguous(ContiguousAllocation& allocation) override { - stats_.recordFree( - allocation.size(), [&]() { freeContiguousImpl(allocation); }); + size_t capacity() const override { + return AllocationTraits::pageBytes(capacity_); } - bool growContiguous( + bool growContiguousWithoutRetry( MachinePageCount increment, ContiguousAllocation& allocation, ReservationCallback reservationCB = nullptr) override; - /// Allocates 'bytes' contiguous bytes and returns the pointer to the first - /// byte. If 'bytes' is less than 'maxMallocBytes_', delegates the allocation - /// to malloc. If the size is above that and below the largest size classes' - /// size, allocates one element of the next size classes' size. If 'size' is - /// greater than the largest size classes' size, calls allocateContiguous(). - /// Returns nullptr if there is no space. The amount to allocate is subject to - /// the size limit of 'this'. This function is not virtual but calls the - /// virtual functions allocateNonContiguous and allocateContiguous, which can - /// track sizes and enforce caps etc. If 'alignment' is not kMinAlignment, - /// then 'bytes' must be a multiple of 'alignment'. - /// - /// NOTE: 'alignment' must be power of two and in range of [kMinAlignment, - /// kMaxAlignment]. - void* allocateBytes(uint64_t bytes, uint16_t alignment) override; + void freeContiguous(ContiguousAllocation& allocation) override; + + int64_t freeNonContiguous(Allocation& allocation) override; void freeBytes(void* p, uint64_t bytes) noexcept override; @@ -339,6 +312,19 @@ class MmapAllocator : public MemoryAllocator { uint64_t numAdvisedAway_ = 0; }; + bool allocateNonContiguousWithoutRetry( + MachinePageCount numPages, + Allocation& out, + ReservationCallback reservationCB = nullptr, + MachinePageCount minSizeClass = 0) override; + + bool allocateContiguousWithoutRetry( + MachinePageCount numPages, + Allocation* collateral, + ContiguousAllocation& allocation, + ReservationCallback reservationCB = nullptr, + MachinePageCount maxPages = 0) override; + bool allocateContiguousImpl( MachinePageCount numPages, Allocation* collateral, @@ -348,6 +334,21 @@ class MmapAllocator : public MemoryAllocator { void freeContiguousImpl(ContiguousAllocation& allocation); + // Allocates 'bytes' contiguous bytes and returns the pointer to the first + // byte. If 'bytes' is less than 'maxMallocBytes_', delegates the allocation + // to malloc. If the size is above that and below the largest size classes' + // size, allocates one element of the next size classes' size. If 'size' is + // greater than the largest size classes' size, calls allocateContiguous(). + // Returns nullptr if there is no space. The amount to allocate is subject to + // the size limit of 'this'. This function is not virtual but calls the + // virtual functions allocateNonContiguous and allocateContiguous, which can + // track sizes and enforce caps etc. If 'alignment' is not kMinAlignment, + // then 'bytes' must be a multiple of 'alignment'. + // + // NOTE: 'alignment' must be power of two and in range of [kMinAlignment, + // kMaxAlignment]. + void* allocateBytesWithoutRetry(uint64_t bytes, uint16_t alignment) override; + // Ensures that there are at least 'newMappedNeeded' pages that are // not backing any existing allocation. If capacity_ - numMapped_ < // newMappedNeeded, advises away enough pages backing freed slots in @@ -418,7 +419,7 @@ class MmapAllocator : public MemoryAllocator { std::mutex arenaMutex_; std::unique_ptr managedArenas_; - Stats stats_; + std::shared_ptr cache_; }; } // namespace facebook::velox::memory diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index e2e71c3496c..eabc39a3a95 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -88,18 +88,16 @@ class MemoryPoolTest : public testing::TestWithParam { MmapAllocator::Options opts{8UL << 30}; allocator_ = std::make_shared(opts); if (useCache_) { - cache_ = - std::make_shared(allocator_, kCapacity, nullptr); - MemoryAllocator::setDefaultInstance(cache_.get()); + cache_ = AsyncDataCache::create(allocator_.get()); + MemoryAllocator::setDefaultInstance(allocator_.get()); } else { MemoryAllocator::setDefaultInstance(allocator_.get()); } } else { allocator_ = MemoryAllocator::createDefaultInstance(); if (useCache_) { - cache_ = - std::make_shared(allocator_, kCapacity, nullptr); - MemoryAllocator::setDefaultInstance(cache_.get()); + cache_ = AsyncDataCache::create(allocator_.get()); + MemoryAllocator::setDefaultInstance(allocator_.get()); } else { MemoryAllocator::setDefaultInstance(allocator_.get()); } @@ -111,6 +109,9 @@ class MemoryPoolTest : public testing::TestWithParam { } void TearDown() override { + if (useCache_) { + cache_->prepareShutdown(); + } allocator_->testingClearFailureInjection(); MmapAllocator::setDefaultInstance(nullptr); } diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 63638ddf074..660974cd5d9 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -312,7 +312,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { executor_.get(), std::unordered_map{}, configs, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), std::move(pool)); return queryCtx; } diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index e11d5f60089..b8fa93f191d 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -17,6 +17,7 @@ #include "velox/common/base/AsyncSource.h" #include "velox/common/base/RuntimeMetrics.h" +#include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/ScanTracker.h" #include "velox/common/future/VeloxPromise.h" #include "velox/core/ExpressionEvaluator.h" @@ -223,7 +224,7 @@ class ConnectorQueryCtx { memory::MemoryPool* connectorPool, const Config* connectorConfig, std::unique_ptr expressionEvaluator, - memory::MemoryAllocator* FOLLY_NONNULL allocator, + cache::AsyncDataCache* cache, const std::string& queryId, const std::string& taskId, const std::string& planNodeId, @@ -232,7 +233,7 @@ class ConnectorQueryCtx { connectorPool_(connectorPool), config_(connectorConfig), expressionEvaluator_(std::move(expressionEvaluator)), - allocator_(allocator), + cache_(cache), scanId_(fmt::format("{}.{}", taskId, planNodeId)), queryId_(queryId), taskId_(taskId), @@ -260,10 +261,8 @@ class ConnectorQueryCtx { return expressionEvaluator_.get(); } - // MemoryAllocator for large allocations. Used for caching with - // CachedBufferedImput if this implements cache::AsyncDataCache. - memory::MemoryAllocator* FOLLY_NONNULL allocator() const { - return allocator_; + cache::AsyncDataCache* cache() const { + return cache_; } // This is a combination of task id and the scan's PlanNodeId. This is an id @@ -295,7 +294,7 @@ class ConnectorQueryCtx { memory::MemoryPool* connectorPool_; const Config* FOLLY_NONNULL config_; std::unique_ptr expressionEvaluator_; - memory::MemoryAllocator* FOLLY_NONNULL allocator_; + cache::AsyncDataCache* cache_; const std::string scanId_; const std::string queryId_; const std::string taskId_; diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index 5970b9df80c..f1756255684 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -52,7 +52,7 @@ class HiveConnector : public Connector { columnHandles, &fileHandleFactory_, connectorQueryCtx->expressionEvaluator(), - connectorQueryCtx->allocator(), + connectorQueryCtx->cache(), connectorQueryCtx->scanId(), HiveConfig::isFileColumnNamesReadAsLowerCase( connectorQueryCtx->config()), diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 74b9f56f42d..7f58d125af0 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -358,7 +358,7 @@ HiveDataSource::HiveDataSource( std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, core::ExpressionEvaluator* expressionEvaluator, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, const std::string& scanId, bool fileColumnNamesReadAsLowerCase, folly::Executor* executor, @@ -368,7 +368,7 @@ HiveDataSource::HiveDataSource( pool_(&options.getMemoryPool()), outputType_(outputType), expressionEvaluator_(expressionEvaluator), - allocator_(allocator), + cache_(cache), scanId_(scanId), executor_(executor) { // Column handled keyed on the column alias, the name used in the query. @@ -835,12 +835,12 @@ std::unique_ptr HiveDataSource::createBufferedInput( const FileHandle& fileHandle, const dwio::common::ReaderOptions& readerOpts) { - if (auto* asyncCache = dynamic_cast(allocator_)) { + if (cache_) { return std::make_unique( fileHandle.file, dwio::common::MetricsLog::voidLog(), fileHandle.uuid.id(), - asyncCache, + cache_, Connector::getTracker(scanId_, readerOpts.loadQuantum()), fileHandle.groupId.id(), ioStats_, diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index c5db04e897c..1b822eb9746 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -37,7 +37,7 @@ class HiveDataSource : public DataSource { std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, core::ExpressionEvaluator* expressionEvaluator, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, const std::string& scanId, bool fileColumnNamesReadAsLowerCase, folly::Executor* executor, @@ -162,7 +162,7 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - memory::MemoryAllocator* const allocator_; + cache::AsyncDataCache* const cache_{nullptr}; const std::string& scanId_; folly::Executor* executor_; }; diff --git a/velox/core/CMakeLists.txt b/velox/core/CMakeLists.txt index 86d171485ed..f4db34c8f38 100644 --- a/velox/core/CMakeLists.txt +++ b/velox/core/CMakeLists.txt @@ -23,6 +23,7 @@ add_library(velox_core Expressions.cpp PlanFragment.cpp PlanNode.cpp target_link_libraries( velox_core + velox_caching velox_config velox_expression_functions velox_type diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index cb06600b2fb..2bfa52f6d9f 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -21,13 +21,13 @@ QueryCtx::QueryCtx( folly::Executor* executor, std::unordered_map queryConfigValues, std::unordered_map> connectorConfigs, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, std::shared_ptr pool, std::shared_ptr spillExecutor, const std::string& queryId) : queryId_(queryId), connectorConfigs_(connectorConfigs), - allocator_(allocator), + cache_(cache), pool_(std::move(pool)), executor_(executor), queryConfig_{std::move(queryConfigValues)}, @@ -39,12 +39,12 @@ QueryCtx::QueryCtx( folly::Executor::KeepAlive<> executorKeepalive, std::unordered_map queryConfigValues, std::unordered_map> connectorConfigs, - memory::MemoryAllocator* allocator, + cache::AsyncDataCache* cache, std::shared_ptr pool, const std::string& queryId) : queryId_(queryId), connectorConfigs_(connectorConfigs), - allocator_(allocator), + cache_(cache), pool_(std::move(pool)), executorKeepalive_(std::move(executorKeepalive)), queryConfig_{std::move(queryConfigValues)} { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index d4f50874a99..7b0f178b5c7 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -17,8 +17,8 @@ #include #include +#include "velox/common/caching/AsyncDataCache.h" #include "velox/common/memory/Memory.h" -#include "velox/common/memory/MemoryAllocator.h" #include "velox/core/QueryConfig.h" #include "velox/vector/DecodedVector.h" #include "velox/vector/VectorPool.h" @@ -39,8 +39,7 @@ class QueryCtx { std::unordered_map queryConfigValues = {}, std::unordered_map> connectorConfigs = {}, - memory::MemoryAllocator* allocator = - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(), std::shared_ptr pool = nullptr, std::shared_ptr spillExecutor = nullptr, const std::string& queryId = ""); @@ -54,8 +53,7 @@ class QueryCtx { std::unordered_map queryConfigValues = {}, std::unordered_map> connectorConfigs = {}, - memory::MemoryAllocator* allocator = - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(), std::shared_ptr pool = nullptr, const std::string& queryId = ""); @@ -65,8 +63,8 @@ class QueryCtx { return pool_.get(); } - memory::MemoryAllocator* allocator() const { - return allocator_; + cache::AsyncDataCache* cache() const { + return cache_; } folly::Executor* executor() const { @@ -135,7 +133,7 @@ class QueryCtx { const std::string queryId_; std::unordered_map> connectorConfigs_; - memory::MemoryAllocator* allocator_; + cache::AsyncDataCache* cache_; std::shared_ptr pool_; folly::Executor* executor_; folly::Executor::KeepAlive<> executorKeepalive_; diff --git a/velox/dwio/common/CachedBufferedInput.cpp b/velox/dwio/common/CachedBufferedInput.cpp index 9c20aa58049..f8fbf33f68e 100644 --- a/velox/dwio/common/CachedBufferedInput.cpp +++ b/velox/dwio/common/CachedBufferedInput.cpp @@ -87,8 +87,9 @@ bool CachedBufferedInput::shouldPreload(int32_t numPages) { memory::AllocationTraits::kPageSize; } auto cachePages = cache_->incrementCachedPages(0); - auto maxPages = memory::AllocationTraits::numPages(cache_->capacity()); - auto allocatedPages = cache_->numAllocated(); + auto allocator = cache_->allocator(); + auto maxPages = memory::AllocationTraits::numPages(allocator->capacity()); + auto allocatedPages = allocator->numAllocated(); if (numPages < maxPages - allocatedPages) { // There is free space for the read-ahead. return true; diff --git a/velox/dwio/dwrf/test/CacheInputTest.cpp b/velox/dwio/dwrf/test/CacheInputTest.cpp index a31d8214659..3bb9c588d2c 100644 --- a/velox/dwio/dwrf/test/CacheInputTest.cpp +++ b/velox/dwio/dwrf/test/CacheInputTest.cpp @@ -114,6 +114,9 @@ class CacheTest : public testing::Test { if (ssdCache) { ssdCache->deleteFiles(); } + if (cache_) { + cache_->prepareShutdown(); + } } void initializeCache(uint64_t maxBytes, uint64_t ssdBytes = 0) { @@ -130,10 +133,8 @@ class CacheTest : public testing::Test { } memory::MmapAllocator::Options options; options.capacity = maxBytes; - cache_ = std::make_shared( - std::make_shared(options), - maxBytes, - std::move(ssd)); + allocator_ = std::make_shared(options); + cache_ = AsyncDataCache::create(allocator_.get(), std::move(ssd)); cache_->setVerifyHook(checkEntry); for (auto i = 0; i < kMaxStreams; ++i) { streamIds_.push_back(std::make_unique( @@ -424,6 +425,7 @@ class CacheTest : public testing::Test { folly::F14FastMap> pathToInput_; std::shared_ptr tempDirectory_; cache::FileGroupStats* FOLLY_NULLABLE groupStats_ = nullptr; + std::shared_ptr allocator_; std::shared_ptr cache_; std::shared_ptr ioStats_; std::unique_ptr executor_; @@ -471,7 +473,7 @@ TEST_F(CacheTest, window) { auto cacheInput = dynamic_cast(stream.get()); EXPECT_TRUE(cacheInput != nullptr); auto maxSize = - cache_->sizeClasses().back() * memory::AllocationTraits::kPageSize; + allocator_->sizeClasses().back() * memory::AllocationTraits::kPageSize; const void* buffer; int32_t size; int32_t numRead = 0; diff --git a/velox/examples/CMakeLists.txt b/velox/examples/CMakeLists.txt index 013fa6b5db5..8625d34ce82 100644 --- a/velox/examples/CMakeLists.txt +++ b/velox/examples/CMakeLists.txt @@ -20,12 +20,12 @@ target_link_libraries(velox_example_simple_functions velox_functions_lib add_executable(velox_example_expression_eval ExpressionEval.cpp) target_link_libraries(velox_example_expression_eval velox_type velox_vector - velox_memory velox_expression) + velox_caching velox_memory velox_expression) add_executable(velox_example_opaque_type OpaqueType.cpp) target_link_libraries(velox_example_opaque_type velox_type velox_vector - velox_expression velox_memory) + velox_caching velox_expression velox_memory) # This is disabled temporarily until we figure out why g++ is crashing linking # it on linux builds. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index d0f44c53669..0f3114fbe91 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -54,7 +54,7 @@ OperatorCtx::createConnectorQueryCtx( driverCtx_->task->queryCtx()->getConnectorConfig(connectorId), std::make_unique( execCtx()->queryCtx(), execCtx()->pool()), - driverCtx_->task->queryCtx()->allocator(), + driverCtx_->task->queryCtx()->cache(), driverCtx_->task->queryCtx()->queryId(), taskId(), planNodeId, diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index e6df3e94b56..78164dcdc3a 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2492,8 +2492,7 @@ TEST_F(TableScanTest, addSplitsToFailedTask) { } TEST_F(TableScanTest, errorInLoadLazy) { - auto cache = dynamic_cast( - memory::MemoryAllocator::getInstance()); + auto cache = cache::AsyncDataCache::getInstance(); VELOX_CHECK_NOT_NULL(cache); auto vectors = makeVectors(10, 1'000); auto filePath = TempFilePath::create(); diff --git a/velox/exec/tests/ThreadDebugInfoTest.cpp b/velox/exec/tests/ThreadDebugInfoTest.cpp index eea12376af1..8721f5ee61a 100644 --- a/velox/exec/tests/ThreadDebugInfoTest.cpp +++ b/velox/exec/tests/ThreadDebugInfoTest.cpp @@ -91,7 +91,7 @@ TEST_F(ThreadDebugInfoDeathTest, withinTheCallingThread) { executor_.get(), std::unordered_map{}, std::unordered_map>{}, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), nullptr, nullptr, "TaskCursorQuery_0"); diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index 74c52d72696..0e3146eed0f 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -216,7 +216,7 @@ AssertQueryBuilder::readCursor() { executor_.get(), std::unordered_map{}, std::unordered_map>{}, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), nullptr, nullptr, fmt::format("TaskCursorQuery_{}", cursorQueryId++)); diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 59e0dad5f94..0495a15a367 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -123,7 +123,7 @@ TaskCursor::TaskCursor(const CursorParameters& params) executor_.get(), std::unordered_map{}, std::unordered_map>{}, - memory::MemoryAllocator::getInstance(), + cache::AsyncDataCache::getInstance(), nullptr, nullptr, fmt::format("TaskCursorQuery_{}", cursorQueryId++)); diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 8f378b70c36..a3f5e0ff4d0 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -33,9 +33,6 @@ using namespace facebook::velox::common::testutil; namespace facebook::velox::exec::test { -// static -std::shared_ptr OperatorTestBase::asyncDataCache_; - OperatorTestBase::OperatorTestBase() { using memory::MemoryAllocator; facebook::velox::exec::ExchangeSource::registerFactory(); @@ -51,29 +48,34 @@ OperatorTestBase::~OperatorTestBase() { memory::MemoryAllocator::setDefaultInstance(nullptr); } +void OperatorTestBase::SetUpTestCase() { + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + TestValue::enable(); +} + void OperatorTestBase::TearDownTestCase() { Task::testingWaitForAllTasksToBeDeleted(); } void OperatorTestBase::SetUp() { - // Sets the process default MemoryAllocator to an async cache of up - // to 4GB backed by a default MemoryAllocator - if (!asyncDataCache_) { - asyncDataCache_ = std::make_shared( - memory::MemoryAllocator::createDefaultInstance(), 4UL << 30); - } - memory::MemoryAllocator::setDefaultInstance(asyncDataCache_.get()); if (!isRegisteredVectorSerde()) { this->registerVectorSerde(); } driverExecutor_ = std::make_unique(3); ioExecutor_ = std::make_unique(3); + allocator_ = memory::MemoryAllocator::createDefaultInstance(); + if (!asyncDataCache_) { + asyncDataCache_ = cache::AsyncDataCache::create(allocator_.get()); + cache::AsyncDataCache::setInstance(asyncDataCache_.get()); + } + memory::MemoryAllocator::setDefaultInstance(allocator_.get()); } -void OperatorTestBase::SetUpTestCase() { - functions::prestosql::registerAllScalarFunctions(); - aggregate::prestosql::registerAllAggregateFunctions(); - TestValue::enable(); +void OperatorTestBase::TearDown() { + if (asyncDataCache_ != nullptr) { + asyncDataCache_->prepareShutdown(); + } } std::shared_ptr OperatorTestBase::assertQuery( diff --git a/velox/exec/tests/utils/OperatorTestBase.h b/velox/exec/tests/utils/OperatorTestBase.h index e5a40d3f8f4..7900b94f95a 100644 --- a/velox/exec/tests/utils/OperatorTestBase.h +++ b/velox/exec/tests/utils/OperatorTestBase.h @@ -37,6 +37,8 @@ class OperatorTestBase : public testing::Test, void SetUp() override; + void TearDown() override; + /// Allow base classes to register custom vector serde. /// By default, registers Presto-compatible serde. virtual void registerVectorSerde(); @@ -139,8 +141,11 @@ class OperatorTestBase : public testing::Test, protected: DuckDbQueryRunner duckDbQueryRunner_; - // Used as default MappedMemory. Created on first use. - static std::shared_ptr asyncDataCache_; + // Used as default MemoryAllocator. + std::shared_ptr allocator_; + + // Used as default AsyncDataCache. + std::shared_ptr asyncDataCache_; // Used for driver thread execution. std::unique_ptr driverExecutor_;