Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(generic-metrics): Add Base64 decoding to Snuba processors #5761

Merged
merged 26 commits into from
Apr 25, 2024

Conversation

ayirr7
Copy link
Member

@ayirr7 ayirr7 commented Apr 11, 2024

Adds BASE64 decoding support to generic metrics processors.

TODO:

@ayirr7 ayirr7 changed the title wip: add encoding to processors wip: add base64 decoding to processors Apr 12, 2024
}

impl<T> EncodedSeries<T> {
impl<T: Decodable> EncodedSeries<T> {
fn into_vec(self) -> Vec<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term I think we should make this fallible and return Result<Vec<T>, SomeError>, the decoding has a few possible failures, e.g. a bad encoding (base64/zstd) or an invalid length of the payload (length % T::size != 0).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I added some error handling, do we think this is sufficient to cover the failure cases you outlined?

rust_snuba/src/processors/generic_metrics.rs Outdated Show resolved Hide resolved
rust_snuba/src/processors/generic_metrics.rs Outdated Show resolved Hide resolved
@ayirr7
Copy link
Member Author

ayirr7 commented Apr 12, 2024

[RESOLVED] This is where we got to so far. The final error we ran into:

error[E0207]: the const parameter `COUNT` is not constrained by the impl trait, self type, or predicates
   --> src/processors/generic_metrics.rs:124:6
    |
124 | impl<const COUNT: usize, T: Decodable<COUNT>> EncodedSeries<T> {
    |      ^^^^^^^^^^^^^^^^^^ unconstrained const parameter
    |
    = note: expressions using a const parameter must map each value to a distinct output value
    = note: proving the result of expressions other than the parameter are unique is not supported

For more information about this error, try `rustc --explain E0207`.
error: could not compile `rust_snuba` (lib) due to previous error

@ayirr7
Copy link
Member Author

ayirr7 commented Apr 15, 2024

Ok, we fixed that particular compilation issue.

Next, error handling and then local testing.

We need to make sure that the error being propagated would lead any problematic message to get DLQ'd

@ayirr7 ayirr7 changed the title wip: add base64 decoding to processors feat(generic-metrics): Add Base64 decoding to Snuba processors Apr 16, 2024
@@ -76,7 +77,7 @@ enum MetricValue {
#[serde(rename = "c")]
Counter(f64),
#[serde(rename = "s", deserialize_with = "encoded_series_compat_deserializer")]
Set(EncodedSeries<u64>),
Set(EncodedSeries<u32>),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we're changing this is because Relay actually sends the sets as buckets of u32 and not u64. Clickhouse expects u64, but we can test this out locally to make sure it's compatible and we can write data.

@ayirr7 ayirr7 marked this pull request as ready for review April 23, 2024 02:08
@ayirr7 ayirr7 requested a review from a team as a code owner April 23, 2024 02:08
@ayirr7 ayirr7 requested a review from a team April 23, 2024 02:08
rust_snuba/src/processors/generic_metrics.rs Outdated Show resolved Hide resolved
rust_snuba/src/processors/generic_metrics.rs Outdated Show resolved Hide resolved
rust_snuba/src/processors/generic_metrics.rs Outdated Show resolved Hide resolved
.map(TryInto::try_into)
.map(Result::unwrap)
.map(T::decode_bytes)
.collect(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a question of personal preference, but I would find a for-loop more readable in this case:

let decoded = BASE64.decode(data.as_bytes())?;
let mut res = vec![];
for chunk in decoded.chunks_exact(T::SIZE) {
    res.push(T::decode_bytes(chunk.try_into()?));
}
res

Copy link
Member

@john-z-yang john-z-yang Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I actually found the iterators to be more idiomatic, but I don't have a strong preference. @ayirr7 Thoughts on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong preferences but I found iterator to be fine

Comment on lines +100 to +102
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
Copy link
Member

@john-z-yang john-z-yang Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly unrelated question to this pr, but I was hoping @jjbayer or @Dav1dde can answer this. Why does from_le_bytes not take a slice but requires the full ownership of the [u8]? It seems like with the try_into on line 136 we are effectively performing a memcpy of the entire bytes array.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The few bytes that make up an integer or a floating point number need to be copied out of the slice at some point anyway. As for try_into itself, I do hope that the compiler optimizes out any unnecessary copies. You can check https://godbolt.org/ to be sure.

There still might be a possible optimization where we reinterpret the entire array of bytes as an array of numbers without copying anything (relying on the assumption that the platform represents numbers in little endian notation). But I wouldn't go there in the first iteration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There still might be a possible optimization where we reinterpret the entire array of bytes as an array of numbers without copying anything (relying on the assumption that the platform represents numbers in little endian notation). But I wouldn't go there in the first iteration.

Would that be with std::mem::transmute?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for example

unsafe {
    std::mem::transmute::<&[u8], &[u32]>(&raw_bytes)
}

@john-z-yang
Copy link
Member

john-z-yang commented Apr 24, 2024

Tested this locally with sentry's send_metrics.py

u32 decode

Sending the metrics

Details
python bin/send_metrics.py --metric-types=s --b64-encode=true
1 / 6
{'name': 's:spans/error@none',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_spans_set_k_3STY9DJU': 'metric_e2e_spans_set_v_3STY9DJU',
          'session.status': 'errored'},
 'timestamp': 1713980352,
 'type': 's',
 'value': {'data': 'YAUAAI8DAACrBgAAqAYAAKICAADhBQAABwYAAKcCAAA=',
           'format': 'base64'}}
Done

2 / 6
{'name': 's:escalating_issues/error@none',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_escalating_issues_set_k_3STY9DJU': 'metric_e2e_escalating_issues_set_v_3STY9DJU',
          'session.status': 'errored'},
 'timestamp': 1713980352,
 'type': 's',
 'value': {'data': 'SgAAABcCAADnBQAAiwQAAJwHAAAUAgAAwwAAAGMBAAA=',
           'format': 'base64'}}
Done

3 / 6
{'name': 's:metric_stats/error@none',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_metric_stats_set_k_3STY9DJU': 'metric_e2e_metric_stats_set_v_3STY9DJU',
          'session.status': 'errored'},
 'timestamp': 1713980352,
 'type': 's',
 'value': {'data': 'tgYAACACAADlAwAAjQUAAJQBAAC0AAAAWQYAAH0CAAA=',
           'format': 'base64'}}
Done

4 / 6
{'name': 's:custom/error@none',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_custom_set_k_3STY9DJU': 'metric_e2e_custom_set_v_3STY9DJU',
          'session.status': 'errored'},
 'timestamp': 1713980352,
 'type': 's',
 'value': {'data': '0gEAAJUGAADlAAAAIwEAADYCAAAQAAAAqAIAANcDAAA=',
           'format': 'base64'}}
Done

5 / 6
{'name': 's:transactions/error@none',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_transactions_set_k_3STY9DJU': 'metric_e2e_transactions_set_v_3STY9DJU',
          'session.status': 'errored'},
 'timestamp': 1713980352,
 'type': 's',
 'value': {'data': 'SgUAANsEAAD6AAAASwQAACwDAACVBwAA8AAAACUBAAA=',
           'format': 'base64'}}
