diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index 27cf7292dcdbf..5a8b2fc800897 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -1,12 +1,18 @@ -use std::{collections::HashMap, convert::TryFrom, fmt::Debug, time::SystemTime}; +use std::{ + collections::{BTreeMap, HashMap}, + convert::TryFrom, + fmt::Debug, + time::SystemTime, +}; use chrono::{DateTime, Utc}; use ordered_float::NotNan; use serde::{Deserialize, Serialize}; +use vector_common::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL}; use vrl::event_path; use super::NewRelicSinkError; -use crate::event::{Event, MetricValue, Value}; +use crate::event::{Event, MetricKind, MetricValue, Value}; #[derive(Debug)] pub enum NewRelicApiModel { @@ -22,30 +28,9 @@ type DataStore = HashMap>; pub struct MetricsApiModel(pub Vec); impl MetricsApiModel { - pub fn new(metric_array: Vec<(Value, Value, Value)>) -> Self { - let mut metric_data_array = vec![]; - for (m_name, m_value, m_timestamp) in metric_array { - let mut metric_data = KeyValData::new(); - metric_data.insert("name".to_owned(), m_name); - metric_data.insert("value".to_owned(), m_value); - match m_timestamp { - Value::Timestamp(ts) => { - metric_data.insert("timestamp".to_owned(), Value::from(ts.timestamp())); - } - Value::Integer(i) => { - metric_data.insert("timestamp".to_owned(), Value::from(i)); - } - _ => { - metric_data.insert( - "timestamp".to_owned(), - Value::from(DateTime::::from(SystemTime::now()).timestamp()), - ); - } - } - metric_data_array.push(metric_data); - } + pub fn new(metric_array: Vec) -> Self { let mut metric_store = DataStore::new(); - metric_store.insert("metrics".to_owned(), metric_data_array); + metric_store.insert("metrics".to_owned(), metric_array); Self(vec![metric_store]) } } @@ -54,43 +39,107 @@ impl TryFrom> for MetricsApiModel { type Error = NewRelicSinkError; fn try_from(buf_events: Vec) -> Result { - let mut metric_array = vec![]; - - for buf_event in buf_events { - if let Event::Metric(metric) = buf_event { - // Future improvement: put metric type. If type = count, NR metric model requires an interval.ms field, that is not provided by the Vector Metric model. - match metric.value() { - MetricValue::Gauge { value } => { - metric_array.push(( - Value::from(metric.name().to_owned()), - Value::from( - NotNan::new(*value).map_err(|_| { - NewRelicSinkError::new("NaN value not supported") - })?, - ), - Value::from(metric.timestamp()), - )); - } - MetricValue::Counter { value } => { - metric_array.push(( - Value::from(metric.name().to_owned()), - Value::from( - NotNan::new(*value).map_err(|_| { - NewRelicSinkError::new("NaN value not supported") - })?, - ), - Value::from(metric.timestamp()), - )); + let mut num_non_metric_events = 0; + let mut num_missing_interval = 0; + let mut num_nan_value = 0; + let mut num_unsupported_metric_type = 0; + + let metric_array: Vec<_> = buf_events + .into_iter() + .filter_map(|event| { + let Some(metric) = event.try_into_metric() else { + num_non_metric_events += 1; + return None; + }; + + // Generate Value::Object() from BTreeMap + let (series, data, _) = metric.into_parts(); + + let mut metric_data = KeyValData::new(); + + // We only handle gauge and counter metrics + // Extract value & type and set type-related attributes + let (value, metric_type) = match (data.value, &data.kind) { + (MetricValue::Counter { value }, MetricKind::Incremental) => { + let Some(interval_ms) = data.time.interval_ms else { + // Incremental counter without an interval is worthless, skip this metric + num_missing_interval += 1; + return None; + }; + metric_data.insert( + "interval.ms".to_owned(), + Value::from(interval_ms.get() as i64), + ); + (value, "count") } + (MetricValue::Counter { value }, MetricKind::Absolute) => (value, "gauge"), + (MetricValue::Gauge { value }, _) => (value, "gauge"), _ => { - // Unrecognized metric type + // Unsupported metric type + num_unsupported_metric_type += 1; + return None; } + }; + + // Set name, type, value, timestamp, and attributes + metric_data.insert("name".to_owned(), Value::from(series.name.name)); + metric_data.insert("type".to_owned(), Value::from(metric_type)); + let Some(value) = NotNan::new(value).ok() else { + num_nan_value += 1; + return None; + }; + metric_data.insert("value".to_owned(), Value::from(value)); + metric_data.insert( + "timestamp".to_owned(), + Value::from( + data.time + .timestamp + .unwrap_or_else(|| DateTime::::from(SystemTime::now())) + .timestamp(), + ), + ); + if let Some(tags) = series.tags { + metric_data.insert( + "attributes".to_owned(), + Value::from( + tags.iter_single() + .map(|(key, value)| (key.to_string(), Value::from(value))) + .collect::>(), + ), + ); } - } + + Some(metric_data) + }) + .collect(); + + if num_non_metric_events > 0 { + emit!(ComponentEventsDropped:: { + count: num_non_metric_events, + reason: "non-metric event" + }); + } + if num_unsupported_metric_type > 0 { + emit!(ComponentEventsDropped:: { + count: num_unsupported_metric_type, + reason: "unsupported metric type" + }); + } + if num_nan_value > 0 { + emit!(ComponentEventsDropped:: { + count: num_nan_value, + reason: "NaN value not supported" + }); + } + if num_missing_interval > 0 { + emit!(ComponentEventsDropped:: { + count: num_missing_interval, + reason: "incremental counter missing interval" + }); } if !metric_array.is_empty() { - Ok(MetricsApiModel::new(metric_array)) + Ok(Self::new(metric_array)) } else { Err(NewRelicSinkError::new("No valid metrics to generate")) } @@ -110,9 +159,17 @@ impl TryFrom> for EventsApiModel { type Error = NewRelicSinkError; fn try_from(buf_events: Vec) -> Result { - let mut events_array = vec![]; - for buf_event in buf_events { - if let Event::Log(log) = buf_event { + let mut num_non_log_events = 0; + let mut num_nan_value = 0; + + let events_array: Vec> = buf_events + .into_iter() + .filter_map(|event| { + let Some(log) = event.try_into_log() else { + num_non_log_events += 1; + return None; + }; + let mut event_model = KeyValData::new(); for (k, v) in log.convert_to_fields() { event_model.insert(k, v.clone()); @@ -133,8 +190,9 @@ impl TryFrom> for EventsApiModel { if let Some(f) = n.as_f64() { event_model.insert( k, - Value::from(NotNan::new(f).map_err(|_| { - NewRelicSinkError::new("NaN value not supported") + Value::from(NotNan::new(f).ok().or_else(|| { + num_nan_value += 1; + None })?), ); } else { @@ -144,7 +202,9 @@ impl TryFrom> for EventsApiModel { serde_json::Value::Bool(b) => { event_model.insert(k, Value::from(b)); } - _ => {} + _ => { + // Note that arrays and nested objects are silently dropped. + } } } event_model.remove("message"); @@ -156,8 +216,21 @@ impl TryFrom> for EventsApiModel { .insert("eventType".to_owned(), Value::from("VectorSink".to_owned())); } - events_array.push(event_model); - } + Some(event_model) + }) + .collect(); + + if num_non_log_events > 0 { + emit!(ComponentEventsDropped:: { + count: num_non_log_events, + reason: "non-log event" + }); + } + if num_nan_value > 0 { + emit!(ComponentEventsDropped:: { + count: num_nan_value, + reason: "NaN value not supported" + }); } if !events_array.is_empty() { @@ -183,9 +256,16 @@ impl TryFrom> for LogsApiModel { type Error = NewRelicSinkError; fn try_from(buf_events: Vec) -> Result { - let mut logs_array = vec![]; - for buf_event in buf_events { - if let Event::Log(log) = buf_event { + let mut num_non_log_events = 0; + + let logs_array: Vec> = buf_events + .into_iter() + .filter_map(|event| { + let Some(log) = event.try_into_log() else { + num_non_log_events += 1; + return None; + }; + let mut log_model = KeyValData::new(); for (k, v) in log.convert_to_fields() { log_model.insert(k, v.clone()); @@ -196,8 +276,16 @@ impl TryFrom> for LogsApiModel { Value::from("log from vector".to_owned()), ); } - logs_array.push(log_model); - } + + Some(log_model) + }) + .collect(); + + if num_non_log_events > 0 { + emit!(ComponentEventsDropped:: { + count: num_non_log_events, + reason: "non-log event" + }); } if !logs_array.is_empty() { diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index f977b58f0177c..c433e9dd27a78 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, convert::TryFrom, time::SystemTime}; +use std::{collections::HashMap, convert::TryFrom, num::NonZeroU32, time::SystemTime}; use chrono::{DateTime, Utc}; use futures::{future::ready, stream}; @@ -211,7 +211,7 @@ fn generate_metric_api_model() { MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); let metrics = model.0[0] .get("metrics") - .expect("Logs data store not present"); + .expect("Metric data store not present"); assert_eq!(metrics.len(), 1); assert!(metrics[0].get("name").is_some()); @@ -235,7 +235,7 @@ fn generate_metric_api_model() { MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); let metrics = model.0[0] .get("metrics") - .expect("Logs data store not present"); + .expect("Metric data store not present"); assert_eq!(metrics.len(), 1); assert!(metrics[0].get("name").is_some()); @@ -246,4 +246,31 @@ fn generate_metric_api_model() { assert!(metrics[0].get("value").is_some()); assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0)); assert!(metrics[0].get("timestamp").is_some()); + + // Incremental counter + let m = Metric::new( + "my_metric", + MetricKind::Incremental, + MetricValue::Counter { value: 100.0 }, + ) + .with_timestamp(Some(DateTime::::from(SystemTime::now()))) + .with_interval_ms(NonZeroU32::new(1000)); + let event = Event::Metric(m); + let model = + MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); + let metrics = model.0[0] + .get("metrics") + .expect("Metric data store not present"); + + assert_eq!(metrics.len(), 1); + assert!(metrics[0].get("name").is_some()); + assert_eq!( + metrics[0].get("name").unwrap().to_string_lossy(), + "my_metric".to_owned() + ); + assert!(metrics[0].get("value").is_some()); + assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0)); + assert!(metrics[0].get("timestamp").is_some()); + assert!(metrics[0].get("interval.ms").is_some()); + assert_eq!(metrics[0].get("interval.ms").unwrap(), &Value::from(1000)); }