Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_PROMETHEUS_REPORTER
"Enables capturing of runtime metrics using prometheus client" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down Expand Up @@ -201,6 +204,10 @@ if(PRESTO_ENABLE_JWT)
add_compile_definitions(PRESTO_ENABLE_JWT)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_compile_definitions(PRESTO_ENABLE_PROMETHEUS_REPORTER)
endif()

if("${MAX_LINK_JOBS}")
set_property(GLOBAL APPEND PROPERTY JOB_POOLS
"presto_link_job_pool=${MAX_LINK_JOBS}")
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,7 @@ set_property(TARGET presto_server PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
message(STATUS "Linking prometheus metrics reporter")
target_link_libraries(presto_server_lib prometheus_reporter)
endif()
5 changes: 0 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,3 @@ int main(int argc, char* argv[]) {
presto.run();
PRESTO_SHUTDOWN_LOG(INFO) << "Exiting main()";
}

// Initialize singleton for the reporter.
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::velox::DummyStatsReporter();
});
49 changes: 49 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"
#include "velox/serializers/PrestoSerializer.h"

#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
#include "presto_cpp/main/common/prometheus-metrics/PrometheusReporter.h"
// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
auto nodeConfig = facebook::presto::NodeConfig::instance();
std::string cluster = nodeConfig->nodeEnvironment();
char* hostName = std::getenv("HOSTNAME");
std::string worker = !hostName ? "" : hostName;
return new facebook::presto::prometheus::PrometheusReporter(
::prometheus::Labels{{"cluster", cluster}, {"worker", worker}});
});
#else
#include "presto_cpp/main/common/StatsReporterImpl.h"
// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::presto::StatsReporterImpl();
});
#endif

#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
#include "presto_cpp/main/RemoteFunctionRegisterer.h"
#endif
Expand Down Expand Up @@ -226,6 +245,10 @@ void PrestoServer::run() {
exit(EXIT_FAILURE);
}

