Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion lib/codecs/src/encoding/format/native_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ impl Encoder<Event> for NativeJsonSerializer {
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use vector_core::event::{LogEvent, Value};
use vector_core::buckets;
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, Value};
use vrl::btreemap;

use super::*;
Expand Down Expand Up @@ -84,4 +85,25 @@ mod tests {

assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
}

#[test]
fn serialize_aggregated_histogram() {
let histogram_event = Event::from(Metric::new(
"histogram",
MetricKind::Absolute,
MetricValue::AggregatedHistogram {
count: 1,
sum: 1.0,
buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0),
},
));

let mut serializer = NativeJsonSerializer::new();
let mut bytes = BytesMut::new();
serializer
.encode(histogram_event.clone(), &mut bytes)
.unwrap();
let json = serializer.to_json_value(histogram_event).unwrap();
assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
}
}
86 changes: 86 additions & 0 deletions lib/codecs/tests/native_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use bytes::BytesMut;
use codecs::decoding::format::Deserializer;
use codecs::encoding::format::Serializer;
use codecs::{NativeJsonDeserializerConfig, NativeJsonSerializerConfig};
use vector_core::buckets;
use vector_core::config::LogNamespace;
use vector_core::event::{Event, Metric};
use vector_core::event::{MetricKind, MetricValue};

fn assert_roundtrip(
input_event: Event,
serializer: &mut dyn Serializer<Error = vector_common::Error>,
deserializer: &dyn Deserializer,
expected_json_value: serde_json::Value,
) {
let mut bytes_mut = BytesMut::new();
serializer
.encode(input_event.clone(), &mut bytes_mut)
.unwrap();
let bytes = bytes_mut.freeze();
let events = deserializer.parse(bytes, LogNamespace::Vector).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0], input_event);

let json_value = serde_json::to_value(input_event.as_metric()).unwrap();
assert_eq!(json_value, expected_json_value);
}

#[test]
fn histogram_metric_roundtrip() {
let histogram_event = Event::from(Metric::new(
"histogram",
MetricKind::Absolute,
MetricValue::AggregatedHistogram {
count: 1,
sum: 1.0,
buckets: buckets!(
f64::NEG_INFINITY => 10 ,
f64::MIN => 10, 1.5 => 10,
f64::MAX => 10,
f64::INFINITY => 10),
},
));

let expected_json_value = serde_json::from_str(
r#"
{
"aggregated_histogram": {
"buckets": [
{
"count": 10,
"upper_limit": "-inf"
},
{
"count": 10,
"upper_limit": -1.7976931348623157e308
},
{
"count": 10,
"upper_limit": 1.5
},
{
"count": 10,
"upper_limit": 1.7976931348623157e308
},
{
"count": 10,
"upper_limit": "inf"
}
],
"count": 1,
"sum": 1.0
},
"kind": "absolute",
"name": "histogram"
}"#,
)
.unwrap();

assert_roundtrip(
histogram_event,
&mut NativeJsonSerializerConfig.build(),
&NativeJsonDeserializerConfig::default().build(),
expected_json_value,
)
}
61 changes: 58 additions & 3 deletions lib/vector-core/src/event/metric/value.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use core::fmt;
use std::collections::BTreeSet;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

use vector_common::byte_size_of::ByteSizeOf;
use vector_config::configurable_component;

use super::{samples_to_buckets, write_list, write_word};
use crate::{float_eq, metrics::AgentDDSketch};

use super::{samples_to_buckets, write_list, write_word};

const INFINITY: &str = "inf";
const NEG_INFINITY: &str = "-inf";
const NAN: &str = "NaN";

/// Metric value.
#[configurable_component]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -597,14 +604,62 @@ impl ByteSizeOf for Sample {
}
}

/// Custom serialization function which converts special `f64` values to strings.
/// Non-special values are serialized as numbers.
#[allow(clippy::trivially_copy_pass_by_ref)]
fn serialize_f64<S>(value: &f64, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if value.is_infinite() {
serializer.serialize_str(if *value > 0.0 { INFINITY } else { NEG_INFINITY })
} else if value.is_nan() {
serializer.serialize_str(NAN)
} else {
serializer.serialize_f64(*value)
}
}

/// Custom deserialization function for handling special f64 values.
fn deserialize_f64<'de, D>(deserializer: D) -> Result<f64, D::Error>
where
D: Deserializer<'de>,
{
struct UpperLimitVisitor;

impl<'de> de::Visitor<'de> for UpperLimitVisitor {
type Value = f64;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a number or a special string value")
}

fn visit_f64<E: de::Error>(self, value: f64) -> Result<Self::Value, E> {
Ok(value)
}

fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
match value {
NAN => Ok(f64::NAN),
INFINITY => Ok(f64::INFINITY),
NEG_INFINITY => Ok(f64::NEG_INFINITY),
_ => Err(E::custom("unsupported string value")),
}
}
}

deserializer.deserialize_any(UpperLimitVisitor)
}

/// A histogram bucket.
///
/// Histogram buckets represent the `count` of observations where the value of the observations does
/// not exceed the specified `upper_limit`.
#[configurable_component]
#[derive(Clone, Copy, Debug)]
#[configurable_component(no_deser, no_ser)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// The upper limit of values in the bucket.
#[serde(serialize_with = "serialize_f64", deserialize_with = "deserialize_f64")]
pub upper_limit: f64,

/// The number of values tracked in this bucket.
Expand Down