Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,17 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)


option(PRESTO_ENABLE_PROMETHEUS_REPORTER
"Enables capturing of runtime metrics using prometheus client" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_compile_definitions(PRESTO_ENABLE_PROMETHEUS_REPORTER)
endif()

if(PRESTO_ENABLE_S3)
set(VELOX_ENABLE_S3
ON
Expand Down Expand Up @@ -137,6 +145,10 @@ find_library(ZSTD zstd)
find_package(ZLIB)
find_library(SNAPPY snappy)

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
find_package(prometheus-cpp CONFIG REQUIRED)
endif()

find_package(folly CONFIG REQUIRED)
set(FOLLY_WITH_DEPENDENCIES
${FOLLY_LIBRARIES}
Expand All @@ -149,6 +161,13 @@ set(FOLLY_WITH_DEPENDENCIES
${ZSTD}
${ZLIB_LIBRARIES})


if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
list(APPEND FOLLY_WITH_DEPENDENCIES
prometheus-cpp::core
prometheus-cpp::pull)
endif()

find_package(BZip2 MODULE)
if(BZIP2_FOUND)
list(APPEND FOLLY_WITH_DEPENDENCIES ${BZIP2_LIBRARIES})
Expand All @@ -168,6 +187,7 @@ find_library(WANGLE wangle)

find_library(RE2 re2)


find_package(fizz CONFIG)
find_package(wangle CONFIG)
find_package(FBThrift)
Expand Down
7 changes: 7 additions & 0 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ PRESTO_ENABLE_HDFS ?= "OFF"
PRESTO_ENABLE_REMOTE_FUNCTIONS ?= "OFF"
PRESTO_ENABLE_JWT ?= "OFF"
EXTRA_CMAKE_FLAGS ?= ""
PRESTO_ENABLE_PROMETHEUS_REPORTER ?= "OFF"
# Check if PROMETHEUS_REPORTER is set to ON or OFF and set CMAKE_FLAGS accordingly



CMAKE_FLAGS := -DTREAT_WARNINGS_AS_ERRORS=${TREAT_WARNINGS_AS_ERRORS}
CMAKE_FLAGS += -DPRESTO_ENABLE_PROMETHEUS_REPORTER=$(PRESTO_ENABLE_PROMETHEUS_REPORTER)
CMAKE_FLAGS += -DENABLE_ALL_WARNINGS=${ENABLE_WALL}
CMAKE_FLAGS += -DCMAKE_PREFIX_PATH=$(CMAKE_PREFIX_PATH)
CMAKE_FLAGS += -DCMAKE_BUILD_TYPE=$(BUILD_TYPE)
Expand All @@ -54,6 +59,8 @@ CMAKE_FLAGS += -DCMAKE_CXX_COMPILER_LAUNCHER=ccache
endif
endif



all: release #: Build the release version

clean: #: Delete all build artifacts
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@
# limitations under the License.
add_subdirectory(main)
add_subdirectory(presto_protocol)

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_subdirectory(metrics)
endif()

4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ target_link_libraries(
${GFLAGS_LIBRARIES}
pthread)

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
target_link_libraries(presto_server_lib presto_metrics)
endif()

# Enabling Parquet causes build errors with missing symbols on MacOS. This is
# likely due to a conflict between Arrow Thrift from velox_hive_connector and
# FBThrift libraries. The build issue is fixed by linking velox_hive_connector
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include <glog/logging.h>
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/common/Utils.h"
#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
#include "presto_cpp/metrics/PrometheusStatsReporter.h"
#endif
#include "velox/common/base/StatsReporter.h"

DEFINE_string(etc_dir, ".", "etc directory for presto configuration");
Expand All @@ -31,7 +34,14 @@ int main(int argc, char* argv[]) {
PRESTO_SHUTDOWN_LOG(INFO) << "Exiting main()";
}

#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::presto::PrometheusStatsReporter();
});
#else
// Initialize singleton for the reporter.
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::velox::DummyStatsReporter();
});
#endif
8 changes: 8 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ PrestoServer::PrestoServer(const std::string& configDirectoryPath)

