Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,29 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
const auto receiveThrift =
contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
const auto contentEncoding = headers.getSingleOrEmpty("Content-Encoding");
const auto isCompressed =
!contentEncoding.empty() && contentEncoding != "identity";

return new http::CallbackRequestHandler(
[this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift](
[this,
taskId,
summarize,
createOrUpdateFunc,
sendThrift,
receiveThrift,
contentEncoding,
isCompressed](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream,
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
folly::via(
httpSrvCpuExecutor_,
[this,
requestBody = util::extractMessageBody(body),
requestBody = isCompressed
? util::decompressMessageBody(body, contentEncoding)
: util::extractMessageBody(body),
taskId,
summarize,
createOrUpdateFunc,
Expand Down
44 changes: 41 additions & 3 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,17 @@ SystemConfig::SystemConfig() {
NONE_PROP(kHttpServerHttpsPort),
BOOL_PROP(kHttpServerHttpsEnabled, false),
BOOL_PROP(kHttpServerHttp2Enabled, true),
NUM_PROP(kHttpServerIdleTimeoutMs, 60'000),
NUM_PROP(kHttpServerHttp2InitialReceiveWindow, 1 << 20),
NUM_PROP(kHttpServerHttp2ReceiveStreamWindowSize, 1 << 20),
NUM_PROP(kHttpServerHttp2ReceiveSessionWindowSize, 10 * (1 << 20)),
NUM_PROP(kHttpServerIdleTimeoutMs, 60'000),
NUM_PROP(kHttpServerHttp2MaxConcurrentStreams, 100),
NUM_PROP(kHttpServerContentCompressionLevel, 4),
NUM_PROP(kHttpServerContentCompressionMinimumSize, 3584),
BOOL_PROP(kHttpServerEnableContentCompression, false),
BOOL_PROP(kHttpServerEnableZstdCompression, false),
NUM_PROP(kHttpServerZstdContentCompressionLevel, 8),
BOOL_PROP(kHttpServerEnableGzipCompression, false),
STR_PROP(
kHttpsSupportedCiphers,
"ECDHE-ECDSA-AES256-GCM-SHA384,AES256-GCM-SHA384"),
Expand Down Expand Up @@ -309,6 +316,10 @@ bool SystemConfig::httpServerHttp2Enabled() const {
return optionalProperty<bool>(kHttpServerHttp2Enabled).value();
}

uint32_t SystemConfig::httpServerIdleTimeoutMs() const {
return optionalProperty<uint32_t>(kHttpServerIdleTimeoutMs).value();
}

uint32_t SystemConfig::httpServerHttp2InitialReceiveWindow() const {
return optionalProperty<uint32_t>(kHttpServerHttp2InitialReceiveWindow)
.value();
Expand All @@ -324,8 +335,35 @@ uint32_t SystemConfig::httpServerHttp2ReceiveSessionWindowSize() const {
.value();
}

uint32_t SystemConfig::httpServerIdleTimeoutMs() const {
return optionalProperty<uint32_t>(kHttpServerIdleTimeoutMs).value();
uint32_t SystemConfig::httpServerHttp2MaxConcurrentStreams() const {
return optionalProperty<uint32_t>(kHttpServerHttp2MaxConcurrentStreams)
.value();
}

uint32_t SystemConfig::httpServerContentCompressionLevel() const {
return optionalProperty<uint32_t>(kHttpServerContentCompressionLevel).value();
}

uint32_t SystemConfig::httpServerContentCompressionMinimumSize() const {
return optionalProperty<uint32_t>(kHttpServerContentCompressionMinimumSize)
.value();
}

bool SystemConfig::httpServerEnableContentCompression() const {
return optionalProperty<bool>(kHttpServerEnableContentCompression).value();
}

bool SystemConfig::httpServerEnableZstdCompression() const {
return optionalProperty<bool>(kHttpServerEnableZstdCompression).value();
}

uint32_t SystemConfig::httpServerZstdContentCompressionLevel() const {
return optionalProperty<uint32_t>(kHttpServerZstdContentCompressionLevel)
.value();
}

bool SystemConfig::httpServerEnableGzipCompression() const {
return optionalProperty<bool>(kHttpServerEnableGzipCompression).value();
}

std::string SystemConfig::httpsSupportedCiphers() const {
Expand Down
45 changes: 39 additions & 6 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ class SystemConfig : public ConfigBase {
"http-server.https.enabled"};
static constexpr std::string_view kHttpServerHttp2Enabled{
"http-server.http2.enabled"};
/// HTTP/2 server idle timeout in milliseconds (default 60000ms).
static constexpr std::string_view kHttpServerIdleTimeoutMs{
"http-server.http2.idle-timeout-ms"};
/// HTTP/2 initial receive window size in bytes (default 1MB).
static constexpr std::string_view kHttpServerHttp2InitialReceiveWindow{
"http-server.http2.initial-receive-window"};
Expand All @@ -218,11 +221,27 @@ class SystemConfig : public ConfigBase {
/// HTTP/2 receive session window size in bytes (default 10MB).
static constexpr std::string_view kHttpServerHttp2ReceiveSessionWindowSize{
"http-server.http2.receive-session-window-size"};

/// HTTP server idle timeout in milliseconds
static constexpr std::string_view kHttpServerIdleTimeoutMs{
"http-server.idle-timeout-ms"};

/// HTTP/2 maximum concurrent streams per connection (default 100).
static constexpr std::string_view kHttpServerHttp2MaxConcurrentStreams{
"http-server.http2.max-concurrent-streams"};
/// HTTP/2 content compression level (1-9, default 4 for speed).
static constexpr std::string_view kHttpServerContentCompressionLevel{
"http-server.http2.content-compression-level"};
/// HTTP/2 content compression minimum size in bytes (default 3584).
static constexpr std::string_view kHttpServerContentCompressionMinimumSize{
"http-server.http2.content-compression-minimum-size"};
/// Enable content compression (master switch, default true).
static constexpr std::string_view kHttpServerEnableContentCompression{
"http-server.http2.enable-content-compression"};
/// Enable zstd compression (default false).
static constexpr std::string_view kHttpServerEnableZstdCompression{
"http-server.http2.enable-zstd-compression"};
/// Zstd compression level (-5 to 22, default 8).
static constexpr std::string_view kHttpServerZstdContentCompressionLevel{
"http-server.http2.zstd-content-compression-level"};
/// Enable gzip compression (default true).
static constexpr std::string_view kHttpServerEnableGzipCompression{
"http-server.http2.enable-gzip-compression"};
/// List of comma separated ciphers the client can use.
///
/// NOTE: the client needs to have at least one cipher shared with server
Expand Down Expand Up @@ -841,13 +860,27 @@ class SystemConfig : public ConfigBase {

bool httpServerHttp2Enabled() const;

uint32_t httpServerIdleTimeoutMs() const;

uint32_t httpServerHttp2InitialReceiveWindow() const;

uint32_t httpServerHttp2ReceiveStreamWindowSize() const;

uint32_t httpServerHttp2ReceiveSessionWindowSize() const;

uint32_t httpServerIdleTimeoutMs() const;
uint32_t httpServerHttp2MaxConcurrentStreams() const;

uint32_t httpServerContentCompressionLevel() const;

uint32_t httpServerContentCompressionMinimumSize() const;

bool httpServerEnableContentCompression() const;

bool httpServerEnableZstdCompression() const;

uint32_t httpServerZstdContentCompressionLevel() const;

bool httpServerEnableGzipCompression() const;

/// A list of ciphers (comma separated) that are supported by
/// server and client. Note Java and folly::SSLContext use different names to
Expand Down
47 changes: 47 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

#include "presto_cpp/main/common/Utils.h"
#include <fmt/format.h>
#include <folly/compression/Compression.h>
#include <folly/io/Cursor.h>
#include <folly/io/IOBuf.h>
#include <sys/resource.h>
#include "velox/common/base/Exceptions.h"
#include "velox/common/process/ThreadDebugInfo.h"

namespace facebook::presto::util {
Expand Down Expand Up @@ -89,4 +92,48 @@ std::string extractMessageBody(
}
return ret;
}

std::string decompressMessageBody(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to return string here? Or IOBuf?

const std::vector<std::unique_ptr<folly::IOBuf>>& body,
const std::string& contentEncoding) {
try {
// Combine all IOBufs into a single chain
std::unique_ptr<folly::IOBuf> combined;
for (const auto& buf : body) {
if (!combined) {
combined = buf->clone();
} else {
combined->appendToChain(buf->clone());
}
}

// Determine compression codec type; Support only ZSTD for now
folly::compression::CodecType codecType;
if (contentEncoding == "zstd") {
codecType = folly::compression::CodecType::ZSTD;
} else {
VELOX_USER_FAIL("Unsupported Content-Encoding: {}", contentEncoding);
}

// Decompress the data
auto codec = folly::compression::getCodec(
codecType); // getCodec never return nullptr
auto decompressed = codec->uncompress(combined.get());

size_t decompressedSize = decompressed->computeChainDataLength();

// Convert decompressed IOBuf to string
std::string ret;
ret.resize(decompressedSize);
folly::io::Cursor cursor(decompressed.get());
cursor.pull(ret.data(), decompressedSize);

return ret;
} catch (const std::exception& e) {
VELOX_USER_FAIL(
"Failed to decompress request body with {}: {}",
contentEncoding,
e.what());
}
}
} // namespace facebook::presto::util
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ void installSignalHandler();
std::string extractMessageBody(
const std::vector<std::unique_ptr<folly::IOBuf>>& body);

/// Decompress message body based on Content-Encoding
/// Throws exception if decompression fails
std::string decompressMessageBody(
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
const std::string& contentEncoding);

inline std::string addDefaultNamespacePrefix(
const std::string& prestoDefaultNamespacePrefix,
const std::string& functionName) {
Expand Down
33 changes: 29 additions & 4 deletions presto-native-execution/presto_cpp/main/http/HttpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,10 @@ void HttpServer::start(
std::function<void(proxygen::HTTPServer* /*server*/)> onSuccess,
std::function<void(std::exception_ptr)> onError) {
proxygen::HTTPServerOptions options;
options.idleTimeout = std::chrono::milliseconds(
SystemConfig::instance()->httpServerIdleTimeoutMs());
options.enableContentCompression = false;

auto systemConfig = SystemConfig::instance();
options.idleTimeout =
std::chrono::milliseconds(systemConfig->httpServerIdleTimeoutMs());

proxygen::RequestHandlerChain handlerFactories;

Expand All @@ -290,15 +291,39 @@ void HttpServer::start(
options.handlerFactories = handlerFactories.build();

// HTTP/2 flow control window sizes (configurable)
auto systemConfig = SystemConfig::instance();
options.initialReceiveWindow =
systemConfig->httpServerHttp2InitialReceiveWindow();
options.receiveStreamWindowSize =
systemConfig->httpServerHttp2ReceiveStreamWindowSize();
options.receiveSessionWindowSize =
systemConfig->httpServerHttp2ReceiveSessionWindowSize();
options.maxConcurrentIncomingStreams =
systemConfig->httpServerHttp2MaxConcurrentStreams();
options.h2cEnabled = true;

// Enable HTTP/2 responses compression for better performance
// Supports both gzip and zstd (zstd preferred when client supports it)
options.enableContentCompression =
systemConfig->httpServerEnableContentCompression();
options.contentCompressionLevel =
systemConfig->httpServerContentCompressionLevel();
options.contentCompressionMinimumSize =
systemConfig->httpServerContentCompressionMinimumSize();
options.enableZstdCompression =
systemConfig->httpServerEnableZstdCompression();
options.zstdContentCompressionLevel =
systemConfig->httpServerZstdContentCompressionLevel();
options.enableGzipCompression =
systemConfig->httpServerEnableGzipCompression();

// CRITICAL: Add Thrift content-types for Presto task updates
// By default, proxygen only compresses text/* and some application/* types
// We need to explicitly add all Thrift variants used by Presto
options.contentCompressionTypes.insert(
"application/x-thrift"); // Standard Thrift
options.contentCompressionTypes.insert(
"application/x-thrift+binary"); // Thrift binary protocol

server_ = std::make_unique<proxygen::HTTPServer>(std::move(options));

std::vector<proxygen::HTTPServer::IPConfig> ipConfigs;
Expand Down
Loading