Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "presto_cpp/main/PeriodicServiceInventoryManager.h"
#include <folly/futures/Retrying.h>
#include <velox/common/memory/Memory.h>
#include "presto_cpp/main/common/Configs.h"

namespace facebook::presto {
PeriodicServiceInventoryManager::PeriodicServiceInventoryManager(
Expand Down Expand Up @@ -80,6 +81,8 @@ void PeriodicServiceInventoryManager::sendRequest() {
LOG(INFO) << "Service Inventory changed to " << newAddress.getAddressStr()
<< ":" << newAddress.getPort();
std::swap(serviceAddress_, newAddress);
auto systemConfig = SystemConfig::instance();
auto httpClientOptions = systemConfig->httpClientOptions();
client_ = std::make_shared<http::HttpClient>(
eventBaseThread_.getEventBase(),
nullptr,
Expand All @@ -91,7 +94,8 @@ void PeriodicServiceInventoryManager::sendRequest() {
std::chrono::milliseconds(10'000),
std::chrono::milliseconds(0),
pool_,
sslContext_);
sslContext_,
std::move(httpClientOptions));
}
} catch (const std::exception& ex) {
LOG(WARNING) << "Error occurred during updating service address: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ PrestoExchangeSource::PrestoExchangeSource(
VELOX_CHECK_NOT_NULL(driverExecutor_);
VELOX_CHECK_NOT_NULL(ioEventBase);
VELOX_CHECK_NOT_NULL(pool_);
auto systemConfig = SystemConfig::instance();
Comment thread
aditi-pandit marked this conversation as resolved.
auto httpClientOptions = systemConfig->httpClientOptions();
httpClient_ = std::make_shared<http::HttpClient>(
ioEventBase,
connPool,
Expand All @@ -111,7 +113,9 @@ PrestoExchangeSource::PrestoExchangeSource(
requestTimeoutMs,
connectTimeoutMs,
immediateBufferTransfer_ ? pool_ : nullptr,
sslContext_);
sslContext_,
std::move(httpClientOptions));
jwtOptions_ = systemConfig->jwtOptions();
}

void PrestoExchangeSource::close() {
Expand Down Expand Up @@ -185,7 +189,8 @@ void PrestoExchangeSource::doRequest(
} else {
method = proxygen::HTTPMethod::GET;
}
auto requestBuilder = http::RequestBuilder().method(method).url(path);
auto requestBuilder =
http::RequestBuilder().jwtOptions(jwtOptions_).method(method).url(path);

velox::common::testutil::TestValue::adjust(
"facebook::presto::PrestoExchangeSource::doRequest", this);
Expand Down Expand Up @@ -468,6 +473,7 @@ void PrestoExchangeSource::acknowledgeResults(int64_t ackSequence) {
auto ackPath = fmt::format("{}/{}/acknowledge", basePath_, ackSequence);
VLOG(1) << "Sending ack " << ackPath;
http::RequestBuilder()
.jwtOptions(jwtOptions_)
.method(proxygen::HTTPMethod::GET)
.url(ackPath)
.send(httpClient_.get())
Expand Down Expand Up @@ -512,6 +518,7 @@ void PrestoExchangeSource::abortResults() {

void PrestoExchangeSource::doAbortResults(int64_t delayMs) {
http::RequestBuilder()
.jwtOptions(jwtOptions_)
.method(proxygen::HTTPMethod::DELETE)
.url(basePath_)
.send(httpClient_.get(), "", delayMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
folly::CPUThreadPoolExecutor* const driverExecutor_;

std::shared_ptr<http::HttpClient> httpClient_;
http::JwtOptions jwtOptions_;
RetryState dataRequestRetryState_;
RetryState abortRetryState_;
int failedAttempts_;
Expand Down
65 changes: 35 additions & 30 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,37 +684,42 @@ void PrestoServer::run() {

// Start everything. After the return from the following call we are shutting
// down.
httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) {
const auto addresses = server->addresses();
for (auto address : addresses) {
PRESTO_STARTUP_LOG(INFO) << fmt::format(
"Server listening at {}:{} - https {}",
address.address.getIPAddress().str(),
address.address.getPort(),
address.sslConfigs.size() != 0);
// We could be bound to both http and https ports.
// If set, we must use the https port and skip http.
if (httpsPort.has_value() && address.sslConfigs.size() == 0) {
continue;
}
startAnnouncerAndHeartbeatManagerCb(
httpsPort.has_value(), address.address.getPort());
setTaskUriCb(httpsPort.has_value(), address.address.getPort());
break;
}
auto startupOptions = systemConfig->httpServerStartupOptions();
httpServer_->start(
std::move(startupOptions),
getHttpServerFilters(),
[&](proxygen::HTTPServer* server) {
const auto addresses = server->addresses();
for (auto address : addresses) {
PRESTO_STARTUP_LOG(INFO) << fmt::format(
"Server listening at {}:{} - https {}",
address.address.getIPAddress().str(),
address.address.getPort(),
address.sslConfigs.size() != 0);
// We could be bound to both http and https ports.
// If set, we must use the https port and skip http.
if (httpsPort.has_value() && address.sslConfigs.size() == 0) {
continue;
}
startAnnouncerAndHeartbeatManagerCb(
httpsPort.has_value(), address.address.getPort());
setTaskUriCb(httpsPort.has_value(), address.address.getPort());
break;
}

if (coordinatorDiscoverer_ != nullptr) {
VELOX_CHECK_NOT_NULL(
announcer_,
"The announcer is expected to have been created but wasn't.");
const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
VELOX_CHECK_NOT_NULL(
heartbeatManager_,
"The heartbeat manager is expected to have been created but wasn't.");
}
}
});
if (coordinatorDiscoverer_ != nullptr) {
VELOX_CHECK_NOT_NULL(
announcer_,
"The announcer is expected to have been created but wasn't.");
const auto heartbeatFrequencyMs =
systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
VELOX_CHECK_NOT_NULL(
heartbeatManager_,
"The heartbeat manager is expected to have been created but wasn't.");
}
}
});

if (announcer_ != nullptr) {
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer";
Expand Down
44 changes: 44 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,25 @@ bool SystemConfig::httpServerEnableGzipCompression() const {
return optionalProperty<bool>(kHttpServerEnableGzipCompression).value();
}

http::HttpServerStartupOptions SystemConfig::httpServerStartupOptions() const {
http::HttpServerStartupOptions options;
options.idleTimeoutMs = httpServerIdleTimeoutMs();
options.http2InitialReceiveWindow = httpServerHttp2InitialReceiveWindow();
options.http2ReceiveStreamWindowSize =
httpServerHttp2ReceiveStreamWindowSize();
options.http2ReceiveSessionWindowSize =
httpServerHttp2ReceiveSessionWindowSize();
options.http2MaxConcurrentStreams = httpServerHttp2MaxConcurrentStreams();
options.enableContentCompression = httpServerEnableContentCompression();
options.contentCompressionLevel = httpServerContentCompressionLevel();
options.contentCompressionMinimumSize =
httpServerContentCompressionMinimumSize();
options.enableZstdCompression = httpServerEnableZstdCompression();
options.zstdContentCompressionLevel = httpServerZstdContentCompressionLevel();
options.enableGzipCompression = httpServerEnableGzipCompression();
return options;
}

std::string SystemConfig::httpsSupportedCiphers() const {
return optionalProperty(kHttpsSupportedCiphers).value();
}
Expand Down Expand Up @@ -963,6 +982,20 @@ bool SystemConfig::httpClientConnectionReuseCounterEnabled() const {
.value();
}

http::HttpClientOptions SystemConfig::httpClientOptions() const {
http::HttpClientOptions options;
options.http2Enabled = httpClientHttp2Enabled();
options.http2MaxStreamsPerConnection =
httpClientHttp2MaxStreamsPerConnection();
options.http2InitialStreamWindow = httpClientHttp2InitialStreamWindow();
options.http2StreamWindow = httpClientHttp2StreamWindow();
options.http2SessionWindow = httpClientHttp2SessionWindow();
options.maxAllocateBytes = httpMaxAllocateBytes();
options.connectionReuseCounterEnabled =
httpClientConnectionReuseCounterEnabled();
return options;
}

std::chrono::duration<double> SystemConfig::exchangeMaxErrorDuration() const {
return velox::config::toDuration(
optionalProperty(kExchangeMaxErrorDuration).value());
Expand Down Expand Up @@ -1027,6 +1060,17 @@ int32_t SystemConfig::internalCommunicationJwtExpirationSeconds() const {
.value();
}

http::JwtOptions SystemConfig::jwtOptions() const {
http::JwtOptions options;
options.jwtEnabled = internalCommunicationJwtEnabled();
if (options.jwtEnabled) {
options.sharedSecret = internalCommunicationSharedSecret();
options.jwtExpirationSeconds = internalCommunicationJwtExpirationSeconds();
options.nodeId = NodeConfig::instance()->nodeId();
}
return options;
}

bool SystemConfig::useLegacyArrayAgg() const {
return optionalProperty<bool>(kUseLegacyArrayAgg).value();
}
Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include <memory>
#include <string>
#include <unordered_map>
#include "presto_cpp/main/http/HttpClientOptions.h" // @manual
#include "presto_cpp/main/http/HttpServerStartupOptions.h" // @manual
#include "presto_cpp/main/http/JwtOptions.h" // @manual
#include "velox/common/config/Config.h"

namespace facebook::presto {
Expand Down Expand Up @@ -936,6 +939,8 @@ class SystemConfig : public ConfigBase {

bool httpServerEnableGzipCompression() const;

http::HttpServerStartupOptions httpServerStartupOptions() const;

/// A list of ciphers (comma separated) that are supported by
/// server and client. Note Java and folly::SSLContext use different names to
/// refer to the same cipher. For e.g. TLS_RSA_WITH_AES_256_GCM_SHA384 in Java
Expand Down Expand Up @@ -1168,6 +1173,8 @@ class SystemConfig : public ConfigBase {

bool httpClientConnectionReuseCounterEnabled() const;

http::HttpClientOptions httpClientOptions() const;

std::chrono::duration<double> exchangeMaxErrorDuration() const;

std::chrono::duration<double> exchangeRequestTimeoutMs() const;
Expand Down Expand Up @@ -1196,6 +1203,8 @@ class SystemConfig : public ConfigBase {

int32_t internalCommunicationJwtExpirationSeconds() const;

http::JwtOptions jwtOptions() const;

bool useLegacyArrayAgg() const;

bool cacheVeloxTtlEnabled() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <folly/Uri.h>
#include <proxygen/lib/http/HTTPMessage.h>

#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/functions/remote/utils/ContentTypes.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/memory/Memory.h"
Expand All @@ -39,6 +40,8 @@ RestRemoteClient::RestRemoteClient(const std::string& url) : url_(url) {
folly::SocketAddress addr(uri.host().c_str(), uri.port(), true);

evbThread_ = std::make_unique<folly::ScopedEventBaseThread>("rest-client");
auto systemConfig = SystemConfig::instance();
auto httpClientOptions = systemConfig->httpClientOptions();
httpClient_ = std::make_shared<http::HttpClient>(
evbThread_->getEventBase(),
nullptr,
Expand All @@ -47,7 +50,8 @@ RestRemoteClient::RestRemoteClient(const std::string& url) : url_(url) {
requestTimeoutMs,
connectTimeoutMs,
memPool_,
nullptr);
nullptr,
std::move(httpClientOptions));
}

RestRemoteClient::~RestRemoteClient() {
Expand Down
44 changes: 14 additions & 30 deletions presto-native-execution/presto_cpp/main/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <folly/synchronization/Latch.h>
#include <proxygen/lib/http/codec/CodecProtocol.h>
#include <velox/common/base/Exceptions.h>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/HttpClient.h"
Expand All @@ -37,27 +36,18 @@ HttpClient::HttpClient(
std::chrono::milliseconds connectTimeout,
std::shared_ptr<velox::memory::MemoryPool> pool,
folly::SSLContextPtr sslContext,
HttpClientOptions options,
std::function<void(int)>&& reportOnBodyStatsFunc)
: eventBase_(eventBase),
connPool_(connPool),
endpoint_(endpoint),
address_(address),
transactionTimeout_(transactionTimeout),
connectTimeout_(connectTimeout),
http2Enabled_(SystemConfig::instance()->httpClientHttp2Enabled()),
maxConcurrentStreams_(
SystemConfig::instance()->httpClientHttp2MaxStreamsPerConnection()),
http2InitialStreamWindow_(
SystemConfig::instance()->httpClientHttp2InitialStreamWindow()),
http2StreamWindow_(
SystemConfig::instance()->httpClientHttp2StreamWindow()),
http2SessionWindow_(
SystemConfig::instance()->httpClientHttp2SessionWindow()),
options_(std::move(options)),
pool_(std::move(pool)),
sslContext_(std::move(sslContext)),
reportOnBodyStatsFunc_(std::move(reportOnBodyStatsFunc)),
maxResponseAllocBytes_(SystemConfig::instance()->httpMaxAllocateBytes()) {
}
reportOnBodyStatsFunc_(std::move(reportOnBodyStatsFunc)) {}

HttpClient::~HttpClient() {
if (sessionPoolHolder_) {
Expand Down Expand Up @@ -218,7 +208,7 @@ class ResponseHandler : public proxygen::HTTPTransactionHandler {
// - seqNo == 0: First request on this connection
// - seqNo > 0: Connection is being reused for subsequent requests
// Reuse rate = connection_reuse / (connection_first_use + connection_reuse)
if (SystemConfig::instance()->httpClientConnectionReuseCounterEnabled()) {
if (client_->connectionReuseCounterEnabled()) {
const uint32_t seqNo = txn_->getSequenceNumber();
if (seqNo > 0) {
RECORD_METRIC_VALUE(kCounterHttpClientConnectionReuse);
Expand Down Expand Up @@ -567,11 +557,11 @@ void HttpClient::sendRequest(std::shared_ptr<ResponseHandler> responseHandler) {
sessionPool_,
proxygen::WheelTimerInstance(transactionTimeout_, eventBase_),
connectTimeout_,
http2Enabled_,
maxConcurrentStreams_,
http2InitialStreamWindow_,
http2StreamWindow_,
http2SessionWindow_,
options_.http2Enabled,
options_.http2MaxStreamsPerConnection,
options_.http2InitialStreamWindow,
options_.http2StreamWindow,
options_.http2SessionWindow,
eventBase_,
address_,
sslContext_);
Expand All @@ -592,7 +582,7 @@ folly::SemiFuture<std::unique_ptr<HttpResponse>> HttpClient::sendRequest(
request.ensureHostHeader();
auto responseHandler = std::make_shared<ResponseHandler>(
request,
maxResponseAllocBytes_,
options_.maxAllocateBytes,
body,
reportOnBodyStatsFunc_,
shared_from_this());
Expand All @@ -618,26 +608,20 @@ folly::SemiFuture<std::unique_ptr<HttpResponse>> HttpClient::sendRequest(

void RequestBuilder::addJwtIfConfigured() {
#ifdef PRESTO_ENABLE_JWT
if (SystemConfig::instance()->internalCommunicationJwtEnabled()) {
if (jwtOptions_.jwtEnabled) {
// If JWT was enabled the secret cannot be empty.
auto secretHash = std::vector<uint8_t>(SHA256_DIGEST_LENGTH);
folly::ssl::OpenSSLHash::sha256(
folly::range(secretHash),
folly::ByteRange(
folly::StringPiece(
SystemConfig::instance()
->internalCommunicationSharedSecret())));
folly::ByteRange(folly::StringPiece(jwtOptions_.sharedSecret)));

const auto time = std::chrono::system_clock::now();
const auto token =
jwt::create<jwt::traits::nlohmann_json>()
.set_subject(NodeConfig::instance()->nodeId())
.set_subject(jwtOptions_.nodeId)
.set_issued_at(time)
.set_expires_at(
time +
std::chrono::seconds{
SystemConfig::instance()
->internalCommunicationJwtExpirationSeconds()})
time + std::chrono::seconds{jwtOptions_.jwtExpirationSeconds})
.sign(
jwt::algorithm::hs256{std::string(
reinterpret_cast<char*>(secretHash.data()),
Expand Down
Loading
Loading