From a5cc9ae1986cf0bfb176ba3b5041181b58316cb6 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Mon, 2 Jun 2025 16:00:27 +0200 Subject: [PATCH 1/2] git: update opentelemetry-proto to v1.7.0 Signed-off-by: Florian Lehner --- opentelemetry-proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-proto b/opentelemetry-proto index 0adf6aa..8654ab7 160000 --- a/opentelemetry-proto +++ b/opentelemetry-proto @@ -1 +1 @@ -Subproject commit 0adf6aac004578b28267394514b2e55ee9cc012f +Subproject commit 8654ab7a5a43ca25fe8046e59dcd6935c3f76de0 From 3dc265e0fa2ec8a4ad03be8eebe83fa2ffb7f643 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Mon, 2 Jun 2025 16:43:23 +0200 Subject: [PATCH 2/2] OTel Profiling Proto: update to v1.7.0 Signed-off-by: Florian Lehner --- src/collector/otlp/service.rs | 61 ++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/src/collector/otlp/service.rs b/src/collector/otlp/service.rs index 063081b..06049c7 100644 --- a/src/collector/otlp/service.rs +++ b/src/collector/otlp/service.rs @@ -21,7 +21,7 @@ use crate::collector::otlp::pb::collector::profiles::v1development::{ }; use crate::collector::otlp::pb::common::v1::any_value::Value; use crate::collector::otlp::pb::common::v1::KeyValue; -use crate::collector::otlp::pb::profiles::v1development::{Location, Profile, Sample}; +use crate::collector::otlp::pb::profiles::v1development::{ProfilesDictionary, Sample, ValueType}; use crate::collector::Stats; use crate::storage::*; use chrono::Utc; @@ -51,13 +51,26 @@ impl pb_collector::profiles_service_server::ProfilesService for ProfilesService self.stats.log_request(&request); let r = request.into_inner(); + let dict = match r.dictionary.as_ref() { + Some(dictionary) => dictionary, + None => return Err(Status::invalid_argument("ProfilesDictionary is required")), + }; + let loc_mapping = ingest_locations(dict)?; + for resource_profile in r.resource_profiles { for scope_profile in resource_profile.scope_profiles { for profile in scope_profile.profiles { - let loc_mapping = ingest_locations(&profile, &profile.location_table)?; + if profile.sample_type.len() != 1 { + tracing::warn!( + "unexpected length '{}' for profile.sample_type", + profile.sample_type.len() + ); + continue; + } + let st = &profile.sample_type[0]; for sample in &profile.sample { - process_sample(&profile, &loc_mapping, sample)?; + process_sample(dict, &loc_mapping, &st, sample)?; } } } @@ -138,9 +151,10 @@ fn get_attr<'tab>( ))); } -fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result, Status> { - let stab = &profile.string_table; - let atab = &profile.attribute_table; +fn ingest_locations(dic: &ProfilesDictionary) -> Result, Status> { + let stab = &dic.string_table; + let atab = &dic.attribute_table; + let locs = &dic.location_table; let mut batch = DB.stack_frames.batched_insert(); let mut mappings = Vec::with_capacity(locs.len()); @@ -179,10 +193,7 @@ fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result, continue; } - let Some(mapping) = profile - .mapping_table - .get(loc.mapping_index.unwrap() as usize) - else { + let Some(mapping) = &dic.mapping_table.get(loc.mapping_index.unwrap() as usize) else { return Err(Status::invalid_argument("mapping index is out of bounds")); }; @@ -238,7 +249,7 @@ fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result, }; if line.function_index != 0 { - let Some(fn_ref) = profile.function_table.get(line.function_index as usize) else { + let Some(fn_ref) = &dic.function_table.get(line.function_index as usize) else { return Err(Status::invalid_argument("invalid function index")); }; @@ -265,24 +276,21 @@ fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result, } fn process_sample( - profile: &Profile, + dict: &ProfilesDictionary, loc_mapping: &Vec, + sample_type: &ValueType, sample: &Sample, ) -> Result<(), Status> { let loc_start = sample.locations_start_index as usize; let loc_len = sample.locations_length as usize; let loc_rng = loc_start..loc_start.saturating_add(loc_len); - let Some(loc_indices) = profile.location_indices.get(loc_rng.clone()) else { - return Err(Status::invalid_argument("location range is out of bounds")); - }; // Collect frame list. let mut frame_list = Vec::with_capacity(loc_rng.len().min(128)); - for loc_index in loc_indices { - let Some(frame) = loc_mapping.get(*loc_index as usize) else { + for loc_index in loc_rng { + let Some(frame) = loc_mapping.get(loc_index as usize) else { return Err(Status::invalid_argument("location index is out of bounds")); }; - frame_list.push(*frame); } @@ -302,7 +310,7 @@ fn process_sample( }; let comm = get_attr( - &profile.attribute_table, + &dict.attribute_table, sample.attribute_indices.to_vec(), "thread.name", ); @@ -323,22 +331,15 @@ fn process_sample( id: DB.generate_id(), }; - if profile.sample_type.len() != 1 { - tracing::warn!( - "unexpected length '{}' for profile.sample_type", - profile.sample_type.len() - ); - continue; - } - let stt_idx = profile.sample_type[0].type_strindex; - let stu_idx = profile.sample_type[0].unit_strindex; + let stt_idx = sample_type.type_strindex; + let stu_idx = sample_type.unit_strindex; let sample_type_type = get_str( - &profile.string_table, + &dict.string_table, stt_idx.try_into().unwrap(), "sample_type.type", )?; let sample_type_unit = get_str( - &profile.string_table, + &dict.string_table, stu_idx.try_into().unwrap(), "sample_type.unit", )?;