diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp index 37fb00689784d..a7449ece1a30d 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp @@ -189,12 +189,9 @@ std::shared_ptr QueryContextManager::findOrCreateQueryCtx( static std::atomic_uint64_t poolId{0}; auto pool = memory::MemoryManager::getInstance()->addRootPool( fmt::format("{}_{}", queryId, poolId++), - queryConfig.queryMaxMemoryPerNode(), - !SystemConfig::instance()->memoryArbitratorKind().empty() - ? memory::MemoryReclaimer::create() - : nullptr); + queryConfig.queryMaxMemoryPerNode()); - auto queryCtx = std::make_shared( + auto queryCtx = core::QueryCtx::create( driverExecutor_, std::move(queryConfig), connectorConfigs, diff --git a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp index 600f59121b6cc..b87562df03ab5 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp @@ -56,7 +56,7 @@ class BroadcastTest : public exec::test::OperatorTestBase { const std::string& taskId, core::PlanNodePtr planNode, int destination) { - auto queryCtx = std::make_shared(executor_.get()); + auto queryCtx = core::QueryCtx::create(executor_.get()); core::PlanFragment planFragment{planNode}; return exec::Task::create( taskId, diff --git a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp index 22be460ba66c7..5d9703b1a87e2 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp @@ -337,8 +337,8 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { const std::string& taskId, core::PlanNodePtr planNode, int destination) { - auto queryCtx = std::make_shared( - executor_.get(), core::QueryConfig({})); + auto queryCtx = + core::QueryCtx::create(executor_.get(), core::QueryConfig({})); core::PlanFragment planFragment{planNode}; return exec::Task::create( taskId, @@ -711,8 +711,8 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { {core::QueryConfig::kPreferredOutputBatchRows, std::to_string(outputRowLimit)}}; - auto queryCtx = std::make_shared( - executor_.get(), core::QueryConfig(properties)); + auto queryCtx = + core::QueryCtx::create(executor_.get(), core::QueryConfig(properties)); auto params = exec::test::CursorParameters(); params.planNode = plan; params.queryCtx = queryCtx; diff --git a/presto-native-execution/presto_cpp/main/tests/QueryContextCacheTest.cpp b/presto-native-execution/presto_cpp/main/tests/QueryContextCacheTest.cpp index 216402924f49e..2d0821e8568ce 100644 --- a/presto-native-execution/presto_cpp/main/tests/QueryContextCacheTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/QueryContextCacheTest.cpp @@ -54,7 +54,7 @@ TEST_F(QueryContextCacheTest, basic) { for (int i = 0; i < 16; ++i) { auto queryId = fmt::format("query-{}", i); - auto queryCtx = std::make_shared( + auto queryCtx = core::QueryCtx::create( (folly::Executor*)nullptr, core::QueryConfig({})); queryCtxs[queryId] = queryCtx; queryContextCache.insert(queryId, queryCtx); @@ -84,7 +84,7 @@ TEST_F(QueryContextCacheTest, eviction) { for (int i = 0; i < 8; ++i) { auto queryId = fmt::format("query-{}", i); - auto queryCtx = std::make_shared( + auto queryCtx = core::QueryCtx::create( (folly::Executor*)nullptr, core::QueryConfig({})); queryCtxs[queryId] = queryCtx; queryContextCache.insert(queryId, queryCtx); @@ -104,7 +104,7 @@ TEST_F(QueryContextCacheTest, eviction) { // Insert 4 more query ctxs for (int i = 8; i < 12; ++i) { auto queryId = fmt::format("query-{}", i); - auto queryCtx = std::make_shared( + auto queryCtx = core::QueryCtx::create( (folly::Executor*)nullptr, core::QueryConfig({})); queryCtxs[queryId] = queryCtx; queryContextCache.insert(queryId, queryCtx); @@ -118,7 +118,7 @@ TEST_F(QueryContextCacheTest, eviction) { // Ensure that cache expands if all the queries in cache are alive. for (int i = 12; i < 20; ++i) { auto queryId = fmt::format("query-{}", i); - auto queryCtx = std::make_shared( + auto queryCtx = core::QueryCtx::create( (folly::Executor*)nullptr, core::QueryConfig({})); queryCtxs[queryId] = queryCtx; queryContextCache.insert(queryId, queryCtx); diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index 4c4e5bf547fce..7774b6e55fc04 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -1343,7 +1343,7 @@ TEST_F(TaskManagerTest, testCumulativeMemory) { .values(batches) .partitionedOutput({}, 1) .planFragment(); - auto queryCtx = std::make_shared(driverExecutor_.get()); + auto queryCtx = core::QueryCtx::create(driverExecutor_.get()); const protocol::TaskId taskId = "scan.0.0.1.0"; auto veloxTask = Task::create( taskId, diff --git a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp index ccf5cacc852e5..f0b351f42cebc 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp @@ -63,7 +63,7 @@ std::shared_ptr assertToVeloxQueryPlan( protocol::PlanFragment prestoPlan = json::parse(fragment); auto pool = memory::deprecatedAddDefaultLeafMemoryPool(); - auto queryCtx = std::make_shared(); + auto queryCtx = core::QueryCtx::create(); VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool.get()); return converter .toVeloxQueryPlan( @@ -80,7 +80,7 @@ std::shared_ptr assertToBatchVeloxQueryPlan( protocol::PlanFragment prestoPlan = json::parse(fragment); auto pool = memory::deprecatedAddDefaultLeafMemoryPool(); - auto queryCtx = std::make_shared(); + auto queryCtx = core::QueryCtx::create(); VeloxBatchQueryPlanConverter converter( shuffleName, std::move(serializedShuffleWriteInfo), diff --git a/presto-native-execution/presto_cpp/main/types/tests/ValuesPipeTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/ValuesPipeTest.cpp index ed28adfdb70c2..89b6766ecc8f7 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/ValuesPipeTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/ValuesPipeTest.cpp @@ -63,7 +63,7 @@ TEST_F(TestValues, valuesRowVector) { testJsonRoundtrip(j, p); auto pool = memory::deprecatedAddDefaultLeafMemoryPool(); - auto queryCtx = std::make_shared(); + auto queryCtx = core::QueryCtx::create(); VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool.get()); auto values = std::dynamic_pointer_cast( converter.toVeloxQueryPlan( @@ -103,7 +103,7 @@ TEST_F(TestValues, valuesPlan) { testJsonRoundtrip(j, p); auto pool = memory::deprecatedAddDefaultLeafMemoryPool(); - auto queryCtx = std::make_shared(); + auto queryCtx = core::QueryCtx::create(); VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool.get()); auto values = converter.toVeloxQueryPlan( std::dynamic_pointer_cast(p->root)->source, diff --git a/presto-native-execution/velox b/presto-native-execution/velox index e2c0014b219f3..adb47bc1cff86 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit e2c0014b219f30cc007a665b5722ae8d218e391a +Subproject commit adb47bc1cff86f315cb15894a205f4e3d6b64b6d