diff --git a/presto-native-execution/presto_cpp/main/TaskResource.cpp b/presto-native-execution/presto_cpp/main/TaskResource.cpp index 3fc4a2e42b4ff..67ba67ebf325d 100644 --- a/presto-native-execution/presto_cpp/main/TaskResource.cpp +++ b/presto-native-execution/presto_cpp/main/TaskResource.cpp @@ -224,9 +224,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl( headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE); const auto receiveThrift = contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos; + const auto contentEncoding = headers.getSingleOrEmpty("Content-Encoding"); + const auto isCompressed = + !contentEncoding.empty() && contentEncoding != "identity"; return new http::CallbackRequestHandler( - [this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift]( + [this, + taskId, + summarize, + createOrUpdateFunc, + sendThrift, + receiveThrift, + contentEncoding, + isCompressed]( proxygen::HTTPMessage* /*message*/, const std::vector>& body, proxygen::ResponseHandler* downstream, @@ -234,7 +244,9 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl( folly::via( httpSrvCpuExecutor_, [this, - requestBody = util::extractMessageBody(body), + requestBody = isCompressed + ? util::decompressMessageBody(body, contentEncoding) + : util::extractMessageBody(body), taskId, summarize, createOrUpdateFunc, diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index bf0da5cc5f18a..80df7602f3f05 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -150,10 +150,17 @@ SystemConfig::SystemConfig() { NONE_PROP(kHttpServerHttpsPort), BOOL_PROP(kHttpServerHttpsEnabled, false), BOOL_PROP(kHttpServerHttp2Enabled, true), + NUM_PROP(kHttpServerIdleTimeoutMs, 60'000), NUM_PROP(kHttpServerHttp2InitialReceiveWindow, 1 << 20), NUM_PROP(kHttpServerHttp2ReceiveStreamWindowSize, 1 << 20), NUM_PROP(kHttpServerHttp2ReceiveSessionWindowSize, 10 * (1 << 20)), - NUM_PROP(kHttpServerIdleTimeoutMs, 60'000), + NUM_PROP(kHttpServerHttp2MaxConcurrentStreams, 100), + NUM_PROP(kHttpServerContentCompressionLevel, 4), + NUM_PROP(kHttpServerContentCompressionMinimumSize, 3584), + BOOL_PROP(kHttpServerEnableContentCompression, false), + BOOL_PROP(kHttpServerEnableZstdCompression, false), + NUM_PROP(kHttpServerZstdContentCompressionLevel, 8), + BOOL_PROP(kHttpServerEnableGzipCompression, false), STR_PROP( kHttpsSupportedCiphers, "ECDHE-ECDSA-AES256-GCM-SHA384,AES256-GCM-SHA384"), @@ -309,6 +316,10 @@ bool SystemConfig::httpServerHttp2Enabled() const { return optionalProperty(kHttpServerHttp2Enabled).value(); } +uint32_t SystemConfig::httpServerIdleTimeoutMs() const { + return optionalProperty(kHttpServerIdleTimeoutMs).value(); +} + uint32_t SystemConfig::httpServerHttp2InitialReceiveWindow() const { return optionalProperty(kHttpServerHttp2InitialReceiveWindow) .value(); @@ -324,8 +335,35 @@ uint32_t SystemConfig::httpServerHttp2ReceiveSessionWindowSize() const { .value(); } -uint32_t SystemConfig::httpServerIdleTimeoutMs() const { - return optionalProperty(kHttpServerIdleTimeoutMs).value(); +uint32_t SystemConfig::httpServerHttp2MaxConcurrentStreams() const { + return optionalProperty(kHttpServerHttp2MaxConcurrentStreams) + .value(); +} + +uint32_t SystemConfig::httpServerContentCompressionLevel() const { + return optionalProperty(kHttpServerContentCompressionLevel).value(); +} + +uint32_t SystemConfig::httpServerContentCompressionMinimumSize() const { + return optionalProperty(kHttpServerContentCompressionMinimumSize) + .value(); +} + +bool SystemConfig::httpServerEnableContentCompression() const { + return optionalProperty(kHttpServerEnableContentCompression).value(); +} + +bool SystemConfig::httpServerEnableZstdCompression() const { + return optionalProperty(kHttpServerEnableZstdCompression).value(); +} + +uint32_t SystemConfig::httpServerZstdContentCompressionLevel() const { + return optionalProperty(kHttpServerZstdContentCompressionLevel) + .value(); +} + +bool SystemConfig::httpServerEnableGzipCompression() const { + return optionalProperty(kHttpServerEnableGzipCompression).value(); } std::string SystemConfig::httpsSupportedCiphers() const { diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index f6708cbac733b..b3419a69e67e6 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -209,6 +209,9 @@ class SystemConfig : public ConfigBase { "http-server.https.enabled"}; static constexpr std::string_view kHttpServerHttp2Enabled{ "http-server.http2.enabled"}; + /// HTTP/2 server idle timeout in milliseconds (default 60000ms). + static constexpr std::string_view kHttpServerIdleTimeoutMs{ + "http-server.http2.idle-timeout-ms"}; /// HTTP/2 initial receive window size in bytes (default 1MB). static constexpr std::string_view kHttpServerHttp2InitialReceiveWindow{ "http-server.http2.initial-receive-window"}; @@ -218,11 +221,27 @@ class SystemConfig : public ConfigBase { /// HTTP/2 receive session window size in bytes (default 10MB). static constexpr std::string_view kHttpServerHttp2ReceiveSessionWindowSize{ "http-server.http2.receive-session-window-size"}; - - /// HTTP server idle timeout in milliseconds - static constexpr std::string_view kHttpServerIdleTimeoutMs{ - "http-server.idle-timeout-ms"}; - + /// HTTP/2 maximum concurrent streams per connection (default 100). + static constexpr std::string_view kHttpServerHttp2MaxConcurrentStreams{ + "http-server.http2.max-concurrent-streams"}; + /// HTTP/2 content compression level (1-9, default 4 for speed). + static constexpr std::string_view kHttpServerContentCompressionLevel{ + "http-server.http2.content-compression-level"}; + /// HTTP/2 content compression minimum size in bytes (default 3584). + static constexpr std::string_view kHttpServerContentCompressionMinimumSize{ + "http-server.http2.content-compression-minimum-size"}; + /// Enable content compression (master switch, default true). + static constexpr std::string_view kHttpServerEnableContentCompression{ + "http-server.http2.enable-content-compression"}; + /// Enable zstd compression (default false). + static constexpr std::string_view kHttpServerEnableZstdCompression{ + "http-server.http2.enable-zstd-compression"}; + /// Zstd compression level (-5 to 22, default 8). + static constexpr std::string_view kHttpServerZstdContentCompressionLevel{ + "http-server.http2.zstd-content-compression-level"}; + /// Enable gzip compression (default true). + static constexpr std::string_view kHttpServerEnableGzipCompression{ + "http-server.http2.enable-gzip-compression"}; /// List of comma separated ciphers the client can use. /// /// NOTE: the client needs to have at least one cipher shared with server @@ -841,13 +860,27 @@ class SystemConfig : public ConfigBase { bool httpServerHttp2Enabled() const; + uint32_t httpServerIdleTimeoutMs() const; + uint32_t httpServerHttp2InitialReceiveWindow() const; uint32_t httpServerHttp2ReceiveStreamWindowSize() const; uint32_t httpServerHttp2ReceiveSessionWindowSize() const; - uint32_t httpServerIdleTimeoutMs() const; + uint32_t httpServerHttp2MaxConcurrentStreams() const; + + uint32_t httpServerContentCompressionLevel() const; + + uint32_t httpServerContentCompressionMinimumSize() const; + + bool httpServerEnableContentCompression() const; + + bool httpServerEnableZstdCompression() const; + + uint32_t httpServerZstdContentCompressionLevel() const; + + bool httpServerEnableGzipCompression() const; /// A list of ciphers (comma separated) that are supported by /// server and client. Note Java and folly::SSLContext use different names to diff --git a/presto-native-execution/presto_cpp/main/common/Utils.cpp b/presto-native-execution/presto_cpp/main/common/Utils.cpp index 96fb762e38b89..43f6c24edbb85 100644 --- a/presto-native-execution/presto_cpp/main/common/Utils.cpp +++ b/presto-native-execution/presto_cpp/main/common/Utils.cpp @@ -14,8 +14,11 @@ #include "presto_cpp/main/common/Utils.h" #include +#include #include +#include #include +#include "velox/common/base/Exceptions.h" #include "velox/common/process/ThreadDebugInfo.h" namespace facebook::presto::util { @@ -89,4 +92,48 @@ std::string extractMessageBody( } return ret; } + +std::string decompressMessageBody( + const std::vector>& body, + const std::string& contentEncoding) { + try { + // Combine all IOBufs into a single chain + std::unique_ptr combined; + for (const auto& buf : body) { + if (!combined) { + combined = buf->clone(); + } else { + combined->appendToChain(buf->clone()); + } + } + + // Determine compression codec type; Support only ZSTD for now + folly::compression::CodecType codecType; + if (contentEncoding == "zstd") { + codecType = folly::compression::CodecType::ZSTD; + } else { + VELOX_USER_FAIL("Unsupported Content-Encoding: {}", contentEncoding); + } + + // Decompress the data + auto codec = folly::compression::getCodec( + codecType); // getCodec never return nullptr + auto decompressed = codec->uncompress(combined.get()); + + size_t decompressedSize = decompressed->computeChainDataLength(); + + // Convert decompressed IOBuf to string + std::string ret; + ret.resize(decompressedSize); + folly::io::Cursor cursor(decompressed.get()); + cursor.pull(ret.data(), decompressedSize); + + return ret; + } catch (const std::exception& e) { + VELOX_USER_FAIL( + "Failed to decompress request body with {}: {}", + contentEncoding, + e.what()); + } +} } // namespace facebook::presto::util diff --git a/presto-native-execution/presto_cpp/main/common/Utils.h b/presto-native-execution/presto_cpp/main/common/Utils.h index 1cc6f8cf9c0ae..bd0800e668f8a 100644 --- a/presto-native-execution/presto_cpp/main/common/Utils.h +++ b/presto-native-execution/presto_cpp/main/common/Utils.h @@ -49,6 +49,12 @@ void installSignalHandler(); std::string extractMessageBody( const std::vector>& body); +/// Decompress message body based on Content-Encoding +/// Throws exception if decompression fails +std::string decompressMessageBody( + const std::vector>& body, + const std::string& contentEncoding); + inline std::string addDefaultNamespacePrefix( const std::string& prestoDefaultNamespacePrefix, const std::string& functionName) { diff --git a/presto-native-execution/presto_cpp/main/http/HttpServer.cpp b/presto-native-execution/presto_cpp/main/http/HttpServer.cpp index cc5e77f493f2c..e6b7428df7e9e 100644 --- a/presto-native-execution/presto_cpp/main/http/HttpServer.cpp +++ b/presto-native-execution/presto_cpp/main/http/HttpServer.cpp @@ -273,9 +273,10 @@ void HttpServer::start( std::function onSuccess, std::function onError) { proxygen::HTTPServerOptions options; - options.idleTimeout = std::chrono::milliseconds( - SystemConfig::instance()->httpServerIdleTimeoutMs()); - options.enableContentCompression = false; + + auto systemConfig = SystemConfig::instance(); + options.idleTimeout = + std::chrono::milliseconds(systemConfig->httpServerIdleTimeoutMs()); proxygen::RequestHandlerChain handlerFactories; @@ -290,15 +291,39 @@ void HttpServer::start( options.handlerFactories = handlerFactories.build(); // HTTP/2 flow control window sizes (configurable) - auto systemConfig = SystemConfig::instance(); options.initialReceiveWindow = systemConfig->httpServerHttp2InitialReceiveWindow(); options.receiveStreamWindowSize = systemConfig->httpServerHttp2ReceiveStreamWindowSize(); options.receiveSessionWindowSize = systemConfig->httpServerHttp2ReceiveSessionWindowSize(); + options.maxConcurrentIncomingStreams = + systemConfig->httpServerHttp2MaxConcurrentStreams(); options.h2cEnabled = true; + // Enable HTTP/2 responses compression for better performance + // Supports both gzip and zstd (zstd preferred when client supports it) + options.enableContentCompression = + systemConfig->httpServerEnableContentCompression(); + options.contentCompressionLevel = + systemConfig->httpServerContentCompressionLevel(); + options.contentCompressionMinimumSize = + systemConfig->httpServerContentCompressionMinimumSize(); + options.enableZstdCompression = + systemConfig->httpServerEnableZstdCompression(); + options.zstdContentCompressionLevel = + systemConfig->httpServerZstdContentCompressionLevel(); + options.enableGzipCompression = + systemConfig->httpServerEnableGzipCompression(); + + // CRITICAL: Add Thrift content-types for Presto task updates + // By default, proxygen only compresses text/* and some application/* types + // We need to explicitly add all Thrift variants used by Presto + options.contentCompressionTypes.insert( + "application/x-thrift"); // Standard Thrift + options.contentCompressionTypes.insert( + "application/x-thrift+binary"); // Thrift binary protocol + server_ = std::make_unique(std::move(options)); std::vector ipConfigs;