From 837e4bac6c2048eba75a5472dcc3af0edc39f6a2 Mon Sep 17 00:00:00 2001 From: Amit Dutta Date: Sat, 7 Feb 2026 20:38:04 -0800 Subject: [PATCH] [presto] Decouple HttpServer and HttpClient from SystemConfig singleton Summary: Remove direct SystemConfig/NodeConfig singleton access from HttpServer.cpp and HttpClient.cpp in the presto_http library. Instead, inject configuration via option structs (HttpServerStartupOptions, HttpClientOptions, JwtOptions) that callers populate from SystemConfig at construction/call sites. This decouples the HTTP transport layer from Presto's configuration system, making the HTTP components independently testable and reusable without requiring a global SystemConfig singleton to be initialized. Key changes: - Extract HttpServerStartupOptions, HttpClientOptions, and JwtOptions structs into their own headers with separate lightweight Buck targets to break the circular dependency between presto_common and presto_http - Add httpServerStartupOptions(), httpClientOptions(), and jwtOptions() methods to SystemConfig that construct and return fully populated option structs from config - Simplify callsites in PrestoServer.cpp, PrestoExchangeSource.cpp, PeriodicServiceInventoryManager.cpp, RestRemoteClient.cpp, and HttpJwtTest.cpp to single method calls - Move SystemConfig access to caller sites (PrestoServer, PrestoExchangeSource, PeriodicServiceInventoryManager, RestRemoteClient) - Move presto_common from exported_deps to deps in http/BUCK - Add json target to presto_http exported_deps (needed by HttpServer.h) - Add presto_common to presto_server_lib exported_deps (needed by PrestoExchangeSource.h) Differential Revision: D92604681 --- .../main/PeriodicServiceInventoryManager.cpp | 6 +- .../presto_cpp/main/PrestoExchangeSource.cpp | 11 +++- .../presto_cpp/main/PrestoExchangeSource.h | 1 + .../presto_cpp/main/PrestoServer.cpp | 65 ++++++++++--------- .../presto_cpp/main/common/Configs.cpp | 44 +++++++++++++ .../presto_cpp/main/common/Configs.h | 9 +++ .../remote/client/RestRemoteClient.cpp | 6 +- .../presto_cpp/main/http/HttpClient.cpp | 44 ++++--------- .../presto_cpp/main/http/HttpClient.h | 24 ++++--- .../presto_cpp/main/http/HttpClientOptions.h | 33 ++++++++++ .../presto_cpp/main/http/HttpServer.cpp | 33 ++++------ .../presto_cpp/main/http/HttpServer.h | 2 + .../main/http/HttpServerStartupOptions.h | 34 ++++++++++ .../presto_cpp/main/http/JwtOptions.h | 28 ++++++++ .../filters/InternalAuthenticationFilter.cpp | 6 +- .../main/http/tests/HttpJwtTest.cpp | 19 +++--- .../presto_cpp/main/http/tests/HttpTest.cpp | 3 +- .../presto_cpp/main/http/tests/HttpTestBase.h | 7 +- .../main/tests/HttpServerWrapper.cpp | 2 +- 19 files changed, 266 insertions(+), 111 deletions(-) create mode 100644 presto-native-execution/presto_cpp/main/http/HttpClientOptions.h create mode 100644 presto-native-execution/presto_cpp/main/http/HttpServerStartupOptions.h create mode 100644 presto-native-execution/presto_cpp/main/http/JwtOptions.h diff --git a/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp index a1f745ccaad07..ebb5519ecec28 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicServiceInventoryManager.cpp @@ -14,6 +14,7 @@ #include "presto_cpp/main/PeriodicServiceInventoryManager.h" #include #include +#include "presto_cpp/main/common/Configs.h" namespace facebook::presto { PeriodicServiceInventoryManager::PeriodicServiceInventoryManager( @@ -80,6 +81,8 @@ void PeriodicServiceInventoryManager::sendRequest() { LOG(INFO) << "Service Inventory changed to " << newAddress.getAddressStr() << ":" << newAddress.getPort(); std::swap(serviceAddress_, newAddress); + auto systemConfig = SystemConfig::instance(); + auto httpClientOptions = systemConfig->httpClientOptions(); client_ = std::make_shared( eventBaseThread_.getEventBase(), nullptr, @@ -91,7 +94,8 @@ void PeriodicServiceInventoryManager::sendRequest() { std::chrono::milliseconds(10'000), std::chrono::milliseconds(0), pool_, - sslContext_); + sslContext_, + std::move(httpClientOptions)); } } catch (const std::exception& ex) { LOG(WARNING) << "Error occurred during updating service address: " diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index 130bc49690292..e43f887d7042c 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -103,6 +103,8 @@ PrestoExchangeSource::PrestoExchangeSource( VELOX_CHECK_NOT_NULL(driverExecutor_); VELOX_CHECK_NOT_NULL(ioEventBase); VELOX_CHECK_NOT_NULL(pool_); + auto systemConfig = SystemConfig::instance(); + auto httpClientOptions = systemConfig->httpClientOptions(); httpClient_ = std::make_shared( ioEventBase, connPool, @@ -111,7 +113,9 @@ PrestoExchangeSource::PrestoExchangeSource( requestTimeoutMs, connectTimeoutMs, immediateBufferTransfer_ ? pool_ : nullptr, - sslContext_); + sslContext_, + std::move(httpClientOptions)); + jwtOptions_ = systemConfig->jwtOptions(); } void PrestoExchangeSource::close() { @@ -185,7 +189,8 @@ void PrestoExchangeSource::doRequest( } else { method = proxygen::HTTPMethod::GET; } - auto requestBuilder = http::RequestBuilder().method(method).url(path); + auto requestBuilder = + http::RequestBuilder().jwtOptions(jwtOptions_).method(method).url(path); velox::common::testutil::TestValue::adjust( "facebook::presto::PrestoExchangeSource::doRequest", this); @@ -468,6 +473,7 @@ void PrestoExchangeSource::acknowledgeResults(int64_t ackSequence) { auto ackPath = fmt::format("{}/{}/acknowledge", basePath_, ackSequence); VLOG(1) << "Sending ack " << ackPath; http::RequestBuilder() + .jwtOptions(jwtOptions_) .method(proxygen::HTTPMethod::GET) .url(ackPath) .send(httpClient_.get()) @@ -512,6 +518,7 @@ void PrestoExchangeSource::abortResults() { void PrestoExchangeSource::doAbortResults(int64_t delayMs) { http::RequestBuilder() + .jwtOptions(jwtOptions_) .method(proxygen::HTTPMethod::DELETE) .url(basePath_) .send(httpClient_.get(), "", delayMs) diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h index 394627776780e..529f1793022e7 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h @@ -287,6 +287,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { folly::CPUThreadPoolExecutor* const driverExecutor_; std::shared_ptr httpClient_; + http::JwtOptions jwtOptions_; RetryState dataRequestRetryState_; RetryState abortRetryState_; int failedAttempts_; diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index ba0e9f01ed016..fce198fec93c9 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -684,37 +684,42 @@ void PrestoServer::run() { // Start everything. After the return from the following call we are shutting // down. - httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) { - const auto addresses = server->addresses(); - for (auto address : addresses) { - PRESTO_STARTUP_LOG(INFO) << fmt::format( - "Server listening at {}:{} - https {}", - address.address.getIPAddress().str(), - address.address.getPort(), - address.sslConfigs.size() != 0); - // We could be bound to both http and https ports. - // If set, we must use the https port and skip http. - if (httpsPort.has_value() && address.sslConfigs.size() == 0) { - continue; - } - startAnnouncerAndHeartbeatManagerCb( - httpsPort.has_value(), address.address.getPort()); - setTaskUriCb(httpsPort.has_value(), address.address.getPort()); - break; - } + auto startupOptions = systemConfig->httpServerStartupOptions(); + httpServer_->start( + std::move(startupOptions), + getHttpServerFilters(), + [&](proxygen::HTTPServer* server) { + const auto addresses = server->addresses(); + for (auto address : addresses) { + PRESTO_STARTUP_LOG(INFO) << fmt::format( + "Server listening at {}:{} - https {}", + address.address.getIPAddress().str(), + address.address.getPort(), + address.sslConfigs.size() != 0); + // We could be bound to both http and https ports. + // If set, we must use the https port and skip http. + if (httpsPort.has_value() && address.sslConfigs.size() == 0) { + continue; + } + startAnnouncerAndHeartbeatManagerCb( + httpsPort.has_value(), address.address.getPort()); + setTaskUriCb(httpsPort.has_value(), address.address.getPort()); + break; + } - if (coordinatorDiscoverer_ != nullptr) { - VELOX_CHECK_NOT_NULL( - announcer_, - "The announcer is expected to have been created but wasn't."); - const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); - if (heartbeatFrequencyMs > 0) { - VELOX_CHECK_NOT_NULL( - heartbeatManager_, - "The heartbeat manager is expected to have been created but wasn't."); - } - } - }); + if (coordinatorDiscoverer_ != nullptr) { + VELOX_CHECK_NOT_NULL( + announcer_, + "The announcer is expected to have been created but wasn't."); + const auto heartbeatFrequencyMs = + systemConfig->heartbeatFrequencyMs(); + if (heartbeatFrequencyMs > 0) { + VELOX_CHECK_NOT_NULL( + heartbeatManager_, + "The heartbeat manager is expected to have been created but wasn't."); + } + } + }); if (announcer_ != nullptr) { PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer"; diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index e44937470a186..0cd35c7e4816b 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -378,6 +378,25 @@ bool SystemConfig::httpServerEnableGzipCompression() const { return optionalProperty(kHttpServerEnableGzipCompression).value(); } +http::HttpServerStartupOptions SystemConfig::httpServerStartupOptions() const { + http::HttpServerStartupOptions options; + options.idleTimeoutMs = httpServerIdleTimeoutMs(); + options.http2InitialReceiveWindow = httpServerHttp2InitialReceiveWindow(); + options.http2ReceiveStreamWindowSize = + httpServerHttp2ReceiveStreamWindowSize(); + options.http2ReceiveSessionWindowSize = + httpServerHttp2ReceiveSessionWindowSize(); + options.http2MaxConcurrentStreams = httpServerHttp2MaxConcurrentStreams(); + options.enableContentCompression = httpServerEnableContentCompression(); + options.contentCompressionLevel = httpServerContentCompressionLevel(); + options.contentCompressionMinimumSize = + httpServerContentCompressionMinimumSize(); + options.enableZstdCompression = httpServerEnableZstdCompression(); + options.zstdContentCompressionLevel = httpServerZstdContentCompressionLevel(); + options.enableGzipCompression = httpServerEnableGzipCompression(); + return options; +} + std::string SystemConfig::httpsSupportedCiphers() const { return optionalProperty(kHttpsSupportedCiphers).value(); } @@ -963,6 +982,20 @@ bool SystemConfig::httpClientConnectionReuseCounterEnabled() const { .value(); } +http::HttpClientOptions SystemConfig::httpClientOptions() const { + http::HttpClientOptions options; + options.http2Enabled = httpClientHttp2Enabled(); + options.http2MaxStreamsPerConnection = + httpClientHttp2MaxStreamsPerConnection(); + options.http2InitialStreamWindow = httpClientHttp2InitialStreamWindow(); + options.http2StreamWindow = httpClientHttp2StreamWindow(); + options.http2SessionWindow = httpClientHttp2SessionWindow(); + options.maxAllocateBytes = httpMaxAllocateBytes(); + options.connectionReuseCounterEnabled = + httpClientConnectionReuseCounterEnabled(); + return options; +} + std::chrono::duration SystemConfig::exchangeMaxErrorDuration() const { return velox::config::toDuration( optionalProperty(kExchangeMaxErrorDuration).value()); @@ -1027,6 +1060,17 @@ int32_t SystemConfig::internalCommunicationJwtExpirationSeconds() const { .value(); } +http::JwtOptions SystemConfig::jwtOptions() const { + http::JwtOptions options; + options.jwtEnabled = internalCommunicationJwtEnabled(); + if (options.jwtEnabled) { + options.sharedSecret = internalCommunicationSharedSecret(); + options.jwtExpirationSeconds = internalCommunicationJwtExpirationSeconds(); + options.nodeId = NodeConfig::instance()->nodeId(); + } + return options; +} + bool SystemConfig::useLegacyArrayAgg() const { return optionalProperty(kUseLegacyArrayAgg).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index b92fa33c255b5..dc20f46609654 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -17,6 +17,9 @@ #include #include #include +#include "presto_cpp/main/http/HttpClientOptions.h" // @manual +#include "presto_cpp/main/http/HttpServerStartupOptions.h" // @manual +#include "presto_cpp/main/http/JwtOptions.h" // @manual #include "velox/common/config/Config.h" namespace facebook::presto { @@ -936,6 +939,8 @@ class SystemConfig : public ConfigBase { bool httpServerEnableGzipCompression() const; + http::HttpServerStartupOptions httpServerStartupOptions() const; + /// A list of ciphers (comma separated) that are supported by /// server and client. Note Java and folly::SSLContext use different names to /// refer to the same cipher. For e.g. TLS_RSA_WITH_AES_256_GCM_SHA384 in Java @@ -1168,6 +1173,8 @@ class SystemConfig : public ConfigBase { bool httpClientConnectionReuseCounterEnabled() const; + http::HttpClientOptions httpClientOptions() const; + std::chrono::duration exchangeMaxErrorDuration() const; std::chrono::duration exchangeRequestTimeoutMs() const; @@ -1196,6 +1203,8 @@ class SystemConfig : public ConfigBase { int32_t internalCommunicationJwtExpirationSeconds() const; + http::JwtOptions jwtOptions() const; + bool useLegacyArrayAgg() const; bool cacheVeloxTtlEnabled() const; diff --git a/presto-native-execution/presto_cpp/main/functions/remote/client/RestRemoteClient.cpp b/presto-native-execution/presto_cpp/main/functions/remote/client/RestRemoteClient.cpp index b39483fb20d4b..68e2a651eff48 100644 --- a/presto-native-execution/presto_cpp/main/functions/remote/client/RestRemoteClient.cpp +++ b/presto-native-execution/presto_cpp/main/functions/remote/client/RestRemoteClient.cpp @@ -17,6 +17,7 @@ #include #include +#include "presto_cpp/main/common/Configs.h" #include "presto_cpp/main/functions/remote/utils/ContentTypes.h" #include "velox/common/base/Exceptions.h" #include "velox/common/memory/Memory.h" @@ -39,6 +40,8 @@ RestRemoteClient::RestRemoteClient(const std::string& url) : url_(url) { folly::SocketAddress addr(uri.host().c_str(), uri.port(), true); evbThread_ = std::make_unique("rest-client"); + auto systemConfig = SystemConfig::instance(); + auto httpClientOptions = systemConfig->httpClientOptions(); httpClient_ = std::make_shared( evbThread_->getEventBase(), nullptr, @@ -47,7 +50,8 @@ RestRemoteClient::RestRemoteClient(const std::string& url) : url_(url) { requestTimeoutMs, connectTimeoutMs, memPool_, - nullptr); + nullptr, + std::move(httpClientOptions)); } RestRemoteClient::~RestRemoteClient() { diff --git a/presto-native-execution/presto_cpp/main/http/HttpClient.cpp b/presto-native-execution/presto_cpp/main/http/HttpClient.cpp index 0a0f208e010ab..ce547538fc353 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpClient.cpp +++ b/presto-native-execution/presto_cpp/main/http/HttpClient.cpp @@ -20,7 +20,6 @@ #include #include #include -#include "presto_cpp/main/common/Configs.h" #include "presto_cpp/main/common/Counters.h" #include "presto_cpp/main/common/Utils.h" #include "presto_cpp/main/http/HttpClient.h" @@ -37,6 +36,7 @@ HttpClient::HttpClient( std::chrono::milliseconds connectTimeout, std::shared_ptr pool, folly::SSLContextPtr sslContext, + HttpClientOptions options, std::function&& reportOnBodyStatsFunc) : eventBase_(eventBase), connPool_(connPool), @@ -44,20 +44,10 @@ HttpClient::HttpClient( address_(address), transactionTimeout_(transactionTimeout), connectTimeout_(connectTimeout), - http2Enabled_(SystemConfig::instance()->httpClientHttp2Enabled()), - maxConcurrentStreams_( - SystemConfig::instance()->httpClientHttp2MaxStreamsPerConnection()), - http2InitialStreamWindow_( - SystemConfig::instance()->httpClientHttp2InitialStreamWindow()), - http2StreamWindow_( - SystemConfig::instance()->httpClientHttp2StreamWindow()), - http2SessionWindow_( - SystemConfig::instance()->httpClientHttp2SessionWindow()), + options_(std::move(options)), pool_(std::move(pool)), sslContext_(std::move(sslContext)), - reportOnBodyStatsFunc_(std::move(reportOnBodyStatsFunc)), - maxResponseAllocBytes_(SystemConfig::instance()->httpMaxAllocateBytes()) { -} + reportOnBodyStatsFunc_(std::move(reportOnBodyStatsFunc)) {} HttpClient::~HttpClient() { if (sessionPoolHolder_) { @@ -218,7 +208,7 @@ class ResponseHandler : public proxygen::HTTPTransactionHandler { // - seqNo == 0: First request on this connection // - seqNo > 0: Connection is being reused for subsequent requests // Reuse rate = connection_reuse / (connection_first_use + connection_reuse) - if (SystemConfig::instance()->httpClientConnectionReuseCounterEnabled()) { + if (client_->connectionReuseCounterEnabled()) { const uint32_t seqNo = txn_->getSequenceNumber(); if (seqNo > 0) { RECORD_METRIC_VALUE(kCounterHttpClientConnectionReuse); @@ -567,11 +557,11 @@ void HttpClient::sendRequest(std::shared_ptr responseHandler) { sessionPool_, proxygen::WheelTimerInstance(transactionTimeout_, eventBase_), connectTimeout_, - http2Enabled_, - maxConcurrentStreams_, - http2InitialStreamWindow_, - http2StreamWindow_, - http2SessionWindow_, + options_.http2Enabled, + options_.http2MaxStreamsPerConnection, + options_.http2InitialStreamWindow, + options_.http2StreamWindow, + options_.http2SessionWindow, eventBase_, address_, sslContext_); @@ -592,7 +582,7 @@ folly::SemiFuture> HttpClient::sendRequest( request.ensureHostHeader(); auto responseHandler = std::make_shared( request, - maxResponseAllocBytes_, + options_.maxAllocateBytes, body, reportOnBodyStatsFunc_, shared_from_this()); @@ -618,26 +608,20 @@ folly::SemiFuture> HttpClient::sendRequest( void RequestBuilder::addJwtIfConfigured() { #ifdef PRESTO_ENABLE_JWT - if (SystemConfig::instance()->internalCommunicationJwtEnabled()) { + if (jwtOptions_.jwtEnabled) { // If JWT was enabled the secret cannot be empty. auto secretHash = std::vector(SHA256_DIGEST_LENGTH); folly::ssl::OpenSSLHash::sha256( folly::range(secretHash), - folly::ByteRange( - folly::StringPiece( - SystemConfig::instance() - ->internalCommunicationSharedSecret()))); + folly::ByteRange(folly::StringPiece(jwtOptions_.sharedSecret))); const auto time = std::chrono::system_clock::now(); const auto token = jwt::create() - .set_subject(NodeConfig::instance()->nodeId()) + .set_subject(jwtOptions_.nodeId) .set_issued_at(time) .set_expires_at( - time + - std::chrono::seconds{ - SystemConfig::instance() - ->internalCommunicationJwtExpirationSeconds()}) + time + std::chrono::seconds{jwtOptions_.jwtExpirationSeconds}) .sign( jwt::algorithm::hs256{std::string( reinterpret_cast(secretHash.data()), diff --git a/presto-native-execution/presto_cpp/main/http/HttpClient.h b/presto-native-execution/presto_cpp/main/http/HttpClient.h index c6dc84ad0d86a..76ad101f2e465 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpClient.h +++ b/presto-native-execution/presto_cpp/main/http/HttpClient.h @@ -18,7 +18,9 @@ #include #include #include +#include "presto_cpp/main/http/HttpClientOptions.h" // @manual #include "presto_cpp/main/http/HttpConstants.h" +#include "presto_cpp/main/http/JwtOptions.h" // @manual #include "velox/common/base/Exceptions.h" namespace facebook::presto::http { @@ -151,10 +153,6 @@ class HttpClientConnectionPool { class ResponseHandler; -// HttpClient uses proxygen::SessionPool which must be destructed on the -// EventBase thread. Hence, the destructor of HttpClient must run on the -// EventBase thread as well. Consider running HttpClient's destructor -// via EventBase::runOnDestruction. class HttpClient : public std::enable_shared_from_this { public: HttpClient( @@ -166,6 +164,7 @@ class HttpClient : public std::enable_shared_from_this { std::chrono::milliseconds connectTimeout, std::shared_ptr pool, folly::SSLContextPtr sslContext, + HttpClientOptions options = {}, std::function&& reportOnBodyStatsFunc = nullptr); ~HttpClient(); @@ -180,6 +179,10 @@ class HttpClient : public std::enable_shared_from_this { return pool_; } + bool connectionReuseCounterEnabled() const { + return options_.connectionReuseCounterEnabled; + } + static int64_t numConnectionsCreated() { return numConnectionsCreated_; } @@ -200,15 +203,10 @@ class HttpClient : public std::enable_shared_from_this { const folly::SocketAddress address_; const std::chrono::milliseconds transactionTimeout_; const std::chrono::milliseconds connectTimeout_; - const bool http2Enabled_; - const uint32_t maxConcurrentStreams_; - const uint32_t http2InitialStreamWindow_; - const uint32_t http2StreamWindow_; - const uint32_t http2SessionWindow_; + const HttpClientOptions options_; const std::shared_ptr pool_; const folly::SSLContextPtr sslContext_; const std::function reportOnBodyStatsFunc_; - const uint64_t maxResponseAllocBytes_; proxygen::SessionPool* sessionPool_ = nullptr; proxygen::ServerIdleSessionController* idleSessions_ = nullptr; @@ -221,6 +219,11 @@ class RequestBuilder { public: RequestBuilder() {} + RequestBuilder& jwtOptions(JwtOptions options) { + jwtOptions_ = std::move(options); + return *this; + } + RequestBuilder& method(proxygen::HTTPMethod method) { headers_.setMethod(method); return *this; @@ -253,6 +256,7 @@ class RequestBuilder { private: void addJwtIfConfigured(); + JwtOptions jwtOptions_; proxygen::HTTPMessage headers_; }; diff --git a/presto-native-execution/presto_cpp/main/http/HttpClientOptions.h b/presto-native-execution/presto_cpp/main/http/HttpClientOptions.h new file mode 100644 index 0000000000000..4f5b698389057 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/http/HttpClientOptions.h @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace facebook::presto::http { + +struct HttpClientOptions { + // HTTP/2 transport settings (used in constructor) + bool http2Enabled{false}; + uint32_t http2MaxStreamsPerConnection{8}; + uint32_t http2InitialStreamWindow{1 << 23}; + uint32_t http2StreamWindow{1 << 23}; + uint32_t http2SessionWindow{1 << 26}; + uint64_t maxAllocateBytes{65536}; + + // Metrics settings (used in ResponseHandler) + bool connectionReuseCounterEnabled{true}; +}; + +} // namespace facebook::presto::http diff --git a/presto-native-execution/presto_cpp/main/http/HttpServer.cpp b/presto-native-execution/presto_cpp/main/http/HttpServer.cpp index ebeb1e7f69c9b..a987174722799 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpServer.cpp +++ b/presto-native-execution/presto_cpp/main/http/HttpServer.cpp @@ -14,9 +14,9 @@ #include -#include "presto_cpp/main/common/Configs.h" #include "presto_cpp/main/common/Utils.h" #include "presto_cpp/main/http/HttpServer.h" +#include "velox/common/base/Exceptions.h" namespace facebook::presto::http { @@ -279,14 +279,13 @@ HttpServer::endpoints() const { } void HttpServer::start( + HttpServerStartupOptions startupOptions, std::vector> filters, std::function onSuccess, std::function onError) { proxygen::HTTPServerOptions options; - auto systemConfig = SystemConfig::instance(); - options.idleTimeout = - std::chrono::milliseconds(systemConfig->httpServerIdleTimeoutMs()); + options.idleTimeout = std::chrono::milliseconds(startupOptions.idleTimeoutMs); proxygen::RequestHandlerChain handlerFactories; @@ -301,30 +300,24 @@ void HttpServer::start( options.handlerFactories = handlerFactories.build(); // HTTP/2 flow control window sizes (configurable) - options.initialReceiveWindow = - systemConfig->httpServerHttp2InitialReceiveWindow(); - options.receiveStreamWindowSize = - systemConfig->httpServerHttp2ReceiveStreamWindowSize(); + options.initialReceiveWindow = startupOptions.http2InitialReceiveWindow; + options.receiveStreamWindowSize = startupOptions.http2ReceiveStreamWindowSize; options.receiveSessionWindowSize = - systemConfig->httpServerHttp2ReceiveSessionWindowSize(); + startupOptions.http2ReceiveSessionWindowSize; options.maxConcurrentIncomingStreams = - systemConfig->httpServerHttp2MaxConcurrentStreams(); + startupOptions.http2MaxConcurrentStreams; options.h2cEnabled = true; // Enable HTTP/2 responses compression for better performance // Supports both gzip and zstd (zstd preferred when client supports it) - options.enableContentCompression = - systemConfig->httpServerEnableContentCompression(); - options.contentCompressionLevel = - systemConfig->httpServerContentCompressionLevel(); + options.enableContentCompression = startupOptions.enableContentCompression; + options.contentCompressionLevel = startupOptions.contentCompressionLevel; options.contentCompressionMinimumSize = - systemConfig->httpServerContentCompressionMinimumSize(); - options.enableZstdCompression = - systemConfig->httpServerEnableZstdCompression(); + startupOptions.contentCompressionMinimumSize; + options.enableZstdCompression = startupOptions.enableZstdCompression; options.zstdContentCompressionLevel = - systemConfig->httpServerZstdContentCompressionLevel(); - options.enableGzipCompression = - systemConfig->httpServerEnableGzipCompression(); + startupOptions.zstdContentCompressionLevel; + options.enableGzipCompression = startupOptions.enableGzipCompression; // CRITICAL: Add Thrift content-types for Presto task updates // By default, proxygen only compresses text/* and some application/* types diff --git a/presto-native-execution/presto_cpp/main/http/HttpServer.h b/presto-native-execution/presto_cpp/main/http/HttpServer.h index 0d9870810a42b..c8b56dbbb0f2d 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpServer.h +++ b/presto-native-execution/presto_cpp/main/http/HttpServer.h @@ -20,6 +20,7 @@ #include #include "presto_cpp/external/json/nlohmann/json.hpp" #include "presto_cpp/main/http/HttpConstants.h" +#include "presto_cpp/main/http/HttpServerStartupOptions.h" // @manual namespace facebook::presto::http { @@ -288,6 +289,7 @@ class HttpServer { std::unique_ptr httpsConfig = nullptr); void start( + HttpServerStartupOptions startupOptions = {}, std::vector> filters = {}, std::function onSuccess = nullptr, diff --git a/presto-native-execution/presto_cpp/main/http/HttpServerStartupOptions.h b/presto-native-execution/presto_cpp/main/http/HttpServerStartupOptions.h new file mode 100644 index 0000000000000..55ae878fbe356 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/http/HttpServerStartupOptions.h @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace facebook::presto::http { + +struct HttpServerStartupOptions { + uint32_t idleTimeoutMs{60'000}; + uint32_t http2InitialReceiveWindow{1 << 20}; + uint32_t http2ReceiveStreamWindowSize{1 << 20}; + uint32_t http2ReceiveSessionWindowSize{10 * (1 << 20)}; + uint32_t http2MaxConcurrentStreams{100}; + bool enableContentCompression{false}; + uint32_t contentCompressionLevel{4}; + uint32_t contentCompressionMinimumSize{3584}; + bool enableZstdCompression{false}; + uint32_t zstdContentCompressionLevel{8}; + bool enableGzipCompression{false}; +}; + +} // namespace facebook::presto::http diff --git a/presto-native-execution/presto_cpp/main/http/JwtOptions.h b/presto-native-execution/presto_cpp/main/http/JwtOptions.h new file mode 100644 index 0000000000000..0eb349c12c891 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/http/JwtOptions.h @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace facebook::presto::http { + +struct JwtOptions { + bool jwtEnabled{false}; + std::string sharedSecret; + int32_t jwtExpirationSeconds{300}; + std::string nodeId; +}; + +} // namespace facebook::presto::http diff --git a/presto-native-execution/presto_cpp/main/http/filters/InternalAuthenticationFilter.cpp b/presto-native-execution/presto_cpp/main/http/filters/InternalAuthenticationFilter.cpp index 1e5125abc371b..460e158d754ec 100644 --- a/presto-native-execution/presto_cpp/main/http/filters/InternalAuthenticationFilter.cpp +++ b/presto-native-execution/presto_cpp/main/http/filters/InternalAuthenticationFilter.cpp @@ -154,11 +154,11 @@ void InternalAuthenticationFilter::processAndVerifyJwt( } // Passed the verification, move the message along. Filter::onRequest(std::move(msg)); - } catch (const jwt::error::token_verification_exception& e) { + } catch (const jwt::error::token_verification_exception&) { sendUnauthorizedResponse(); - } catch (const jwt::error::signature_verification_exception& e) { + } catch (const jwt::error::signature_verification_exception&) { sendUnauthorizedResponse(); - } catch (const std::system_error& e) { + } catch (const std::system_error&) { sendGenericErrorResponse(); } #endif // PRESTO_ENABLE_JWT diff --git a/presto-native-execution/presto_cpp/main/http/tests/HttpJwtTest.cpp b/presto-native-execution/presto_cpp/main/http/tests/HttpJwtTest.cpp index 08d3135b593d0..22a4036eba9ba 100644 --- a/presto-native-execution/presto_cpp/main/http/tests/HttpJwtTest.cpp +++ b/presto-native-execution/presto_cpp/main/http/tests/HttpJwtTest.cpp @@ -104,8 +104,10 @@ class HttpJwtTestSuite : public ::testing::TestWithParam { auto [reqPromise, reqFuture] = folly::makePromiseContract(); request->requestPromise = std::move(reqPromise); + auto jwtOpts = systemConfig->jwtOptions(); + auto responseFuture = - sendGet(client.get(), "/async/msg", sendDelayMs, "TestBody"); + sendGet(client.get(), "/async/msg", sendDelayMs, "TestBody", jwtOpts); auto serverConfig = jwtSystemConfig(serverSystemConfigOverride); auto valuesMap = serverConfig->rawConfigsCopy(); @@ -133,7 +135,7 @@ class HttpJwtTestSuite : public ::testing::TestWithParam { TEST_P(HttpJwtTestSuite, basicJwtTest) { const bool useHttps = GetParam(); - auto response = std::move(produceHttpResponse(useHttps)); + auto response = produceHttpResponse(useHttps); EXPECT_EQ(response->headers()->getStatusCode(), http::kHttpOk); } @@ -145,8 +147,7 @@ TEST_P(HttpJwtTestSuite, jwtSecretMismatch) { const bool useHttps = GetParam(); - auto response = - std::move(produceHttpResponse(useHttps, {}, serverConfigOverride)); + auto response = produceHttpResponse(useHttps, {}, serverConfigOverride); EXPECT_EQ(response->headers()->getStatusCode(), http::kHttpUnauthorized); } @@ -162,8 +163,8 @@ TEST_P(HttpJwtTestSuite, jwtExpiredToken) { const bool useHttps = GetParam(); - auto response = std::move( - produceHttpResponse(useHttps, clientConfigOverride, {}, kSendDelay)); + auto response = + produceHttpResponse(useHttps, clientConfigOverride, {}, kSendDelay); EXPECT_EQ(response->headers()->getStatusCode(), http::kHttpUnauthorized); } @@ -176,8 +177,7 @@ TEST_P(HttpJwtTestSuite, jwtServerVerificationDisabled) { const bool useHttps = GetParam(); - auto response = - std::move(produceHttpResponse(useHttps, {}, serverConfigOverride)); + auto response = produceHttpResponse(useHttps, {}, serverConfigOverride); EXPECT_EQ(response->headers()->getStatusCode(), http::kHttpUnauthorized); } @@ -190,8 +190,7 @@ TEST_P(HttpJwtTestSuite, jwtClientMissingJwt) { const bool useHttps = GetParam(); - auto response = - std::move(produceHttpResponse(useHttps, clientConfigOverride)); + auto response = produceHttpResponse(useHttps, clientConfigOverride); EXPECT_EQ(response->headers()->getStatusCode(), http::kHttpUnauthorized); } diff --git a/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp b/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp index c72617f01dfc0..3b25551c58b98 100644 --- a/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp +++ b/presto-native-execution/presto_cpp/main/http/tests/HttpTest.cpp @@ -160,7 +160,8 @@ TEST_P(HttpTestSuite, clientIdleSessions) { std::chrono::seconds(1), std::chrono::milliseconds(0), memoryPool, - useHttps ? makeSslContext() : nullptr); + useHttps ? makeSslContext() : nullptr, + http::HttpClientOptions{}); auto response = sendGet(client.get(), "/ping").get(std::chrono::seconds(3)); ASSERT_EQ(response->headers()->getStatusCode(), http::kHttpOk); } diff --git a/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h b/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h index f67a2ff0451c5..4f70a2cd9d997 100644 --- a/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h +++ b/presto-native-execution/presto_cpp/main/http/tests/HttpTestBase.h @@ -75,7 +75,7 @@ class HttpServerWrapper { promise_ = std::move(promise); serverThread_ = std::make_unique([this]() { server_->start( - std::move(filters_), [&](proxygen::HTTPServer* httpServer) { + {}, std::move(filters_), [&](proxygen::HTTPServer* httpServer) { ASSERT_EQ(httpServer->addresses().size(), 1); promise_.setValue(httpServer->addresses()[0].address); }); @@ -208,6 +208,7 @@ class HttpClientFactory { connectTimeout, pool, useHttps ? makeSslContext() : nullptr, + facebook::presto::http::HttpClientOptions{}, std::move(reportOnBodyStatsFunc)); } @@ -221,8 +222,10 @@ sendGet( facebook::presto::http::HttpClient* client, const std::string& url, const uint64_t sendDelay = 0, - const std::string body = "") { + const std::string body = "", + facebook::presto::http::JwtOptions jwtOptions = {}) { return facebook::presto::http::RequestBuilder() + .jwtOptions(std::move(jwtOptions)) .method(proxygen::HTTPMethod::GET) .url(url) .send(client, body, sendDelay); diff --git a/presto-native-execution/presto_cpp/main/tests/HttpServerWrapper.cpp b/presto-native-execution/presto_cpp/main/tests/HttpServerWrapper.cpp index d1a9c64a29761..48e0d73e5daeb 100644 --- a/presto-native-execution/presto_cpp/main/tests/HttpServerWrapper.cpp +++ b/presto-native-execution/presto_cpp/main/tests/HttpServerWrapper.cpp @@ -21,7 +21,7 @@ folly::SemiFuture HttpServerWrapper::start() { auto [promise, future] = folly::makePromiseContract(); promise_ = std::move(promise); serverThread_ = std::make_unique([this]() { - server_->start({}, [&](proxygen::HTTPServer* httpServer) { + server_->start({}, {}, [&](proxygen::HTTPServer* httpServer) { ASSERT_EQ(httpServer->addresses().size(), 1); promise_.setValue(httpServer->addresses()[0].address); });