Done

6 / 6
{'name': 's:profiles/error@none',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_profiles_set_k_3STY9DJU': 'metric_e2e_profiles_set_v_3STY9DJU',
          'session.status': 'errored'},
 'timestamp': 1713980352,
 'type': 's',
 'value': {'data': 'LAEAAJIAAABEAgAAgAYAAN4HAACZBAAAXwYAAFICAAA=',
           'format': 'base64'}}
Done

Use the following SQL to verify postgres, there should be 3 strings for each use cases, 18 in total.

    SELECT string,
       organization_id,
       use_case_id,
       date_added,
       last_seen
    FROM sentry_perfstringindexer
    WHERE string ~ 'metric_e2e_.*3STY9DJU';

Use the following SQL to verify clickhouse, there should be 1 metrics for each use cases, 6 in total.
    # --snip--
    SELECT use_case_id,
        org_id,
        project_id,
        metric_id,
        timestamp,
        tags.key,
        tags.raw_value
    FROM generic_metric_sets_raw_local
    WHERE arrayExists(v -> match(v, 'metric_e2e_.*3STY9DJU'), tags.raw_value)
    # --snip--

Clickhouse

Clickhouse
SELECT
    set_values,
    use_case_id,
    timestamp,
    tags.key,
    tags.raw_value
