Skip to content

Commit 4891aaf

Browse files
committed
[native] Make PrestoServer::driverExecutor_ abstract type
Summary: Make PrestoServer::driverExecutor_ abstract folly::Executor type. This is to provide flexibility to make driver executor customizable, for example, for monitoring. This change also makes other executors unique_ptr because PrestoServer as the application backing server, should hold sole ownership of all application related executors (in fact the current usage does not need any features from shared_ptr). Reviewed By: amitkdutta Differential Revision: D79155996
1 parent 2179ba7 commit 4891aaf

File tree

2 files changed

+32
-24
lines changed

2 files changed

+32
-24
lines changed

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -488,8 +488,8 @@ void PrestoServer::run() {
488488
});
489489

490490
PRESTO_STARTUP_LOG(INFO) << "Driver CPU executor '"
491-
<< driverExecutor_->getName() << "' has "
492-
<< driverExecutor_->numThreads() << " threads.";
491+
<< driverCpuExecutor_->getName() << "' has "
492+
<< driverCpuExecutor_->numThreads() << " threads.";
493493
if (httpServer_->getExecutor()) {
494494
PRESTO_STARTUP_LOG(INFO)
495495
<< "HTTP Server IO executor '" << httpServer_->getExecutor()->getName()
@@ -519,7 +519,7 @@ void PrestoServer::run() {
519519
auto* memoryAllocator = velox::memory::memoryManager()->allocator();
520520
auto* asyncDataCache = velox::cache::AsyncDataCache::getInstance();
521521
periodicTaskManager_ = std::make_unique<PeriodicTaskManager>(
522-
driverExecutor_.get(),
522+
driverCpuExecutor_,
523523
spillerExecutor_.get(),
524524
httpSrvIoExecutor_.get(),
525525
httpSrvCpuExecutor_.get(),
@@ -648,10 +648,10 @@ void PrestoServer::run() {
648648
unregisterVeloxCudf();
649649

650650
PRESTO_SHUTDOWN_LOG(INFO)
651-
<< "Joining Driver CPU Executor '" << driverExecutor_->getName()
652-
<< "': threads: " << driverExecutor_->numActiveThreads() << "/"
653-
<< driverExecutor_->numThreads()
654-
<< ", task queue: " << driverExecutor_->getTaskQueueSize();
651+
<< "Joining Driver CPU Executor '" << driverCpuExecutor_->getName()
652+
<< "': threads: " << driverCpuExecutor_->numActiveThreads() << "/"
653+
<< driverCpuExecutor_->numThreads()
654+
<< ", task queue: " << driverCpuExecutor_->getTaskQueueSize();
655655
// Schedule release of SessionPools held by HttpClients before the exchange
656656
// HTTP IO executor threads are joined.
657657
driverExecutor_.reset();
@@ -747,7 +747,7 @@ void PrestoServer::yieldTasks() {
747747
return;
748748
}
749749
static std::atomic<int32_t> numYields = 0;
750-
const auto numQueued = driverExecutor_->getTaskQueueSize();
750+
const auto numQueued = driverCpuExecutor_->getTaskQueueSize();
751751
if (numQueued > 0) {
752752
numYields += taskManager_->yieldTasks(numQueued, timeslice);
753753
}
@@ -796,8 +796,10 @@ void PrestoServer::initializeThreadPools() {
796796
threadFactory = std::make_shared<folly::NamedThreadFactory>("Driver");
797797
}
798798

799-
driverExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(
799+
auto driverExecutor = std::make_unique<folly::CPUThreadPoolExecutor>(
800800
numDriverCpuThreads, threadFactory);
801+
driverCpuExecutor_ = driverExecutor.get();
802+
driverExecutor_ = std::move(driverExecutor);
801803

802804
const auto numIoThreads = std::max<size_t>(
803805
systemConfig->httpServerNumIoThreadsHwMultiplier() * hwConcurrency, 1);
@@ -806,13 +808,13 @@ void PrestoServer::initializeThreadPools() {
806808

807809
const auto numCpuThreads = std::max<size_t>(
808810
systemConfig->httpServerNumCpuThreadsHwMultiplier() * hwConcurrency, 1);
809-
httpSrvCpuExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(
811+
httpSrvCpuExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
810812
numCpuThreads, std::make_shared<folly::NamedThreadFactory>("HTTPSrvCpu"));
811813

812814
const auto numSpillerCpuThreads = std::max<size_t>(
813815
systemConfig->spillerNumCpuThreadsHwMultiplier() * hwConcurrency, 0);
814816
if (numSpillerCpuThreads > 0) {
815-
spillerExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(
817+
spillerExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
816818
numSpillerCpuThreads,
817819
std::make_shared<folly::NamedThreadFactory>("Spiller"));
818820
}
@@ -821,7 +823,7 @@ void PrestoServer::initializeThreadPools() {
821823
systemConfig->exchangeHttpClientNumIoThreadsHwMultiplier() *
822824
std::thread::hardware_concurrency(),
823825
1);
824-
exchangeHttpIoExecutor_ = std::make_shared<folly::IOThreadPoolExecutor>(
826+
exchangeHttpIoExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(
825827
numExchangeHttpClientIoThreads,
826828
std::make_shared<folly::NamedThreadFactory>("ExchangeIO"));
827829

@@ -841,7 +843,7 @@ void PrestoServer::initializeThreadPools() {
841843
std::thread::hardware_concurrency(),
842844
1);
843845

844-
exchangeHttpCpuExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(
846+
exchangeHttpCpuExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
845847
numExchangeHttpClientCpuThreads,
846848
std::make_shared<folly::NamedThreadFactory>("ExchangeCPU"));
847849

@@ -1035,7 +1037,7 @@ size_t PrestoServer::numDriverThreads() const {
10351037
VELOX_CHECK(
10361038
driverExecutor_ != nullptr,
10371039
"Driver executor is expected to be not null, but it is null!");
1038-
return driverExecutor_->numThreads();
1040+
return driverCpuExecutor_->numThreads();
10391041
}
10401042

10411043
void PrestoServer::detachWorker() {
@@ -1441,7 +1443,7 @@ void PrestoServer::enableWorkerStatsReporting() {
14411443

14421444
void PrestoServer::initVeloxPlanValidator() {
14431445
VELOX_CHECK_NULL(planValidator_);
1444-
planValidator_ = std::make_shared<VeloxPlanValidator>();
1446+
planValidator_ = std::make_unique<VeloxPlanValidator>();
14451447
}
14461448

14471449
VeloxPlanValidator* PrestoServer::getVeloxPlanValidator() {

presto-native-execution/presto_cpp/main/PrestoServer.h

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ class PrestoServer {
198198

199199
void registerStatsCounters();
200200

201-
protected:
202201
void updateAnnouncerDetails();
203202

204203
void addServerPeriodicTasks();
@@ -242,24 +241,31 @@ class PrestoServer {
242241
std::unique_ptr<folly::IOThreadPoolExecutor> connectorIoExecutor_;
243242

244243
// Executor for exchange data over http.
245-
std::shared_ptr<folly::IOThreadPoolExecutor> exchangeHttpIoExecutor_;
244+
std::unique_ptr<folly::IOThreadPoolExecutor> exchangeHttpIoExecutor_;
246245

247246
// Executor for exchange request processing.
248-
std::shared_ptr<folly::CPUThreadPoolExecutor> exchangeHttpCpuExecutor_;
247+
std::unique_ptr<folly::CPUThreadPoolExecutor> exchangeHttpCpuExecutor_;
249248

250249
// Executor for HTTP request dispatching
251250
std::shared_ptr<folly::IOThreadPoolExecutor> httpSrvIoExecutor_;
252251

253252
// Executor for HTTP request processing after dispatching
254-
std::shared_ptr<folly::CPUThreadPoolExecutor> httpSrvCpuExecutor_;
255-
256-
// Executor for query engine driver executions.
257-
std::shared_ptr<folly::CPUThreadPoolExecutor> driverExecutor_;
253+
std::unique_ptr<folly::CPUThreadPoolExecutor> httpSrvCpuExecutor_;
254+
255+
// Executor for query engine driver executions. The underlying thread pool
256+
// executor is a folly::CPUThreadPoolExecutor. The executor is stored as
257+
// abstract type to provide flexibility of thread pool monitoring. The
258+
// underlying folly::CPUThreadPoolExecutor can be obtained through
259+
// 'driverCpuExecutor()' method.
260+
std::unique_ptr<folly::Executor> driverExecutor_;
261+
// Raw pointer pointing to the underlying folly::CPUThreadPoolExecutor of
262+
// 'driverExecutor_'.
263+
folly::CPUThreadPoolExecutor* driverCpuExecutor_;
258264

259265
// Executor for spilling.
260-
std::shared_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;
266+
std::unique_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;
261267

262-
std::shared_ptr<VeloxPlanValidator> planValidator_;
268+
std::unique_ptr<VeloxPlanValidator> planValidator_;
263269

264270
std::unique_ptr<http::HttpClientConnectionPool> exchangeSourceConnectionPool_;
265271

0 commit comments

Comments
 (0)