diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index c49337a13e3e4..f647a93359e4f 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -1,11 +1,16 @@ -use std::{collections::HashMap, convert::TryFrom, fmt::Debug, time::SystemTime}; +use std::{ + collections::{BTreeMap, HashMap}, + convert::TryFrom, + fmt::Debug, + time::SystemTime, +}; use chrono::{DateTime, Utc}; use ordered_float::NotNan; use serde::{Deserialize, Serialize}; use super::NewRelicSinkError; -use crate::event::{Event, MetricValue, Value}; +use crate::event::{Event, MetricKind, MetricValue, Value}; #[derive(Debug)] pub enum NewRelicApiModel { @@ -21,30 +26,9 @@ type DataStore = HashMap>; pub struct MetricsApiModel(pub Vec); impl MetricsApiModel { - pub fn new(metric_array: Vec<(Value, Value, Value)>) -> Self { - let mut metric_data_array = vec![]; - for (m_name, m_value, m_timestamp) in metric_array { - let mut metric_data = KeyValData::new(); - metric_data.insert("name".to_owned(), m_name); - metric_data.insert("value".to_owned(), m_value); - match m_timestamp { - Value::Timestamp(ts) => { - metric_data.insert("timestamp".to_owned(), Value::from(ts.timestamp())); - } - Value::Integer(i) => { - metric_data.insert("timestamp".to_owned(), Value::from(i)); - } - _ => { - metric_data.insert( - "timestamp".to_owned(), - Value::from(DateTime::::from(SystemTime::now()).timestamp()), - ); - } - } - metric_data_array.push(metric_data); - } + pub fn new(metric_array: Vec) -> Self { let mut metric_store = DataStore::new(); - metric_store.insert("metrics".to_owned(), metric_data_array); + metric_store.insert("metrics".to_owned(), metric_array); Self(vec![metric_store]) } } @@ -57,39 +41,66 @@ impl TryFrom> for MetricsApiModel { for buf_event in buf_events { if let Event::Metric(metric) = buf_event { - // Future improvement: put metric type. If type = count, NR metric model requires an interval.ms field, that is not provided by the Vector Metric model. - match metric.value() { - MetricValue::Gauge { value } => { - metric_array.push(( - Value::from(metric.name().to_owned()), - Value::from( - NotNan::new(*value).map_err(|_| { - NewRelicSinkError::new("NaN value not supported") - })?, - ), - Value::from(metric.timestamp()), - )); - } - MetricValue::Counter { value } => { - metric_array.push(( - Value::from(metric.name().to_owned()), - Value::from( - NotNan::new(*value).map_err(|_| { - NewRelicSinkError::new("NaN value not supported") - })?, - ), - Value::from(metric.timestamp()), - )); + // Generate Value::Object() from BTreeMap + let (series, data, _) = metric.into_parts(); + let attr = series.tags.map(|tags| { + Value::from( + tags.into_iter() + .map(|(key, value)| (key, Value::from(value))) + .collect::>(), + ) + }); + + // We only handle gauge and counter metrics + if let MetricValue::Gauge { value } | MetricValue::Counter { value } = data.value { + let mut metric_data = KeyValData::new(); + // Set name, value, and timestamp + metric_data.insert("name".to_owned(), Value::from(series.name.name)); + metric_data.insert( + "value".to_owned(), + Value::from( + NotNan::new(value) + .map_err(|_| NewRelicSinkError::new("NaN value not supported"))?, + ), + ); + metric_data.insert( + "timestamp".to_owned(), + Value::from( + data.time + .timestamp + .unwrap_or_else(|| DateTime::::from(SystemTime::now())) + .timestamp(), + ), + ); + if let Some(attr) = attr { + metric_data.insert("attributes".to_owned(), attr); } - _ => { - // Unrecognized metric type + // Set type and type related attributes + if let (MetricValue::Counter { .. }, MetricKind::Incremental) = + (data.value, data.kind) + { + if let Some(interval_ms) = data.time.interval_ms { + metric_data.insert( + "interval.ms".to_owned(), + Value::from(interval_ms.get() as i64), + ); + } else { + // Incremental counter without an interval is worthless, skip this metric + continue; + } + metric_data.insert("type".to_owned(), Value::from("count")); + } else { + // Anything that's not an incremental counter is considered a gauge, that is gauge and absolute counters metrics. + metric_data.insert("type".to_owned(), Value::from("gauge")); } + + metric_array.push(metric_data); } } } if !metric_array.is_empty() { - Ok(MetricsApiModel::new(metric_array)) + Ok(Self::new(metric_array)) } else { Err(NewRelicSinkError::new("No valid metrics to generate")) } diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index 8b35dadd16cfd..a99609b3a2327 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, convert::TryFrom, time::SystemTime}; +use std::{collections::HashMap, convert::TryFrom, num::NonZeroU32, time::SystemTime}; use chrono::{DateTime, Utc}; use futures::{future::ready, stream}; @@ -176,7 +176,7 @@ fn generate_metric_api_model() { MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); let metrics = model.0[0] .get("metrics") - .expect("Logs data store not present"); + .expect("Metric data store not present"); assert_eq!(metrics.len(), 1); assert!(metrics[0].get("name").is_some()); @@ -200,7 +200,7 @@ fn generate_metric_api_model() { MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); let metrics = model.0[0] .get("metrics") - .expect("Logs data store not present"); + .expect("Metric data store not present"); assert_eq!(metrics.len(), 1); assert!(metrics[0].get("name").is_some()); @@ -211,4 +211,31 @@ fn generate_metric_api_model() { assert!(metrics[0].get("value").is_some()); assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0)); assert!(metrics[0].get("timestamp").is_some()); + + // Incremental counter + let m = Metric::new( + "my_metric", + MetricKind::Incremental, + MetricValue::Counter { value: 100.0 }, + ) + .with_timestamp(Some(DateTime::::from(SystemTime::now()))) + .with_interval_ms(NonZeroU32::new(1000)); + let event = Event::Metric(m); + let model = + MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); + let metrics = model.0[0] + .get("metrics") + .expect("Metric data store not present"); + + assert_eq!(metrics.len(), 1); + assert!(metrics[0].get("name").is_some()); + assert_eq!( + metrics[0].get("name").unwrap().to_string_lossy(), + "my_metric".to_owned() + ); + assert!(metrics[0].get("value").is_some()); + assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0)); + assert!(metrics[0].get("timestamp").is_some()); + assert!(metrics[0].get("interval.ms").is_some()); + assert_eq!(metrics[0].get("interval.ms").unwrap(), &Value::from(1000)); }