Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions src/sinks/appsignal/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,23 @@ async fn metrics_real_endpoint() {
#[tokio::test]
async fn metrics_shape() {
let events: Vec<_> = (0..5)
.map(|index| {
Event::Metric(Metric::new(
format!("counter_{}", index),
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
.flat_map(|index| {
vec![
Event::Metric(Metric::new(
format!("counter_{}", index),
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
)),
Event::Metric(Metric::new(
format!("counter_{}", index),
MetricKind::Absolute,
MetricValue::Counter {
value: (index + index) as f64,
},
)),
]
})
.collect();
let api_key = push_api_key();
Expand Down Expand Up @@ -146,11 +155,11 @@ async fn metrics_shape() {
.collect();
assert_eq!(
vec![
("counter_0", "absolute", 0.0),
("counter_1", "absolute", 1.0),
("counter_2", "absolute", 2.0),
("counter_3", "absolute", 3.0),
("counter_4", "absolute", 4.0),
("counter_0", "incremental", 0.0),
("counter_1", "incremental", 1.0),
("counter_2", "incremental", 2.0),
("counter_3", "incremental", 3.0),
("counter_4", "incremental", 4.0),
],
metrics
);
Expand Down Expand Up @@ -231,11 +240,18 @@ async fn error_scenario_real_endpoint() {

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events = vec![Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
))];
let events = vec![
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
)),
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 2.0 },
)),
];
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

mod config;
mod encoder;
mod normalizer;
mod request_builder;
mod service;
mod sink;
Expand Down
78 changes: 78 additions & 0 deletions src/sinks/appsignal/normalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use vector_core::event::{Metric, MetricValue};

use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

#[derive(Default)]
pub(crate) struct AppsignalMetricsNormalizer;

impl MetricNormalize for AppsignalMetricsNormalizer {
fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
// We only care about making sure that counters are incremental, and that gauges are
// always absolute. Other metric types are currently unsupported.
match &metric.value() {
// We always send counters as incremental and gauges as absolute. Realistically, any
// system sending an incremental gauge update is kind of doing it wrong, but alas.
MetricValue::Counter { .. } => state.make_incremental(metric),
MetricValue::Gauge { .. } => state.make_absolute(metric),
// Otherwise, send it through as-is.
_ => Some(metric),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use crate::event::{Metric, MetricKind, MetricValue};

use super::AppsignalMetricsNormalizer;
use crate::test_util::metrics::{assert_normalize, tests};

#[test]
fn absolute_counter() {
tests::absolute_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
}

#[test]
fn incremental_counter() {
tests::incremental_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
}

#[test]
fn mixed_counter() {
tests::mixed_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
}

#[test]
fn absolute_gauge() {
tests::absolute_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
}

#[test]
fn incremental_gauge() {
tests::incremental_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
}

#[test]
fn mixed_gauge() {
tests::mixed_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
}

#[test]
fn other_metrics() {
let metric = Metric::new(
"set",
MetricKind::Incremental,
MetricValue::Set {
values: BTreeSet::new(),
},
);

assert_normalize(
AppsignalMetricsNormalizer,
vec![metric.clone()],
vec![Some(metric)],
);
}
}
15 changes: 13 additions & 2 deletions src/sinks/appsignal/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::{stream::BoxStream, StreamExt};
use futures_util::future::ready;
use tower::{Service, ServiceBuilder};
use vector_core::{
event::Event,
Expand All @@ -7,12 +8,14 @@ use vector_core::{
};

use crate::{
codecs::Transformer, internal_events::SinkRequestBuildError,
sinks::util::builder::SinkBuilderExt, sinks::util::Compression,
codecs::Transformer,
internal_events::SinkRequestBuildError,
sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression},
};

use super::{
encoder::AppsignalEncoder,
normalizer::AppsignalMetricsNormalizer,
request_builder::{AppsignalRequest, AppsignalRequestBuilder},
};

Expand All @@ -32,8 +35,16 @@ where
{
pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = ServiceBuilder::new().service(self.service);
let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();

input
.filter_map(move |event| {
ready(if let Event::Metric(metric) = event {
normalizer.normalize(metric).map(Event::Metric)
} else {
Some(event)
})
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(
None,
Expand Down
Loading