Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 28 additions & 33 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ void PrestoServer::run() {
connectors.emplace_back(velox::connector::getConnector(connectorId));
}

auto memoryAllocator = velox::memory::MemoryAllocator::getInstance();
auto asyncDataCache = cache::AsyncDataCache::getInstance();
auto* memoryAllocator = velox::memory::memoryManager()->allocator();
auto* asyncDataCache = cache::AsyncDataCache::getInstance();
periodicTaskManager_ = std::make_unique<PeriodicTaskManager>(
driverExecutor_.get(),
httpServer_->getExecutor(),
Expand Down Expand Up @@ -607,16 +607,34 @@ void PrestoServer::initializeVeloxMemory() {
const uint64_t memoryGb = systemConfig->systemMemoryGb();
PRESTO_STARTUP_LOG(INFO) << "Starting with node memory " << memoryGb << "GB";

const int64_t memoryBytes = memoryGb << 30;
// Set up velox memory manager.
memory::MemoryManagerOptions options;
options.allocatorCapacity = memoryGb << 30;
if (systemConfig->useMmapAllocator()) {
memory::MmapAllocator::Options options;
options.capacity = memoryBytes;
options.useMmapAllocator = true;
options.useMmapArena = systemConfig->useMmapArena();
options.mmapArenaCapacityRatio = systemConfig->mmapArenaCapacityRatio();
allocator_ = std::make_shared<memory::MmapAllocator>(options);
} else {
allocator_ = memory::MemoryAllocator::createDefaultInstance();
}
options.checkUsageLeak = systemConfig->enableMemoryLeakCheck();
options.trackDefaultUsage =
systemConfig->enableSystemMemoryPoolUsageTracking();
if (!systemConfig->memoryArbitratorKind().empty()) {
options.arbitratorKind = systemConfig->memoryArbitratorKind();
const uint64_t queryMemoryGb = systemConfig->queryMemoryGb();
VELOX_USER_CHECK_LE(
queryMemoryGb,
memoryGb,
"Query memory capacity must not be larger than system memory capacity");
options.arbitratorCapacity = queryMemoryGb << 30;
options.memoryPoolInitCapacity = systemConfig->memoryPoolInitCapacity();
options.memoryPoolTransferCapacity =
systemConfig->memoryPoolTransferCapacity();
options.arbitrationStateCheckCb = velox::exec::memoryArbitrationStateCheck;
}
memory::initializeMemoryManager(options);
PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: "
<< memory::memoryManager()->toString();

if (systemConfig->asyncDataCacheEnabled()) {
std::unique_ptr<cache::SsdCache> ssd;
const auto asyncCacheSsdGb = systemConfig->asyncCacheSsdGb();
Expand All @@ -643,7 +661,8 @@ void PrestoServer::initializeVeloxMemory() {
}
std::string cacheStr =
ssd == nullptr ? "AsyncDataCache" : "AsyncDataCache with SSD";
cache_ = cache::AsyncDataCache::create(allocator_.get(), std::move(ssd));
cache_ = cache::AsyncDataCache::create(
memory::memoryManager()->allocator(), std::move(ssd));
cache::AsyncDataCache::setInstance(cache_.get());
PRESTO_STARTUP_LOG(INFO) << cacheStr << " has been setup";

Expand All @@ -664,30 +683,6 @@ void PrestoServer::initializeVeloxMemory() {
0,
"Async data cache cannot be disabled if ssd cache is enabled");
}

memory::MemoryAllocator::setDefaultInstance(allocator_.get());
// Set up velox memory manager.
memory::MemoryManagerOptions options;
options.capacity = memoryBytes;
options.checkUsageLeak = systemConfig->enableMemoryLeakCheck();
options.trackDefaultUsage =
systemConfig->enableSystemMemoryPoolUsageTracking();
if (!systemConfig->memoryArbitratorKind().empty()) {
options.arbitratorKind = systemConfig->memoryArbitratorKind();
const uint64_t queryMemoryGb = systemConfig->queryMemoryGb();
VELOX_USER_CHECK_LE(
queryMemoryGb,
memoryGb,
"Query memory capacity must not be larger than system memory capacity");
options.queryMemoryCapacity = queryMemoryGb << 30;
options.memoryPoolInitCapacity = systemConfig->memoryPoolInitCapacity();
options.memoryPoolTransferCapacity =
systemConfig->memoryPoolTransferCapacity();
options.arbitrationStateCheckCb = velox::exec::memoryArbitrationStateCheck;
}
memory::initializeMemoryManager(options);
PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: "
<< memory::memoryManager()->toString();
}

void PrestoServer::stop() {
Expand Down
3 changes: 0 additions & 3 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ class PrestoServer {

std::unique_ptr<ConnectionPools> exchangeSourceConnectionPools_;

// Instance of MemoryAllocator used for all query memory allocations.
std::shared_ptr<velox::memory::MemoryAllocator> allocator_;

// If not null, the instance of AsyncDataCache used for in-memory file cache.
std::shared_ptr<velox::cache::AsyncDataCache> cache_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ TEST_F(JsonSignatureParserTest, complexTypes) {
"map(varchar, double)",
"row(varbinary, double, tinyint)",
"array(array(bigint))",
"map(array(map(bigint, array(boolean))))"
"map(array(map(bigint, array(boolean))), row(integer, real))"
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,20 @@ struct Params {

class PrestoExchangeSourceTest : public ::testing::TestWithParam<Params> {
public:
static void SetUpTestCase() {
MemoryManagerOptions options;
options.allocatorCapacity = 1L << 30;
options.useMmapAllocator = true;
MemoryManager::testingSetInstance(options);
}

void SetUp() override {
pool_ = memory::deprecatedAddDefaultLeafMemoryPool();

memory::MmapAllocator::Options options;
options.capacity = 1L << 30;
allocator_ = std::make_unique<memory::MmapAllocator>(options);
exchangeCpuExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(
GetParam().exchangeCpuThreadPoolSize);
exchangeIoExecutor_ = std::make_shared<folly::IOThreadPoolExecutor>(
GetParam().exchangeIoThreadPoolSize);
memory::MemoryAllocator::setDefaultInstance(allocator_.get());
TestValue::enable();

filesystems::registerLocalFileSystem();
Expand All @@ -417,7 +420,6 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam<Params> {
}

void TearDown() override {
memory::MemoryAllocator::setDefaultInstance(nullptr);
TestValue::disable();
}

Expand Down Expand Up @@ -455,7 +457,6 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam<Params> {
}

std::shared_ptr<memory::MemoryPool> pool_;
std::unique_ptr<memory::MemoryAllocator> allocator_;
std::shared_ptr<folly::CPUThreadPoolExecutor> exchangeCpuExecutor_;
std::shared_ptr<folly::IOThreadPoolExecutor> exchangeIoExecutor_;
ConnectionPools connectionPools_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class Cursor {
}

memory::MemoryPool* pool_;
memory::MemoryAllocator* allocator_ = memory::MemoryAllocator::getInstance();
TaskManager* taskManager_;
const protocol::TaskId taskId_;
RowTypePtr rowType_;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 173 files