From 77690d6ec710ddaef434117713f0415545e6d45a Mon Sep 17 00:00:00 2001 From: "jay.narale" Date: Thu, 20 Feb 2025 17:07:24 +0000 Subject: [PATCH] [native] Support specification of node pool type in announcer --- .../presto_cpp/main/Announcer.cpp | 4 ++++ presto-native-execution/presto_cpp/main/Announcer.h | 1 + .../presto_cpp/main/PrestoServer.cpp | 2 ++ .../presto_cpp/main/PrestoServer.h | 1 + .../presto_cpp/main/common/Configs.cpp | 12 ++++++++++++ .../presto_cpp/main/common/Configs.h | 5 +++++ .../presto_cpp/main/tests/AnnouncerTest.cpp | 1 + 7 files changed, 26 insertions(+) diff --git a/presto-native-execution/presto_cpp/main/Announcer.cpp b/presto-native-execution/presto_cpp/main/Announcer.cpp index a218c68d8c845..0c9433ac335e0 100644 --- a/presto-native-execution/presto_cpp/main/Announcer.cpp +++ b/presto-native-execution/presto_cpp/main/Announcer.cpp @@ -30,6 +30,7 @@ std::string announcementBody( const std::string& nodeVersion, const std::string& environment, const std::string& nodeLocation, + const std::string& nodePoolType, const bool sidecar, const std::vector& connectorIds) { std::string id = @@ -49,6 +50,7 @@ std::string announcementBody( {"coordinator", false}, {"sidecar", sidecar}, {"connectorIds", folly::join(',', connectorIds)}, + {"pool_type", nodePoolType}, {uriScheme, fmt::format("{}://{}:{}", uriScheme, address, port)}}}}}}}; return body.dump(); @@ -81,6 +83,7 @@ Announcer::Announcer( const std::string& environment, const std::string& nodeId, const std::string& nodeLocation, + const std::string& nodePoolType, const bool sidecar, const std::vector& connectorIds, const uint64_t maxFrequencyMs, @@ -99,6 +102,7 @@ Announcer::Announcer( nodeVersion, environment, nodeLocation, + nodePoolType, sidecar, connectorIds)), announcementRequest_( diff --git a/presto-native-execution/presto_cpp/main/Announcer.h b/presto-native-execution/presto_cpp/main/Announcer.h index 2b35f6e69de14..0fca926fde254 100644 --- a/presto-native-execution/presto_cpp/main/Announcer.h +++ b/presto-native-execution/presto_cpp/main/Announcer.h @@ -31,6 +31,7 @@ class Announcer : public PeriodicServiceInventoryManager { const std::string& environment, const std::string& nodeId, const std::string& nodeLocation, + const std::string& nodePoolType, const bool sidecar, const std::vector& connectorIds, const uint64_t maxFrequencyMs_, diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 0a422c66e16bd..edbc11aadda1d 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -241,6 +241,7 @@ void PrestoServer::run() { address_ = fmt::format("[{}]", address_); } nodeLocation_ = nodeConfig->nodeLocation(); + nodePoolType_ = systemConfig->poolType(); prestoBuiltinFunctionPrefix_ = systemConfig->prestoDefaultNamespacePrefix(); } catch (const velox::VeloxUserError& e) { PRESTO_STARTUP_LOG(ERROR) << "Failed to start server due to " << e.what(); @@ -576,6 +577,7 @@ void PrestoServer::run() { environment_, nodeId_, nodeLocation_, + nodePoolType_, systemConfig->prestoNativeSidecar(), catalogNames, systemConfig->announcementMaxFrequencyMs(), diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index 9fa1301c1bf1d..cf346777d5b4f 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -290,6 +290,7 @@ class PrestoServer { std::string nodeId_; std::string address_; std::string nodeLocation_; + std::string nodePoolType_; folly::SSLContextPtr sslContext_; std::string prestoBuiltinFunctionPrefix_; }; diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 2ef3d8253ab3d..303a368e89eda 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -240,6 +240,7 @@ SystemConfig::SystemConfig() { BOOL_PROP(kEnableRuntimeMetricsCollection, false), BOOL_PROP(kPlanValidatorFailOnNestedLoopJoin, false), STR_PROP(kPrestoDefaultNamespacePrefix, "presto.default"), + STR_PROP(kPoolType, "DEFAULT"), }; } @@ -290,6 +291,17 @@ std::string SystemConfig::prestoVersion() const { return requiredProperty(std::string(kPrestoVersion)); } +std::string SystemConfig::poolType() const { + static const std::unordered_set kPoolTypes = {"LEAF", "INTERMEDIATE", "DEFAULT"}; + static constexpr std::string_view kPoolTypeDefault = "DEFAULT"; + auto value = optionalProperty(kPoolType).value_or(std::string(kPoolTypeDefault)); + VELOX_USER_CHECK( + kPoolTypes.count(value), + "{} must be one of 'LEAF', 'INTERMEDIATE', or 'DEFAULT'", + kPoolType); + return value; +} + bool SystemConfig::mutableConfig() const { return optionalProperty(kMutableConfig).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 5969006e62575..0a053a9409238 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -659,6 +659,9 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kPrestoDefaultNamespacePrefix{ "presto.default-namespace"}; + // Specifies the type of worker pool + static constexpr std::string_view kPoolType{"pool-type"}; + SystemConfig(); virtual ~SystemConfig() = default; @@ -898,6 +901,8 @@ class SystemConfig : public ConfigBase { bool prestoNativeSidecar() const; std::string prestoDefaultNamespacePrefix() const; + + std::string poolType() const; }; /// Provides access to node properties defined in node.properties file. diff --git a/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp b/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp index 4b5881645f055..8f2640d1e8d75 100644 --- a/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/AnnouncerTest.cpp @@ -173,6 +173,7 @@ TEST_P(AnnouncerTestSuite, basic) { "testing", "test-node", "test-node-location", + "DEFAULT", true, {"hive", "tpch"}, 500 /*milliseconds*/,