Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(generic-metrics): Add Base64 decoding to Snuba processors #5761

Merged
merged 26 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ json-schema-diff = "0.1.7"
serde_path_to_error = "0.1.15"
hyper = "1.2.0"
tokio-stream = "0.1.15"
data-encoding = "2.5.0"


[patch.crates-io]
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka" }

Copy link
Member Author

@ayirr7 ayirr7 Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want to also bump the Kafka schema in this file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did it


[dev-dependencies]
criterion = "0.5.1"
httpmock = "0.7.0"
Expand Down
246 changes: 237 additions & 9 deletions rust_snuba/src/processors/generic_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use adler::Adler32;
use anyhow::{Context, Error};
use anyhow::{anyhow, Context, Error};
use chrono::DateTime;
use serde::{
de::value::{MapAccessDeserializer, SeqAccessDeserializer},
Expand All @@ -15,6 +15,7 @@ use crate::{

use rust_arroyo::backends::kafka::types::{Headers, KafkaPayload};
use rust_arroyo::{counter, timer};
static BASE64: data_encoding::Encoding = data_encoding::BASE64;

use super::utils::enforce_retention;

Expand Down Expand Up @@ -76,7 +77,7 @@ enum MetricValue {
#[serde(rename = "c")]
Counter(f64),
#[serde(rename = "s", deserialize_with = "encoded_series_compat_deserializer")]
Set(EncodedSeries<u64>),
Set(EncodedSeries<u32>),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we're changing this is because Relay actually sends the sets as buckets of u32 and not u64. Clickhouse expects u64, but we can test this out locally to make sure it's compatible and we can write data.

#[serde(rename = "d", deserialize_with = "encoded_series_compat_deserializer")]
Distribution(EncodedSeries<f64>),
#[serde(rename = "g")]
Expand All @@ -89,16 +90,61 @@ enum MetricValue {
},
}

trait Decodable<const SIZE: usize>: Copy {
const SIZE: usize = SIZE;

fn decode_bytes(bytes: [u8; SIZE]) -> Self;
}

impl Decodable<4> for u32 {
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
Comment on lines +100 to +102
Copy link
Member

@john-z-yang john-z-yang Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly unrelated question to this pr, but I was hoping @jjbayer or @Dav1dde can answer this. Why does from_le_bytes not take a slice but requires the full ownership of the [u8]? It seems like with the try_into on line 136 we are effectively performing a memcpy of the entire bytes array.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The few bytes that make up an integer or a floating point number need to be copied out of the slice at some point anyway. As for try_into itself, I do hope that the compiler optimizes out any unnecessary copies. You can check https://godbolt.org/ to be sure.

There still might be a possible optimization where we reinterpret the entire array of bytes as an array of numbers without copying anything (relying on the assumption that the platform represents numbers in little endian notation). But I wouldn't go there in the first iteration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There still might be a possible optimization where we reinterpret the entire array of bytes as an array of numbers without copying anything (relying on the assumption that the platform represents numbers in little endian notation). But I wouldn't go there in the first iteration.

Would that be with std::mem::transmute?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for example

unsafe {
    std::mem::transmute::<&[u8], &[u32]>(&raw_bytes)
}

}

impl Decodable<8> for u64 {
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
}

impl Decodable<8> for f64 {
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
}

#[derive(Debug, Deserialize)]
#[serde(tag = "format", rename_all = "lowercase")]
enum EncodedSeries<T> {
Array { data: Vec<T> },
Base64 { data: String },
}

impl<T> EncodedSeries<T> {
fn into_vec(self) -> Vec<T> {
fn try_into_vec<const SIZE: usize>(self) -> Result<Vec<T>, anyhow::Error>
where
T: Decodable<SIZE>,
{
match self {
EncodedSeries::Array { data } => data,
EncodedSeries::Array { data } => Ok(data),
EncodedSeries::Base64 { data, .. } => {
let decoded_bytes = BASE64.decode(data.as_bytes())?;
if decoded_bytes.len() % T::SIZE == 0 {
Ok(decoded_bytes
.chunks_exact(T::SIZE)
.map(TryInto::try_into)
.map(Result::unwrap) // OK to unwrap, `chunks_exact` always yields slices of the right length
.map(T::decode_bytes)
.collect())
} else {
Err(anyhow!(format!(
"Decoded Base64 cannot be chunked into {}, but got {}",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit on the error message here. Should it be something like "Decoded Base64 cannot be chunked into {}, got {}"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

T::SIZE,
decoded_bytes.len()
)))
}
}
}
}
}
Expand Down Expand Up @@ -363,19 +409,21 @@ pub fn process_counter_message(
struct SetsRawRow {
#[serde(flatten)]
common_fields: CommonMetricFields,
set_values: Vec<u64>,
set_values: Vec<u32>,
}

impl Parse for SetsRawRow {
fn parse(
from: FromGenericMetricsMessage,
config: &ProcessorConfig,
) -> anyhow::Result<Option<SetsRawRow>> {
let set_values = match from.value {
MetricValue::Set(values) => values.into_vec(),
let maybe_set = match from.value {
MetricValue::Set(values) => values.try_into_vec(),
_ => return Ok(Option::None),
};

let set_values = maybe_set?;
john-z-yang marked this conversation as resolved.
Show resolved Hide resolved

timer!(
"generic_metrics.messages.sets_value_len",
set_values.len() as u64
Expand Down Expand Up @@ -452,11 +500,13 @@ impl Parse for DistributionsRawRow {
from: FromGenericMetricsMessage,
config: &ProcessorConfig,
) -> anyhow::Result<Option<DistributionsRawRow>> {
let distribution_values = match from.value {
MetricValue::Distribution(value) => value.into_vec(),
let maybe_dist = match from.value {
MetricValue::Distribution(value) => value.try_into_vec(),
_ => return Ok(Option::None),
};

let distribution_values = maybe_dist?;

timer!(
"generic_metrics.messages.dists_value_len",
distribution_values.len() as u64
Expand Down Expand Up @@ -757,6 +807,184 @@ mod tests {
"aggregation_option": "ten_second"
}"#;

const DUMMY_BASE64_ENCODED_DISTRIBUTION_MESSAGE: &str = r#"{
"version": 2,
"use_case_id": "spans",
"org_id": 1,
"project_id": 3,
"metric_id": 65563,
"timestamp": 1704614940,
"sentry_received_timestamp": 1704614940,
"tags": {"9223372036854776010":"production","9223372036854776017":"healthy","65690":"metric_e2e_spans_dist_v_VUW93LMS"},
"retention_days": 90,
"mapping_meta":{"d":{"65560":"d:spans/duration@second"},"h":{"9223372036854776017":"session.status","9223372036854776010":"environment"},"f":{"65691":"metric_e2e_spans_dist_k_VUW93LMS"}},
"type": "d",
"value": {"format": "base64", "data": "AAAAAAAACEAAAAAAAADwPwAAAAAAAABA"}
}"#;

const DUMMY_BASE64_ENCODED_SET_MESSAGE: &str = r#"{
"version": 2,
"use_case_id": "spans",
"org_id": 1,
"project_id": 3,
"metric_id": 65563,
"timestamp": 1704614940,
"sentry_received_timestamp": 1704614940,
"tags": {"9223372036854776010":"production","9223372036854776017":"healthy","65690":"metric_e2e_spans_dist_v_VUW93LMS"},
"retention_days": 90,
"mapping_meta":{"d":{"65560":"s:spans/duration@second"},"h":{"9223372036854776017":"session.status","9223372036854776010":"environment"},"f":{"65691":"metric_e2e_spans_set_k_VUW93LMS"}},
"type": "s",
"value": {"format": "base64", "data": "AQAAAAcAAAA="}
}"#;

#[test]
fn test_base64_decode_f64() {
assert!(
EncodedSeries::<f64>::Base64 {
data: "AAAAAAAACEAAAAAAAADwPwAAAAAAAABA".to_string(),
}
.try_into_vec()
.ok()
.unwrap()
== vec![3f64, 1f64, 2f64]
)
}

#[test]
fn test_distribution_processor_with_v2_distribution_message() {
let result: Result<InsertBatch, Error> = test_processor_with_payload(
&(process_distribution_message
as fn(
rust_arroyo::backends::kafka::types::KafkaPayload,
crate::types::KafkaMessageMetadata,
&crate::ProcessorConfig,
)
-> std::result::Result<crate::types::InsertBatch, anyhow::Error>),
DUMMY_BASE64_ENCODED_DISTRIBUTION_MESSAGE,
);
let expected_row = DistributionsRawRow {
common_fields: CommonMetricFields {
use_case_id: "spans".to_string(),
org_id: 1,
project_id: 3,
metric_id: 65563,
timestamp: 1704614940,
retention_days: 90,
tags_key: vec![65690, 9223372036854776010, 9223372036854776017],
tags_indexed_value: vec![0; 3],
tags_raw_value: vec![
"metric_e2e_spans_dist_v_VUW93LMS".to_string(),
"production".to_string(),
"healthy".to_string(),
],
metric_type: "distribution".to_string(),
materialization_version: 2,
timeseries_id: 1436359714,
granularities: vec![
GRANULARITY_ONE_MINUTE,
GRANULARITY_ONE_HOUR,
GRANULARITY_ONE_DAY,
],
decasecond_retention_days: None,
min_retention_days: Some(90),
hr_retention_days: None,
day_retention_days: None,
record_meta: Some(1),
},
distribution_values: vec![3f64, 1f64, 2f64],
enable_histogram: None,
};
assert_eq!(
result.unwrap(),
InsertBatch {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 675)])
})
}
);
}

#[test]
fn test_base64_decode_32() {
assert!(
EncodedSeries::<u32>::Base64 {
data: "AQAAAAcAAAA=".to_string(),
}
.try_into_vec()
.ok()
.unwrap()
== vec![1u32, 7u32]
)
}

#[test]
fn test_base64_decode_32_invalid() {
assert!(EncodedSeries::<u32>::Base64 {
data: "AQAAAAcAAA=".to_string(),
}
.try_into_vec()
.is_err())
}

#[test]
fn test_set_processor_with_v2_set_message() {
let result = test_processor_with_payload(
&(process_set_message
as fn(
rust_arroyo::backends::kafka::types::KafkaPayload,
crate::types::KafkaMessageMetadata,
&crate::ProcessorConfig,
)
-> std::result::Result<crate::types::InsertBatch, anyhow::Error>),
DUMMY_BASE64_ENCODED_SET_MESSAGE,
);
let expected_row = SetsRawRow {
common_fields: CommonMetricFields {
use_case_id: "spans".to_string(),
org_id: 1,
project_id: 3,
metric_id: 65563,
timestamp: 1704614940,
retention_days: 90,
tags_key: vec![65690, 9223372036854776010, 9223372036854776017],
tags_indexed_value: vec![0; 3],
tags_raw_value: vec![
"metric_e2e_spans_dist_v_VUW93LMS".to_string(),
"production".to_string(),
"healthy".to_string(),
],
metric_type: "set".to_string(),
materialization_version: 2,
timeseries_id: 1436359714,
granularities: vec![
GRANULARITY_ONE_MINUTE,
GRANULARITY_ONE_HOUR,
GRANULARITY_ONE_DAY,
],
decasecond_retention_days: None,
min_retention_days: Some(90),
hr_retention_days: None,
day_retention_days: None,
record_meta: Some(1),
},
set_values: vec![1u32, 7u32],
};
assert_eq!(
result.unwrap(),
InsertBatch {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 654)])
})
}
);
}

#[test]
fn test_shouldnt_killswitch() {
let fake_config = Ok(Some("[custom]".to_string()));
Expand Down
Loading