-
-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(generic-metrics): Add Base64 decoding to Snuba processors #5761
Changes from all commits
dabfdb1
5db24a3
07aba47
8259cc6
29592de
13a7274
9cdaee7
29f427c
840c04b
0f91eb9
8487feb
2714b52
25f01e3
5579123
adf931b
f9f4de3
ae769a3
8219def
9ed5861
1ae0dce
753708e
890f7d3
8bad4d3
27cda52
1c86215
4483381
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
use adler::Adler32; | ||
use anyhow::{Context, Error}; | ||
use anyhow::{anyhow, Context, Error}; | ||
use chrono::DateTime; | ||
use serde::{ | ||
de::value::{MapAccessDeserializer, SeqAccessDeserializer}, | ||
|
@@ -15,6 +15,7 @@ use crate::{ | |
|
||
use rust_arroyo::backends::kafka::types::{Headers, KafkaPayload}; | ||
use rust_arroyo::{counter, timer}; | ||
static BASE64: data_encoding::Encoding = data_encoding::BASE64; | ||
|
||
use super::utils::enforce_retention; | ||
|
||
|
@@ -76,7 +77,7 @@ enum MetricValue { | |
#[serde(rename = "c")] | ||
Counter(f64), | ||
#[serde(rename = "s", deserialize_with = "encoded_series_compat_deserializer")] | ||
Set(EncodedSeries<u64>), | ||
Set(EncodedSeries<u32>), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason we're changing this is because Relay actually sends the sets as buckets of u32 and not u64. Clickhouse expects u64, but we can test this out locally to make sure it's compatible and we can write data. |
||
#[serde(rename = "d", deserialize_with = "encoded_series_compat_deserializer")] | ||
Distribution(EncodedSeries<f64>), | ||
#[serde(rename = "g")] | ||
|
@@ -89,16 +90,61 @@ enum MetricValue { | |
}, | ||
} | ||
|
||
trait Decodable<const SIZE: usize>: Copy { | ||
const SIZE: usize = SIZE; | ||
|
||
fn decode_bytes(bytes: [u8; SIZE]) -> Self; | ||
} | ||
|
||
impl Decodable<4> for u32 { | ||
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self { | ||
Self::from_le_bytes(bytes) | ||
} | ||
Comment on lines
+100
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The few bytes that make up an integer or a floating point number need to be copied out of the slice at some point anyway. As for There still might be a possible optimization where we reinterpret the entire array of bytes as an array of numbers without copying anything (relying on the assumption that the platform represents numbers in little endian notation). But I wouldn't go there in the first iteration. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Would that be with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, for example unsafe {
std::mem::transmute::<&[u8], &[u32]>(&raw_bytes)
} |
||
} | ||
|
||
impl Decodable<8> for u64 { | ||
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self { | ||
Self::from_le_bytes(bytes) | ||
} | ||
} | ||
|
||
impl Decodable<8> for f64 { | ||
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self { | ||
Self::from_le_bytes(bytes) | ||
} | ||
} | ||
|
||
#[derive(Debug, Deserialize)] | ||
#[serde(tag = "format", rename_all = "lowercase")] | ||
enum EncodedSeries<T> { | ||
Array { data: Vec<T> }, | ||
Base64 { data: String }, | ||
} | ||
|
||
impl<T> EncodedSeries<T> { | ||
fn into_vec(self) -> Vec<T> { | ||
fn try_into_vec<const SIZE: usize>(self) -> Result<Vec<T>, anyhow::Error> | ||
where | ||
T: Decodable<SIZE>, | ||
{ | ||
match self { | ||
EncodedSeries::Array { data } => data, | ||
EncodedSeries::Array { data } => Ok(data), | ||
EncodedSeries::Base64 { data, .. } => { | ||
let decoded_bytes = BASE64.decode(data.as_bytes())?; | ||
if decoded_bytes.len() % T::SIZE == 0 { | ||
Ok(decoded_bytes | ||
.chunks_exact(T::SIZE) | ||
.map(TryInto::try_into) | ||
.map(Result::unwrap) // OK to unwrap, `chunks_exact` always yields slices of the right length | ||
.map(T::decode_bytes) | ||
.collect()) | ||
} else { | ||
Err(anyhow!( | ||
"Decoded Base64 cannot be chunked into {}, got {}", | ||
T::SIZE, | ||
decoded_bytes.len() | ||
)) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -363,7 +409,7 @@ pub fn process_counter_message( | |
struct SetsRawRow { | ||
#[serde(flatten)] | ||
common_fields: CommonMetricFields, | ||
set_values: Vec<u64>, | ||
set_values: Vec<u32>, | ||
} | ||
|
||
impl Parse for SetsRawRow { | ||
|
@@ -372,7 +418,7 @@ impl Parse for SetsRawRow { | |
config: &ProcessorConfig, | ||
) -> anyhow::Result<Option<SetsRawRow>> { | ||
let set_values = match from.value { | ||
MetricValue::Set(values) => values.into_vec(), | ||
MetricValue::Set(values) => values.try_into_vec()?, | ||
_ => return Ok(Option::None), | ||
}; | ||
|
||
|
@@ -452,11 +498,13 @@ impl Parse for DistributionsRawRow { | |
from: FromGenericMetricsMessage, | ||
config: &ProcessorConfig, | ||
) -> anyhow::Result<Option<DistributionsRawRow>> { | ||
let distribution_values = match from.value { | ||
MetricValue::Distribution(value) => value.into_vec(), | ||
let maybe_dist = match from.value { | ||
MetricValue::Distribution(value) => value.try_into_vec(), | ||
_ => return Ok(Option::None), | ||
}; | ||
|
||
let distribution_values = maybe_dist?; | ||
|
||
timer!( | ||
"generic_metrics.messages.dists_value_len", | ||
distribution_values.len() as u64 | ||
|
@@ -757,6 +805,184 @@ mod tests { | |
"aggregation_option": "ten_second" | ||
}"#; | ||
|
||
const DUMMY_BASE64_ENCODED_DISTRIBUTION_MESSAGE: &str = r#"{ | ||
"version": 2, | ||
"use_case_id": "spans", | ||
"org_id": 1, | ||
"project_id": 3, | ||
"metric_id": 65563, | ||
"timestamp": 1704614940, | ||
"sentry_received_timestamp": 1704614940, | ||
"tags": {"9223372036854776010":"production","9223372036854776017":"healthy","65690":"metric_e2e_spans_dist_v_VUW93LMS"}, | ||
"retention_days": 90, | ||
"mapping_meta":{"d":{"65560":"d:spans/duration@second"},"h":{"9223372036854776017":"session.status","9223372036854776010":"environment"},"f":{"65691":"metric_e2e_spans_dist_k_VUW93LMS"}}, | ||
"type": "d", | ||
"value": {"format": "base64", "data": "AAAAAAAACEAAAAAAAADwPwAAAAAAAABA"} | ||
}"#; | ||
|
||
const DUMMY_BASE64_ENCODED_SET_MESSAGE: &str = r#"{ | ||
"version": 2, | ||
"use_case_id": "spans", | ||
"org_id": 1, | ||
"project_id": 3, | ||
"metric_id": 65563, | ||
"timestamp": 1704614940, | ||
"sentry_received_timestamp": 1704614940, | ||
"tags": {"9223372036854776010":"production","9223372036854776017":"healthy","65690":"metric_e2e_spans_dist_v_VUW93LMS"}, | ||
"retention_days": 90, | ||
"mapping_meta":{"d":{"65560":"s:spans/duration@second"},"h":{"9223372036854776017":"session.status","9223372036854776010":"environment"},"f":{"65691":"metric_e2e_spans_set_k_VUW93LMS"}}, | ||
"type": "s", | ||
"value": {"format": "base64", "data": "AQAAAAcAAAA="} | ||
}"#; | ||
|
||
#[test] | ||
fn test_base64_decode_f64() { | ||
assert!( | ||
EncodedSeries::<f64>::Base64 { | ||
data: "AAAAAAAACEAAAAAAAADwPwAAAAAAAABA".to_string(), | ||
} | ||
.try_into_vec() | ||
.ok() | ||
.unwrap() | ||
== vec![3f64, 1f64, 2f64] | ||
) | ||
} | ||
|
||
#[test] | ||
fn test_distribution_processor_with_v2_distribution_message() { | ||
let result: Result<InsertBatch, Error> = test_processor_with_payload( | ||
&(process_distribution_message | ||
as fn( | ||
rust_arroyo::backends::kafka::types::KafkaPayload, | ||
crate::types::KafkaMessageMetadata, | ||
&crate::ProcessorConfig, | ||
) | ||
-> std::result::Result<crate::types::InsertBatch, anyhow::Error>), | ||
DUMMY_BASE64_ENCODED_DISTRIBUTION_MESSAGE, | ||
); | ||
let expected_row = DistributionsRawRow { | ||
common_fields: CommonMetricFields { | ||
use_case_id: "spans".to_string(), | ||
org_id: 1, | ||
project_id: 3, | ||
metric_id: 65563, | ||
timestamp: 1704614940, | ||
retention_days: 90, | ||
tags_key: vec![65690, 9223372036854776010, 9223372036854776017], | ||
tags_indexed_value: vec![0; 3], | ||
tags_raw_value: vec![ | ||
"metric_e2e_spans_dist_v_VUW93LMS".to_string(), | ||
"production".to_string(), | ||
"healthy".to_string(), | ||
], | ||
metric_type: "distribution".to_string(), | ||
materialization_version: 2, | ||
timeseries_id: 1436359714, | ||
granularities: vec![ | ||
GRANULARITY_ONE_MINUTE, | ||
GRANULARITY_ONE_HOUR, | ||
GRANULARITY_ONE_DAY, | ||
], | ||
decasecond_retention_days: None, | ||
min_retention_days: Some(90), | ||
hr_retention_days: None, | ||
day_retention_days: None, | ||
record_meta: Some(1), | ||
}, | ||
distribution_values: vec![3f64, 1f64, 2f64], | ||
enable_histogram: None, | ||
}; | ||
assert_eq!( | ||
result.unwrap(), | ||
InsertBatch { | ||
rows: RowData::from_rows([expected_row]).unwrap(), | ||
origin_timestamp: None, | ||
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), | ||
cogs_data: Some(CogsData { | ||
data: BTreeMap::from([("genericmetrics_spans".to_string(), 675)]) | ||
}) | ||
} | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_base64_decode_32() { | ||
assert!( | ||
EncodedSeries::<u32>::Base64 { | ||
data: "AQAAAAcAAAA=".to_string(), | ||
} | ||
.try_into_vec() | ||
.ok() | ||
.unwrap() | ||
== vec![1u32, 7u32] | ||
) | ||
} | ||
|
||
#[test] | ||
fn test_base64_decode_32_invalid() { | ||
assert!(EncodedSeries::<u32>::Base64 { | ||
data: "AQAAAAcAAA=".to_string(), | ||
} | ||
.try_into_vec() | ||
.is_err()) | ||
} | ||
|
||
#[test] | ||
fn test_set_processor_with_v2_set_message() { | ||
let result = test_processor_with_payload( | ||
&(process_set_message | ||
as fn( | ||
rust_arroyo::backends::kafka::types::KafkaPayload, | ||
crate::types::KafkaMessageMetadata, | ||
&crate::ProcessorConfig, | ||
) | ||
-> std::result::Result<crate::types::InsertBatch, anyhow::Error>), | ||
DUMMY_BASE64_ENCODED_SET_MESSAGE, | ||
); | ||
let expected_row = SetsRawRow { | ||
common_fields: CommonMetricFields { | ||
use_case_id: "spans".to_string(), | ||
org_id: 1, | ||
project_id: 3, | ||
metric_id: 65563, | ||
timestamp: 1704614940, | ||
retention_days: 90, | ||
tags_key: vec![65690, 9223372036854776010, 9223372036854776017], | ||
tags_indexed_value: vec![0; 3], | ||
tags_raw_value: vec![ | ||
"metric_e2e_spans_dist_v_VUW93LMS".to_string(), | ||
"production".to_string(), | ||
"healthy".to_string(), | ||
], | ||
metric_type: "set".to_string(), | ||
materialization_version: 2, | ||
timeseries_id: 1436359714, | ||
granularities: vec![ | ||
GRANULARITY_ONE_MINUTE, | ||
GRANULARITY_ONE_HOUR, | ||
GRANULARITY_ONE_DAY, | ||
], | ||
decasecond_retention_days: None, | ||
min_retention_days: Some(90), | ||
hr_retention_days: None, | ||
day_retention_days: None, | ||
record_meta: Some(1), | ||
}, | ||
set_values: vec![1u32, 7u32], | ||
}; | ||
assert_eq!( | ||
result.unwrap(), | ||
InsertBatch { | ||
rows: RowData::from_rows([expected_row]).unwrap(), | ||
origin_timestamp: None, | ||
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), | ||
cogs_data: Some(CogsData { | ||
data: BTreeMap::from([("genericmetrics_spans".to_string(), 654)]) | ||
}) | ||
} | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_shouldnt_killswitch() { | ||
let fake_config = Ok(Some("[custom]".to_string())); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
--- | ||
source: src/processors/mod.rs | ||
description: "{\n \"version\": 2,\n \"use_case_id\": \"spans\",\n \"org_id\": 1,\n \"project_id\": 3,\n \"metric_id\": 65563,\n \"timestamp\": 1704614940,\n \"sentry_received_timestamp\": 1704614940,\n \"tags\": {\n \"9223372036854776010\": \"production\",\n \"9223372036854776017\": \"healthy\",\n \"65690\": \"metric_e2e_spans_dist_v_VUW93LMS\"\n },\n \"retention_days\": 90,\n \"mapping_meta\": {\n \"d\": {\n \"65560\": \"d:spans/duration@second\"\n },\n \"h\": {\n \"9223372036854776017\": \"session.status\",\n \"9223372036854776010\": \"environment\"\n },\n \"f\": {\n \"65691\": \"metric_e2e_spans_dist_k_VUW93LMS\"\n }\n },\n \"type\": \"d\",\n \"value\": {\n \"format\": \"base64\",\n \"data\": \"AAAAAAAACEAAAAAAAADwPwAAAAAAAABA\"\n }\n}\n" | ||
expression: snapshot_payload | ||
--- | ||
[] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
--- | ||
source: src/processors/mod.rs | ||
description: "{\n \"version\": 2,\n \"use_case_id\": \"spans\",\n \"org_id\": 1,\n \"project_id\": 3,\n \"metric_id\": 65562,\n \"timestamp\": 1704614940,\n \"sentry_received_timestamp\": 1704614940,\n \"tags\": {\n \"9223372036854776010\": \"production\",\n \"9223372036854776017\": \"errored\",\n \"65690\": \"metric_e2e_spans_set_v_VUW93LMS\"\n },\n \"retention_days\": 90,\n \"mapping_meta\": {\n \"h\": {\n \"9223372036854776017\": \"session.status\",\n \"9223372036854776010\": \"environment\"\n },\n \"f\": {\n \"65690\": \"metric_e2e_spans_set_k_VUW93LMS\"\n },\n \"d\": {\n \"65562\": \"s:spans/error@none\"\n }\n },\n \"type\": \"s\",\n \"value\": {\n \"format\": \"base64\",\n \"data\": \"AQAAAAcAAAA=\"\n }\n}\n" | ||
expression: snapshot_payload | ||
--- | ||
[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we want to also bump the Kafka schema in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did it