diff --git a/presto-native-execution/presto_cpp/main/Announcer.cpp b/presto-native-execution/presto_cpp/main/Announcer.cpp index d7a716e7c026f..b24e402f2e17f 100644 --- a/presto-native-execution/presto_cpp/main/Announcer.cpp +++ b/presto-native-execution/presto_cpp/main/Announcer.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include "presto_cpp/external/json/json.hpp" #include "presto_cpp/main/http/HttpClient.h" @@ -80,11 +82,13 @@ Announcer::Announcer( const std::string& nodeId, const std::string& nodeLocation, const std::vector& connectorIds, - uint64_t frequencyMs, + const uint64_t minFrequencyMs, + const uint64_t maxFrequencyMs, const std::string& clientCertAndKeyPath, const std::string& ciphers) : coordinatorDiscoverer_(coordinatorDiscoverer), - frequencyMs_(frequencyMs), + minFrequencyMs_(minFrequencyMs), + maxFrequencyMs_(maxFrequencyMs), announcementBody_(announcementBody( address, useHttps, @@ -142,26 +146,49 @@ void Announcer::makeAnnouncement() { client_->sendRequest(announcementRequest_, announcementBody_) .via(eventBaseThread_.getEventBase()) - .thenValue([](auto response) { + .thenValue([this](auto response) { auto message = response->headers(); if (message->getStatusCode() != http::kHttpAccepted) { + ++failedAttempts_; LOG(WARNING) << "Announcement failed: HTTP " << message->getStatusCode() << " - " << response->dumpBodyChain(); } else if (response->hasError()) { + ++failedAttempts_; LOG(ERROR) << "Announcement failed: " << response->error(); } else { + failedAttempts_ = 0; LOG(INFO) << "Announcement succeeded: " << message->getStatusCode(); } }) .thenError( folly::tag_t{}, - [](const std::exception& e) { + [this](const std::exception& e) { + ++failedAttempts_; LOG(WARNING) << "Announcement failed: " << e.what(); }) .thenTry([this](auto /*unused*/) { scheduleNext(); }); } +uint64_t Announcer::getAnnouncementDelay() const { + if (failedAttempts_ > 0) { + // For announcement failure cases, execute exponential back off to ping + // coordinator with max back off time cap at 'maxFrequencyMs_'. + auto rng = folly::ThreadLocalPRNG(); + return folly::futures::detail::retryingJitteredExponentialBackoffDur( + failedAttempts_, + std::chrono::milliseconds(minFrequencyMs_), + std::chrono::milliseconds(maxFrequencyMs_), + backOffjitterParam_, + rng) + .count(); + } + + // Adds some jitter for successful cases so that all workers does not ping + // coordinator at the same time + return maxFrequencyMs_ + folly::Random::rand32(2000) - 1000; +} + void Announcer::scheduleNext() { if (stopped_) { return; @@ -169,7 +196,7 @@ void Announcer::scheduleNext() { eventBaseThread_.getEventBase()->scheduleAt( [this]() { return makeAnnouncement(); }, std::chrono::steady_clock::now() + - std::chrono::milliseconds(frequencyMs_)); + std::chrono::milliseconds(getAnnouncementDelay())); } } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/Announcer.h b/presto-native-execution/presto_cpp/main/Announcer.h index 5a1a41ab01a5b..167bb48ff292a 100644 --- a/presto-native-execution/presto_cpp/main/Announcer.h +++ b/presto-native-execution/presto_cpp/main/Announcer.h @@ -31,7 +31,8 @@ class Announcer { const std::string& nodeId, const std::string& nodeLocation, const std::vector& connectorIds, - uint64_t frequencyMs, + const uint64_t minFrequencyMs, + const uint64_t maxFrequencyMs_, const std::string& clientCertAndKeyPath = "", const std::string& ciphers = ""); @@ -44,19 +45,26 @@ class Announcer { private: void makeAnnouncement(); + uint64_t getAnnouncementDelay() const; + void scheduleNext(); const std::shared_ptr coordinatorDiscoverer_; - const uint64_t frequencyMs_; + const uint64_t minFrequencyMs_; + const uint64_t maxFrequencyMs_; const std::string announcementBody_; const proxygen::HTTPMessage announcementRequest_; const std::shared_ptr pool_; - folly::SocketAddress address_; - std::shared_ptr client_; - std::atomic_bool stopped_{true}; folly::EventBaseThread eventBaseThread_; const std::string clientCertAndKeyPath_; const std::string ciphers_; + /// jitter value for backoff delay time in case of announcment failure + const double backOffjitterParam_{0.1}; + + folly::SocketAddress address_; + std::shared_ptr client_; + std::atomic_bool stopped_{true}; + uint64_t failedAttempts_{0}; }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 64ddcc935ac66..999d70c786715 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -225,7 +225,8 @@ void PrestoServer::run() { nodeId_, nodeLocation_, catalogNames, - 30'000 /*milliseconds*/, + systemConfig->announcementMinFrequencyMs(), + systemConfig->announcementMaxFrequencyMs(), clientCertAndKeyPath, ciphers); announcer_->start(); diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index a161da0a3b4cc..1c5157a60ec05 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -193,6 +193,12 @@ void ConfigBase::checkRegisteredProperties( } } +static constexpr std::string_view kAnnouncementMinFrequencyMs{ + "announcement-min-frequency-ms"}; + +static constexpr std::string_view kAnnouncementMaxFrequencyMs{ + "announcement-max-frequency-ms"}; + SystemConfig::SystemConfig() { registeredProps_ = std::unordered_map>{ @@ -242,6 +248,8 @@ SystemConfig::SystemConfig() { STR_PROP(kSkipRuntimeStatsInRunningTaskInfo, "true"), STR_PROP(kLogZombieTaskInfo, "false"), NUM_PROP(kLogNumZombieTasks, 20), + NUM_PROP(kAnnouncementMinFrequencyMs, 25'000), // 25s + NUM_PROP(kAnnouncementMaxFrequencyMs, 30'000), // 35s }; } @@ -456,6 +464,14 @@ uint32_t SystemConfig::logNumZombieTasks() const { return optionalProperty(kLogNumZombieTasks).value(); } +uint64_t SystemConfig::announcementMinFrequencyMs() const { + return optionalProperty(kAnnouncementMinFrequencyMs).value(); +} + +uint64_t SystemConfig::announcementMaxFrequencyMs() const { + return optionalProperty(kAnnouncementMaxFrequencyMs).value(); +} + NodeConfig::NodeConfig() { registeredProps_ = std::unordered_map>{ diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index d2ee881648f4b..c2c7368d0b250 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -259,6 +259,12 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kLogZombieTaskInfo{"log-zombie-task-info"}; static constexpr std::string_view kLogNumZombieTasks{"log-num-zombie-tasks"}; + static constexpr std::string_view kAnnouncementMinFrequencyMs{ + "announcement-min-frequency-ms"}; + + static constexpr std::string_view kAnnouncementMaxFrequencyMs{ + "announcement-max-frequency-ms"}; + SystemConfig(); static SystemConfig* instance(); @@ -375,6 +381,10 @@ class SystemConfig : public ConfigBase { bool logZombieTaskInfo() const; uint32_t logNumZombieTasks() const; + + uint64_t announcementMinFrequencyMs() const; + + uint64_t announcementMaxFrequencyMs() const; }; /// Provides access to node properties defined in node.properties file. diff --git a/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp b/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp index 72121087299d6..83a612009a539 100644 --- a/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp @@ -166,6 +166,7 @@ TEST_P(AnnouncerTestSuite, basic) { "test-node-location", {"hive", "tpch"}, 100 /*milliseconds*/, + 500 /*milliseconds*/, keyPath, ciphers);