Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 62 additions & 51 deletions src/sinks/new_relic/model.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::{collections::HashMap, convert::TryFrom, fmt::Debug, time::SystemTime};
use std::{
collections::{BTreeMap, HashMap},
convert::TryFrom,
fmt::Debug,
time::SystemTime,
};

use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use serde::{Deserialize, Serialize};

use super::NewRelicSinkError;
use crate::event::{Event, MetricValue, Value};
use crate::event::{Event, MetricKind, MetricValue, Value};

#[derive(Debug)]
pub enum NewRelicApiModel {
Expand All @@ -21,30 +26,9 @@ type DataStore = HashMap<String, Vec<KeyValData>>;
pub struct MetricsApiModel(pub Vec<DataStore>);

impl MetricsApiModel {
pub fn new(metric_array: Vec<(Value, Value, Value)>) -> Self {
let mut metric_data_array = vec![];
for (m_name, m_value, m_timestamp) in metric_array {
let mut metric_data = KeyValData::new();
metric_data.insert("name".to_owned(), m_name);
metric_data.insert("value".to_owned(), m_value);
match m_timestamp {
Value::Timestamp(ts) => {
metric_data.insert("timestamp".to_owned(), Value::from(ts.timestamp()));
}
Value::Integer(i) => {
metric_data.insert("timestamp".to_owned(), Value::from(i));
}
_ => {
metric_data.insert(
"timestamp".to_owned(),
Value::from(DateTime::<Utc>::from(SystemTime::now()).timestamp()),
);
}
}
metric_data_array.push(metric_data);
}
pub fn new(metric_array: Vec<KeyValData>) -> Self {
let mut metric_store = DataStore::new();
metric_store.insert("metrics".to_owned(), metric_data_array);
metric_store.insert("metrics".to_owned(), metric_array);
Self(vec![metric_store])
}
}
Expand All @@ -57,39 +41,66 @@ impl TryFrom<Vec<Event>> for MetricsApiModel {

for buf_event in buf_events {
if let Event::Metric(metric) = buf_event {
// Future improvement: put metric type. If type = count, NR metric model requires an interval.ms field, that is not provided by the Vector Metric model.
match metric.value() {
MetricValue::Gauge { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
}
MetricValue::Counter { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
// Generate Value::Object() from BTreeMap<String, String>
let (series, data, _) = metric.into_parts();
let attr = series.tags.map(|tags| {
Value::from(
tags.into_iter()
.map(|(key, value)| (key, Value::from(value)))
.collect::<BTreeMap<_, _>>(),
)
});

// We only handle gauge and counter metrics
if let MetricValue::Gauge { value } | MetricValue::Counter { value } = data.value {
let mut metric_data = KeyValData::new();
// Set name, value, and timestamp
metric_data.insert("name".to_owned(), Value::from(series.name.name));
metric_data.insert(
"value".to_owned(),
Value::from(
NotNan::new(value)
.map_err(|_| NewRelicSinkError::new("NaN value not supported"))?,
),
);
metric_data.insert(
"timestamp".to_owned(),
Value::from(
data.time
.timestamp
.unwrap_or_else(|| DateTime::<Utc>::from(SystemTime::now()))
.timestamp(),
),
);
if let Some(attr) = attr {
metric_data.insert("attributes".to_owned(), attr);
}
_ => {
// Unrecognized metric type
// Set type and type related attributes
if let (MetricValue::Counter { .. }, MetricKind::Incremental) =
(data.value, data.kind)
{
if let Some(interval_ms) = data.time.interval_ms {
metric_data.insert(
"interval.ms".to_owned(),
Value::from(interval_ms.get() as i64),
);
} else {
// Incremental counter without an interval is worthless, skip this metric
continue;
}
metric_data.insert("type".to_owned(), Value::from("count"));
} else {
// Anything that's not an incremental counter is considered a gauge, that is gauge and absolute counters metrics.
metric_data.insert("type".to_owned(), Value::from("gauge"));
}

metric_array.push(metric_data);
}
}
}

if !metric_array.is_empty() {
Ok(MetricsApiModel::new(metric_array))
Ok(Self::new(metric_array))
} else {
Err(NewRelicSinkError::new("No valid metrics to generate"))
}
Expand Down
33 changes: 30 additions & 3 deletions src/sinks/new_relic/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, convert::TryFrom, time::SystemTime};
use std::{collections::HashMap, convert::TryFrom, num::NonZeroU32, time::SystemTime};

use chrono::{DateTime, Utc};
use futures::{future::ready, stream};
Expand Down Expand Up @@ -176,7 +176,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -200,7 +200,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -211,4 +211,31 @@ fn generate_metric_api_model() {
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());

// Incremental counter
let m = Metric::new(
"my_metric",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
.with_interval_ms(NonZeroU32::new(1000));
let event = Event::Metric(m);
let model =
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
assert_eq!(
metrics[0].get("name").unwrap().to_string_lossy(),
"my_metric".to_owned()
);
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());
assert!(metrics[0].get("interval.ms").is_some());
assert_eq!(metrics[0].get("interval.ms").unwrap(), &Value::from(1000));
}