if (systemConfig->enableRuntimeMetricsCollection()) {
// This flag must be set to register the counters.
facebook::velox::BaseStatsReporter::registered = true;
}
registerStatsCounters();
registerFileSinks();
registerFileSystems();
Expand Down Expand Up @@ -319,6 +342,14 @@ void PrestoServer::run() {
proxygen::ResponseHandler* downstream) {
server->reportServerInfo(downstream);
});
httpServer_->registerGet(
"/v1/info/health/metrics",
[server = this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
server->reportHealthMetrics(downstream);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? If we just define an exposer that will create another http server on /metrics or /v1/metrics? We can also define the number of threads to be used by the new http server

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is exactly what we are trying to avoid. We didn't find a strong need to start a new server sharing space with our worker containers. Reduces the overhead of maintaining and launching another process. Why do you want multiple threads to be spawned by the exposer? are you expecting high traffic.
CMIW, prometheus server is configured to periodically call the scrape endpoint. So, we can expect 1 HTTP request at X seconds interval. If you are expecting huge traffic, then it is not recommended to share space with worker instance.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think isolation is also one point, we do not want to degrade the presto process as much as we can

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amitkdutta : What approach did Meta take for this ? Does your metric collection use any endpoint in Presto worker process itself, or did you start another server ?

});
httpServer_->registerGet(
"/v1/info/state",
[server = this](
Expand Down Expand Up @@ -1093,6 +1124,24 @@ void PrestoServer::reportServerInfo(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(serverInfo));
}

void PrestoServer::reportHealthMetrics(proxygen::ResponseHandler* downstream) {
auto nodeConfig = facebook::presto::NodeConfig::instance();
std::string cluster = nodeConfig->nodeEnvironment();
char* hostName = std::getenv("HOSTNAME");
std::string worker = !hostName ? "" : hostName;
#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
auto reporter = std::dynamic_pointer_cast<
facebook::presto::prometheus::PrometheusReporter>(
folly::Singleton<facebook::velox::BaseStatsReporter>::try_get());
http::sendOkResponse(downstream, reporter->getSerializedMetrics());
#else
auto reporter = std::dynamic_pointer_cast<StatsReporterImpl>(
folly::Singleton<facebook::velox::BaseStatsReporter>::try_get());
prometheus::PrometheusSerializer serializer(
prometheus::Labels{{"cluster", cluster}, {"worker", worker}});
http::sendOkResponse(downstream, reporter->getMetrics(serializer));
#endif
}
void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class PrestoServer {

void reportNodeStatus(proxygen::ResponseHandler* downstream);

void reportHealthMetrics(proxygen::ResponseHandler* downstream);

protocol::NodeStatus fetchNodeStatus();

void populateMemAndCPUInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# limitations under the License.

add_library(presto_exception Exception.cpp)
add_library(presto_common Counters.cpp Utils.cpp ConfigReader.cpp Configs.cpp)
add_library(presto_common Counters.cpp Utils.cpp ConfigReader.cpp Configs.cpp
StatsReporterImpl.cpp)

target_link_libraries(presto_exception velox_exception)
set_property(TARGET presto_exception PROPERTY JOB_POOL_LINK
Expand All @@ -23,3 +24,7 @@ set_property(TARGET presto_common PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_subdirectory(prometheus-metrics)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ std::chrono::duration<double> SystemConfig::cacheVeloxTtlCheckInterval() const {
}

bool SystemConfig::enableRuntimeMetricsCollection() const {
return optionalProperty<bool>(kEnableRuntimeMetricsCollection).value();
return optionalProperty<bool>(kEnableRuntimeMetricsCollection)
.value_or(false);
}

NodeConfig::NodeConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "StatsReporterImpl.h"

namespace facebook::presto {

void StatsReporterImpl::registerMetricExportType(
folly::StringPiece key,
facebook::velox::StatType statType) const {
registerMetricExportType(key.start(), statType);
}

void StatsReporterImpl::registerMetricExportType(
const char* key,
facebook::velox::StatType statType) const {
std::lock_guard<std::mutex> lock(mutex_);
registeredStats_.emplace(key, statType);
metricsMap_.emplace(key, 0);
}

void StatsReporterImpl::addMetricValue(const char* key, size_t value) const {
std::lock_guard<std::mutex> lock(mutex_);
auto it = registeredStats_.find(key);
if (it == registeredStats_.end()) {
VLOG(1) << "addMetricValue() for unregistered stat " << key;
return;
}
if (it->second == facebook::velox::StatType::COUNT) {
// increment the counter.
metricsMap_[key] += value;
return;
}
// Gauge type metric value must be reset.
metricsMap_[key] = value;
}

void StatsReporterImpl::addMetricValue(const std::string& key, size_t value)
const {
addMetricValue(key.c_str(), value);
}

void StatsReporterImpl::addMetricValue(folly::StringPiece key, size_t value)
const {
addMetricValue(key.start(), value);
}

const std::string StatsReporterImpl::getMetrics(
const MetricsSerializer& serializer) {
std::lock_guard<std::mutex> lock(mutex_);
return serializer.serialize(registeredStats_, metricsMap_);
}
} // namespace facebook::presto
173 changes: 173 additions & 0 deletions presto-native-execution/presto_cpp/main/common/StatsReporterImpl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <folly/dynamic.h>
#include <fstream>
#include <iostream>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Counters.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/StatsReporter.h"

namespace facebook::presto {

class MetricsSerializer {
public:
virtual std::string serialize(
const std::unordered_map<std::string, facebook::velox::StatType>&
metricStatTypes,
const std::unordered_map<std::string, size_t>& metricValues) const = 0;
};

namespace prometheus {
using Labels = std::unordered_map<std::string, std::string>;
class PrometheusSerializer : public MetricsSerializer {
public:
explicit PrometheusSerializer(const Labels& labels) : labels_(labels) {}

std::string serialize(
const std::unordered_map<std::string, facebook::velox::StatType>&
metricStatTypes,
const std::unordered_map<std::string, size_t>& metricValues) const {
std::stringstream ss;
for (const auto metric : metricValues) {
auto metricName = metric.first;
std::replace(metricName.begin(), metricName.end(), '.', '_');
auto statType = metricStatTypes.find(metric.first)->second;
ss << "# HELP " << metricName << std::endl;
std::string statTypeStr = "gauge";
if (statType == facebook::velox::StatType::COUNT) {
statTypeStr = "counter";
}
ss << "# TYPE " << metricName << " " << statTypeStr << std::endl;
int i = 0;
ss << metricName << "{";
for (auto& label : labels_) {
ss << label.first << "=\"" << label.second << "\"";
if (i < labels_.size() - 1) {
// Comma separated labels.
ss << ",";
}
++i;
}
ss << "} " << metric.second << std::endl;
}
return ss.str();
}

private:
// A map of labels assigned to each metric which helps in filtering at client
// end.
const Labels labels_;
};
} // namespace prometheus.

/// An implementation of BaseStatsReporter which gathers runtime metrics and
/// and maintains them in-memory. Users can call
/// StatsReporterImpl::getMetrics(MetricSerializer) to get metrics in custom
/// formatted string.
class StatsReporterImpl : public facebook::velox::BaseStatsReporter {
public:
StatsReporterImpl(
const std::string cluster = "",
const std::string worker = "") {
if (cluster.empty()) {
auto nodeConfig = facebook::presto::NodeConfig::instance();
cluster_ = nodeConfig->nodeEnvironment();
} else {
cluster_ = cluster;
}
char* hostName = std::getenv("HOSTNAME");
workerPod_ = !hostName ? worker : hostName;
}

/// Register a stat of the given stat type.
/// @param key The key to identify the stat.
/// @param statType How the stat is aggregated.
void registerMetricExportType(
const char* key,
facebook::velox::StatType statType) const override;

void registerMetricExportType(
folly::StringPiece key,
facebook::velox::StatType statType) const override;

void registerHistogramMetricExportType(
const char* /*key*/,
int64_t /* bucketWidth */,
int64_t /* min */,
int64_t /* max */,
const std::vector<int32_t>& /* pcts */) const override {}

void registerHistogramMetricExportType(
folly::StringPiece /* key */,
int64_t /* bucketWidth */,
int64_t /* min */,
int64_t /* max */,
const std::vector<int32_t>& /* pcts */) const override {}

void addMetricValue(const std::string& key, size_t value = 1) const override;

void addMetricValue(const char* key, size_t value = 1) const override;

void addMetricValue(folly::StringPiece key, size_t value = 1) const override;

void addHistogramMetricValue(const std::string& key, size_t value)
const override {}

void addHistogramMetricValue(const char* key, size_t value) const override {}

void addHistogramMetricValue(folly::StringPiece key, size_t value)
const override {}

const facebook::velox::StatType getRegisteredStatType(
const std::string& metricName) {
std::lock_guard<std::mutex> lock(mutex_);
return registeredStats_[metricName];
}

/*
* Serializes the metrics collected so far in the format suitable for
* back filling Prometheus server.
*
* Given a metric name and a set of labels, time series are frequently
* identified using this notation:
*
* <metric name>{<label name>=<label value>, ...}
*
* For example, a time series with the metric name num_tasks_aborted
* and the labels cluster="<cluster_id>" and worker="worker-id"
* could be written like this:
* # HELP num_tasks_aborted
* # TYPE num_tasks_aborted gauge*
* num_tasks_aborted{cluster="<cluster_id>", worker="worker-id"} value
* timestamp
*
* Above info is from:
* https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
*/
const std::string getMetrics(const MetricsSerializer& serializer);

private:
/// Mapping of registered stats key to StatType.
mutable std::unordered_map<std::string, facebook::velox::StatType>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use mutable folly::ConcurrentHashMap

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we normally use folly::Synchronized<> to do this. Lmk revisit this after finishing the RFC. I wanted to unblock any testing you wanted to do, so focussed on getting a working version. I need to clean up the tests as well.

registeredStats_;
/// A mapping from stats key of type COUNT to value.
mutable std::unordered_map<std::string, size_t> metricsMap_;
// Mutex to control access to registeredStats_ and metricMap_ members.
mutable std::mutex mutex_;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not need mutex

std::string cluster_;
std::string workerPod_;
}; // class StatsReporterImpl
} // namespace facebook::presto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
find_package(prometheus-cpp CONFIG REQUIRED)
add_library(prometheus_reporter PrometheusReporter.cpp PrometheusReporter.h)
target_link_libraries(prometheus_reporter presto_common prometheus-cpp::core)
set_property(TARGET prometheus_reporter PROPERTY JOB_POOL_LINK
presto_link_job_pool)
add_executable(prometheus_reporter_test PrometheusReporterTest.cpp)
target_link_libraries(prometheus_reporter_test presto_server_lib
velox_exec_test_lib prometheus_reporter gtest gtest_main)
Loading