From 00506f1e04bf5f30a7206f4c6d7dabcea4e9cc7e Mon Sep 17 00:00:00 2001 From: Amit Dutta Date: Fri, 16 Jun 2023 17:41:04 -0700 Subject: [PATCH] [native] Randomize retry time with jitter after worker announcement failure. --- .../presto_cpp/main/Announcer.cpp | 38 ++++++++++++++++--- .../presto_cpp/main/Announcer.h | 16 ++++++-- .../presto_cpp/main/PrestoServer.cpp | 3 +- .../presto_cpp/main/common/Configs.cpp | 10 +++++ .../presto_cpp/main/common/Configs.h | 10 +++++ .../presto_cpp/main/tests/AnnouncerTest.cpp | 1 + 6 files changed, 68 insertions(+), 10 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/Announcer.cpp b/presto-native-execution/presto_cpp/main/Announcer.cpp index d7a716e7c026f..185eedcd85e90 100644 --- a/presto-native-execution/presto_cpp/main/Announcer.cpp +++ b/presto-native-execution/presto_cpp/main/Announcer.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include "presto_cpp/external/json/json.hpp" #include "presto_cpp/main/http/HttpClient.h" @@ -80,11 +82,13 @@ Announcer::Announcer( const std::string& nodeId, const std::string& nodeLocation, const std::vector& connectorIds, - uint64_t frequencyMs, + const uint64_t minFrequencyMs, + const uint64_t maxFrequencyMs, const std::string& clientCertAndKeyPath, const std::string& ciphers) : coordinatorDiscoverer_(coordinatorDiscoverer), - frequencyMs_(frequencyMs), + minFrequencyMs_(minFrequencyMs), + maxFrequencyMs_(maxFrequencyMs), announcementBody_(announcementBody( address, useHttps, @@ -142,34 +146,58 @@ void Announcer::makeAnnouncement() { client_->sendRequest(announcementRequest_, announcementBody_) .via(eventBaseThread_.getEventBase()) - .thenValue([](auto response) { + .thenValue([this](auto response) { auto message = response->headers(); if (message->getStatusCode() != http::kHttpAccepted) { + ++failedAttempts_; LOG(WARNING) << "Announcement failed: HTTP " << message->getStatusCode() << " - " << response->dumpBodyChain(); } else if (response->hasError()) { + ++failedAttempts_; LOG(ERROR) << "Announcement failed: " << response->error(); } else { + failedAttempts_ = 0; LOG(INFO) << "Announcement succeeded: " << message->getStatusCode(); } }) .thenError( folly::tag_t{}, - [](const std::exception& e) { + [this](const std::exception& e) { + ++failedAttempts_; LOG(WARNING) << "Announcement failed: " << e.what(); }) .thenTry([this](auto /*unused*/) { scheduleNext(); }); } +uint64_t Announcer::getAnnouncementDelay() const { + if (failedAttempts_ > 0) { + // For announcement failure cases, execute exponential back off to ping + // coordinator with max back off time cap at 'maxFrequencyMs_'. + auto rng = folly::ThreadLocalPRNG(); + return folly::futures::detail::retryingJitteredExponentialBackoffDur( + failedAttempts_, + std::chrono::milliseconds(minFrequencyMs_), + std::chrono::milliseconds(maxFrequencyMs_), + backOffjitterParam_, + rng) + .count(); + } + + // Adds some jitter for successful cases so that all workers does not ping + // coordinator at the same time + return maxFrequencyMs_ + folly::Random::rand32(2000) - 1000; +} + void Announcer::scheduleNext() { if (stopped_) { return; } + eventBaseThread_.getEventBase()->scheduleAt( [this]() { return makeAnnouncement(); }, std::chrono::steady_clock::now() + - std::chrono::milliseconds(frequencyMs_)); + std::chrono::milliseconds(getAnnouncementDelay())); } } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/Announcer.h b/presto-native-execution/presto_cpp/main/Announcer.h index 5a1a41ab01a5b..462b51130ab1e 100644 --- a/presto-native-execution/presto_cpp/main/Announcer.h +++ b/presto-native-execution/presto_cpp/main/Announcer.h @@ -31,7 +31,8 @@ class Announcer { const std::string& nodeId, const std::string& nodeLocation, const std::vector& connectorIds, - uint64_t frequencyMs, + const uint64_t minFrequencyMs, + const uint64_t maxFrequencyMs_, const std::string& clientCertAndKeyPath = "", const std::string& ciphers = ""); @@ -44,19 +45,26 @@ class Announcer { private: void makeAnnouncement(); + uint64_t getAnnouncementDelay() const; + void scheduleNext(); const std::shared_ptr coordinatorDiscoverer_; - const uint64_t frequencyMs_; + const uint64_t minFrequencyMs_; + const uint64_t maxFrequencyMs_; const std::string announcementBody_; const proxygen::HTTPMessage announcementRequest_; const std::shared_ptr pool_; + const std::string clientCertAndKeyPath_; + const std::string ciphers_; + /// jitter value for backoff delay time in case of announcment failure + const double backOffjitterParam_{0.1}; + folly::SocketAddress address_; std::shared_ptr client_; std::atomic_bool stopped_{true}; folly::EventBaseThread eventBaseThread_; - const std::string clientCertAndKeyPath_; - const std::string ciphers_; + uint64_t failedAttempts_{0}; }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 339b2dee38eda..c0d8c9b4d9a36 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -227,7 +227,8 @@ void PrestoServer::run() { nodeId_, nodeLocation_, catalogNames, - 30'000 /*milliseconds*/, + systemConfig->announcementMinFrequencyMs(), + systemConfig->announcementMaxFrequencyMs(), clientCertAndKeyPath, ciphers); announcer_->start(); diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index f3ac32de4781a..bcf834013dc4c 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -240,6 +240,8 @@ SystemConfig::SystemConfig() { STR_PROP(kEnableMemoryLeakCheck, "true"), NONE_PROP(kRemoteFunctionServerThriftPort), STR_PROP(kSkipRuntimeStatsInRunningTaskInfo, "true"), + NUM_PROP(kAnnouncementMinFrequencyMs, 100), // 100ms + NUM_PROP(kAnnouncementMaxFrequencyMs, 35'000), // 35s }; } @@ -424,6 +426,14 @@ bool SystemConfig::skipRuntimeStatsInRunningTaskInfo() const { return optionalProperty(kSkipRuntimeStatsInRunningTaskInfo).value(); } +uint64_t SystemConfig::announcementMinFrequencyMs() const { + return optionalProperty(kAnnouncementMinFrequencyMs).value(); +} + +uint64_t SystemConfig::announcementMaxFrequencyMs() const { + return optionalProperty(kAnnouncementMaxFrequencyMs).value(); +} + NodeConfig::NodeConfig() { registeredProps_ = std::unordered_map>{ diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 0590c3b670e79..6c450046b2ed2 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -235,6 +235,12 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kSkipRuntimeStatsInRunningTaskInfo{ "skip-runtime-stats-in-running-task-info"}; + static constexpr std::string_view kAnnouncementMinFrequencyMs{ + "announcement-min-frequency-ms"}; + + static constexpr std::string_view kAnnouncementMaxFrequencyMs{ + "announcement-max-frequency-ms"}; + SystemConfig(); static SystemConfig* instance(); @@ -339,6 +345,10 @@ class SystemConfig : public ConfigBase { bool enableMemoryLeakCheck() const; bool skipRuntimeStatsInRunningTaskInfo() const; + + uint64_t announcementMinFrequencyMs() const; + + uint64_t announcementMaxFrequencyMs() const; }; /// Provides access to node properties defined in node.properties file. diff --git a/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp b/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp index 72121087299d6..83a612009a539 100644 --- a/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp @@ -166,6 +166,7 @@ TEST_P(AnnouncerTestSuite, basic) { "test-node-location", {"hive", "tpch"}, 100 /*milliseconds*/, + 500 /*milliseconds*/, keyPath, ciphers);