From 5abaf33f7a2bb0cb72a27f53403d430d0821c1e0 Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Thu, 30 Oct 2025 14:24:05 -0700 Subject: [PATCH] feat(native): Add flow control configuration for http2 client --- .../presto_cpp/main/common/Configs.cpp | 11 +++++++++ .../presto_cpp/main/common/Configs.h | 13 ++++++++++ .../presto_cpp/main/http/HttpClient.cpp | 24 +++++++++++++++++++ .../presto_cpp/main/http/HttpClient.h | 3 +++ 4 files changed, 51 insertions(+) diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 450d4b34253bd..fb949cfb78a83 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -234,6 +234,8 @@ SystemConfig::SystemConfig() { NUM_PROP(kAnnouncementMaxFrequencyMs, 30'000), // 30s NUM_PROP(kHeartbeatFrequencyMs, 0), BOOL_PROP(kHttpClientHttp2Enabled, false), + NUM_PROP(kHttpClientHttp2MaxStreamsPerConnection, 1), + NUM_PROP(kHttpClientHttp2FlowControlWindow, 1 << 24 /*16MB*/), STR_PROP(kExchangeMaxErrorDuration, "3m"), STR_PROP(kExchangeRequestTimeout, "20s"), STR_PROP(kExchangeConnectTimeout, "20s"), @@ -860,6 +862,15 @@ bool SystemConfig::httpClientHttp2Enabled() const { return optionalProperty(kHttpClientHttp2Enabled).value(); } +uint32_t SystemConfig::httpClientHttp2MaxStreamsPerConnection() const { + return optionalProperty(kHttpClientHttp2MaxStreamsPerConnection) + .value(); +} + +uint32_t SystemConfig::httpClientHttp2FlowControlWindow() const { + return optionalProperty(kHttpClientHttp2FlowControlWindow).value(); +} + std::chrono::duration SystemConfig::exchangeMaxErrorDuration() const { return velox::config::toDuration( optionalProperty(kExchangeMaxErrorDuration).value()); diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index a70b091962e14..bdfc39bf4190c 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -657,6 +657,15 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kHttpClientHttp2Enabled{ "http-client.http2-enabled"}; + /// Maximum concurrent streams per HTTP/2 connection + static constexpr std::string_view kHttpClientHttp2MaxStreamsPerConnection{ + "http-client.http2.max-streams-per-connection"}; + + /// HTTP/2 flow control window size in bytes. + /// Controls the initial receive window, stream window, and session window. + static constexpr std::string_view kHttpClientHttp2FlowControlWindow{ + "http-client.http2.flow-control-window"}; + static constexpr std::string_view kExchangeMaxErrorDuration{ "exchange.max-error-duration"}; @@ -1042,6 +1051,10 @@ class SystemConfig : public ConfigBase { bool httpClientHttp2Enabled() const; + uint32_t httpClientHttp2MaxStreamsPerConnection() const; + + uint32_t httpClientHttp2FlowControlWindow() const; + std::chrono::duration exchangeMaxErrorDuration() const; std::chrono::duration exchangeRequestTimeoutMs() const; diff --git a/presto-native-execution/presto_cpp/main/http/HttpClient.cpp b/presto-native-execution/presto_cpp/main/http/HttpClient.cpp index c9cc5a7d688c2..a511838e086f0 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpClient.cpp +++ b/presto-native-execution/presto_cpp/main/http/HttpClient.cpp @@ -44,6 +44,11 @@ HttpClient::HttpClient( address_(address), transactionTimeout_(transactionTimeout), connectTimeout_(connectTimeout), + http2Enabled_(SystemConfig::instance()->httpClientHttp2Enabled()), + maxConcurrentStreams_( + SystemConfig::instance()->httpClientHttp2MaxStreamsPerConnection()), + http2FlowControlWindow_( + SystemConfig::instance()->httpClientHttp2FlowControlWindow()), pool_(std::move(pool)), sslContext_(sslContext), reportOnBodyStatsFunc_(std::move(reportOnBodyStatsFunc)), @@ -303,6 +308,9 @@ class ConnectionHandler : public proxygen::HTTPConnector::Callback { proxygen::SessionPool* sessionPool, proxygen::WheelTimerInstance transactionTimeout, std::chrono::milliseconds connectTimeout, + bool http2Enabled, + uint32_t maxConcurrentStreams, + uint32_t http2FlowControlWindow, folly::EventBase* eventBase, const folly::SocketAddress& address, folly::SSLContextPtr sslContext) @@ -310,6 +318,9 @@ class ConnectionHandler : public proxygen::HTTPConnector::Callback { sessionPool_(sessionPool), transactionTimer_(transactionTimeout), connectTimeout_(connectTimeout), + http2Enabled_(http2Enabled), + maxConcurrentStreams_(maxConcurrentStreams), + http2FlowControlWindow_(http2FlowControlWindow), eventBase_(eventBase), address_(address), sslContext_(std::move(sslContext)) {} @@ -330,6 +341,13 @@ class ConnectionHandler : public proxygen::HTTPConnector::Callback { } void connectSuccess(proxygen::HTTPUpstreamSession* session) override { + if (http2Enabled_) { + session->setFlowControl( + http2FlowControlWindow_, + http2FlowControlWindow_, + http2FlowControlWindow_); + session->setMaxConcurrentOutgoingStreams(maxConcurrentStreams_); + } auto txn = session->newTransaction(responseHandler_.get()); if (txn) { responseHandler_->sendRequest(txn); @@ -352,6 +370,9 @@ class ConnectionHandler : public proxygen::HTTPConnector::Callback { // The connect timeout used to timeout the duration from starting connection // to connect success const std::chrono::milliseconds connectTimeout_; + const bool http2Enabled_; + const uint32_t maxConcurrentStreams_; + const uint32_t http2FlowControlWindow_; folly::EventBase* const eventBase_; const folly::SocketAddress address_; const folly::SSLContextPtr sslContext_; @@ -518,6 +539,9 @@ void HttpClient::sendRequest(std::shared_ptr responseHandler) { sessionPool_, proxygen::WheelTimerInstance(transactionTimeout_, eventBase_), connectTimeout_, + http2Enabled_, + maxConcurrentStreams_, + http2FlowControlWindow_, eventBase_, address_, sslContext_); diff --git a/presto-native-execution/presto_cpp/main/http/HttpClient.h b/presto-native-execution/presto_cpp/main/http/HttpClient.h index d01adeec7db9c..d96a0ba0c356e 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpClient.h +++ b/presto-native-execution/presto_cpp/main/http/HttpClient.h @@ -200,6 +200,9 @@ class HttpClient : public std::enable_shared_from_this { const folly::SocketAddress address_; const std::chrono::milliseconds transactionTimeout_; const std::chrono::milliseconds connectTimeout_; + const bool http2Enabled_; + const uint32_t maxConcurrentStreams_; + const uint32_t http2FlowControlWindow_; const std::shared_ptr pool_; const folly::SSLContextPtr sslContext_; const std::function reportOnBodyStatsFunc_;