FROM generic_metric_sets_raw_local
WHERE arrayExists(v -> match(v, 'metric_e2e_.*3STY9DJU'), tags.raw_value)

Query id: 93baacb5-6383-42f9-9cf5-21947d6b457c

┌─set_values─────────────────────────────┬─use_case_id───────┬───────────timestamp─┬─tags.key────────────────────────────────────────┬─tags.raw_value─────────────────────────────────────────────────────────┐
│ [466,1685,229,291,566,16,680,983]      │ custom            │ 2024-04-24 17:39:12 │ [65593,9223372036854776010,9223372036854776017] │ ['metric_e2e_custom_set_v_3STY9DJU','production','errored']            │
│ [74,535,1511,1163,1948,532,195,355]    │ escalating_issues │ 2024-04-24 17:39:12 │ [65591,9223372036854776010,9223372036854776017] │ ['metric_e2e_escalating_issues_set_v_3STY9DJU','production','errored'] │
│ [1718,544,997,1421,404,180,1625,637]   │ metric_stats      │ 2024-04-24 17:39:12 │ [65592,9223372036854776010,9223372036854776017] │ ['metric_e2e_metric_stats_set_v_3STY9DJU','production','errored']      │
│ [1376,911,1707,1704,674,1505,1543,679] │ spans             │ 2024-04-24 17:39:12 │ [65590,9223372036854776010,9223372036854776017] │ ['metric_e2e_spans_set_v_3STY9DJU','production','errored']             │
└────────────────────────────────────────┴───────────────────┴─────────────────────┴─────────────────────────────────────────────────┴────────────────────────────────────────────────────────────────────────┘
┌─set_values────────────────────────────┬─use_case_id──┬───────────timestamp─┬─tags.key────────────────────────────────────────┬─tags.raw_value────────────────────────────────────────────────────┐
│ [300,146,580,1664,2014,1177,1631,594] │ profiles     │ 2024-04-24 17:39:12 │ [65595,9223372036854776010,9223372036854776017] │ ['metric_e2e_profiles_set_v_3STY9DJU','production','errored']     │
│ [1354,1243,250,1099,812,1941,240,293] │ transactions │ 2024-04-24 17:39:12 │ [65594,9223372036854776010,9223372036854776017] │ ['metric_e2e_transactions_set_v_3STY9DJU','production','errored'] │
└───────────────────────────────────────┴──────────────┴─────────────────────┴─────────────────────────────────────────────────┴───────────────────────────────────────────────────────────────────┘

6 rows in set. Elapsed: 0.017 sec.

f64 decode

Sending the metrics

Details
python bin/send_metrics.py --metric-types=d --b64-encode=true
1 / 6
{'name': 'd:metric_stats/duration@second',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_metric_stats_dist_k_DV9FF27E': 'metric_e2e_metric_stats_dist_v_DV9FF27E',
          'session.status': 'healthy'},
 'timestamp': 1713980935,
 'type': 'd',
 'value': {'data': 'MsdscRbK7D8YcNui6B69P0yOdmt6LdU/jt3Wf1Gt3z+ettO0HQ3aP26kr9aUTOA/xo4+QQAV2z9tpixrvZDmPw==',
           'format': 'base64'}}
Done

