Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 31 additions & 30 deletions src/collector/otlp/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::collector::otlp::pb::collector::profiles::v1development::{
};
use crate::collector::otlp::pb::common::v1::any_value::Value;
use crate::collector::otlp::pb::common::v1::KeyValue;
use crate::collector::otlp::pb::profiles::v1development::{Location, Profile, Sample};
use crate::collector::otlp::pb::profiles::v1development::{ProfilesDictionary, Sample, ValueType};
use crate::collector::Stats;
use crate::storage::*;
use chrono::Utc;
Expand Down Expand Up @@ -51,13 +51,26 @@ impl pb_collector::profiles_service_server::ProfilesService for ProfilesService
self.stats.log_request(&request);
let r = request.into_inner();

let dict = match r.dictionary.as_ref() {
Some(dictionary) => dictionary,
None => return Err(Status::invalid_argument("ProfilesDictionary is required")),
};
let loc_mapping = ingest_locations(dict)?;

for resource_profile in r.resource_profiles {
for scope_profile in resource_profile.scope_profiles {
for profile in scope_profile.profiles {
let loc_mapping = ingest_locations(&profile, &profile.location_table)?;
if profile.sample_type.len() != 1 {
tracing::warn!(
"unexpected length '{}' for profile.sample_type",
profile.sample_type.len()
);
continue;
}

let st = &profile.sample_type[0];
for sample in &profile.sample {
process_sample(&profile, &loc_mapping, sample)?;
process_sample(dict, &loc_mapping, &st, sample)?;
}
}
}
Expand Down Expand Up @@ -138,9 +151,10 @@ fn get_attr<'tab>(
)));
}

fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result<Vec<Frame>, Status> {
let stab = &profile.string_table;
let atab = &profile.attribute_table;
fn ingest_locations(dic: &ProfilesDictionary) -> Result<Vec<Frame>, Status> {
let stab = &dic.string_table;
let atab = &dic.attribute_table;
let locs = &dic.location_table;
let mut batch = DB.stack_frames.batched_insert();
let mut mappings = Vec::with_capacity(locs.len());

Expand Down Expand Up @@ -179,10 +193,7 @@ fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result<Vec<Frame>,
continue;
}

let Some(mapping) = profile
.mapping_table
.get(loc.mapping_index.unwrap() as usize)
else {
let Some(mapping) = &dic.mapping_table.get(loc.mapping_index.unwrap() as usize) else {
return Err(Status::invalid_argument("mapping index is out of bounds"));
};

Expand Down Expand Up @@ -238,7 +249,7 @@ fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result<Vec<Frame>,
};

if line.function_index != 0 {
let Some(fn_ref) = profile.function_table.get(line.function_index as usize) else {
let Some(fn_ref) = &dic.function_table.get(line.function_index as usize) else {
return Err(Status::invalid_argument("invalid function index"));
};

Expand All @@ -265,24 +276,21 @@ fn ingest_locations(profile: &Profile, locs: &[Location]) -> Result<Vec<Frame>,
}

fn process_sample(
profile: &Profile,
dict: &ProfilesDictionary,
loc_mapping: &Vec<Frame>,
sample_type: &ValueType,
sample: &Sample,
) -> Result<(), Status> {
let loc_start = sample.locations_start_index as usize;
let loc_len = sample.locations_length as usize;
let loc_rng = loc_start..loc_start.saturating_add(loc_len);
let Some(loc_indices) = profile.location_indices.get(loc_rng.clone()) else {
return Err(Status::invalid_argument("location range is out of bounds"));
};

// Collect frame list.
let mut frame_list = Vec::with_capacity(loc_rng.len().min(128));
for loc_index in loc_indices {
let Some(frame) = loc_mapping.get(*loc_index as usize) else {
for loc_index in loc_rng {
let Some(frame) = loc_mapping.get(loc_index as usize) else {
return Err(Status::invalid_argument("location index is out of bounds"));
};

frame_list.push(*frame);
}

Expand All @@ -302,7 +310,7 @@ fn process_sample(
};

let comm = get_attr(
&profile.attribute_table,
&dict.attribute_table,
sample.attribute_indices.to_vec(),
"thread.name",
);
Expand All @@ -323,22 +331,15 @@ fn process_sample(
id: DB.generate_id(),
};

if profile.sample_type.len() != 1 {
tracing::warn!(
"unexpected length '{}' for profile.sample_type",
profile.sample_type.len()
);
continue;
}
let stt_idx = profile.sample_type[0].type_strindex;
let stu_idx = profile.sample_type[0].unit_strindex;
let stt_idx = sample_type.type_strindex;
let stu_idx = sample_type.unit_strindex;
let sample_type_type = get_str(
&profile.string_table,
&dict.string_table,
stt_idx.try_into().unwrap(),
"sample_type.type",
)?;
let sample_type_unit = get_str(
&profile.string_table,
&dict.string_table,
stu_idx.try_into().unwrap(),
"sample_type.unit",
)?;
Expand Down