Skip to content

Commit 1184a7e

Browse files
committed
feat(metrics): add NATS metrics to prometheus_metrics_fmt
- Add NATS client metrics collection via callbacks in transports/nats.rs - Execute callbacks before TextEncoder to include NATS metrics in output - Update prometheus_metrics_fmt to include dynamically added NATS metrics - Add comprehensive test coverage for NATS client metrics - Update tests to filter out dynamic NATS metrics for comparison - Ensure NATS connection state, bytes, and other client metrics are exposed - Add ComponentSystemStatusNatsMetrics struct for component service stats - Implement automatic metrics registration in Namespace::component() - Centralize NATS metric names in prometheus_names module - Add filtering support for NATS metrics in tests - Move ComponentSystemStatusNatsMetrics to service.rs module - Update metric callbacks to use proper runtime prefix hierarchy
1 parent fa4a7f1 commit 1184a7e

File tree

9 files changed

+799
-68
lines changed

9 files changed

+799
-68
lines changed

lib/runtime/examples/Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/runtime/src/component.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
//! TODO: Top-level Overview of Endpoints/Functions
3131
3232
use crate::{
33-
config::HealthStatus, discovery::Lease, metrics::MetricsRegistry, service::ServiceSet,
33+
config::HealthStatus,
34+
discovery::Lease,
35+
metrics::{prometheus_names, MetricsRegistry},
36+
service::ServiceSet,
3437
transports::etcd::EtcdPath,
3538
};
3639

@@ -247,6 +250,7 @@ impl Component {
247250
Ok(out)
248251
}
249252

253+
/// Scrape user defined stats (embedded in data field of ServiceInfo)
250254
pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
251255
let service_name = self.service_name();
252256
let service_client = self.drt().service_client();
@@ -255,6 +259,49 @@ impl Component {
255259
.await
256260
}
257261