PrestoServer::~PrestoServer() {}

void registerPrometheusMetrics() {
facebook::velox::BaseStatsReporter::registered = true;
}

void PrestoServer::run() {
auto systemConfig = SystemConfig::instance();
auto nodeConfig = NodeConfig::instance();
Expand Down Expand Up @@ -226,6 +230,10 @@ void PrestoServer::run() {
PRESTO_STARTUP_LOG(ERROR) << "Failed to start server due to " << e.what();
exit(EXIT_FAILURE);
}
#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
// This flag must be set to register the counters.
registerPrometheusMetrics();
#endif

registerStatsCounters();
registerFileSinks();
Expand Down
11 changes: 11 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ NodeConfig::NodeConfig() {
NONE_PROP(kNodeIp),
NONE_PROP(kNodeInternalAddress),
NONE_PROP(kNodeLocation),
NONE_PROP(KNodeMetricPort),
};
}

Expand All @@ -635,6 +636,16 @@ std::string NodeConfig::nodeId() const {
return requiredProperty(kNodeId);
}

std::string NodeConfig::nodeMetricPort(std::string& defaultPort) const {
auto resultOpt = optionalProperty(KNodeMetricPort);
if (resultOpt.has_value()) {
return resultOpt.value();
} else {
return defaultPort;
}
return requiredProperty(KNodeMetricPort);
}

