Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,20 +313,21 @@ impl ProcessingGroup {
))
}

if project_info.has_feature(Feature::SpanV2ExperimentalProcessing) {
let span_v2_items = envelope.take_items_by(|item| {
matches!(
item.integration(),
Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
) || ItemContainer::<SpanV2>::is_container(item)
});
let span_v2_items = envelope.take_items_by(|item| {
let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
let is_supported_integration = matches!(
item.integration(),
Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
);

if !span_v2_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::SpanV2,
Envelope::from_parts(headers.clone(), span_v2_items),
))
}
ItemContainer::<SpanV2>::is_container(item) || (exp_feature && is_supported_integration)
});

if !span_v2_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::SpanV2,
Envelope::from_parts(headers.clone(), span_v2_items),
))
}

// Extract spans.
Expand Down Expand Up @@ -2122,7 +2123,6 @@ impl EnvelopeProcessorService {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut extracted_metrics = ProcessingExtractedMetrics::new();

span::expand_v2_spans(managed_envelope)?;
span::filter(managed_envelope, ctx.config, ctx.project_info);
span::convert_otel_traces_data(managed_envelope);

Expand Down
71 changes: 2 additions & 69 deletions relay-server/src/services/processor/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
use prost::Message;
use relay_dynamic_config::Feature;
use relay_event_normalization::span::tag_extraction;
use relay_event_schema::protocol::{Event, Span, SpanV2};
use relay_event_schema::protocol::{Event, Span};
use relay_protocol::Annotated;
use relay_quotas::DataCategory;
use relay_spans::otel_trace::TracesData;

use crate::envelope::{ContentType, Item, ItemContainer, ItemType};
use crate::envelope::{ContentType, Item, ItemType};
use crate::integrations::{Integration, OtelFormat, SpansIntegration};
use crate::managed::{ItemAction, TypedEnvelope};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{SpanGroup, should_filter};
use crate::statsd::RelayTimers;

#[cfg(feature = "processing")]
mod processing;
Expand All @@ -22,8 +21,6 @@ use crate::services::projects::project::ProjectInfo;
pub use processing::*;
use relay_config::Config;

use super::ProcessingError;

pub fn filter(
managed_envelope: &mut TypedEnvelope<SpanGroup>,
config: &Config,
Expand All @@ -48,70 +45,6 @@ pub fn filter(
});
}

/// Expands V2 spans to V1 spans.
///
/// This expands one item (contanining multiple V2 spans) into several
/// (containing one V1 span each).
pub fn expand_v2_spans(
managed_envelope: &mut TypedEnvelope<SpanGroup>,
) -> Result<(), ProcessingError> {
let span_v2_items = managed_envelope
.envelope_mut()
.take_items_by(ItemContainer::<SpanV2>::is_container);

// V2 spans must always be sent as an `ItemContainer`, currently it is not allowed to
// send multiple containers for V2 spans.
//
// This restriction may be lifted in the future, this is why this validation only happens
// when processing is enabled, allowing it to be changed easily in the future.
//
// This limit mostly exists to incentivise SDKs to batch multiple spans into a single container,
// technically it can be removed without issues.
if span_v2_items.len() > 1 {
return Err(ProcessingError::DuplicateItem(ItemType::Span));
}

if span_v2_items.is_empty() {
return Ok(());
}

let now = std::time::Instant::now();

for span_v2_item in span_v2_items {
let spans_v2 = match ItemContainer::parse(&span_v2_item) {
Ok(spans_v2) => spans_v2,
Err(err) => {
relay_log::debug!("failed to parse V2 spans: {err}");
track_invalid(
managed_envelope,
DiscardReason::InvalidSpan,
span_v2_item.item_count().unwrap_or(1) as usize,
);
continue;
}
};

for span_v2 in spans_v2.into_items() {
let span_v1 = span_v2.value.map_value(relay_spans::span_v2_to_span_v1);
match span_v1.to_json() {
Ok(payload) => {
let mut new_item = Item::new(ItemType::Span);
new_item.set_payload(ContentType::Json, payload);
managed_envelope.envelope_mut().add_item(new_item);
}
Err(err) => {
relay_log::debug!("failed to serialize span: {}", err);
track_invalid(managed_envelope, DiscardReason::Internal, 1);
}
}
}
}

relay_statsd::metric!(timer(RelayTimers::SpanV2Expansion) = now.elapsed());

Ok(())
}

pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
let envelope = managed_envelope.envelope_mut();

Expand Down
3 changes: 0 additions & 3 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,6 @@ pub enum RelayTimers {
BodyReadDuration,
/// Timing in milliseconds to count spans in a serialized transaction payload.
CheckNestedSpans,
/// The time in milliseconds it takes to expand a Span V2 container into Spans V1.
SpanV2Expansion,
/// The time it needs to create a signature. Includes both the signature used for
/// trusted relays and for register challenges.
SignatureCreationDuration,
Expand Down Expand Up @@ -649,7 +647,6 @@ impl TimerMetric for RelayTimers {
RelayTimers::BufferEnvelopeDecompression => "buffer.envelopes_decompression",
RelayTimers::BodyReadDuration => "requests.body_read.duration",
RelayTimers::CheckNestedSpans => "envelope.check_nested_spans",
RelayTimers::SpanV2Expansion => "envelope.span_v2_expansion",
RelayTimers::SignatureCreationDuration => "signature.create.duration",
}
}
Expand Down
Loading
Loading