diff --git a/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp index 70597d99cabe0..550ce311a9793 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp @@ -34,15 +34,14 @@ PeriodicServiceInventoryManager::PeriodicServiceInventoryManager( void PeriodicServiceInventoryManager::start() { eventBaseThread_.start(id_); - sessionPool_ = std::make_unique(nullptr, 10); stopped_ = false; auto* eventBase = eventBaseThread_.getEventBase(); - eventBase->runOnDestruction([this] { sessionPool_.reset(); }); eventBase->schedule([this]() { return sendRequest(); }); } void PeriodicServiceInventoryManager::stop() { stopped_ = true; + client_.reset(); eventBaseThread_.stop(); } @@ -75,7 +74,11 @@ void PeriodicServiceInventoryManager::sendRequest() { std::swap(serviceAddress_, newAddress); client_ = std::make_shared( eventBaseThread_.getEventBase(), - sessionPool_.get(), + nullptr, + proxygen::Endpoint( + serviceAddress_.getAddressStr(), + serviceAddress_.getPort(), + sslContext_ != nullptr), serviceAddress_, std::chrono::milliseconds(10'000), std::chrono::milliseconds(0), diff --git a/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.h b/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.h index 771c1e2955b9c..77f7c106ed0a9 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.h +++ b/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.h @@ -72,7 +72,6 @@ class PeriodicServiceInventoryManager { const double backOffjitterParam_{0.1}; folly::EventBaseThread eventBaseThread_; - std::unique_ptr sessionPool_; folly::SocketAddress serviceAddress_; std::shared_ptr client_; std::atomic_bool stopped_{true}; diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp index 0957f62a7ca4b..f3f0a35103501 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp @@ -18,6 +18,7 @@ #include "presto_cpp/main/PrestoExchangeSource.h" #include "presto_cpp/main/PrestoServer.h" #include "presto_cpp/main/common/Counters.h" +#include "presto_cpp/main/http/HttpClient.h" #include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h" #include "velox/common/base/PeriodicStatsReporter.h" #include "velox/common/base/StatsReporter.h" @@ -84,8 +85,9 @@ static constexpr size_t kTaskPeriodCleanOldTasks{60'000'000}; // 60 seconds. static constexpr size_t kConnectorPeriodGlobalCounters{ 60'000'000}; // 60 seconds. static constexpr size_t kOsPeriodGlobalCounters{2'000'000}; // 2 seconds -// Every 1 minute we print endpoint latency counters. -static constexpr size_t kHttpEndpointLatencyPeriodGlobalCounters{ +static constexpr size_t kHttpServerPeriodGlobalCounters{ + 60'000'000}; // 60 seconds. +static constexpr size_t kHttpClientPeriodGlobalCounters{ 60'000'000}; // 60 seconds. PeriodicTaskManager::PeriodicTaskManager( @@ -137,9 +139,11 @@ void PeriodicTaskManager::start() { addOperatingSystemStatsUpdateTask(); if (SystemConfig::instance()->enableHttpEndpointLatencyFilter()) { - addHttpEndpointLatencyStatsTask(); + addHttpServerStatsTask(); } + addHttpClientStatsTask(); + if (server_ && server_->hasCoordinatorDiscoverer()) { numDriverThreads_ = server_->numDriverThreads(); addWatchdogTask(); @@ -406,7 +410,7 @@ void PeriodicTaskManager::addOperatingSystemStatsUpdateTask() { "os_counters"); } -void PeriodicTaskManager::printHttpEndpointLatencyStats() { +void PeriodicTaskManager::printHttpServerStats() { const auto latencyMetrics = http::filters::HttpEndpointLatencyFilter::retrieveLatencies(); std::ostringstream oss; @@ -418,11 +422,26 @@ void PeriodicTaskManager::printHttpEndpointLatencyStats() { LOG(INFO) << oss.str(); } -void PeriodicTaskManager::addHttpEndpointLatencyStatsTask() { +void PeriodicTaskManager::addHttpServerStatsTask() { + addTask( + [this]() { printHttpServerStats(); }, + kHttpServerPeriodGlobalCounters, + "http_server_stats"); +} + +void PeriodicTaskManager::updateHttpClientStats() { + const auto numConnectionsCreated = http::HttpClient::numConnectionsCreated(); + RECORD_METRIC_VALUE( + kCounterHttpClientNumConnectionsCreated, + numConnectionsCreated - lastHttpClientNumConnectionsCreated_); + lastHttpClientNumConnectionsCreated_ = numConnectionsCreated; +} + +void PeriodicTaskManager::addHttpClientStatsTask() { addTask( - [this]() { printHttpEndpointLatencyStats(); }, - kHttpEndpointLatencyPeriodGlobalCounters, - "http_endpoint_counters"); + [this] { updateHttpClientStats(); }, + kHttpClientPeriodGlobalCounters, + "http_client_stats"); } void PeriodicTaskManager::addWatchdogTask() { diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h index 29af2c3c56da5..f5108d79bfb7b 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h @@ -112,9 +112,11 @@ class PeriodicTaskManager { void addOperatingSystemStatsUpdateTask(); void updateOperatingSystemStats(); - // Adds task that periodically prints http endpoint latency metrics. - void addHttpEndpointLatencyStatsTask(); - void printHttpEndpointLatencyStats(); + void addHttpServerStatsTask(); + void printHttpServerStats(); + + void addHttpClientStatsTask(); + void updateHttpClientStats(); void addWatchdogTask(); @@ -141,6 +143,8 @@ class PeriodicTaskManager { int64_t lastVoluntaryContextSwitches_{0}; int64_t lastForcedContextSwitches_{0}; + int64_t lastHttpClientNumConnectionsCreated_{0}; + // NOTE: declare last since the threads access other members of `this`. folly::FunctionScheduler oneTimeRunner_; folly::ThreadedRepeatingFunctionRunner repeatedRunner_; diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index e510cc89f2767..5004802fdf059 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -75,7 +75,8 @@ PrestoExchangeSource::PrestoExchangeSource( memory::MemoryPool* pool, folly::CPUThreadPoolExecutor* driverExecutor, folly::EventBase* ioEventBase, - proxygen::SessionPool* sessionPool, + http::HttpClientConnectionPool* connPool, + const proxygen::Endpoint& endpoint, folly::SSLContextPtr sslContext) : ExchangeSource(extractTaskId(baseUri.path()), destination, queue, pool), basePath_(baseUri.path()), @@ -102,7 +103,8 @@ PrestoExchangeSource::PrestoExchangeSource( VELOX_CHECK_NOT_NULL(pool_); httpClient_ = std::make_shared( ioEventBase, - sessionPool, + connPool, + endpoint, address, requestTimeoutMs, connectTimeoutMs, @@ -516,53 +518,6 @@ std::shared_ptr PrestoExchangeSource::getSelfPtr() { return std::dynamic_pointer_cast(shared_from_this()); } -const ConnectionPool& ConnectionPools::get( - const proxygen::Endpoint& endpoint, - folly::IOThreadPoolExecutor* ioExecutor) { - return *pools_.withULockPtr([&](auto ulock) -> const ConnectionPool* { - auto it = ulock->find(endpoint); - if (it != ulock->end()) { - return it->second.get(); - } - auto wlock = ulock.moveFromUpgradeToWrite(); - auto& pool = (*wlock)[endpoint]; - if (!pool) { - pool = std::make_unique(); - pool->eventBase = ioExecutor->getEventBase(); - pool->sessionPool = std::make_unique(nullptr, 10); - // Creation of the timer is not thread safe, so we do it here instead of - // in the constructor of HttpClient. - pool->eventBase->timer(); - } - return pool.get(); - }); -} - -void ConnectionPools::destroy() { - pools_.withWLock([](auto& pools) { - for (auto& [_, pool] : pools) { - pool->eventBase->runInEventBaseThread( - [sessionPool = std::move(pool->sessionPool)] {}); - } - pools.clear(); - }); -} - -namespace { - -std::pair getSessionPool( - ConnectionPools* connectionPools, - folly::IOThreadPoolExecutor* ioExecutor, - const proxygen::Endpoint& ep) { - if (!connectionPools) { - return {ioExecutor->getEventBase(), nullptr}; - } - auto& connPool = connectionPools->get(ep, ioExecutor); - return {connPool.eventBase, connPool.sessionPool.get()}; -} - -} // namespace - // static std::shared_ptr PrestoExchangeSource::create( const std::string& url, @@ -571,14 +526,13 @@ std::shared_ptr PrestoExchangeSource::create( velox::memory::MemoryPool* memoryPool, folly::CPUThreadPoolExecutor* cpuExecutor, folly::IOThreadPoolExecutor* ioExecutor, - ConnectionPools* connectionPools, + http::HttpClientConnectionPool* connPool, folly::SSLContextPtr sslContext) { folly::Uri uri(url); + auto* eventBase = ioExecutor->getEventBase(); if (uri.scheme() == "http") { VELOX_CHECK_NULL(sslContext); proxygen::Endpoint ep(uri.host(), uri.port(), false); - auto [eventBase, sessionPool] = - getSessionPool(connectionPools, ioExecutor, ep); return std::make_shared( uri, destination, @@ -586,14 +540,13 @@ std::shared_ptr PrestoExchangeSource::create( memoryPool, cpuExecutor, eventBase, - sessionPool, + connPool, + ep, sslContext); } if (uri.scheme() == "https") { VELOX_CHECK_NOT_NULL(sslContext); proxygen::Endpoint ep(uri.host(), uri.port(), true); - auto [eventBase, sessionPool] = - getSessionPool(connectionPools, ioExecutor, ep); return std::make_shared( uri, destination, @@ -601,7 +554,8 @@ std::shared_ptr PrestoExchangeSource::create( memoryPool, cpuExecutor, eventBase, - sessionPool, + connPool, + ep, std::move(sslContext)); } return nullptr; diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h index c7b5b7dd98315..cd207f7cdd116 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h @@ -24,38 +24,6 @@ namespace facebook::presto { -// HTTP connection pool for a specific endpoint with its associated event base. -// All the operations on the SessionPool must be performed on the corresponding -// EventBase. -struct ConnectionPool { - folly::EventBase* eventBase; - std::unique_ptr sessionPool; -}; - -// Connection pools used by HTTP client in PrestoExchangeSource. It should be -// held living longer than all the PrestoExchangeSources and will be passed when -// we creating the exchange sources. -class ConnectionPools { - public: - ~ConnectionPools() { - destroy(); - } - - const ConnectionPool& get( - const proxygen::Endpoint& endpoint, - folly::IOThreadPoolExecutor* ioExecutor); - - void destroy(); - - private: - folly::Synchronized, - proxygen::EndpointHash, - proxygen::EndpointEqual>> - pools_; -}; - class PrestoExchangeSource : public velox::exec::ExchangeSource { public: class RetryState { @@ -107,7 +75,8 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { velox::memory::MemoryPool* pool, folly::CPUThreadPoolExecutor* driverExecutor, folly::EventBase* ioEventBase, - proxygen::SessionPool* sessionPool, + http::HttpClientConnectionPool* connPool, + const proxygen::Endpoint& endpoint, folly::SSLContextPtr sslContext); /// Returns 'true' is there is no request in progress, this source is not at @@ -145,7 +114,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { velox::memory::MemoryPool* memoryPool, folly::CPUThreadPoolExecutor* cpuExecutor, folly::IOThreadPoolExecutor* ioExecutor, - ConnectionPools* connectionPools, + http::HttpClientConnectionPool* connPool, folly::SSLContextPtr sslContext); /// Completes the future returned by 'request()' if it hasn't completed diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 2c58d7c86cdbe..6fbdc208b5a69 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -385,7 +385,8 @@ void PrestoServer::run() { if (systemConfig->exchangeEnableConnectionPool()) { PRESTO_STARTUP_LOG(INFO) << "Enable exchange Http Client connection pool."; - exchangeSourceConnectionPools_ = std::make_unique(); + exchangeSourceConnectionPool_ = + std::make_unique(); } facebook::velox::exec::ExchangeSource::registerFactory( @@ -401,7 +402,7 @@ void PrestoServer::run() { pool, driverExecutor_.get(), exchangeHttpExecutor_.get(), - exchangeSourceConnectionPools_.get(), + exchangeSourceConnectionPool_.get(), sslContext_); }); @@ -540,7 +541,9 @@ void PrestoServer::run() { << "': threads: " << driverExecutor_->numActiveThreads() << "/" << driverExecutor_->numThreads() << ", task queue: " << driverExecutor_->getTaskQueueSize(); - driverExecutor_->join(); + // Schedule release of SessionPools held by HttpClients before the exchange + // HTTP executor threads are joined. + driverExecutor_.reset(); if (connectorIoExecutor_) { PRESTO_SHUTDOWN_LOG(INFO) @@ -550,9 +553,9 @@ void PrestoServer::run() { connectorIoExecutor_->join(); } - if (exchangeSourceConnectionPools_) { + if (exchangeSourceConnectionPool_) { PRESTO_SHUTDOWN_LOG(INFO) << "Releasing exchange HTTP connection pools"; - exchangeSourceConnectionPools_->destroy(); + exchangeSourceConnectionPool_->destroy(); } if (httpSrvCpuExecutor_ != nullptr) { diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index 414452b0f38f8..0172254bfcfdb 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -229,7 +229,7 @@ class PrestoServer { // Executor for spilling. std::shared_ptr spillerExecutor_; - std::unique_ptr exchangeSourceConnectionPools_; + std::unique_ptr exchangeSourceConnectionPool_; // If not null, the instance of AsyncDataCache used for in-memory file cache. std::shared_ptr cache_; diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 3daa4f12169ea..fda02b9f8cd27 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -213,7 +213,7 @@ SystemConfig::SystemConfig() { STR_PROP(kExchangeMaxErrorDuration, "3m"), STR_PROP(kExchangeRequestTimeout, "10s"), STR_PROP(kExchangeConnectTimeout, "20s"), - BOOL_PROP(kExchangeEnableConnectionPool, false), + BOOL_PROP(kExchangeEnableConnectionPool, true), BOOL_PROP(kExchangeImmediateBufferTransfer, true), NUM_PROP(kTaskRunTimeSliceMicros, 50'000), BOOL_PROP(kIncludeNodeInSpillPath, false), diff --git a/presto-native-execution/presto_cpp/main/common/Counters.cpp b/presto-native-execution/presto_cpp/main/common/Counters.cpp index 05a3c3fa092ae..e87f96ea17003 100644 --- a/presto-native-execution/presto_cpp/main/common/Counters.cpp +++ b/presto-native-execution/presto_cpp/main/common/Counters.cpp @@ -43,6 +43,8 @@ void registerPrestoMetrics() { 95, 99, 100); + DEFINE_METRIC( + kCounterHttpClientNumConnectionsCreated, facebook::velox::StatType::SUM); DEFINE_HISTOGRAM_METRIC( kCounterPrestoExchangeSerializedPageSize, 10000, diff --git a/presto-native-execution/presto_cpp/main/common/Counters.h b/presto-native-execution/presto_cpp/main/common/Counters.h index ad91addefb95f..d5035e9635c3a 100644 --- a/presto-native-execution/presto_cpp/main/common/Counters.h +++ b/presto-native-execution/presto_cpp/main/common/Counters.h @@ -46,6 +46,8 @@ constexpr folly::StringPiece kCounterHttpClientPrestoExchangeNumOnBody{ /// PrestoExchangeSource. constexpr folly::StringPiece kCounterHttpClientPrestoExchangeOnBodyBytes{ "presto_cpp.http.client.presto_exchange_source.on_body_bytes"}; +constexpr folly::StringPiece kCounterHttpClientNumConnectionsCreated{ + "presto_cpp.http.client.num_connections_created"}; /// SerializedPage size in bytes from PrestoExchangeSource. constexpr folly::StringPiece kCounterPrestoExchangeSerializedPageSize{ "presto_cpp.presto_exchange_source.serialized_page_size"}; diff --git a/presto-native-execution/presto_cpp/main/http/HttpClient.cpp b/presto-native-execution/presto_cpp/main/http/HttpClient.cpp index 89ed8bfebb728..8ff3e69f2b147 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpClient.cpp +++ b/presto-native-execution/presto_cpp/main/http/HttpClient.cpp @@ -16,14 +16,18 @@ #include // @manual #include //@manual #endif // PRESTO_ENABLE_JWT +#include +#include #include #include "presto_cpp/main/common/Configs.h" #include "presto_cpp/main/http/HttpClient.h" namespace facebook::presto::http { + HttpClient::HttpClient( folly::EventBase* eventBase, - proxygen::SessionPool* sessionPool, + HttpClientConnectionPool* connPool, + const proxygen::Endpoint& endpoint, const folly::SocketAddress& address, std::chrono::milliseconds transactionTimeout, std::chrono::milliseconds connectTimeout, @@ -31,18 +35,15 @@ HttpClient::HttpClient( folly::SSLContextPtr sslContext, std::function&& reportOnBodyStatsFunc) : eventBase_(eventBase), + connPool_(connPool), + endpoint_(endpoint), address_(address), - transactionTimer_(transactionTimeout, eventBase), + transactionTimeout_(transactionTimeout), connectTimeout_(connectTimeout), pool_(std::move(pool)), sslContext_(sslContext), reportOnBodyStatsFunc_(std::move(reportOnBodyStatsFunc)), - maxResponseAllocBytes_(SystemConfig::instance()->httpMaxAllocateBytes()), - sessionPool_(sessionPool) { - if (!sessionPool_) { - sessionPoolHolder_ = std::make_unique(); - sessionPool_ = sessionPoolHolder_.get(); - } + maxResponseAllocBytes_(SystemConfig::instance()->httpMaxAllocateBytes()) { } HttpClient::~HttpClient() { @@ -352,44 +353,191 @@ class ConnectionHandler : public proxygen::HTTPConnector::Callback { std::unique_ptr connector_; }; -folly::SemiFuture> HttpClient::sendRequest( - const proxygen::HTTPMessage& request, - const std::string& body, - int64_t delayMs) { - auto responseHandler = std::make_shared( - request, - maxResponseAllocBytes_, - body, - reportOnBodyStatsFunc_, - shared_from_this()); - auto future = responseHandler->initialize(responseHandler); +namespace { +// Same value as +// https://github.com/prestodb/presto/blob/831d5947b909fee0d5c0091a3246ddc5b31b2731/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java#L547 +constexpr int kMaxConnectionsPerServer = 250; +} // namespace + +std::pair +HttpClientConnectionPool::getSessionPoolImpl(SessionPools& endpointPools) { + auto* evb = folly::EventBaseManager::get()->getExistingEventBase(); + VELOX_CHECK_NOT_NULL(evb); + { + auto rlock = endpointPools.byEventBase.rlock(); + auto it = rlock->find(evb); + if (it != rlock->end()) { + return {it->second.get(), &endpointPools.idleSessions}; + } + } + // NOTE: local event base is unique so only 1 thread can access the same key + // entry. + auto wlock = endpointPools.byEventBase.wlock(); + auto& pool = (*wlock)[evb]; + VELOX_CHECK_NULL(pool); + pool = std::make_unique( + nullptr, + kMaxConnectionsPerServer, + std::chrono::seconds(30), + std::chrono::milliseconds(0), + nullptr, + &endpointPools.idleSessions); + return {pool.get(), &endpointPools.idleSessions}; +} + +std::pair +HttpClientConnectionPool::getSessionPool(const proxygen::Endpoint& endpoint) { + { + auto rlock = pools_.rlock(); + auto it = rlock->find(endpoint); + if (it != rlock->end()) { + return getSessionPoolImpl(*it->second); + } + } + auto wlock = pools_.wlock(); + auto it = wlock->find(endpoint); + if (it != wlock->end()) { + auto rlock = wlock.moveFromWriteToRead(); + return getSessionPoolImpl(*it->second); + } + auto& endpointPools = (*wlock)[endpoint]; + VELOX_CHECK_NULL(endpointPools); + endpointPools = std::make_unique(); + endpointPools->idleSessions.setMaxIdleCount(kMaxConnectionsPerServer); + auto rlock = wlock.moveFromWriteToRead(); + return getSessionPoolImpl(*endpointPools); +} + +void HttpClientConnectionPool::destroy() { + pools_.withWLock([](auto& pools) { + for (auto& [_, endpointPools] : pools) { + endpointPools->idleSessions.markForDeath(); + endpointPools->byEventBase.withWLock([](auto& byEventBase) { + folly::Latch latch(byEventBase.size()); + for (auto& [evb, sessionPool] : byEventBase) { + evb->runInEventBaseThread( + [&, sessionPool = std::move(sessionPool)]() mutable { + sessionPool.reset(); + latch.count_down(); + }); + } + latch.wait(); + }); + } + pools.clear(); + }); +} + +void HttpClient::initSessionPool() { + eventBase_->dcheckIsInEventBaseThread(); + if (sessionPool_) { + return; + } + if (connPool_) { + std::tie(sessionPool_, idleSessions_) = + connPool_->getSessionPool(endpoint_); + return; + } + sessionPoolHolder_ = std::make_unique(); + sessionPool_ = sessionPoolHolder_.get(); + idleSessions_ = nullptr; +} - auto send = [this, responseHandler]() { - auto txn = sessionPool_->getTransaction(responseHandler.get()); +folly::SemiFuture HttpClient::createTransaction( + proxygen::HTTPTransactionHandler* handler) { + eventBase_->dcheckIsInEventBaseThread(); + if (auto* txn = sessionPool_->getTransaction(handler)) { + VLOG(3) << "Reuse same thread connection to " << address_.describe(); + return folly::makeSemiFuture(txn); + } + if (!idleSessions_) { + return folly::makeSemiFuture(nullptr); + } + auto idleSessionFuture = idleSessions_->getIdleSession(); + auto getFromIdleSession = + [self = shared_from_this(), this, handler]( + proxygen::HTTPSessionBase* session) -> proxygen::HTTPTransaction* { + if (!session) { + return nullptr; + } + VLOG(3) << "Reuse idle connection from different thread to " + << address_.describe(); + auto* evb = session->getEventBase(); + // The event base from idle session should not be the current event base, + // otherwise we should have already got it from the local session pool. + VELOX_CHECK(!evb || !evb->isInEventBaseThread()); + session->attachThreadLocals( + eventBase_, + sslContext_, + proxygen::WheelTimerInstance(transactionTimeout_, eventBase_), + nullptr, + [](auto*) {}, + nullptr, + nullptr); + sessionPool_->putSession(session); + return sessionPool_->getTransaction(handler); + }; + if (idleSessionFuture.isReady()) { + return getFromIdleSession(idleSessionFuture.value()); + } + return idleSessionFuture.via(eventBase_) + .thenValue(std::move(getFromIdleSession)) + .semi(); +} + +void HttpClient::sendRequest(std::shared_ptr responseHandler) { + initSessionPool(); + auto txnFuture = createTransaction(responseHandler.get()); + auto doSend = [this, responseHandler = std::move(responseHandler)]( + proxygen::HTTPTransaction* txn) { if (txn) { responseHandler->sendRequest(txn); return; } - + VLOG(2) << "Create new connection to " << address_.describe(); + ++numConnectionsCreated_; + // NOTE: the connection handler object will be self-deleted when the + // connection succeeds or fails, auto connectionHandler = new ConnectionHandler( responseHandler, sessionPool_, - transactionTimer_, + proxygen::WheelTimerInstance(transactionTimeout_, eventBase_), connectTimeout_, eventBase_, address_, sslContext_); - connectionHandler->connect(); }; + if (txnFuture.isReady()) { + doSend(txnFuture.value()); + return; + } + std::move(txnFuture).via(eventBase_).thenValue(std::move(doSend)); +} + +folly::SemiFuture> HttpClient::sendRequest( + const proxygen::HTTPMessage& request, + const std::string& body, + int64_t delayMs) { + auto responseHandler = std::make_shared( + request, + maxResponseAllocBytes_, + body, + reportOnBodyStatsFunc_, + shared_from_this()); + auto future = responseHandler->initialize(responseHandler); + + auto sendCb = [this, responseHandler]() mutable { + sendRequest(std::move(responseHandler)); + }; if (delayMs > 0) { // schedule() is expected to be run in the event base thread eventBase_->runInEventBaseThread([=]() { - eventBase_->schedule(send, std::chrono::milliseconds(delayMs)); + eventBase_->schedule(sendCb, std::chrono::milliseconds(delayMs)); }); } else { - eventBase_->runInEventBaseThreadAlwaysEnqueue(send); + eventBase_->runInEventBaseThreadAlwaysEnqueue(sendCb); } return future; diff --git a/presto-native-execution/presto_cpp/main/http/HttpClient.h b/presto-native-execution/presto_cpp/main/http/HttpClient.h index 4d643e7d8552c..0d9bc983e5f5b 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpClient.h +++ b/presto-native-execution/presto_cpp/main/http/HttpClient.h @@ -14,6 +14,7 @@ #pragma once #include #include +#include #include #include #include @@ -107,6 +108,49 @@ class HttpResponse { size_t bodyChainBytes_{0}; }; +/// Connection pool shared by all the http clients. It is held by presto server +/// and should outlive all the http clients, and destroyed before we join the +/// threads backing the event bases. +class HttpClientConnectionPool { + public: + ~HttpClientConnectionPool() { + destroy(); + } + + /// Returns the session pool for a given endpoint and local event base. + /// + /// NOTE: this must be called from a thread context with local event base set. + std::pair + getSessionPool(const proxygen::Endpoint& endpoint); + + void destroy(); + + private: + // Session pools for a specific endpoint, grouped by their associated event + // bases. All the operations on the SessionPool must be performed on the + // corresponding EventBase. + struct SessionPools { + proxygen::ServerIdleSessionController idleSessions; + folly::Synchronized>> + byEventBase; + }; + + std::pair + getSessionPoolImpl(SessionPools& endpointPools); + + // The map from http end point to the corresponding session pools. + folly::Synchronized, + proxygen::EndpointHash, + proxygen::EndpointEqual>> + pools_; +}; + +class ResponseHandler; + // HttpClient uses proxygen::SessionPool which must be destructed on the // EventBase thread. Hence, the destructor of HttpClient must run on the // EventBase thread as well. Consider running HttpClient's destructor @@ -115,7 +159,8 @@ class HttpClient : public std::enable_shared_from_this { public: HttpClient( folly::EventBase* eventBase, - proxygen::SessionPool* sessionPool, + HttpClientConnectionPool* connPool, + const proxygen::Endpoint& endpoint, const folly::SocketAddress& address, std::chrono::milliseconds transactionTimeout, std::chrono::milliseconds connectTimeout, @@ -135,17 +180,35 @@ class HttpClient : public std::enable_shared_from_this { return pool_; } + static int64_t numConnectionsCreated() { + return numConnectionsCreated_; + } + private: + void initSessionPool(); + + folly::SemiFuture createTransaction( + proxygen::HTTPTransactionHandler* handler); + + void sendRequest(std::shared_ptr responseHandler); + + static inline std::atomic_int64_t numConnectionsCreated_ = 0; + folly::EventBase* const eventBase_; + HttpClientConnectionPool* const connPool_; + const proxygen::Endpoint endpoint_; const folly::SocketAddress address_; - const proxygen::WheelTimerInstance transactionTimer_; + const std::chrono::milliseconds transactionTimeout_; const std::chrono::milliseconds connectTimeout_; const std::shared_ptr pool_; const folly::SSLContextPtr sslContext_; const std::function reportOnBodyStatsFunc_; const uint64_t maxResponseAllocBytes_; - proxygen::SessionPool* sessionPool_; - // Create if sessionPool_ is not received from the contructor. + + proxygen::SessionPool* sessionPool_ = nullptr; + proxygen::ServerIdleSessionController* idleSessions_ = nullptr; + + // Create only if connPool_ is null (disabled). std::unique_ptr sessionPoolHolder_; }; diff --git a/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp b/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp index 2f4316a1fc719..83db0b0774f1d 100644 --- a/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp +++ b/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp @@ -135,6 +135,39 @@ TEST_P(HttpTestSuite, basic) { ASSERT_EQ(socketException->getType(), folly::AsyncSocketException::NOT_OPEN); } +TEST_P(HttpTestSuite, clientIdleSessions) { + auto memoryPool = + memory::MemoryManager::getInstance()->addLeafPool("clientIdleSessions"); + const bool useHttps = GetParam(); + auto server = getServer(useHttps); + server->registerGet("/ping", ping); + HttpServerWrapper wrapper(std::move(server)); + auto address = wrapper.start().get(); + constexpr int kNumThreads = 3; + folly::IOThreadPoolExecutor threadPool(kNumThreads); + http::HttpClientConnectionPool connPool; + auto lastNumConnectionsCreated = http::HttpClient::numConnectionsCreated(); + for (int i = 0; i < kNumThreads; ++i) { + auto client = std::make_shared( + threadPool.getEventBase(), + &connPool, + proxygen::Endpoint( + address.getAddressStr(), address.getPort(), useHttps), + address, + std::chrono::seconds(1), + std::chrono::milliseconds(0), + memoryPool, + useHttps ? makeSslContext() : nullptr); + auto response = sendGet(client.get(), "/ping").get(std::chrono::seconds(3)); + ASSERT_EQ(response->headers()->getStatusCode(), http::kHttpOk); + } + ASSERT_EQ( + http::HttpClient::numConnectionsCreated() - lastNumConnectionsCreated, 1); + connPool.destroy(); + threadPool.join(); + wrapper.stop(); +} + TEST_P(HttpTestSuite, httpResponseAllocationFailure) { const int64_t memoryCapBytes = 1 << 10; auto rootPool = diff --git a/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h b/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h index b873ce75643a8..2907130a596b6 100644 --- a/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h +++ b/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h @@ -29,8 +29,6 @@ using namespace facebook::presto; using namespace facebook::velox; using namespace facebook::velox::memory; -namespace { - std::string getCertsPath(const std::string& fileName) { std::string currentPath = fs::current_path().c_str(); if (boost::algorithm::ends_with(currentPath, "fbcode")) { @@ -56,6 +54,15 @@ std::string getCertsPath(const std::string& fileName) { return currentPath + "/certs/" + fileName; } +inline folly::SSLContextPtr makeSslContext() { + const std::string keyPath = getCertsPath("client_ca.pem"); + const std::string ciphers = "AES128-SHA,AES128-SHA256,AES256-GCM-SHA384"; + auto sslContext = std::make_shared(); + sslContext->loadCertKeyPairFromFiles(keyPath.c_str(), keyPath.c_str()); + sslContext->setCiphersOrThrow(ciphers); + return sslContext; +} + class HttpServerWrapper { public: explicit HttpServerWrapper(std::unique_ptr server) @@ -185,7 +192,6 @@ class HttpClientFactory { } ~HttpClientFactory() { - eventBase_->runInEventBaseThread([pools = std::move(sessionPools_)] {}); eventBase_->terminateLoopSoon(); eventBaseThread_->join(); } @@ -197,40 +203,22 @@ class HttpClientFactory { bool useHttps, std::shared_ptr pool, std::function&& reportOnBodyStatsFunc = nullptr) { - sessionPools_.push_back( - std::make_unique(nullptr, 10)); - if (useHttps) { - const std::string keyPath = getCertsPath("client_ca.pem"); - const std::string ciphers = "AES128-SHA,AES128-SHA256,AES256-GCM-SHA384"; - auto sslContext = std::make_shared(); - sslContext->loadCertKeyPairFromFiles(keyPath.c_str(), keyPath.c_str()); - sslContext->setCiphersOrThrow(ciphers); - return std::make_shared( - eventBase_.get(), - sessionPools_.back().get(), - address, - transactionTimeout, - connectTimeout, - pool, - std::move(sslContext), - std::move(reportOnBodyStatsFunc)); - } else { - return std::make_shared( - eventBase_.get(), - sessionPools_.back().get(), - address, - transactionTimeout, - connectTimeout, - pool, - nullptr, - std::move(reportOnBodyStatsFunc)); - } + return std::make_shared( + eventBase_.get(), + nullptr, + proxygen::Endpoint( + address.getAddressStr(), address.getPort(), useHttps), + address, + transactionTimeout, + connectTimeout, + pool, + useHttps ? makeSslContext() : nullptr, + std::move(reportOnBodyStatsFunc)); } private: std::unique_ptr eventBase_; std::unique_ptr eventBaseThread_; - std::vector> sessionPools_; }; folly::SemiFuture> sendGet( @@ -341,4 +329,3 @@ http::EndpointRequestHandlerFactory asyncMsg( }); }; } -} // namespace diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index 8bb1047e8d088..3b50188e75231 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -439,7 +439,7 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam { pool != nullptr ? pool : pool_.get(), exchangeCpuExecutor_.get(), exchangeIoExecutor_.get(), - &connectionPools_, + &connectionPool_, useHttps ? sslContext_ : nullptr); } @@ -456,7 +456,7 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam { std::shared_ptr pool_; std::shared_ptr exchangeCpuExecutor_; std::shared_ptr exchangeIoExecutor_; - ConnectionPools connectionPools_; + http::HttpClientConnectionPool connectionPool_; folly::SSLContextPtr sslContext_; }; diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index d1266ccbfd624..bddc73fca8b4c 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -189,7 +189,7 @@ class TaskManagerTest : public testing::Test { exec::ExchangeSource::registerFactory( [cpuExecutor = exchangeCpuExecutor_, ioExecutor = exchangeIoExecutor_, - connectionPools = connectionPools_]( + connPool = connPool_]( const std::string& taskId, int destination, std::shared_ptr queue, @@ -201,7 +201,7 @@ class TaskManagerTest : public testing::Test { pool, cpuExecutor.get(), ioExecutor.get(), - connectionPools.get(), + connPool.get(), nullptr); }); if (!isRegisteredVectorSerde()) { @@ -644,8 +644,8 @@ class TaskManagerTest : public testing::Test { 8, std::make_shared("HTTPSrvIO")); long splitSequenceId_{0}; - std::shared_ptr connectionPools_ = - std::make_shared(); + std::shared_ptr connPool_ = + std::make_shared(); }; // Runs "select * from t where c0 % 5 = 0" query.