diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 63de22e121899..f2d0912c37f5d 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -443,8 +443,8 @@ void PrestoServer::run() { connectors.emplace_back(velox::connector::getConnector(connectorId)); } - auto memoryAllocator = velox::memory::MemoryAllocator::getInstance(); - auto asyncDataCache = cache::AsyncDataCache::getInstance(); + auto* memoryAllocator = velox::memory::memoryManager()->allocator(); + auto* asyncDataCache = cache::AsyncDataCache::getInstance(); periodicTaskManager_ = std::make_unique( driverExecutor_.get(), httpServer_->getExecutor(), @@ -607,16 +607,34 @@ void PrestoServer::initializeVeloxMemory() { const uint64_t memoryGb = systemConfig->systemMemoryGb(); PRESTO_STARTUP_LOG(INFO) << "Starting with node memory " << memoryGb << "GB"; - const int64_t memoryBytes = memoryGb << 30; + // Set up velox memory manager. + memory::MemoryManagerOptions options; + options.allocatorCapacity = memoryGb << 30; if (systemConfig->useMmapAllocator()) { - memory::MmapAllocator::Options options; - options.capacity = memoryBytes; + options.useMmapAllocator = true; options.useMmapArena = systemConfig->useMmapArena(); options.mmapArenaCapacityRatio = systemConfig->mmapArenaCapacityRatio(); - allocator_ = std::make_shared(options); - } else { - allocator_ = memory::MemoryAllocator::createDefaultInstance(); } + options.checkUsageLeak = systemConfig->enableMemoryLeakCheck(); + options.trackDefaultUsage = + systemConfig->enableSystemMemoryPoolUsageTracking(); + if (!systemConfig->memoryArbitratorKind().empty()) { + options.arbitratorKind = systemConfig->memoryArbitratorKind(); + const uint64_t queryMemoryGb = systemConfig->queryMemoryGb(); + VELOX_USER_CHECK_LE( + queryMemoryGb, + memoryGb, + "Query memory capacity must not be larger than system memory capacity"); + options.arbitratorCapacity = queryMemoryGb << 30; + options.memoryPoolInitCapacity = systemConfig->memoryPoolInitCapacity(); + options.memoryPoolTransferCapacity = + systemConfig->memoryPoolTransferCapacity(); + options.arbitrationStateCheckCb = velox::exec::memoryArbitrationStateCheck; + } + memory::initializeMemoryManager(options); + PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: " + << memory::memoryManager()->toString(); + if (systemConfig->asyncDataCacheEnabled()) { std::unique_ptr ssd; const auto asyncCacheSsdGb = systemConfig->asyncCacheSsdGb(); @@ -643,7 +661,8 @@ void PrestoServer::initializeVeloxMemory() { } std::string cacheStr = ssd == nullptr ? "AsyncDataCache" : "AsyncDataCache with SSD"; - cache_ = cache::AsyncDataCache::create(allocator_.get(), std::move(ssd)); + cache_ = cache::AsyncDataCache::create( + memory::memoryManager()->allocator(), std::move(ssd)); cache::AsyncDataCache::setInstance(cache_.get()); PRESTO_STARTUP_LOG(INFO) << cacheStr << " has been setup"; @@ -664,30 +683,6 @@ void PrestoServer::initializeVeloxMemory() { 0, "Async data cache cannot be disabled if ssd cache is enabled"); } - - memory::MemoryAllocator::setDefaultInstance(allocator_.get()); - // Set up velox memory manager. - memory::MemoryManagerOptions options; - options.capacity = memoryBytes; - options.checkUsageLeak = systemConfig->enableMemoryLeakCheck(); - options.trackDefaultUsage = - systemConfig->enableSystemMemoryPoolUsageTracking(); - if (!systemConfig->memoryArbitratorKind().empty()) { - options.arbitratorKind = systemConfig->memoryArbitratorKind(); - const uint64_t queryMemoryGb = systemConfig->queryMemoryGb(); - VELOX_USER_CHECK_LE( - queryMemoryGb, - memoryGb, - "Query memory capacity must not be larger than system memory capacity"); - options.queryMemoryCapacity = queryMemoryGb << 30; - options.memoryPoolInitCapacity = systemConfig->memoryPoolInitCapacity(); - options.memoryPoolTransferCapacity = - systemConfig->memoryPoolTransferCapacity(); - options.arbitrationStateCheckCb = velox::exec::memoryArbitrationStateCheck; - } - memory::initializeMemoryManager(options); - PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: " - << memory::memoryManager()->toString(); } void PrestoServer::stop() { diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index b738199a0ca41..c716674534dcc 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -193,9 +193,6 @@ class PrestoServer { std::unique_ptr exchangeSourceConnectionPools_; - // Instance of MemoryAllocator used for all query memory allocations. - std::shared_ptr allocator_; - // If not null, the instance of AsyncDataCache used for in-memory file cache. std::shared_ptr cache_; diff --git a/presto-native-execution/presto_cpp/main/tests/JsonSignatureParserTest.cpp b/presto-native-execution/presto_cpp/main/tests/JsonSignatureParserTest.cpp index 2974c41a387d6..2f7779309c3af 100644 --- a/presto-native-execution/presto_cpp/main/tests/JsonSignatureParserTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/JsonSignatureParserTest.cpp @@ -121,7 +121,7 @@ TEST_F(JsonSignatureParserTest, complexTypes) { "map(varchar, double)", "row(varbinary, double, tinyint)", "array(array(bigint))", - "map(array(map(bigint, array(boolean))))" + "map(array(map(bigint, array(boolean))), row(integer, real))" ] } ] diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index 9abc4a0218459..1fd4c2c60078f 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -390,17 +390,20 @@ struct Params { class PrestoExchangeSourceTest : public ::testing::TestWithParam { public: + static void SetUpTestCase() { + MemoryManagerOptions options; + options.allocatorCapacity = 1L << 30; + options.useMmapAllocator = true; + MemoryManager::testingSetInstance(options); + } + void SetUp() override { pool_ = memory::deprecatedAddDefaultLeafMemoryPool(); - memory::MmapAllocator::Options options; - options.capacity = 1L << 30; - allocator_ = std::make_unique(options); exchangeCpuExecutor_ = std::make_shared( GetParam().exchangeCpuThreadPoolSize); exchangeIoExecutor_ = std::make_shared( GetParam().exchangeIoThreadPoolSize); - memory::MemoryAllocator::setDefaultInstance(allocator_.get()); TestValue::enable(); filesystems::registerLocalFileSystem(); @@ -417,7 +420,6 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam { } void TearDown() override { - memory::MemoryAllocator::setDefaultInstance(nullptr); TestValue::disable(); } @@ -455,7 +457,6 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam { } std::shared_ptr pool_; - std::unique_ptr allocator_; std::shared_ptr exchangeCpuExecutor_; std::shared_ptr exchangeIoExecutor_; ConnectionPools connectionPools_; diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index 07ae7719c70c4..182d7431d5223 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -127,7 +127,6 @@ class Cursor { } memory::MemoryPool* pool_; - memory::MemoryAllocator* allocator_ = memory::MemoryAllocator::getInstance(); TaskManager* taskManager_; const protocol::TaskId taskId_; RowTypePtr rowType_; diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 5e0fd098e285b..523e561d3da99 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 5e0fd098e285beafea5890dd1c0991b0dc5f3378 +Subproject commit 523e561d3da99a78a33c4e22106ba7d1cf83c8a2