Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions presto-native-execution/presto_cpp/main/Announcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <folly/Random.h>
#include <folly/futures/Retrying.h>
#include <velox/common/memory/Memory.h>
#include "presto_cpp/external/json/json.hpp"
#include "presto_cpp/main/http/HttpClient.h"
Expand Down Expand Up @@ -80,11 +82,13 @@ Announcer::Announcer(
const std::string& nodeId,
const std::string& nodeLocation,
const std::vector<std::string>& connectorIds,
uint64_t frequencyMs,
const uint64_t minFrequencyMs,
const uint64_t maxFrequencyMs,
const std::string& clientCertAndKeyPath,
const std::string& ciphers)
: coordinatorDiscoverer_(coordinatorDiscoverer),
frequencyMs_(frequencyMs),
minFrequencyMs_(minFrequencyMs),
maxFrequencyMs_(maxFrequencyMs),
announcementBody_(announcementBody(
address,
useHttps,
Expand Down Expand Up @@ -142,34 +146,58 @@ void Announcer::makeAnnouncement() {

client_->sendRequest(announcementRequest_, announcementBody_)
.via(eventBaseThread_.getEventBase())
.thenValue([](auto response) {
.thenValue([this](auto response) {
auto message = response->headers();
if (message->getStatusCode() != http::kHttpAccepted) {
++failedAttempts_;
LOG(WARNING) << "Announcement failed: HTTP "
<< message->getStatusCode() << " - "
<< response->dumpBodyChain();
} else if (response->hasError()) {
++failedAttempts_;
LOG(ERROR) << "Announcement failed: " << response->error();
} else {
failedAttempts_ = 0;
LOG(INFO) << "Announcement succeeded: " << message->getStatusCode();
}
})
.thenError(
folly::tag_t<std::exception>{},
[](const std::exception& e) {
[this](const std::exception& e) {
++failedAttempts_;
LOG(WARNING) << "Announcement failed: " << e.what();
})
.thenTry([this](auto /*unused*/) { scheduleNext(); });
}

uint64_t Announcer::getAnnouncementDelay() const {
if (failedAttempts_ > 0) {
// For announcement failure cases, execute exponential back off to ping
// coordinator with max back off time cap at 'maxFrequencyMs_'.
auto rng = folly::ThreadLocalPRNG();
return folly::futures::detail::retryingJitteredExponentialBackoffDur(
failedAttempts_,
std::chrono::milliseconds(minFrequencyMs_),
std::chrono::milliseconds(maxFrequencyMs_),
backOffjitterParam_,
rng)
.count();
}

// Adds some jitter for successful cases so that all workers does not ping
// coordinator at the same time
return maxFrequencyMs_ + folly::Random::rand32(2000) - 1000;
}

void Announcer::scheduleNext() {
if (stopped_) {
return;
}

eventBaseThread_.getEventBase()->scheduleAt(
[this]() { return makeAnnouncement(); },
std::chrono::steady_clock::now() +
std::chrono::milliseconds(frequencyMs_));
std::chrono::milliseconds(getAnnouncementDelay()));
}

} // namespace facebook::presto
16 changes: 12 additions & 4 deletions presto-native-execution/presto_cpp/main/Announcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class Announcer {
const std::string& nodeId,
const std::string& nodeLocation,
const std::vector<std::string>& connectorIds,
uint64_t frequencyMs,
const uint64_t minFrequencyMs,
const uint64_t maxFrequencyMs_,
const std::string& clientCertAndKeyPath = "",
const std::string& ciphers = "");

Expand All @@ -44,19 +45,26 @@ class Announcer {
private:
void makeAnnouncement();

uint64_t getAnnouncementDelay() const;

void scheduleNext();

const std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
const uint64_t frequencyMs_;
const uint64_t minFrequencyMs_;
const uint64_t maxFrequencyMs_;
const std::string announcementBody_;
const proxygen::HTTPMessage announcementRequest_;
const std::shared_ptr<velox::memory::MemoryPool> pool_;
const std::string clientCertAndKeyPath_;
const std::string ciphers_;
/// jitter value for backoff delay time in case of announcment failure
const double backOffjitterParam_{0.1};

folly::SocketAddress address_;
std::shared_ptr<http::HttpClient> client_;
std::atomic_bool stopped_{true};
folly::EventBaseThread eventBaseThread_;
const std::string clientCertAndKeyPath_;
const std::string ciphers_;
uint64_t failedAttempts_{0};
};

} // namespace facebook::presto
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ void PrestoServer::run() {
nodeId_,
nodeLocation_,
catalogNames,
30'000 /*milliseconds*/,
systemConfig->announcementMinFrequencyMs(),
systemConfig->announcementMaxFrequencyMs(),
clientCertAndKeyPath,
ciphers);
announcer_->start();
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ SystemConfig::SystemConfig() {
STR_PROP(kEnableMemoryLeakCheck, "true"),
NONE_PROP(kRemoteFunctionServerThriftPort),
STR_PROP(kSkipRuntimeStatsInRunningTaskInfo, "true"),
NUM_PROP(kAnnouncementMinFrequencyMs, 100), // 100ms
NUM_PROP(kAnnouncementMaxFrequencyMs, 35'000), // 35s
};
}

Expand Down Expand Up @@ -424,6 +426,14 @@ bool SystemConfig::skipRuntimeStatsInRunningTaskInfo() const {
return optionalProperty<bool>(kSkipRuntimeStatsInRunningTaskInfo).value();
}

uint64_t SystemConfig::announcementMinFrequencyMs() const {
return optionalProperty<uint64_t>(kAnnouncementMinFrequencyMs).value();
}

uint64_t SystemConfig::announcementMaxFrequencyMs() const {
return optionalProperty<uint64_t>(kAnnouncementMaxFrequencyMs).value();
}

NodeConfig::NodeConfig() {
registeredProps_ =
std::unordered_map<std::string, folly::Optional<std::string>>{
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kSkipRuntimeStatsInRunningTaskInfo{
"skip-runtime-stats-in-running-task-info"};

static constexpr std::string_view kAnnouncementMinFrequencyMs{
"announcement-min-frequency-ms"};

static constexpr std::string_view kAnnouncementMaxFrequencyMs{
"announcement-max-frequency-ms"};

SystemConfig();

static SystemConfig* instance();
Expand Down Expand Up @@ -339,6 +345,10 @@ class SystemConfig : public ConfigBase {
bool enableMemoryLeakCheck() const;

bool skipRuntimeStatsInRunningTaskInfo() const;

uint64_t announcementMinFrequencyMs() const;

uint64_t announcementMaxFrequencyMs() const;
};

/// Provides access to node properties defined in node.properties file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ TEST_P(AnnouncerTestSuite, basic) {
"test-node-location",
{"hive", "tpch"},
100 /*milliseconds*/,
500 /*milliseconds*/,
keyPath,
ciphers);

Expand Down