262+
/// Register Prometheus metrics for this component's service stats
263+
pub fn register_metrics(&self) -> Result<crate::service::ComponentSystemStatusNatsMetrics> {
264+
let component_metrics =
265+
crate::service::ComponentSystemStatusNatsMetrics::from_component(self)?;
266+
267+
// Create a callback that scrapes stats and updates metrics when called
268+
let metrics_clone = component_metrics.clone();
269+
let component_clone = self.clone();
270+
let prefix = self.prefix();
271+
let service_name = self.service_name();
272+
let prefix_for_closure = prefix.clone();
273+
self.drt().add_metrics_callback(&prefix, move |_runtime| {
274+
println!(
275+
"[DEBUG]CALLING metrics callback for component: {}, prefix:{}",
276+
service_name, prefix_for_closure
277+
);
278+
// Use tokio::runtime::Handle to run async code in the callback
279+
let handle = tokio::runtime::Handle::try_current();
280+
if let Ok(handle) = handle {
281+
let metrics_ref = metrics_clone.clone();
282+
let comp_ref = component_clone.clone();
283+
handle.spawn(async move {
284+
let timeout = std::time::Duration::from_millis(500);
285+
match comp_ref.scrape_stats(timeout).await {
286+
Ok(service_set) => {
287+
metrics_ref.update_from_service_set(&service_set);
288+
}
289+
Err(err) => {
290+
tracing::warn!(
291+
"Failed to scrape stats for component '{}': {}",
292+
comp_ref.service_name(),
293+
err
294+
);
295+
}
296+
}
297+
});
298+
}
299+
Ok("".to_string())
300+
});
301+
302+
Ok(component_metrics)
303+
}
304+
258305
/// TODO
259306
///
260307
/// This method will scrape the stats for all available services
@@ -488,11 +535,13 @@ impl Namespace {
488535

489536
/// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
490537
pub fn component(&self, name: impl Into<String>) -> Result<Component> {
491-
Ok(ComponentBuilder::from_runtime(self.runtime.clone())
538+
let component = ComponentBuilder::from_runtime(self.runtime.clone())
492539
.name(name)
493540
.namespace(self.clone())
494541
.is_static(self.is_static)
495-
.build()?)
542+
.build()?;
543+
component.register_metrics()?; // register a callback to scrape stats and update metrics
544+
Ok(component)
496545
}
497546

498547
/// Create a [`Namespace`] in the parent namespace

lib/runtime/src/distributed.rs

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,21 @@ impl DistributedRuntime {
106106
component_registry: component::Registry::new(),
107107
is_static,
108108
instance_sources: Arc::new(Mutex::new(HashMap::new())),
109-
prometheus_registries_by_prefix: Arc::new(std::sync::Mutex::new(HashMap::<
109+
metrics_registry_by_prefix: Arc::new(std::sync::Mutex::new(HashMap::<
110110
String,
111-
prometheus::Registry,
111+
crate::MetricsRegistryEntry,
112112
>::new())),
113113
system_health,
114114
};
115115

116+
// Register NATS client metrics after creation
117+
if let Err(e) = distributed_runtime
118+
.nats_client()
119+
.register_metrics(&distributed_runtime)
120+
{
121+
tracing::warn!("Failed to register NATS client metrics: {}", e);
122+
}
123+
116124
// Start metrics server if enabled
117125
if let Some(cancel_token) = cancel_token {
118126
let host = config.system_host.clone();
@@ -226,11 +234,6 @@ impl DistributedRuntime {
226234
self.nats_client.clone()
227235
}
228236

229-
/// Get metrics server information if available
230-
pub fn metrics_server_info(&self) -> Option<Arc<crate::metrics_server::MetricsServerInfo>> {
231-
self.metrics_server.get().cloned()
232-
}
233-
234237
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
235238
pub fn etcd_client(&self) -> Option<etcd::Client> {
236239
self.etcd_client.clone()
@@ -243,6 +246,97 @@ impl DistributedRuntime {
243246
pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
244247
self.instance_sources.clone()
245248
}
249+
250+
/// Get metrics server information if available
251+
pub fn metrics_server_info(&self) -> Option<Arc<crate::metrics_server::MetricsServerInfo>> {
252+
self.metrics_server.get().cloned()
253+
}
254+
255+
/// Add a Prometheus metric to a specific prefix's registry
256+
pub fn add_prometheus_metric(
257+
&self,
258+
prefix: &str,
259+
prometheus_metric: Box<dyn prometheus::core::Collector>,
260+
) -> anyhow::Result<()> {
261+
let mut registries = self.metrics_registry_by_prefix.lock().unwrap();
262+
let entry = registries.entry(prefix.to_string()).or_default();
263+
entry.prometheus_registry.register(prometheus_metric)?;
264+
Ok(())
265+
}
266+
267+
/// Add a callback function to a metrics registry for the given prefix
268+
pub fn add_metrics_callback<F>(&self, prefix: &str, callback: F)
269+
where
270+
F: Fn(&dyn crate::metrics::MetricsRegistry) -> anyhow::Result<String>
271+
+ Send
272+
+ Sync
273+
+ 'static,
274+
{
275+
let mut registries = self.metrics_registry_by_prefix.lock().unwrap();
276+
registries
277+
.entry(prefix.to_string())
278+
.or_default()
279+
.add_callback(self as &dyn crate::metrics::MetricsRegistry, callback);
280+
}
281+
282+
/// Execute all callbacks for a given prefix and return their results
283+
pub fn execute_metrics_callbacks(&self, prefix: &str) -> Vec<anyhow::Result<String>> {
284+
let registries = self.metrics_registry_by_prefix.lock().unwrap();
285+
if let Some(entry) = registries.get(prefix) {
286+
entry.execute_callbacks(self as &dyn crate::metrics::MetricsRegistry)
287+
} else {
288+
Vec::new()
289+
}
290+
}
291+
292+
/// Get all registered prefixes. Private because it is only used for testing.
293+
fn get_metrics_prefixes(&self) -> Vec<String> {
294+
let registries = self.metrics_registry_by_prefix.lock().unwrap();
295+
registries.keys().cloned().collect()
296+
}
297+
}
298+
299+
#[cfg(test)]
300+
mod tests {
301+
use super::*;
302+
303+
#[test]
304+
fn test_metrics_registry_methods() {
305+
let rt = tokio::runtime::Runtime::new().unwrap();
306+
let drt = rt.block_on(async {
307+
let runtime = Runtime::single_threaded().unwrap();
308+
DistributedRuntime::from_settings_without_discovery(runtime)
309+
.await
310+
.unwrap()
311+
});
312+
313+
// Test adding callbacks
314+
drt.add_metrics_callback("test_prefix", |_| Ok("test_callback".to_string()));
315+
drt.add_metrics_callback("test_prefix", |_| Ok("test_callback2".to_string()));
316+
317+
// Test executing callbacks
318+
let results = drt.execute_metrics_callbacks("test_prefix");
319+
assert_eq!(results.len(), 2);
320+
assert_eq!(results[0].as_ref().unwrap(), "test_callback");
321+
assert_eq!(results[1].as_ref().unwrap(), "test_callback2");
322+
323+
// Test getting prefixes
324+
let prefixes = drt.get_metrics_prefixes();
325+
assert!(prefixes.contains(&"test_prefix".to_string()));
326+
327+
// Test non-existent prefix
328+
let empty_results = drt.execute_metrics_callbacks("non_existent");
329+
assert_eq!(empty_results.len(), 0);
330+
331+
// Test adding a Prometheus metric
332+
let counter = prometheus::Counter::new("test_counter", "A test counter").unwrap();
333+
drt.add_prometheus_metric("test_prefix", Box::new(counter.clone()))
334+
.unwrap();
335+
336+
// Verify the metric was added by checking if the prefix exists
337+
let prefixes = drt.get_metrics_prefixes();
338+
assert!(prefixes.contains(&"test_prefix".to_string()));
339+
}
246340
}
247341

248342
#[derive(Dissolve)]

lib/runtime/src/lib.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,63 @@ impl SystemHealth {
147147
}
148148
}
149149

150+
/// Structure to hold Prometheus registries and associated callbacks for a given prefix
151+
pub struct MetricsRegistryEntry {
152+
/// The Prometheus registry for this prefix
153+
pub prometheus_registry: prometheus::Registry,
154+
/// List of function callbacks that receive a reference to any MetricsRegistry
155+
pub runtime_callbacks: Vec<
156+
Box<dyn Fn(&dyn crate::metrics::MetricsRegistry) -> anyhow::Result<String> + Send + Sync>,
157+
>,
158+
}
159+
160+
impl MetricsRegistryEntry {
161+
/// Create a new metrics registry entry with an empty registry and no callbacks
162+
pub fn new() -> Self {
163+
Self {
164+
prometheus_registry: prometheus::Registry::new(),
165+
runtime_callbacks: Vec::new(),
166+
}
167+
}
168+
169+
/// Add a callback function that receives a reference to any MetricsRegistry
170+
pub fn add_callback<F>(&mut self, _runtime: &dyn crate::metrics::MetricsRegistry, callback: F)
171+
where
172+
F: Fn(&dyn crate::metrics::MetricsRegistry) -> anyhow::Result<String>
173+
+ Send
174+
+ Sync
175+
+ 'static,
176+
{
177+
self.runtime_callbacks.push(Box::new(callback));
178+
}
179+
180+
/// Execute all runtime callbacks and return their results
181+
pub fn execute_callbacks(
182+
&self,
183+
runtime: &dyn crate::metrics::MetricsRegistry,
184+
) -> Vec<anyhow::Result<String>> {
185+
self.runtime_callbacks
186+
.iter()
187+
.map(|callback| callback(runtime))
188+
.collect()
189+
}
190+
}
191+
192+
impl Default for MetricsRegistryEntry {
193+
fn default() -> Self {
194+
Self::new()
195+
}
196+
}
197+
198+
impl Clone for MetricsRegistryEntry {
199+
fn clone(&self) -> Self {
200+
Self {
201+
prometheus_registry: self.prometheus_registry.clone(),
202+
runtime_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
203+
}
204+
}
205+
}
206+
150207
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
151208
/// communication protocols and transports.
152209
#[derive(Clone)]
@@ -176,6 +233,6 @@ pub struct DistributedRuntime {
176233
// Health Status
177234
system_health: Arc<std::sync::Mutex<SystemHealth>>,
178235

179-
// This map associates metric prefixes with their corresponding Prometheus registries.
180-
prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
236+
// This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
237+
metrics_registry_by_prefix: Arc<std::sync::Mutex<HashMap<String, MetricsRegistryEntry>>>,
181238
}

0 commit comments

Comments
 (0)