Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ PeriodicServiceInventoryManager::PeriodicServiceInventoryManager(

void PeriodicServiceInventoryManager::start() {
eventBaseThread_.start(id_);
sessionPool_ = std::make_unique<proxygen::SessionPool>(nullptr, 10);
stopped_ = false;
auto* eventBase = eventBaseThread_.getEventBase();
eventBase->runOnDestruction([this] { sessionPool_.reset(); });
eventBase->schedule([this]() { return sendRequest(); });
}

void PeriodicServiceInventoryManager::stop() {
stopped_ = true;
client_.reset();
eventBaseThread_.stop();
}

Expand Down Expand Up @@ -75,7 +74,11 @@ void PeriodicServiceInventoryManager::sendRequest() {
std::swap(serviceAddress_, newAddress);
client_ = std::make_shared<http::HttpClient>(
eventBaseThread_.getEventBase(),
sessionPool_.get(),
nullptr,
proxygen::Endpoint(
serviceAddress_.getAddressStr(),
serviceAddress_.getPort(),
sslContext_ != nullptr),
serviceAddress_,
std::chrono::milliseconds(10'000),
std::chrono::milliseconds(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class PeriodicServiceInventoryManager {
const double backOffjitterParam_{0.1};

folly::EventBaseThread eventBaseThread_;
std::unique_ptr<proxygen::SessionPool> sessionPool_;
folly::SocketAddress serviceAddress_;
std::shared_ptr<http::HttpClient> client_;
std::atomic_bool stopped_{true};
Expand Down
35 changes: 27 additions & 8 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/http/HttpClient.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
#include "velox/common/base/PeriodicStatsReporter.h"
#include "velox/common/base/StatsReporter.h"
Expand Down Expand Up @@ -84,8 +85,9 @@ static constexpr size_t kTaskPeriodCleanOldTasks{60'000'000}; // 60 seconds.
static constexpr size_t kConnectorPeriodGlobalCounters{
60'000'000}; // 60 seconds.
static constexpr size_t kOsPeriodGlobalCounters{2'000'000}; // 2 seconds
// Every 1 minute we print endpoint latency counters.
static constexpr size_t kHttpEndpointLatencyPeriodGlobalCounters{
static constexpr size_t kHttpServerPeriodGlobalCounters{
60'000'000}; // 60 seconds.
static constexpr size_t kHttpClientPeriodGlobalCounters{
60'000'000}; // 60 seconds.

PeriodicTaskManager::PeriodicTaskManager(
Expand Down Expand Up @@ -137,9 +139,11 @@ void PeriodicTaskManager::start() {
addOperatingSystemStatsUpdateTask();

if (SystemConfig::instance()->enableHttpEndpointLatencyFilter()) {
addHttpEndpointLatencyStatsTask();
addHttpServerStatsTask();
}

addHttpClientStatsTask();

if (server_ && server_->hasCoordinatorDiscoverer()) {
numDriverThreads_ = server_->numDriverThreads();
addWatchdogTask();
Expand Down Expand Up @@ -406,7 +410,7 @@ void PeriodicTaskManager::addOperatingSystemStatsUpdateTask() {
"os_counters");
}

void PeriodicTaskManager::printHttpEndpointLatencyStats() {
void PeriodicTaskManager::printHttpServerStats() {
const auto latencyMetrics =
http::filters::HttpEndpointLatencyFilter::retrieveLatencies();
std::ostringstream oss;
Expand All @@ -418,11 +422,26 @@ void PeriodicTaskManager::printHttpEndpointLatencyStats() {
LOG(INFO) << oss.str();
}

void PeriodicTaskManager::addHttpEndpointLatencyStatsTask() {
void PeriodicTaskManager::addHttpServerStatsTask() {
addTask(
[this]() { printHttpServerStats(); },
kHttpServerPeriodGlobalCounters,
"http_server_stats");
}

void PeriodicTaskManager::updateHttpClientStats() {
const auto numConnectionsCreated = http::HttpClient::numConnectionsCreated();
RECORD_METRIC_VALUE(
kCounterHttpClientNumConnectionsCreated,
numConnectionsCreated - lastHttpClientNumConnectionsCreated_);
lastHttpClientNumConnectionsCreated_ = numConnectionsCreated;
}

void PeriodicTaskManager::addHttpClientStatsTask() {
addTask(
[this]() { printHttpEndpointLatencyStats(); },
kHttpEndpointLatencyPeriodGlobalCounters,
"http_endpoint_counters");
[this] { updateHttpClientStats(); },
kHttpClientPeriodGlobalCounters,
"http_client_stats");
}

void PeriodicTaskManager::addWatchdogTask() {
Expand Down
10 changes: 7 additions & 3 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ class PeriodicTaskManager {
void addOperatingSystemStatsUpdateTask();
void updateOperatingSystemStats();

// Adds task that periodically prints http endpoint latency metrics.
void addHttpEndpointLatencyStatsTask();
void printHttpEndpointLatencyStats();
void addHttpServerStatsTask();
void printHttpServerStats();

void addHttpClientStatsTask();
void updateHttpClientStats();

void addWatchdogTask();

Expand All @@ -141,6 +143,8 @@ class PeriodicTaskManager {
int64_t lastVoluntaryContextSwitches_{0};
int64_t lastForcedContextSwitches_{0};

int64_t lastHttpClientNumConnectionsCreated_{0};

// NOTE: declare last since the threads access other members of `this`.
folly::FunctionScheduler oneTimeRunner_;
folly::ThreadedRepeatingFunctionRunner repeatedRunner_;
Expand Down
66 changes: 10 additions & 56 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ PrestoExchangeSource::PrestoExchangeSource(
memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
http::HttpClientConnectionPool* connPool,
const proxygen::Endpoint& endpoint,
folly::SSLContextPtr sslContext)
: ExchangeSource(extractTaskId(baseUri.path()), destination, queue, pool),
basePath_(baseUri.path()),
Expand All @@ -102,7 +103,8 @@ PrestoExchangeSource::PrestoExchangeSource(
VELOX_CHECK_NOT_NULL(pool_);
httpClient_ = std::make_shared<http::HttpClient>(
ioEventBase,
sessionPool,
connPool,
endpoint,
address,
requestTimeoutMs,
connectTimeoutMs,
Expand Down Expand Up @@ -516,53 +518,6 @@ std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::getSelfPtr() {
return std::dynamic_pointer_cast<PrestoExchangeSource>(shared_from_this());
}

const ConnectionPool& ConnectionPools::get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor) {
return *pools_.withULockPtr([&](auto ulock) -> const ConnectionPool* {
auto it = ulock->find(endpoint);
if (it != ulock->end()) {
return it->second.get();
}
auto wlock = ulock.moveFromUpgradeToWrite();
auto& pool = (*wlock)[endpoint];
if (!pool) {
pool = std::make_unique<ConnectionPool>();
pool->eventBase = ioExecutor->getEventBase();
pool->sessionPool = std::make_unique<proxygen::SessionPool>(nullptr, 10);
// Creation of the timer is not thread safe, so we do it here instead of
// in the constructor of HttpClient.
pool->eventBase->timer();
}
return pool.get();
});
}

void ConnectionPools::destroy() {
pools_.withWLock([](auto& pools) {
for (auto& [_, pool] : pools) {
pool->eventBase->runInEventBaseThread(
[sessionPool = std::move(pool->sessionPool)] {});
}
pools.clear();
});
}

namespace {

std::pair<folly::EventBase*, proxygen::SessionPool*> getSessionPool(
ConnectionPools* connectionPools,
folly::IOThreadPoolExecutor* ioExecutor,
const proxygen::Endpoint& ep) {
if (!connectionPools) {
return {ioExecutor->getEventBase(), nullptr};
}
auto& connPool = connectionPools->get(ep, ioExecutor);
return {connPool.eventBase, connPool.sessionPool.get()};
}

} // namespace

// static
std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::create(
const std::string& url,
Expand All @@ -571,37 +526,36 @@ std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::create(
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools* connectionPools,
http::HttpClientConnectionPool* connPool,
folly::SSLContextPtr sslContext) {
folly::Uri uri(url);
auto* eventBase = ioExecutor->getEventBase();
if (uri.scheme() == "http") {
VELOX_CHECK_NULL(sslContext);
proxygen::Endpoint ep(uri.host(), uri.port(), false);
auto [eventBase, sessionPool] =
getSessionPool(connectionPools, ioExecutor, ep);
return std::make_shared<PrestoExchangeSource>(
uri,
destination,
queue,
memoryPool,
cpuExecutor,
eventBase,
sessionPool,
connPool,
ep,
sslContext);
}
if (uri.scheme() == "https") {
VELOX_CHECK_NOT_NULL(sslContext);
proxygen::Endpoint ep(uri.host(), uri.port(), true);
auto [eventBase, sessionPool] =
getSessionPool(connectionPools, ioExecutor, ep);
return std::make_shared<PrestoExchangeSource>(
uri,
destination,
queue,
memoryPool,
cpuExecutor,
eventBase,
sessionPool,
connPool,
ep,
std::move(sslContext));
}
return nullptr;
Expand Down
37 changes: 3 additions & 34 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,6 @@

namespace facebook::presto {

// HTTP connection pool for a specific endpoint with its associated event base.
// All the operations on the SessionPool must be performed on the corresponding
// EventBase.
struct ConnectionPool {
folly::EventBase* eventBase;
std::unique_ptr<proxygen::SessionPool> sessionPool;
};

// Connection pools used by HTTP client in PrestoExchangeSource. It should be
// held living longer than all the PrestoExchangeSources and will be passed when
// we creating the exchange sources.
class ConnectionPools {
public:
~ConnectionPools() {
destroy();
}

const ConnectionPool& get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor);

void destroy();

private:
folly::Synchronized<folly::F14FastMap<
proxygen::Endpoint,
std::unique_ptr<ConnectionPool>,
proxygen::EndpointHash,
proxygen::EndpointEqual>>
pools_;
};

class PrestoExchangeSource : public velox::exec::ExchangeSource {
public:
class RetryState {
Expand Down Expand Up @@ -107,7 +75,8 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
velox::memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
http::HttpClientConnectionPool* connPool,
const proxygen::Endpoint& endpoint,
folly::SSLContextPtr sslContext);

/// Returns 'true' is there is no request in progress, this source is not at
Expand Down Expand Up @@ -145,7 +114,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools* connectionPools,
http::HttpClientConnectionPool* connPool,
folly::SSLContextPtr sslContext);

/// Completes the future returned by 'request()' if it hasn't completed
Expand Down
13 changes: 8 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ void PrestoServer::run() {

if (systemConfig->exchangeEnableConnectionPool()) {
PRESTO_STARTUP_LOG(INFO) << "Enable exchange Http Client connection pool.";
exchangeSourceConnectionPools_ = std::make_unique<ConnectionPools>();
exchangeSourceConnectionPool_ =
std::make_unique<http::HttpClientConnectionPool>();
}

facebook::velox::exec::ExchangeSource::registerFactory(
Expand All @@ -401,7 +402,7 @@ void PrestoServer::run() {
pool,
driverExecutor_.get(),
exchangeHttpExecutor_.get(),
exchangeSourceConnectionPools_.get(),
exchangeSourceConnectionPool_.get(),
sslContext_);
});

Expand Down Expand Up @@ -540,7 +541,9 @@ void PrestoServer::run() {
<< "': threads: " << driverExecutor_->numActiveThreads() << "/"
<< driverExecutor_->numThreads()
<< ", task queue: " << driverExecutor_->getTaskQueueSize();
driverExecutor_->join();
// Schedule release of SessionPools held by HttpClients before the exchange
// HTTP executor threads are joined.
driverExecutor_.reset();

if (connectorIoExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
Expand All @@ -550,9 +553,9 @@ void PrestoServer::run() {
connectorIoExecutor_->join();
}

if (exchangeSourceConnectionPools_) {
if (exchangeSourceConnectionPool_) {
PRESTO_SHUTDOWN_LOG(INFO) << "Releasing exchange HTTP connection pools";
exchangeSourceConnectionPools_->destroy();
exchangeSourceConnectionPool_->destroy();
}

if (httpSrvCpuExecutor_ != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class PrestoServer {
// Executor for spilling.
std::shared_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;

std::unique_ptr<ConnectionPools> exchangeSourceConnectionPools_;
std::unique_ptr<http::HttpClientConnectionPool> exchangeSourceConnectionPool_;

// If not null, the instance of AsyncDataCache used for in-memory file cache.
std::shared_ptr<velox::cache::AsyncDataCache> cache_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ SystemConfig::SystemConfig() {
STR_PROP(kExchangeMaxErrorDuration, "3m"),
STR_PROP(kExchangeRequestTimeout, "10s"),
STR_PROP(kExchangeConnectTimeout, "20s"),
BOOL_PROP(kExchangeEnableConnectionPool, false),
BOOL_PROP(kExchangeEnableConnectionPool, true),
BOOL_PROP(kExchangeImmediateBufferTransfer, true),
NUM_PROP(kTaskRunTimeSliceMicros, 50'000),
BOOL_PROP(kIncludeNodeInSpillPath, false),
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ void registerPrestoMetrics() {
95,
99,
100);
DEFINE_METRIC(
kCounterHttpClientNumConnectionsCreated, facebook::velox::StatType::SUM);
DEFINE_HISTOGRAM_METRIC(
kCounterPrestoExchangeSerializedPageSize,
10000,
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ constexpr folly::StringPiece kCounterHttpClientPrestoExchangeNumOnBody{
/// PrestoExchangeSource.
constexpr folly::StringPiece kCounterHttpClientPrestoExchangeOnBodyBytes{
"presto_cpp.http.client.presto_exchange_source.on_body_bytes"};
constexpr folly::StringPiece kCounterHttpClientNumConnectionsCreated{
"presto_cpp.http.client.num_connections_created"};
/// SerializedPage size in bytes from PrestoExchangeSource.
constexpr folly::StringPiece kCounterPrestoExchangeSerializedPageSize{
"presto_cpp.presto_exchange_source.serialized_page_size"};
Expand Down
Loading