2 / 6
{'name': 'd:spans/duration@second',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_spans_dist_k_DV9FF27E': 'metric_e2e_spans_dist_v_DV9FF27E',
          'session.status': 'healthy'},
 'timestamp': 1713980935,
 'type': 'd',
 'value': {'data': 'mNUy8ppA3D8SkotH3vHnPy8x5yUwLew/aEYaoDyTwj8sKPuOVu7qP5BS23cmTcs/gFQgX1Ddjz9gkhiyimLpPw==',
           'format': 'base64'}}
Done

3 / 6
{'name': 'd:custom/duration@second',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_custom_dist_k_DV9FF27E': 'metric_e2e_custom_dist_v_DV9FF27E',
          'session.status': 'healthy'},
 'timestamp': 1713980935,
 'type': 'd',
 'value': {'data': 'Bh6i1WtX3j/gcchSgdzdP//Oezkheus/DpF9eSlQ4j+ucAQxDtbgP1WZMRpS3+8/+AfYj/nqtT+qoe1dITbkPw==',
           'format': 'base64'}}
Done

4 / 6
{'name': 'd:escalating_issues/duration@second',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_escalating_issues_dist_k_DV9FF27E': 'metric_e2e_escalating_issues_dist_v_DV9FF27E',
          'session.status': 'healthy'},
 'timestamp': 1713980935,
 'type': 'd',
 'value': {'data': '+FdHYdI7yj8y9EyEWBTQP1/ppFd9ue0/jEnBXMnF2z/gQVQQkb6TP4CHiHKPPLw/y4cPxxd/6T94k6ihOB22Pw==',
           'format': 'base64'}}
Done

5 / 6
{'name': 'd:profiles/duration@second',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_profiles_dist_k_DV9FF27E': 'metric_e2e_profiles_dist_v_DV9FF27E',
          'session.status': 'healthy'},
 'timestamp': 1713980935,
 'type': 'd',
 'value': {'data': 'vMK7UsUG6D8ACtGxQ0LuP1Y+P+kXftw/bljV9dLU2z+MQl8U4czAPxbv50rrF9c/wN2JWZj42j89Bid4AajrPw==',
           'format': 'base64'}}
Done

6 / 6
{'name': 'd:transactions/duration@second',
 'org_id': 1,
 'project_id': 3,
 'retention_days': 90,
 'tags': {'environment': 'production',
          'metric_e2e_transactions_dist_k_DV9FF27E': 'metric_e2e_transactions_dist_v_DV9FF27E',
          'session.status': 'healthy'},
 'timestamp': 1713980935,
 'type': 'd',
 'value': {'data': 'PkqnHPlN4z/JKrIxd+LgP+F1MkUMbu4/OiuDmxRB4j8C6T0Cgd7ZPwPDI6YE2Og/jp9ORcBv1T8IpIHRd/HBPw==',
           'format': 'base64'}}
Done

Use the following SQL to verify postgres, there should be 3 strings for each use cases, 18 in total.

    SELECT string,
       organization_id,
       use_case_id,
       date_added,
       last_seen
    FROM sentry_perfstringindexer
    WHERE string ~ 'metric_e2e_.*DV9FF27E';

Use the following SQL to verify clickhouse, there should be 1 metrics for each use cases, 6 in total.

    # --snip--
    SELECT use_case_id,
        org_id,
        project_id,
        metric_id,
        timestamp,
        tags.key,
        tags.raw_value
    FROM generic_metric_distributions_raw_local
    WHERE arrayExists(v -> match(v, 'metric_e2e_.*DV9FF27E'), tags.raw_value)
    # --snip--

Clickhouse

Clickhouse
SELECT
    use_case_id,
    distribution_values,
    use_case_id,
    timestamp,
    tags.key,
    tags.raw_value
FROM generic_metric_distributions_raw_local
WHERE arrayExists(v -> match(v, 'metric_e2e_.*DV9FF27E'), tags.raw_value)

Query id: 951e2f21-c0bc-4ef2-bd2a-ffb50b052c71