std::string NodeConfig::nodeLocation() const {
return requiredProperty(kNodeLocation);
}
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ class NodeConfig : public ConfigBase {
static constexpr std::string_view kNodeInternalAddress{
"node.internal-address"};
static constexpr std::string_view kNodeLocation{"node.location"};
static constexpr std::string_view KNodeMetricPort{"node.prometheus_port"};

NodeConfig();

Expand All @@ -703,6 +704,8 @@ class NodeConfig : public ConfigBase {

std::string nodeId() const;

std::string nodeMetricPort(std::string& defaultPort) const;

std::string nodeInternalAddress(
const std::function<std::string()>& defaultIp = nullptr) const;

Expand Down
19 changes: 19 additions & 0 deletions presto-native-execution/presto_cpp/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(presto_metrics OBJECT PrometheusStatsReporter.cpp)

target_link_libraries(presto_metrics presto_common velox_config velox_core velox_exception prometheus-cpp::core prometheus-cpp::pull)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "PrometheusStatsReporter.h"
#include "velox/common/base/StatsReporter.h"
#include "prometheus/client_metric.h"
#include "prometheus/counter.h"
#include "prometheus/summary.h"
#include "prometheus/gauge.h"


namespace facebook::presto {

void PrometheusStatsReporter::registerMetricExportType(folly::StringPiece key, facebook::velox::StatType statType) const {
registerMetricExportType(key.start(), statType);
}


void PrometheusStatsReporter::registerMetricExportType(const char* key, velox::StatType statType) const {
std::string keyStr = sanitizeMetricName(key);
statTypeMap.insert({keyStr, statType});
LOG(INFO) << "Registered metric: " << keyStr << " with type: " << statTypeToString(statType);

switch (statType) {
case velox::StatType::AVG:{
registerSummaryMetricForAverageStatType(keyStr);
break;
}
case velox::StatType::SUM: {
registerGaugeMetric(keyStr);
break;
}
case velox::StatType::COUNT: {
registerCounterMetric(keyStr);
break;
}
case velox::StatType::RATE:
// ToDo: RATE Type is not yet used anywhere
default:
LOG(WARNING) << "Unsupported stat type for key: " << keyStr;
break;
}
}



void PrometheusStatsReporter::addMetricValue(const char* key, size_t value)
const {
addMetricValue(std::string(key), value);
}

void PrometheusStatsReporter::addMetricValue(folly::StringPiece key, size_t value)
const {
addMetricValue(key.str(), value);
}

void PrometheusStatsReporter::addMetricValue(const std::string& key, size_t value) const {
std::string keyStr = sanitizeMetricName(key);
auto statTypeIt = statTypeMap.find(keyStr);
if (statTypeIt == statTypeMap.end()) {
LOG(WARNING) << "Metric not registered: " << keyStr;
return;
}

switch (statTypeIt->second) {
case velox::StatType::AVG: {
updateSummaryMetric(keyStr, value);
break;
}
case velox::StatType::SUM: {
updateGaugeMetric(keyStr, value);
break;
}
case velox::StatType::COUNT: {
updateCounterMetric(keyStr, value);
break;
}
case velox::StatType::RATE:
// ToDo: RATE Type is not yet used anywhere
default:
LOG(WARNING) << "Unsupported stat type for key: " << keyStr;
break;
}
}

void PrometheusStatsReporter::registerSummaryMetricForAverageStatType(const std::string& keyStr) const {
auto& summaryFamily = prometheus::BuildSummary()
.Name(keyStr)
.Help("Summary for " + keyStr)
.Register(*registry);
auto& summary = summaryFamily.Add({{"cluster", cluster_}, {"node", node_}, {"host", host_}},
prometheus::Summary::Quantiles{{0.5, 0.05}});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.5 quantile is not the average, instead it is the median value. This is as good as reporting instantaneous value.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
It has sum and count so avg can be computed as well

summaryMap.insert({keyStr, &summary});
}

void PrometheusStatsReporter::registerGaugeMetric(const std::string& keyStr) const {
auto& gaugeFamily = prometheus::BuildGauge()
.Name(keyStr)
.Help("Gauge for " + keyStr)
.Register(*registry);
auto& gauge = gaugeFamily.Add({{"cluster", cluster_}, {"node", node_}, {"host", host_}});
gaugeMap.insert({keyStr, &gauge});
}

void PrometheusStatsReporter::registerCounterMetric(const std::string& keyStr) const {
auto& counterFamily = prometheus::BuildCounter()
.Name(keyStr)
.Help("Counter for " + keyStr)
.Register(*registry);
auto& counter = counterFamily.Add({{"cluster", cluster_}, {"node", node_}, {"host", host_}});
counterMap.insert({keyStr, &counter});
}

void PrometheusStatsReporter::updateSummaryMetric(const std::string& keyStr, size_t value) const {
auto summaryIt = summaryMap.find(keyStr);
if (summaryIt != summaryMap.end()) {
summaryIt->second->Observe(static_cast<double>(value));
} else {
LOG(WARNING) << "Summary metric not found: " << keyStr;
}
}

void PrometheusStatsReporter::updateGaugeMetric(const std::string& keyStr, size_t value) const {
auto gaugeIt = gaugeMap.find(keyStr);
if (gaugeIt != gaugeMap.end()) {
gaugeIt->second->Increment(static_cast<double>(value));
} else {
LOG(WARNING) << "Gauge metric not found: " << keyStr;
}
}

void PrometheusStatsReporter::updateCounterMetric(const std::string& keyStr, size_t value) const {
auto counterIt = counterMap.find(keyStr);
if (counterIt != counterMap.end()) {
counterIt->second->Increment(static_cast<double>(value));
} else {
LOG(WARNING) << "Counter metric not found: " << keyStr;
}
}

std::string PrometheusStatsReporter::sanitizeMetricName(
const std::string& name) {
std::string sanitized = name;
std::replace(sanitized.begin(), sanitized.end(), '.', '_');
return sanitized;
}
std::string PrometheusStatsReporter::statTypeToString(
velox::StatType statType) const {
switch (statType) {
case velox::StatType::AVG: return "AVG";
case velox::StatType::SUM: return "SUM";
case velox::StatType::RATE: return "RATE";
case velox::StatType::COUNT: return "COUNT";
default: return "Unknown StatType";
}
}


} // namespace facebook::presto
Loading