┌─use_case_id──┬─distribution_values─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─use_case_id──┬───────────timestamp─┬─tags.key────────────────────────────────────────┬─tags.raw_value─────────────────────────────────────────────────────┐
│ profiles     │ [0.750826512898207,0.9455889199237504,0.44519613054305773,0.43486474997678826,0.1312524175141515,0.3608349067214801,0.42142304176328693,0.8642585131385158] │ profiles     │ 2024-04-24 17:48:55 │ [65604,9223372036854776010,9223372036854776017] │ ['metric_e2e_profiles_dist_v_DV9FF27E','production','healthy']     │
│ transactions │ [0.6032681998861518,0.527644726812775,0.9509335853105619,0.5704443967200412,0.4042055627720772,0.7763694043079287,0.3349457432582249,0.14018151979260574]   │ transactions │ 2024-04-24 17:48:55 │ [65607,9223372036854776010,9223372036854776017] │ ['metric_e2e_transactions_dist_v_DV9FF27E','production','healthy'] │
└──────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────┴─────────────────────┴─────────────────────────────────────────────────┴────────────────────────────────────────────────────────────────────┘
┌─use_case_id───────┬─distribution_values────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─use_case_id───────┬───────────timestamp─┬─tags.key────────────────────────────────────────┬─tags.raw_value──────────────────────────────────────────────────────────┐
│ custom            │ [0.4740857683432015,0.46658356746936924,0.8586584208228202,0.5722854016171668,0.5261298138971335,0.9960108291241264,0.08561668168843528,0.6316077074562163]    │ custom            │ 2024-04-24 17:48:55 │ [65601,9223372036854776010,9223372036854776017] │ ['metric_e2e_custom_dist_v_DV9FF27E','production','healthy']            │
│ escalating_issues │ [0.20495061635909884,0.2512418071744974,0.9288927757072932,0.43394693430050135,0.01928164159932233,0.11029907747481893,0.7967642677215222,0.08638338038197257] │ escalating_issues │ 2024-04-24 17:48:55 │ [65602,9223372036854776010,9223372036854776017] │ ['metric_e2e_escalating_issues_dist_v_DV9FF27E','production','healthy'] │
│ metric_stats      │ [0.8996689048128472,0.11375287986760474,0.33090076918071953,0.49495351299580015,0.40705053957831605,0.509348315533442,0.42315679904480097,0.7051684468508036]  │ metric_stats      │ 2024-04-24 17:48:55 │ [65597,9223372036854776010,9223372036854776017] │ ['metric_e2e_metric_stats_dist_v_DV9FF27E','production','healthy']      │
│ spans             │ [0.441443192026179,0.7482749364249364,0.8805161228461084,0.145118311095348,0.8415940086936238,0.21329193928126644,0.015558841612838714,0.7932790258824518]     │ spans             │ 2024-04-24 17:48:55 │ [65598,9223372036854776010,9223372036854776017] │ ['metric_e2e_spans_dist_v_DV9FF27E','production','healthy']             │
└───────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴───────────────────┴─────────────────────┴─────────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────┘

6 rows in set. Elapsed: 0.009 sec.

Comment on lines +100 to +102
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for example

unsafe {
    std::mem::transmute::<&[u8], &[u32]>(&raw_bytes)
}

rust_snuba/src/processors/generic_metrics.rs Outdated Show resolved Hide resolved
.collect())
} else {
Err(anyhow!(
"Decoded Base64 cannot be chunked into {}, but got {}",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit on the error message here. Should it be something like "Decoded Base64 cannot be chunked into {}, got {}"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@@ -50,11 +50,13 @@ json-schema-diff = "0.1.7"
serde_path_to_error = "0.1.15"
hyper = "1.2.0"
tokio-stream = "0.1.15"
data-encoding = "2.5.0"


[patch.crates-io]
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka" }

Copy link
Member Author

@ayirr7 ayirr7 Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want to also bump the Kafka schema in this file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did it

@ayirr7 ayirr7 merged commit 07efd32 into master Apr 25, 2024
29 checks passed
@ayirr7 ayirr7 deleted the compression-rust branch April 25, 2024 23:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants