diff --git a/.changesets/fix_bryn_datadog_exporter_span_kind.md b/.changesets/fix_bryn_datadog_exporter_span_kind.md new file mode 100644 index 0000000000..942c5421e1 --- /dev/null +++ b/.changesets/fix_bryn_datadog_exporter_span_kind.md @@ -0,0 +1,6 @@ +### Datadog `span.kind` now populated ([PR #5609](https://github.com/apollographql/router/pull/5609)) + +Datadog traces use `span.kind` to differentiate between different types of spans. +This change ensures that the `span.kind` is correctly populated using the Open Telemetry span kind which has a 1-2-1 mapping to those set out in [dd-trace](https://github.com/DataDog/dd-trace-go/blob/main/ddtrace/ext/span_kind.go). + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/5609 diff --git a/.changesets/fix_bryn_datadog_exporter_span_metrics.md b/.changesets/fix_bryn_datadog_exporter_span_metrics.md new file mode 100644 index 0000000000..c8e01f96ae --- /dev/null +++ b/.changesets/fix_bryn_datadog_exporter_span_metrics.md @@ -0,0 +1,32 @@ +### Datadog span metrics are now supported ([PR #5609](https://github.com/apollographql/router/pull/5609)) + +When using the APM view in Datadog, span metrics will be displayed for any span that was a top level span or has the `_dd.measured` flag set. + +Apollo Router now sets the `_dd.measured` flag by default for the following spans: + +* `request` +* `router` +* `supergraph` +* `subgraph` +* `subgraph_request` +* `http_request` +* `query_planning` +* `execution` +* `query_parsing` + +You can override this behaviour to enable or disable span metrics for any span by setting the `span_metrics` configuration in the Datadog exporter configuration. + +```yaml +telemetry: + exporters: + tracing: + datadog: + enabled: true + span_metrics: + # Disable span metrics for supergraph + supergraph: false + # Enable span metrics for my_custom_span + my_custom_span: true +``` + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/5609 diff --git a/Cargo.lock b/Cargo.lock index 5d39d4f1e7..5e239aff85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -472,6 +472,7 @@ name = "apollo-router" version = "1.51.0" dependencies = [ "access-json", + "ahash", "anyhow", "apollo-compiler", "apollo-federation", @@ -526,6 +527,7 @@ dependencies = [ "indexmap 2.2.6", "insta", "itertools 0.12.1", + "itoa", "jsonpath-rust", "jsonpath_lib", "jsonschema", @@ -572,12 +574,14 @@ dependencies = [ "regex", "reqwest", "rhai", + "rmp", "router-bridge", "rstack", "rust-embed", "rustls", "rustls-native-certs", "rustls-pemfile", + "ryu", "schemars", "semver 1.0.23", "serde", diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 47459016d6..ba4daf69c9 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -156,7 +156,11 @@ opentelemetry_sdk = { version = "0.20.0", default-features = false, features = [ ] } opentelemetry_api = "0.20.0" opentelemetry-aws = "0.8.0" -opentelemetry-datadog = { version = "0.8.0", features = ["reqwest-client"] } +# START TEMP DATADOG Temporarily remove until we upgrade otel to the latest version +# This means including the rmp library +# opentelemetry-datadog = { version = "0.8.0", features = ["reqwest-client"] } +rmp = "0.8" +# END TEMP DATADOG opentelemetry-http = "0.9.0" opentelemetry-jaeger = { version = "0.19.0", features = [ "collector_client", @@ -268,6 +272,9 @@ time = { version = "0.3.36", features = ["serde"] } similar = { version = "2.5.0", features = ["inline"] } console = "0.15.8" bytesize = { version = "1.3.0", features = ["serde"] } +ahash = "0.8.11" +itoa = "1.0.9" +ryu = "1.0.15" [target.'cfg(macos)'.dependencies] uname = "0.1.1" @@ -305,6 +312,7 @@ opentelemetry-proto = { version = "0.5.0", features = [ "gen-tonic-messages", "with-serde", ] } +opentelemetry-datadog = { version = "0.8.0", features = ["reqwest-client"] } p256 = "0.13.2" rand_core = "0.6.4" reqwest = { version = "0.11.27", default-features = false, features = [ diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 99be2e3c39..1f66e536ba 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -1468,6 +1468,24 @@ expression: "&schema" "default": {}, "description": "Custom mapping to be used as the resource field in spans, defaults to: router -> http.route supergraph -> graphql.operation.name query_planning -> graphql.operation.name subgraph -> subgraph.name subgraph_request -> subgraph.name http_request -> http.route", "type": "object" + }, + "span_metrics": { + "additionalProperties": { + "type": "boolean" + }, + "default": { + "execution": true, + "http_request": true, + "parse_query": true, + "query_planning": true, + "request": true, + "router": true, + "subgraph": true, + "subgraph_request": true, + "supergraph": true + }, + "description": "Which spans will be eligible for span stats to be collected for viewing in the APM view. Defaults to true for `request`, `router`, `query_parsing`, `supergraph`, `execution`, `query_planning`, `subgraph`, `subgraph_request` and `http_request`.", + "type": "object" } }, "required": [ diff --git a/apollo-router/src/plugins/telemetry/consts.rs b/apollo-router/src/plugins/telemetry/consts.rs index e1d84c937b..c82d7b202b 100644 --- a/apollo-router/src/plugins/telemetry/consts.rs +++ b/apollo-router/src/plugins/telemetry/consts.rs @@ -18,8 +18,9 @@ pub(crate) const REQUEST_SPAN_NAME: &str = "request"; pub(crate) const QUERY_PLANNING_SPAN_NAME: &str = "query_planning"; pub(crate) const HTTP_REQUEST_SPAN_NAME: &str = "http_request"; pub(crate) const SUBGRAPH_REQUEST_SPAN_NAME: &str = "subgraph_request"; +pub(crate) const QUERY_PARSING_SPAN_NAME: &str = "parse_query"; -pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 8] = [ +pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 9] = [ REQUEST_SPAN_NAME, ROUTER_SPAN_NAME, SUPERGRAPH_SPAN_NAME, @@ -28,4 +29,5 @@ pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 8] = [ HTTP_REQUEST_SPAN_NAME, QUERY_PLANNING_SPAN_NAME, EXECUTION_SPAN_NAME, + QUERY_PARSING_SPAN_NAME, ]; diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 858a7ea9a3..af5f78a0de 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -870,7 +870,7 @@ impl Telemetry { propagators.push(Box::::default()); } if propagation.datadog || tracing.datadog.enabled() { - propagators.push(Box::::default()); + propagators.push(Box::::default()); } if propagation.aws_xray { propagators.push(Box::::default()); diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog.rs b/apollo-router/src/plugins/telemetry/tracing/datadog.rs index 345c54dae9..5b652b9671 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog.rs @@ -1,13 +1,23 @@ //! Configuration for datadog tracing. -use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use ahash::HashMap; +use ahash::HashMapExt; +use futures::future::BoxFuture; use http::Uri; use opentelemetry::sdk; use opentelemetry::sdk::trace::BatchSpanProcessor; use opentelemetry::sdk::trace::Builder; use opentelemetry::Value; +use opentelemetry_api::trace::SpanContext; +use opentelemetry_api::trace::SpanKind; use opentelemetry_api::Key; +use opentelemetry_api::KeyValue; +use opentelemetry_sdk::export::trace::ExportResult; +use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::export::trace::SpanExporter; use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use opentelemetry_semantic_conventions::resource::SERVICE_VERSION; use schemars::JsonSchema; @@ -27,6 +37,9 @@ use crate::plugins::telemetry::consts::SUBGRAPH_REQUEST_SPAN_NAME; use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME; use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME; use crate::plugins::telemetry::endpoint::UriEndpoint; +use crate::plugins::telemetry::tracing::datadog_exporter; +use crate::plugins::telemetry::tracing::datadog_exporter::propagator::TRACE_STATE_MEASURE; +use crate::plugins::telemetry::tracing::datadog_exporter::propagator::TRACE_STATE_TRUE_VALUE; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; use crate::plugins::telemetry::tracing::TracingConfigurator; @@ -79,6 +92,19 @@ pub(crate) struct Config { /// http_request -> http.route #[serde(default)] resource_mapping: HashMap, + + /// Which spans will be eligible for span stats to be collected for viewing in the APM view. + /// Defaults to true for `request`, `router`, `query_parsing`, `supergraph`, `execution`, `query_planning`, `subgraph`, `subgraph_request` and `http_request`. + #[serde(default = "default_span_metrics")] + span_metrics: HashMap, +} + +fn default_span_metrics() -> HashMap { + let mut map = HashMap::with_capacity(BUILT_IN_SPAN_NAMES.len()); + for name in BUILT_IN_SPAN_NAMES { + map.insert(name.to_string(), true); + } + map } fn default_true() -> bool { @@ -111,7 +137,7 @@ impl TracingConfigurator for Config { let fixed_span_names = self.fixed_span_names; - let exporter = opentelemetry_datadog::new_pipeline() + let exporter = datadog_exporter::new_pipeline() .with( &self.endpoint.to_uri(&Uri::from_static(DEFAULT_ENDPOINT)), |builder, e| builder.with_agent_endpoint(e.to_string().trim_end_matches('/')), @@ -170,13 +196,92 @@ impl TracingConfigurator for Config { .expect("cargo version is set as a resource default;qed") .to_string(), ) + .with_http_client(reqwest::Client::builder().build()?) .with_trace_config(common) .build_exporter()?; + + // Use the default span metrics and override with the ones from the config + let mut span_metrics = default_span_metrics(); + span_metrics.extend(self.span_metrics.clone()); + Ok(builder.with_span_processor( - BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio) - .with_batch_config(self.batch_processor.clone().into()) - .build() - .filtered(), + BatchSpanProcessor::builder( + ExporterWrapper { + delegate: exporter, + span_metrics, + }, + opentelemetry::runtime::Tokio, + ) + .with_batch_config(self.batch_processor.clone().into()) + .build() + .filtered(), )) } } + +struct ExporterWrapper { + delegate: datadog_exporter::DatadogExporter, + span_metrics: HashMap, +} + +impl Debug for ExporterWrapper { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.delegate.fmt(f) + } +} + +impl SpanExporter for ExporterWrapper { + fn export(&mut self, mut batch: Vec) -> BoxFuture<'static, ExportResult> { + // Here we do some special processing of the spans before passing them to the delegate + // In particular we default the span.kind to the span kind, and also override the trace measure status if we need to. + for span in &mut batch { + // If the span metrics are enabled for this span, set the trace state to measuring. + // We do all this dancing to avoid allocating. + let original_span_name = span + .attributes + .get(&Key::from_static_str(OTEL_ORIGINAL_NAME)) + .map(|v| v.as_str()); + let final_span_name = if let Some(span_name) = &original_span_name { + span_name.as_ref() + } else { + span.name.as_ref() + }; + + // Unfortunately trace state is immutable, so we have to create a new one + if let Some(true) = self.span_metrics.get(final_span_name) { + let new_trace_state = span + .span_context + .trace_state() + .insert(TRACE_STATE_MEASURE, TRACE_STATE_TRUE_VALUE) + .expect("valid trace state"); + span.span_context = SpanContext::new( + span.span_context.trace_id(), + span.span_context.span_id(), + span.span_context.trace_flags(), + span.span_context.is_remote(), + new_trace_state, + ) + } + + // Set the span kind https://github.com/DataDog/dd-trace-go/blob/main/ddtrace/ext/span_kind.go + let span_kind = match &span.span_kind { + SpanKind::Client => "client", + SpanKind::Server => "server", + SpanKind::Producer => "producer", + SpanKind::Consumer => "consumer", + SpanKind::Internal => "internal", + }; + span.attributes + .insert(KeyValue::new("span.kind", span_kind)); + + // Note we do NOT set span.type as it isn't a good fit for otel. + } + self.delegate.export(batch) + } + fn shutdown(&mut self) { + self.delegate.shutdown() + } + fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> { + self.delegate.force_flush() + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/README.md b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/README.md new file mode 100644 index 0000000000..eeb009b68e --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/README.md @@ -0,0 +1,5 @@ +This is temporary interning of the datadog exporter until we update otel. +The newest version of the exporter does support setting span metrics, but we +can't upgrade until we upgrade Otel. + +Once otel is upgraded, we can remove this code and use the exporter directly. \ No newline at end of file diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/intern.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/intern.rs new file mode 100644 index 0000000000..fd1f69375f --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/intern.rs @@ -0,0 +1,517 @@ +use std::cell::RefCell; +use std::hash::BuildHasherDefault; +use std::hash::Hash; + +use indexmap::set::IndexSet; +use opentelemetry::StringValue; +use opentelemetry::Value; +use rmp::encode::RmpWrite; +use rmp::encode::ValueWriteError; + +type InternHasher = ahash::AHasher; + +#[derive(PartialEq)] +pub(crate) enum InternValue<'a> { + RegularString(&'a str), + OpenTelemetryValue(&'a Value), +} + +impl<'a> Hash for InternValue<'a> { + fn hash(&self, state: &mut H) { + match &self { + InternValue::RegularString(s) => s.hash(state), + InternValue::OpenTelemetryValue(v) => match v { + Value::Bool(x) => x.hash(state), + Value::I64(x) => x.hash(state), + Value::String(x) => x.hash(state), + Value::F64(x) => x.to_bits().hash(state), + Value::Array(a) => match a { + opentelemetry::Array::Bool(x) => x.hash(state), + opentelemetry::Array::I64(x) => x.hash(state), + opentelemetry::Array::F64(floats) => { + for f in floats { + f.to_bits().hash(state); + } + } + opentelemetry::Array::String(x) => x.hash(state), + }, + }, + } + } +} + +impl<'a> Eq for InternValue<'a> {} + +const BOOLEAN_TRUE: &str = "true"; +const BOOLEAN_FALSE: &str = "false"; +const LEFT_SQUARE_BRACKET: u8 = b'['; +const RIGHT_SQUARE_BRACKET: u8 = b']'; +const COMMA: u8 = b','; +const DOUBLE_QUOTE: u8 = b'"'; +const EMPTY_ARRAY: &str = "[]"; + +trait WriteAsLiteral { + fn write_to(&self, buffer: &mut Vec); +} + +impl WriteAsLiteral for bool { + fn write_to(&self, buffer: &mut Vec) { + buffer.extend_from_slice(if *self { BOOLEAN_TRUE } else { BOOLEAN_FALSE }.as_bytes()); + } +} + +impl WriteAsLiteral for i64 { + fn write_to(&self, buffer: &mut Vec) { + buffer.extend_from_slice(itoa::Buffer::new().format(*self).as_bytes()); + } +} + +impl WriteAsLiteral for f64 { + fn write_to(&self, buffer: &mut Vec) { + buffer.extend_from_slice(ryu::Buffer::new().format(*self).as_bytes()); + } +} + +impl WriteAsLiteral for StringValue { + fn write_to(&self, buffer: &mut Vec) { + buffer.push(DOUBLE_QUOTE); + buffer.extend_from_slice(self.as_str().as_bytes()); + buffer.push(DOUBLE_QUOTE); + } +} + +impl<'a> InternValue<'a> { + pub(crate) fn write_as_str( + &self, + payload: &mut W, + reusable_buffer: &mut Vec, + ) -> Result<(), ValueWriteError> { + match self { + InternValue::RegularString(x) => rmp::encode::write_str(payload, x), + InternValue::OpenTelemetryValue(v) => match v { + Value::Bool(x) => { + rmp::encode::write_str(payload, if *x { BOOLEAN_TRUE } else { BOOLEAN_FALSE }) + } + Value::I64(x) => rmp::encode::write_str(payload, itoa::Buffer::new().format(*x)), + Value::F64(x) => rmp::encode::write_str(payload, ryu::Buffer::new().format(*x)), + Value::String(x) => rmp::encode::write_str(payload, x.as_ref()), + Value::Array(array) => match array { + opentelemetry::Array::Bool(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + opentelemetry::Array::I64(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + opentelemetry::Array::F64(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + opentelemetry::Array::String(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + }, + }, + } + } + + fn write_empty_array(payload: &mut W) -> Result<(), ValueWriteError> { + rmp::encode::write_str(payload, EMPTY_ARRAY) + } + + fn write_buffer_as_string( + payload: &mut W, + reusable_buffer: &[u8], + ) -> Result<(), ValueWriteError> { + rmp::encode::write_str_len(payload, reusable_buffer.len() as u32)?; + payload + .write_bytes(reusable_buffer) + .map_err(ValueWriteError::InvalidDataWrite) + } + + fn write_generic_array( + payload: &mut W, + reusable_buffer: &mut Vec, + array: &[T], + ) -> Result<(), ValueWriteError> { + if array.is_empty() { + return Self::write_empty_array(payload); + } + + reusable_buffer.clear(); + reusable_buffer.push(LEFT_SQUARE_BRACKET); + + array[0].write_to(reusable_buffer); + + for value in array[1..].iter() { + reusable_buffer.push(COMMA); + value.write_to(reusable_buffer); + } + + reusable_buffer.push(RIGHT_SQUARE_BRACKET); + + Self::write_buffer_as_string(payload, reusable_buffer) + } +} + +pub(crate) struct StringInterner<'a> { + data: IndexSet, BuildHasherDefault>, +} + +impl<'a> StringInterner<'a> { + pub(crate) fn new() -> StringInterner<'a> { + StringInterner { + data: IndexSet::with_capacity_and_hasher(128, BuildHasherDefault::default()), + } + } + + pub(crate) fn intern(&mut self, data: &'a str) -> u32 { + if let Some(idx) = self.data.get_index_of(&InternValue::RegularString(data)) { + return idx as u32; + } + self.data.insert_full(InternValue::RegularString(data)).0 as u32 + } + + pub(crate) fn intern_value(&mut self, data: &'a Value) -> u32 { + if let Some(idx) = self + .data + .get_index_of(&InternValue::OpenTelemetryValue(data)) + { + return idx as u32; + } + self.data + .insert_full(InternValue::OpenTelemetryValue(data)) + .0 as u32 + } + + pub(crate) fn write_dictionary( + &self, + payload: &mut W, + ) -> Result<(), ValueWriteError> { + thread_local! { + static BUFFER: RefCell> = RefCell::new(Vec::with_capacity(4096)); + } + + BUFFER.with(|cell| { + let reusable_buffer = &mut cell.borrow_mut(); + rmp::encode::write_array_len(payload, self.data.len() as u32)?; + for data in self.data.iter() { + data.write_as_str(payload, reusable_buffer)?; + } + + Ok(()) + }) + } +} + +#[cfg(test)] +mod tests { + use opentelemetry::Array; + + use super::*; + + #[test] + fn test_intern() { + let a = "a".to_string(); + let b = "b"; + let c = "c"; + + let mut intern = StringInterner::new(); + let a_idx = intern.intern(a.as_str()); + let b_idx = intern.intern(b); + let c_idx = intern.intern(c); + let d_idx = intern.intern(a.as_str()); + let e_idx = intern.intern(c); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + } + + #[test] + fn test_intern_bool() { + let a = Value::Bool(true); + let b = Value::Bool(false); + let c = "c"; + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + } + + #[test] + fn test_intern_i64() { + let a = Value::I64(1234567890); + let b = Value::I64(-1234567890); + let c = "c"; + let d = Value::I64(1234567890); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, a_idx); + } + + #[test] + fn test_intern_f64() { + let a = Value::F64(123456.7890); + let b = Value::F64(-1234567.890); + let c = "c"; + let d = Value::F64(-1234567.890); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(b_idx, f_idx); + } + + #[test] + fn test_intern_array_of_booleans() { + let a = Value::Array(Array::Bool(vec![true, false])); + let b = Value::Array(Array::Bool(vec![false, true])); + let c = "c"; + let d = Value::Array(Array::Bool(vec![])); + let f = Value::Array(Array::Bool(vec![false, true])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_intern_array_of_i64() { + let a = Value::Array(Array::I64(vec![123, -123])); + let b = Value::Array(Array::I64(vec![-123, 123])); + let c = "c"; + let d = Value::Array(Array::I64(vec![])); + let f = Value::Array(Array::I64(vec![-123, 123])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_intern_array_of_f64() { + let f1 = 123.0f64; + let f2 = 0f64; + + let a = Value::Array(Array::F64(vec![f1, f2])); + let b = Value::Array(Array::F64(vec![f2, f1])); + let c = "c"; + let d = Value::Array(Array::F64(vec![])); + let f = Value::Array(Array::F64(vec![f2, f1])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_intern_array_of_string() { + let s1 = "a"; + let s2 = "b"; + + let a = Value::Array(Array::String(vec![ + StringValue::from(s1), + StringValue::from(s2), + ])); + let b = Value::Array(Array::String(vec![ + StringValue::from(s2), + StringValue::from(s1), + ])); + let c = "c"; + let d = Value::Array(Array::String(vec![])); + let f = Value::Array(Array::String(vec![ + StringValue::from(s2), + StringValue::from(s1), + ])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_write_boolean_literal() { + let mut buffer: Vec = vec![]; + + true.write_to(&mut buffer); + + assert_eq!(&buffer[..], b"true"); + + buffer.clear(); + + false.write_to(&mut buffer); + + assert_eq!(&buffer[..], b"false"); + } + + #[test] + fn test_write_i64_literal() { + let mut buffer: Vec = vec![]; + + 1234567890i64.write_to(&mut buffer); + + assert_eq!(&buffer[..], b"1234567890"); + + buffer.clear(); + + (-1234567890i64).write_to(&mut buffer); + + assert_eq!(&buffer[..], b"-1234567890"); + } + + #[test] + fn test_write_f64_literal() { + let mut buffer: Vec = vec![]; + + let f1 = 12345.678f64; + let f2 = -12345.678f64; + + f1.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("{}", f1).as_bytes()); + + buffer.clear(); + + f2.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("{}", f2).as_bytes()); + } + + #[test] + fn test_write_string_literal() { + let mut buffer: Vec = vec![]; + + let s1 = StringValue::from("abc"); + let s2 = StringValue::from(""); + + s1.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("\"{}\"", s1).as_bytes()); + + buffer.clear(); + + s2.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("\"{}\"", s2).as_bytes()); + } + + fn test_encoding_intern_value(value: InternValue<'_>) { + let mut expected: Vec = vec![]; + let mut actual: Vec = vec![]; + + let mut buffer = vec![]; + + value.write_as_str(&mut actual, &mut buffer).unwrap(); + + let InternValue::OpenTelemetryValue(value) = value else { + return; + }; + + rmp::encode::write_str(&mut expected, value.as_str().as_ref()).unwrap(); + + assert_eq!(expected, actual); + } + + #[test] + fn test_encode_boolean() { + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::Bool(true))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::Bool(false))); + } + + #[test] + fn test_encode_i64() { + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::I64(123))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::I64(0))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::I64(-123))); + } + + #[test] + fn test_encode_f64() { + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::F64(123.456f64))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::F64(-123.456f64))); + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/mod.rs new file mode 100644 index 0000000000..ae4a37ba07 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/mod.rs @@ -0,0 +1,562 @@ +mod intern; +mod model; + +use std::borrow::Cow; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; +use std::time::Duration; + +use futures::future::BoxFuture; +use http::Method; +use http::Request; +use http::Uri; +pub use model::ApiVersion; +pub use model::Error; +pub use model::FieldMappingFn; +use opentelemetry::global; +use opentelemetry::sdk; +use opentelemetry::trace::TraceError; +use opentelemetry::KeyValue; +use opentelemetry_api::trace::TracerProvider; +use opentelemetry_http::HttpClient; +use opentelemetry_http::ResponseExt; +use opentelemetry_sdk::export::trace::ExportResult; +use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::export::trace::SpanExporter; +use opentelemetry_sdk::resource::ResourceDetector; +use opentelemetry_sdk::resource::SdkProvidedResourceDetector; +use opentelemetry_sdk::runtime::RuntimeChannel; +use opentelemetry_sdk::trace::BatchMessage; +use opentelemetry_sdk::trace::Config; +use opentelemetry_sdk::trace::Tracer; +use opentelemetry_sdk::Resource; +use opentelemetry_semantic_conventions as semcov; +use url::Url; + +use self::model::unified_tags::UnifiedTags; +use crate::plugins::telemetry::tracing::datadog_exporter::exporter::model::FieldMapping; + +/// Default Datadog collector endpoint +const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126"; + +/// Header name used to inform the Datadog agent of the number of traces in the payload +const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count"; + +/// Header name use to inform datadog as to what version +const DATADOG_META_LANG_HEADER: &str = "Datadog-Meta-Lang"; +const DATADOG_META_TRACER_VERSION_HEADER: &str = "Datadog-Meta-Tracer-Version"; + +// Struct to hold the mapping between Opentelemetry spans and datadog spans. +pub struct Mapping { + resource: Option, + name: Option, + service_name: Option, +} + +impl Mapping { + pub fn new( + resource: Option, + name: Option, + service_name: Option, + ) -> Self { + Mapping { + resource, + name, + service_name, + } + } + pub fn empty() -> Self { + Self::new(None, None, None) + } +} + +/// Datadog span exporter +pub struct DatadogExporter { + client: Arc, + request_url: Uri, + model_config: ModelConfig, + api_version: ApiVersion, + mapping: Mapping, + unified_tags: UnifiedTags, +} + +impl DatadogExporter { + fn new( + model_config: ModelConfig, + request_url: Uri, + api_version: ApiVersion, + client: Arc, + mapping: Mapping, + unified_tags: UnifiedTags, + ) -> Self { + DatadogExporter { + client, + request_url, + model_config, + api_version, + mapping, + unified_tags, + } + } + + fn build_request( + &self, + mut batch: Vec, + ) -> Result>, TraceError> { + let traces: Vec<&[SpanData]> = group_into_traces(&mut batch); + let trace_count = traces.len(); + let data = self.api_version.encode( + &self.model_config, + traces, + &self.mapping, + &self.unified_tags, + )?; + let req = Request::builder() + .method(Method::POST) + .uri(self.request_url.clone()) + .header(http::header::CONTENT_TYPE, self.api_version.content_type()) + .header(DATADOG_TRACE_COUNT_HEADER, trace_count) + .header(DATADOG_META_LANG_HEADER, "rust") + .header( + DATADOG_META_TRACER_VERSION_HEADER, + env!("CARGO_PKG_VERSION"), + ) + .body(data) + .map_err::(Into::into)?; + + Ok(req) + } +} + +impl Debug for DatadogExporter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DatadogExporter") + .field("model_config", &self.model_config) + .field("request_url", &self.request_url) + .field("api_version", &self.api_version) + .field("client", &self.client) + .field("resource_mapping", &mapping_debug(&self.mapping.resource)) + .field("name_mapping", &mapping_debug(&self.mapping.name)) + .field( + "service_name_mapping", + &mapping_debug(&self.mapping.service_name), + ) + .finish() + } +} + +/// Create a new Datadog exporter pipeline builder. +pub fn new_pipeline() -> DatadogPipelineBuilder { + DatadogPipelineBuilder::default() +} + +/// Builder for `ExporterConfig` struct. +pub struct DatadogPipelineBuilder { + agent_endpoint: String, + trace_config: Option, + api_version: ApiVersion, + client: Option>, + mapping: Mapping, + unified_tags: UnifiedTags, +} + +impl Default for DatadogPipelineBuilder { + fn default() -> Self { + DatadogPipelineBuilder { + agent_endpoint: DEFAULT_AGENT_ENDPOINT.to_string(), + trace_config: None, + mapping: Mapping::empty(), + api_version: ApiVersion::Version05, + unified_tags: UnifiedTags::new(), + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + not(feature = "surf-client"), + ))] + client: None, + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "surf-client" + ))] + client: Some(Arc::new(surf::Client::new())), + #[cfg(all( + not(feature = "surf-client"), + not(feature = "reqwest-blocking-client"), + feature = "reqwest-client" + ))] + client: Some(Arc::new(reqwest::Client::new())), + #[cfg(feature = "reqwest-blocking-client")] + client: Some(Arc::new(reqwest::blocking::Client::new())), + } + } +} + +impl Debug for DatadogPipelineBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DatadogExporter") + .field("agent_endpoint", &self.agent_endpoint) + .field("trace_config", &self.trace_config) + .field("client", &self.client) + .field("resource_mapping", &mapping_debug(&self.mapping.resource)) + .field("name_mapping", &mapping_debug(&self.mapping.name)) + .field( + "service_name_mapping", + &mapping_debug(&self.mapping.service_name), + ) + .finish() + } +} + +impl DatadogPipelineBuilder { + /// Building a new exporter. + /// + /// This is useful if you are manually constructing a pipeline. + pub fn build_exporter(mut self) -> Result { + let (_, service_name) = self.build_config_and_service_name(); + self.build_exporter_with_service_name(service_name) + } + + fn build_config_and_service_name(&mut self) -> (Config, String) { + let service_name = self.unified_tags.service(); + if let Some(service_name) = service_name { + let config = if let Some(mut cfg) = self.trace_config.take() { + cfg.resource = Cow::Owned(Resource::new( + cfg.resource + .iter() + .filter(|(k, _v)| *k != &semcov::resource::SERVICE_NAME) + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + )); + cfg + } else { + Config { + resource: Cow::Owned(Resource::empty()), + ..Default::default() + } + }; + (config, service_name) + } else { + let service_name = SdkProvidedResourceDetector + .detect(Duration::from_secs(0)) + .get(semcov::resource::SERVICE_NAME) + .unwrap() + .to_string(); + ( + Config { + // use a empty resource to prevent TracerProvider to assign a service name. + resource: Cow::Owned(Resource::empty()), + ..Default::default() + }, + service_name, + ) + } + } + + // parse the endpoint and append the path based on versions. + // keep the query and host the same. + fn build_endpoint(agent_endpoint: &str, version: &str) -> Result { + // build agent endpoint based on version + let mut endpoint = agent_endpoint + .parse::() + .map_err::(Into::into)?; + let mut paths = endpoint + .path_segments() + .map(|c| c.filter(|s| !s.is_empty()).collect::>()) + .unwrap_or_default(); + paths.push(version); + + let path_str = paths.join("/"); + endpoint.set_path(path_str.as_str()); + + Ok(endpoint.as_str().parse().map_err::(Into::into)?) + } + + fn build_exporter_with_service_name( + self, + service_name: String, + ) -> Result { + if let Some(client) = self.client { + let model_config = ModelConfig { service_name }; + + let exporter = DatadogExporter::new( + model_config, + Self::build_endpoint(&self.agent_endpoint, self.api_version.path())?, + self.api_version, + client, + self.mapping, + self.unified_tags, + ); + Ok(exporter) + } else { + Err(Error::NoHttpClient.into()) + } + } + + /// Install the Datadog trace exporter pipeline using a simple span processor. + pub fn install_simple(mut self) -> Result { + let (config, service_name) = self.build_config_and_service_name(); + let exporter = self.build_exporter_with_service_name(service_name)?; + let mut provider_builder = + sdk::trace::TracerProvider::builder().with_simple_exporter(exporter); + provider_builder = provider_builder.with_config(config); + let provider = provider_builder.build(); + let tracer = provider.versioned_tracer( + "opentelemetry-datadog", + Some(env!("CARGO_PKG_VERSION")), + Some(semcov::SCHEMA_URL), + None, + ); + let _ = global::set_tracer_provider(provider); + Ok(tracer) + } + + /// Install the Datadog trace exporter pipeline using a batch span processor with the specified + /// runtime. + pub fn install_batch>( + mut self, + runtime: R, + ) -> Result { + let (config, service_name) = self.build_config_and_service_name(); + let exporter = self.build_exporter_with_service_name(service_name)?; + let mut provider_builder = + sdk::trace::TracerProvider::builder().with_batch_exporter(exporter, runtime); + provider_builder = provider_builder.with_config(config); + let provider = provider_builder.build(); + let tracer = provider.versioned_tracer( + "opentelemetry-datadog", + Some(env!("CARGO_PKG_VERSION")), + Some(semcov::SCHEMA_URL), + None, + ); + let _ = global::set_tracer_provider(provider); + Ok(tracer) + } + + /// Assign the service name under which to group traces + pub fn with_service_name>(mut self, service_name: T) -> Self { + self.unified_tags.set_service(Some(service_name.into())); + self + } + + /// Assign the version under which to group traces + pub fn with_version>(mut self, version: T) -> Self { + self.unified_tags.set_version(Some(version.into())); + self + } + + /// Assign the env under which to group traces + pub fn with_env>(mut self, env: T) -> Self { + self.unified_tags.set_env(Some(env.into())); + self + } + + /// Assign the Datadog collector endpoint. + /// + /// The endpoint of the datadog agent, by default it is `http://127.0.0.1:8126`. + pub fn with_agent_endpoint>(mut self, endpoint: T) -> Self { + self.agent_endpoint = endpoint.into(); + self + } + + /// Choose the http client used by uploader + pub fn with_http_client(mut self, client: T) -> Self { + self.client = Some(Arc::new(client)); + self + } + + /// Assign the SDK trace configuration + pub fn with_trace_config(mut self, config: Config) -> Self { + self.trace_config = Some(config); + self + } + + /// Set version of Datadog trace ingestion API + pub fn with_api_version(mut self, api_version: ApiVersion) -> Self { + self.api_version = api_version; + self + } + + /// Custom the value used for `resource` field in datadog spans. + /// See [`FieldMappingFn`] for details. + pub fn with_resource_mapping(mut self, f: F) -> Self + where + F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static, + { + self.mapping.resource = Some(Arc::new(f)); + self + } + + /// Custom the value used for `name` field in datadog spans. + /// See [`FieldMappingFn`] for details. + pub fn with_name_mapping(mut self, f: F) -> Self + where + F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static, + { + self.mapping.name = Some(Arc::new(f)); + self + } + + /// Custom the value used for `service_name` field in datadog spans. + /// See [`FieldMappingFn`] for details. + pub fn with_service_name_mapping(mut self, f: F) -> Self + where + F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static, + { + self.mapping.service_name = Some(Arc::new(f)); + self + } +} + +fn group_into_traces(spans: &mut [SpanData]) -> Vec<&[SpanData]> { + if spans.is_empty() { + return vec![]; + } + + spans.sort_by_key(|x| x.span_context.trace_id().to_bytes()); + + let mut traces: Vec<&[SpanData]> = Vec::with_capacity(spans.len()); + + let mut start = 0; + let mut start_trace_id = spans[start].span_context.trace_id(); + for (idx, span) in spans.iter().enumerate() { + let current_trace_id = span.span_context.trace_id(); + if start_trace_id != current_trace_id { + traces.push(&spans[start..idx]); + start = idx; + start_trace_id = current_trace_id; + } + } + traces.push(&spans[start..]); + traces +} + +async fn send_request( + client: Arc, + request: http::Request>, +) -> ExportResult { + let _ = client.send(request).await?.error_for_status()?; + Ok(()) +} + +impl SpanExporter for DatadogExporter { + /// Export spans to datadog-agent + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + let request = match self.build_request(batch) { + Ok(req) => req, + Err(err) => return Box::pin(std::future::ready(Err(err))), + }; + + let client = self.client.clone(); + Box::pin(send_request(client, request)) + } +} + +/// Helper struct to custom the mapping between Opentelemetry spans and datadog spans. +/// +/// This struct will be passed to [`FieldMappingFn`] +#[derive(Default, Debug)] +#[non_exhaustive] +pub struct ModelConfig { + pub service_name: String, +} + +fn mapping_debug(f: &Option) -> String { + if f.is_some() { + "custom mapping" + } else { + "default mapping" + } + .to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::plugins::telemetry::tracing::datadog_exporter::exporter::model::tests::get_span; + use crate::plugins::telemetry::tracing::datadog_exporter::ApiVersion::Version05; + + #[test] + fn test_out_of_order_group() { + let mut batch = vec![get_span(1, 1, 1), get_span(2, 2, 2), get_span(1, 1, 3)]; + let expected = vec![ + vec![get_span(1, 1, 1), get_span(1, 1, 3)], + vec![get_span(2, 2, 2)], + ]; + + let mut traces = group_into_traces(&mut batch); + // We need to sort the output in order to compare, but this is not required by the Datadog agent + traces.sort_by_key(|t| u128::from_be_bytes(t[0].span_context.trace_id().to_bytes())); + + assert_eq!(traces, expected); + } + + #[test] + fn test_agent_endpoint_with_api_version() { + let with_tail_slash = + DatadogPipelineBuilder::build_endpoint("http://localhost:8126/", Version05.path()); + let without_tail_slash = + DatadogPipelineBuilder::build_endpoint("http://localhost:8126", Version05.path()); + let with_query = DatadogPipelineBuilder::build_endpoint( + "http://localhost:8126?api_key=123", + Version05.path(), + ); + let invalid = DatadogPipelineBuilder::build_endpoint( + "http://localhost:klsajfjksfh", + Version05.path(), + ); + + assert_eq!( + with_tail_slash.unwrap().to_string(), + "http://localhost:8126/v0.5/traces" + ); + assert_eq!( + without_tail_slash.unwrap().to_string(), + "http://localhost:8126/v0.5/traces" + ); + assert_eq!( + with_query.unwrap().to_string(), + "http://localhost:8126/v0.5/traces?api_key=123" + ); + assert!(invalid.is_err()) + } + + #[derive(Debug)] + struct DummyClient; + + #[async_trait::async_trait] + impl HttpClient for DummyClient { + async fn send( + &self, + _request: Request>, + ) -> Result, opentelemetry_http::HttpError> { + Ok(http::Response::new("dummy response".into())) + } + } + + #[test] + fn test_custom_http_client() { + new_pipeline() + .with_http_client(DummyClient) + .build_exporter() + .unwrap(); + } + + #[test] + fn test_install_simple() { + new_pipeline() + .with_service_name("test_service") + .with_http_client(DummyClient) + .install_simple() + .unwrap(); + } + + #[test] + fn test_install_batch() { + new_pipeline() + .with_service_name("test_service") + .with_http_client(DummyClient) + .install_batch(opentelemetry_sdk::runtime::AsyncStd {}) + .unwrap(); + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/mod.rs new file mode 100644 index 0000000000..d6db4b72b4 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/mod.rs @@ -0,0 +1,310 @@ +use std::fmt::Debug; + +use http::uri; +use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::export::trace::{self}; +use opentelemetry_sdk::export::ExportError; +use url::ParseError; + +use self::unified_tags::UnifiedTags; +use super::Mapping; +use crate::plugins::telemetry::tracing::datadog_exporter::ModelConfig; + +pub mod unified_tags; +mod v03; +mod v05; + +// todo: we should follow the same mapping defined in https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go + +// https://github.com/DataDog/dd-trace-js/blob/c89a35f7d27beb4a60165409376e170eacb194c5/packages/dd-trace/src/constants.js#L4 +static SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1"; + +// https://github.com/DataDog/datadog-agent/blob/ec96f3c24173ec66ba235bda7710504400d9a000/pkg/trace/traceutil/span.go#L20 +static DD_MEASURED_KEY: &str = "_dd.measured"; + +/// Custom mapping between opentelemetry spans and datadog spans. +/// +/// User can provide custom function to change the mapping. It currently supports customizing the following +/// fields in Datadog span protocol. +/// +/// |field name|default value| +/// |---------------|-------------| +/// |service name| service name configuration from [`ModelConfig`]| +/// |name | opentelemetry instrumentation library name | +/// |resource| opentelemetry name| +/// +/// The function takes a reference to [`SpanData`]() and a reference to [`ModelConfig`]() as parameters. +/// It should return a `&str` which will be used as the value for the field. +/// +/// If no custom mapping is provided. Default mapping detailed above will be used. +/// +/// For example, +/// ```no_run +/// use opentelemetry_datadog::{ApiVersion, new_pipeline}; +/// fn main() -> Result<(), opentelemetry::trace::TraceError> { +/// let tracer = new_pipeline() +/// .with_service_name("my_app") +/// .with_api_version(ApiVersion::Version05) +/// // the custom mapping below will change the all spans' name to datadog spans +/// .with_name_mapping(|span, model_config|{ +/// "datadog spans" +/// }) +/// .with_agent_endpoint("http://localhost:8126") +/// .install_batch(opentelemetry_sdk::runtime::Tokio)?; +/// +/// Ok(()) +/// } +/// ``` +pub type FieldMappingFn = dyn for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync; + +pub(crate) type FieldMapping = std::sync::Arc; + +// Datadog uses some magic tags in their models. There is no recommended mapping defined in +// opentelemetry spec. Below is default mapping we gonna uses. Users can override it by providing +// their own implementations. +fn default_service_name_mapping<'a>(_span: &'a SpanData, config: &'a ModelConfig) -> &'a str { + config.service_name.as_str() +} + +fn default_name_mapping<'a>(span: &'a SpanData, _config: &'a ModelConfig) -> &'a str { + span.instrumentation_lib.name.as_ref() +} + +fn default_resource_mapping<'a>(span: &'a SpanData, _config: &'a ModelConfig) -> &'a str { + span.name.as_ref() +} + +/// Wrap type for errors from opentelemetry datadog exporter +#[allow(clippy::enum_variant_names)] +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Message pack error + #[error("message pack error")] + MessagePackError, + /// No http client founded. User should provide one or enable features + #[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")] + NoHttpClient, + /// Http requests failed with following errors + #[error(transparent)] + RequestError(#[from] http::Error), + /// The Uri was invalid + #[error("invalid url {0}")] + InvalidUri(String), + /// Other errors + #[error("{0}")] + Other(String), +} + +impl ExportError for Error { + fn exporter_name(&self) -> &'static str { + "datadog" + } +} + +impl From for Error { + fn from(_: rmp::encode::ValueWriteError) -> Self { + Self::MessagePackError + } +} + +impl From for Error { + fn from(err: ParseError) -> Self { + Self::InvalidUri(err.to_string()) + } +} + +impl From for Error { + fn from(err: uri::InvalidUri) -> Self { + Self::InvalidUri(err.to_string()) + } +} + +/// Version of datadog trace ingestion API +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub enum ApiVersion { + /// Version 0.3 + Version03, + /// Version 0.5 - requires datadog-agent v7.22.0 or above + Version05, +} + +impl ApiVersion { + pub(crate) fn path(self) -> &'static str { + match self { + ApiVersion::Version03 => "/v0.3/traces", + ApiVersion::Version05 => "/v0.5/traces", + } + } + + pub(crate) fn content_type(self) -> &'static str { + match self { + ApiVersion::Version03 => "application/msgpack", + ApiVersion::Version05 => "application/msgpack", + } + } + + pub(crate) fn encode( + self, + model_config: &ModelConfig, + traces: Vec<&[trace::SpanData]>, + mapping: &Mapping, + unified_tags: &UnifiedTags, + ) -> Result, Error> { + match self { + Self::Version03 => v03::encode( + model_config, + traces, + |span, config| match &mapping.service_name { + Some(f) => f(span, config), + None => default_service_name_mapping(span, config), + }, + |span, config| match &mapping.name { + Some(f) => f(span, config), + None => default_name_mapping(span, config), + }, + |span, config| match &mapping.resource { + Some(f) => f(span, config), + None => default_resource_mapping(span, config), + }, + ), + Self::Version05 => v05::encode( + model_config, + traces, + |span, config| match &mapping.service_name { + Some(f) => f(span, config), + None => default_service_name_mapping(span, config), + }, + |span, config| match &mapping.name { + Some(f) => f(span, config), + None => default_name_mapping(span, config), + }, + |span, config| match &mapping.resource { + Some(f) => f(span, config), + None => default_resource_mapping(span, config), + }, + unified_tags, + ), + } + } +} + +#[cfg(test)] +pub(crate) mod tests { + use std::borrow::Cow; + use std::time::Duration; + use std::time::SystemTime; + + use base64::Engine; + use opentelemetry::trace::SpanContext; + use opentelemetry::trace::SpanId; + use opentelemetry::trace::SpanKind; + use opentelemetry::trace::Status; + use opentelemetry::trace::TraceFlags; + use opentelemetry::trace::TraceId; + use opentelemetry::trace::TraceState; + use opentelemetry::KeyValue; + use opentelemetry_sdk::trace::EvictedHashMap; + use opentelemetry_sdk::trace::EvictedQueue; + use opentelemetry_sdk::InstrumentationLibrary; + use opentelemetry_sdk::Resource; + use opentelemetry_sdk::{self}; + + use super::*; + + fn get_traces() -> Vec> { + vec![vec![get_span(7, 1, 99)]] + } + + pub(crate) fn get_span(trace_id: u128, parent_span_id: u64, span_id: u64) -> trace::SpanData { + let span_context = SpanContext::new( + TraceId::from_u128(trace_id), + SpanId::from_u64(span_id), + TraceFlags::default(), + false, + TraceState::default(), + ); + + let start_time = SystemTime::UNIX_EPOCH; + let end_time = start_time.checked_add(Duration::from_secs(1)).unwrap(); + + let mut attributes: EvictedHashMap = EvictedHashMap::new(1, 1); + attributes.insert(KeyValue::new("span.type", "web")); + let resource = Resource::new(vec![KeyValue::new("host.name", "test")]); + let instrumentation_lib = InstrumentationLibrary::new( + "component", + None::<&'static str>, + None::<&'static str>, + None, + ); + + trace::SpanData { + span_context, + parent_span_id: SpanId::from_u64(parent_span_id), + span_kind: SpanKind::Client, + name: "resource".into(), + start_time, + end_time, + attributes, + events: EvictedQueue::new(0), + links: EvictedQueue::new(0), + status: Status::Ok, + resource: Cow::Owned(resource), + instrumentation_lib, + } + } + + #[test] + fn test_encode_v03() -> Result<(), Box> { + let traces = get_traces(); + let model_config = ModelConfig { + service_name: "service_name".to_string(), + ..Default::default() + }; + let encoded = + base64::engine::general_purpose::STANDARD.encode(ApiVersion::Version03.encode( + &model_config, + traces.iter().map(|x| &x[..]).collect(), + &Mapping::empty(), + &UnifiedTags::new(), + )?); + + assert_eq!(encoded.as_str(), "kZGMpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW\ + 50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAA\ + AAAAABpXN0YXJ00wAAAAAAAAAAqGR1cmF0aW9u0wAAAAA7msoApWVycm9y0gAAAACkbWV0YYKpaG9zdC5uYW1lpHRlc3\ + Spc3Bhbi50eXBlo3dlYqdtZXRyaWNzgbVfc2FtcGxpbmdfcHJpb3JpdHlfdjHLAAAAAAAAAAA="); + + Ok(()) + } + + #[test] + fn test_encode_v05() -> Result<(), Box> { + let traces = get_traces(); + let model_config = ModelConfig { + service_name: "service_name".to_string(), + ..Default::default() + }; + + let mut unified_tags = UnifiedTags::new(); + unified_tags.set_env(Some(String::from("test-env"))); + unified_tags.set_version(Some(String::from("test-version"))); + unified_tags.set_service(Some(String::from("test-service"))); + + let _encoded = + base64::engine::general_purpose::STANDARD.encode(ApiVersion::Version05.encode( + &model_config, + traces.iter().map(|x| &x[..]).collect(), + &Mapping::empty(), + &unified_tags, + )?); + + // TODO: Need someone to generate the expected result or instructions to do so. + // assert_eq!(encoded.as_str(), "kp6jd2VirHNlcnZpY2VfbmFtZaljb21wb25lbnSocmVzb3VyY2WpaG9zdC5uYW\ + // 1lpHRlc3Snc2VydmljZax0ZXN0LXNlcnZpY2WjZW52qHRlc3QtZW52p3ZlcnNpb26sdGVzdC12ZXJzaW9uqXNwYW4udH\ + // lwZbVfc2FtcGxpbmdfcHJpb3JpdHlfdjGRkZzOAAAAAc4AAAACzgAAAAPPAAAAAAAAAAfPAAAAAAAAAGPPAAAAAAAAAA\ + // HTAAAAAAAAAADTAAAAADuaygDSAAAAAIXOAAAABM4AAAAFzgAAAAbOAAAAB84AAAAIzgAAAAnOAAAACs4AAAALzgAAAA\ + // zOAAAAAIHOAAAADcsAAAAAAAAAAM4AAAAA"); + + Ok(()) + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/unified_tags.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/unified_tags.rs new file mode 100644 index 0000000000..e4e835c550 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/unified_tags.rs @@ -0,0 +1,123 @@ +/// Unified tags - See: https://docs.datadoghq.com/getting_started/tagging/unified_service_tagging + +pub struct UnifiedTags { + pub service: UnifiedTagField, + pub env: UnifiedTagField, + pub version: UnifiedTagField, +} + +impl UnifiedTags { + pub fn new() -> Self { + UnifiedTags { + service: UnifiedTagField::new(UnifiedTagEnum::Service), + env: UnifiedTagField::new(UnifiedTagEnum::Env), + version: UnifiedTagField::new(UnifiedTagEnum::Version), + } + } + pub fn set_service(&mut self, service: Option) { + self.service.value = service; + } + pub fn set_version(&mut self, version: Option) { + self.version.value = version; + } + pub fn set_env(&mut self, env: Option) { + self.env.value = env; + } + pub fn service(&self) -> Option { + self.service.value.clone() + } + pub fn compute_attribute_size(&self) -> u32 { + self.service.len() + self.env.len() + self.version.len() + } +} + +pub struct UnifiedTagField { + pub value: Option, + pub kind: UnifiedTagEnum, +} + +impl UnifiedTagField { + pub fn new(kind: UnifiedTagEnum) -> Self { + UnifiedTagField { + value: kind.find_unified_tag_value(), + kind, + } + } + pub fn len(&self) -> u32 { + if self.value.is_some() { + return 1; + } + 0 + } + pub fn get_tag_name(&self) -> &'static str { + self.kind.get_tag_name() + } +} + +pub enum UnifiedTagEnum { + Service, + Version, + Env, +} + +impl UnifiedTagEnum { + fn get_env_variable_name(&self) -> &'static str { + match self { + UnifiedTagEnum::Service => "DD_SERVICE", + UnifiedTagEnum::Version => "DD_VERSION", + UnifiedTagEnum::Env => "DD_ENV", + } + } + fn get_tag_name(&self) -> &'static str { + match self { + UnifiedTagEnum::Service => "service", + UnifiedTagEnum::Version => "version", + UnifiedTagEnum::Env => "env", + } + } + fn find_unified_tag_value(&self) -> Option { + let env_name_to_check = self.get_env_variable_name(); + match std::env::var(env_name_to_check) { + Ok(tag_value) => Some(tag_value.to_lowercase()), + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_service() { + std::env::set_var("DD_SERVICE", "test-SERVICE"); + let mut unified_tags = UnifiedTags::new(); + assert_eq!("test-service", unified_tags.service.value.clone().unwrap()); + unified_tags.set_service(Some(String::from("new_service"))); + assert_eq!("new_service", unified_tags.service().unwrap()); + std::env::remove_var("DD_SERVICE"); + } + + #[test] + fn test_env() { + std::env::set_var("DD_ENV", "test-env"); + let mut unified_tags = UnifiedTags::new(); + assert_eq!("test-env", unified_tags.env.value.clone().unwrap()); + unified_tags.set_env(Some(String::from("new_env"))); + assert_eq!("new_env", unified_tags.env.value.unwrap()); + std::env::remove_var("DD_ENV"); + } + + #[test] + fn test_version() { + std::env::set_var("DD_VERSION", "test-version-1.2.3"); + let mut unified_tags = UnifiedTags::new(); + assert_eq!( + "test-version-1.2.3", + unified_tags.version.value.clone().unwrap() + ); + unified_tags.set_version(Some(String::from("new_version"))); + assert_eq!("new_version", unified_tags.version.value.unwrap()); + std::env::remove_var("DD_VERSION"); + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v03.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v03.rs new file mode 100644 index 0000000000..8f7242ea36 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v03.rs @@ -0,0 +1,129 @@ +use std::time::SystemTime; + +use opentelemetry::trace::Status; +use opentelemetry_sdk::export::trace::SpanData; + +use crate::plugins::telemetry::tracing::datadog_exporter::exporter::model::SAMPLING_PRIORITY_KEY; +use crate::plugins::telemetry::tracing::datadog_exporter::Error; +use crate::plugins::telemetry::tracing::datadog_exporter::ModelConfig; + +pub(crate) fn encode( + model_config: &ModelConfig, + traces: Vec<&[SpanData]>, + get_service_name: S, + get_name: N, + get_resource: R, +) -> Result, Error> +where + for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, +{ + let mut encoded = Vec::new(); + rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?; + + for trace in traces.into_iter() { + rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?; + + for span in trace { + // Safe until the year 2262 when Datadog will need to change their API + let start = span + .start_time + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + + let duration = span + .end_time + .duration_since(span.start_time) + .map(|x| x.as_nanos() as i64) + .unwrap_or(0); + + let mut span_type_found = false; + for (key, value) in &span.attributes { + if key.as_str() == "span.type" { + span_type_found = true; + rmp::encode::write_map_len(&mut encoded, 12)?; + rmp::encode::write_str(&mut encoded, "type")?; + rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?; + break; + } + } + + if !span_type_found { + rmp::encode::write_map_len(&mut encoded, 11)?; + } + + // Datadog span name is OpenTelemetry component name - see module docs for more information + rmp::encode::write_str(&mut encoded, "service")?; + rmp::encode::write_str(&mut encoded, get_service_name(span, model_config))?; + + rmp::encode::write_str(&mut encoded, "name")?; + rmp::encode::write_str(&mut encoded, get_name(span, model_config))?; + + rmp::encode::write_str(&mut encoded, "resource")?; + rmp::encode::write_str(&mut encoded, get_resource(span, model_config))?; + + rmp::encode::write_str(&mut encoded, "trace_id")?; + rmp::encode::write_u64( + &mut encoded, + u128::from_be_bytes(span.span_context.trace_id().to_bytes()) as u64, + )?; + + rmp::encode::write_str(&mut encoded, "span_id")?; + rmp::encode::write_u64( + &mut encoded, + u64::from_be_bytes(span.span_context.span_id().to_bytes()), + )?; + + rmp::encode::write_str(&mut encoded, "parent_id")?; + rmp::encode::write_u64( + &mut encoded, + u64::from_be_bytes(span.parent_span_id.to_bytes()), + )?; + + rmp::encode::write_str(&mut encoded, "start")?; + rmp::encode::write_i64(&mut encoded, start)?; + + rmp::encode::write_str(&mut encoded, "duration")?; + rmp::encode::write_i64(&mut encoded, duration)?; + + rmp::encode::write_str(&mut encoded, "error")?; + rmp::encode::write_i32( + &mut encoded, + match span.status { + Status::Error { .. } => 1, + _ => 0, + }, + )?; + + rmp::encode::write_str(&mut encoded, "meta")?; + rmp::encode::write_map_len( + &mut encoded, + (span.attributes.len() + span.resource.len()) as u32, + )?; + for (key, value) in span.resource.iter() { + rmp::encode::write_str(&mut encoded, key.as_str())?; + rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?; + } + for (key, value) in span.attributes.iter() { + rmp::encode::write_str(&mut encoded, key.as_str())?; + rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?; + } + + rmp::encode::write_str(&mut encoded, "metrics")?; + rmp::encode::write_map_len(&mut encoded, 1)?; + rmp::encode::write_str(&mut encoded, SAMPLING_PRIORITY_KEY)?; + rmp::encode::write_f64( + &mut encoded, + if span.span_context.is_sampled() { + 1.0 + } else { + 0.0 + }, + )?; + } + } + + Ok(encoded) +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v05.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v05.rs new file mode 100644 index 0000000000..fd1590966e --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v05.rs @@ -0,0 +1,263 @@ +use std::time::SystemTime; + +use opentelemetry::trace::Status; +use opentelemetry_sdk::export::trace::SpanData; + +use super::unified_tags::UnifiedTagField; +use super::unified_tags::UnifiedTags; +use crate::plugins::telemetry::tracing::datadog_exporter::exporter::intern::StringInterner; +use crate::plugins::telemetry::tracing::datadog_exporter::exporter::model::DD_MEASURED_KEY; +use crate::plugins::telemetry::tracing::datadog_exporter::exporter::model::SAMPLING_PRIORITY_KEY; +use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState; +use crate::plugins::telemetry::tracing::datadog_exporter::Error; +use crate::plugins::telemetry::tracing::datadog_exporter::ModelConfig; + +const SPAN_NUM_ELEMENTS: u32 = 12; +const METRICS_LEN: u32 = 2; +const GIT_META_TAGS_COUNT: u32 = if matches!( + ( + option_env!("DD_GIT_REPOSITORY_URL"), + option_env!("DD_GIT_COMMIT_SHA") + ), + (Some(_), Some(_)) +) { + 2 +} else { + 0 +}; + +// Protocol documentation sourced from https://github.com/DataDog/datadog-agent/blob/c076ea9a1ffbde4c76d35343dbc32aecbbf99cb9/pkg/trace/api/version.go +// +// The payload is an array containing exactly 12 elements: +// +// 1. An array of all unique strings present in the payload (a dictionary referred to by index). +// 2. An array of traces, where each trace is an array of spans. A span is encoded as an array having +// exactly 12 elements, representing all span properties, in this exact order: +// +// 0: Service (uint32) +// 1: Name (uint32) +// 2: Resource (uint32) +// 3: TraceID (uint64) +// 4: SpanID (uint64) +// 5: ParentID (uint64) +// 6: Start (int64) +// 7: Duration (int64) +// 8: Error (int32) +// 9: Meta (map[uint32]uint32) +// 10: Metrics (map[uint32]float64) +// 11: Type (uint32) +// +// Considerations: +// +// - The "uint32" typed values in "Service", "Name", "Resource", "Type", "Meta" and "Metrics" represent +// the index at which the corresponding string is found in the dictionary. If any of the values are the +// empty string, then the empty string must be added into the dictionary. +// +// - None of the elements can be nil. If any of them are unset, they should be given their "zero-value". Here +// is an example of a span with all unset values: +// +// 0: 0 // Service is "" (index 0 in dictionary) +// 1: 0 // Name is "" +// 2: 0 // Resource is "" +// 3: 0 // TraceID +// 4: 0 // SpanID +// 5: 0 // ParentID +// 6: 0 // Start +// 7: 0 // Duration +// 8: 0 // Error +// 9: map[uint32]uint32{} // Meta (empty map) +// 10: map[uint32]float64{} // Metrics (empty map) +// 11: 0 // Type is "" +// +// The dictionary in this case would be []string{""}, having only the empty string at index 0. +// +pub(crate) fn encode( + model_config: &ModelConfig, + traces: Vec<&[SpanData]>, + get_service_name: S, + get_name: N, + get_resource: R, + unified_tags: &UnifiedTags, +) -> Result, Error> +where + for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, +{ + let mut interner = StringInterner::new(); + let mut encoded_traces = encode_traces( + &mut interner, + model_config, + get_service_name, + get_name, + get_resource, + &traces, + unified_tags, + )?; + + let mut payload = Vec::with_capacity(traces.len() * 512); + rmp::encode::write_array_len(&mut payload, 2)?; + + interner.write_dictionary(&mut payload)?; + + payload.append(&mut encoded_traces); + + Ok(payload) +} + +fn write_unified_tags<'a>( + encoded: &mut Vec, + interner: &mut StringInterner<'a>, + unified_tags: &'a UnifiedTags, +) -> Result<(), Error> { + write_unified_tag(encoded, interner, &unified_tags.service)?; + write_unified_tag(encoded, interner, &unified_tags.env)?; + write_unified_tag(encoded, interner, &unified_tags.version)?; + Ok(()) +} + +fn write_unified_tag<'a>( + encoded: &mut Vec, + interner: &mut StringInterner<'a>, + tag: &'a UnifiedTagField, +) -> Result<(), Error> { + if let Some(tag_value) = &tag.value { + rmp::encode::write_u32(encoded, interner.intern(tag.get_tag_name()))?; + rmp::encode::write_u32(encoded, interner.intern(tag_value.as_str().as_ref()))?; + } + Ok(()) +} + +fn get_sampling_priority(span: &SpanData) -> f64 { + if span.span_context.trace_state().priority_sampling_enabled() { + 1.0 + } else { + 0.0 + } +} + +fn get_measuring(span: &SpanData) -> f64 { + if span.span_context.trace_state().measuring_enabled() { + 1.0 + } else { + 0.0 + } +} + +fn encode_traces<'interner, S, N, R>( + interner: &mut StringInterner<'interner>, + model_config: &'interner ModelConfig, + get_service_name: S, + get_name: N, + get_resource: R, + traces: &'interner [&[SpanData]], + unified_tags: &'interner UnifiedTags, +) -> Result, Error> +where + for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, +{ + let mut encoded = Vec::new(); + rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?; + + for trace in traces.iter() { + rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?; + + for span in trace.iter() { + // Safe until the year 2262 when Datadog will need to change their API + let start = span + .start_time + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + + let duration = span + .end_time + .duration_since(span.start_time) + .map(|x| x.as_nanos() as i64) + .unwrap_or(0); + + let mut span_type = interner.intern(""); + for (key, value) in &span.attributes { + if key.as_str() == "span.type" { + span_type = interner.intern_value(value); + break; + } + } + + // Datadog span name is OpenTelemetry component name - see module docs for more information + rmp::encode::write_array_len(&mut encoded, SPAN_NUM_ELEMENTS)?; + rmp::encode::write_u32( + &mut encoded, + interner.intern(get_service_name(span, model_config)), + )?; + rmp::encode::write_u32(&mut encoded, interner.intern(get_name(span, model_config)))?; + rmp::encode::write_u32( + &mut encoded, + interner.intern(get_resource(span, model_config)), + )?; + rmp::encode::write_u64( + &mut encoded, + u128::from_be_bytes(span.span_context.trace_id().to_bytes()) as u64, + )?; + rmp::encode::write_u64( + &mut encoded, + u64::from_be_bytes(span.span_context.span_id().to_bytes()), + )?; + rmp::encode::write_u64( + &mut encoded, + u64::from_be_bytes(span.parent_span_id.to_bytes()), + )?; + rmp::encode::write_i64(&mut encoded, start)?; + rmp::encode::write_i64(&mut encoded, duration)?; + rmp::encode::write_i32( + &mut encoded, + match span.status { + Status::Error { .. } => 1, + _ => 0, + }, + )?; + + rmp::encode::write_map_len( + &mut encoded, + (span.attributes.len() + span.resource.len()) as u32 + + unified_tags.compute_attribute_size() + + GIT_META_TAGS_COUNT, + )?; + for (key, value) in span.resource.iter() { + rmp::encode::write_u32(&mut encoded, interner.intern(key.as_str()))?; + rmp::encode::write_u32(&mut encoded, interner.intern_value(value))?; + } + + write_unified_tags(&mut encoded, interner, unified_tags)?; + + for (key, value) in span.attributes.iter() { + rmp::encode::write_u32(&mut encoded, interner.intern(key.as_str()))?; + rmp::encode::write_u32(&mut encoded, interner.intern_value(value))?; + } + + if let (Some(repository_url), Some(commit_sha)) = ( + option_env!("DD_GIT_REPOSITORY_URL"), + option_env!("DD_GIT_COMMIT_SHA"), + ) { + rmp::encode::write_u32(&mut encoded, interner.intern("git.repository_url"))?; + rmp::encode::write_u32(&mut encoded, interner.intern(repository_url))?; + rmp::encode::write_u32(&mut encoded, interner.intern("git.commit.sha"))?; + rmp::encode::write_u32(&mut encoded, interner.intern(commit_sha))?; + } + + rmp::encode::write_map_len(&mut encoded, METRICS_LEN)?; + rmp::encode::write_u32(&mut encoded, interner.intern(SAMPLING_PRIORITY_KEY))?; + let sampling_priority = get_sampling_priority(span); + rmp::encode::write_f64(&mut encoded, sampling_priority)?; + + rmp::encode::write_u32(&mut encoded, interner.intern(DD_MEASURED_KEY))?; + let measuring = get_measuring(span); + rmp::encode::write_f64(&mut encoded, measuring)?; + rmp::encode::write_u32(&mut encoded, span_type)?; + } + } + + Ok(encoded) +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/mod.rs new file mode 100644 index 0000000000..1c586d48c8 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/mod.rs @@ -0,0 +1,532 @@ +//! # OpenTelemetry Datadog Exporter +//! +//! An OpenTelemetry datadog exporter implementation +//! +//! See the [Datadog Docs](https://docs.datadoghq.com/agent/) for information on how to run the datadog-agent +//! +//! ## Quirks +//! +//! There are currently some incompatibilities between Datadog and OpenTelemetry, and this manifests +//! as minor quirks to this exporter. +//! +//! Firstly Datadog uses operation_name to describe what OpenTracing would call a component. +//! Or to put it another way, in OpenTracing the operation / span name's are relatively +//! granular and might be used to identify a specific endpoint. In datadog, however, they +//! are less granular - it is expected in Datadog that a service will have single +//! primary span name that is the root of all traces within that service, with an additional piece of +//! metadata called resource_name providing granularity. See [here](https://docs.datadoghq.com/tracing/guide/configuring-primary-operation/) +//! +//! The Datadog Golang API takes the approach of using a `resource.name` OpenTelemetry attribute to set the +//! resource_name. See [here](https://github.com/DataDog/dd-trace-go/blob/ecb0b805ef25b00888a2fb62d465a5aa95e7301e/ddtrace/opentracer/tracer.go#L10) +//! +//! Unfortunately, this breaks compatibility with other OpenTelemetry exporters which expect +//! a more granular operation name - as per the OpenTracing specification. +//! +//! This exporter therefore takes a different approach of naming the span with the name of the +//! tracing provider, and using the span name to set the resource_name. This should in most cases +//! lead to the behaviour that users expect. +//! +//! Datadog additionally has a span_type string that alters the rendering of the spans in the web UI. +//! This can be set as the `span.type` OpenTelemetry span attribute. +//! +//! For standard values see [here](https://github.com/DataDog/dd-trace-go/blob/ecb0b805ef25b00888a2fb62d465a5aa95e7301e/ddtrace/ext/app_types.go#L31). +//! +//! If the default mapping is not fit for your use case, you may change some of them by providing [`FieldMappingFn`]s in pipeline. +//! +//! ## Performance +//! +//! For optimal performance, a batch exporter is recommended as the simple exporter will export +//! each span synchronously on drop. You can enable the [`rt-tokio`], [`rt-tokio-current-thread`] +//! or [`rt-async-std`] features and specify a runtime on the pipeline to have a batch exporter +//! configured for you automatically. +//! +//! ```toml +//! [dependencies] +//! opentelemetry = { version = "*", features = ["rt-tokio"] } +//! opentelemetry-datadog = "*" +//! ``` +//! +//! ```no_run +//! # fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! let tracer = opentelemetry_datadog::new_pipeline() +//! .install_batch(opentelemetry_sdk::runtime::Tokio)?; +//! # Ok(()) +//! # } +//! ``` +//! +//! [`rt-tokio`]: https://tokio.rs +//! [`rt-tokio-current-thread`]: https://tokio.rs +//! [`rt-async-std`]: https://async.rs +//! +//! ## Bring your own http client +//! +//! Users can choose appropriate http clients to align with their runtime. +//! +//! Based on the feature enabled. The default http client will be different. If user doesn't specific +//! features or enabled `reqwest-blocking-client` feature. The blocking reqwest http client will be used as +//! default client. If `reqwest-client` feature is enabled. The async reqwest http client will be used. If +//! `surf-client` feature is enabled. The surf http client will be used. +//! +//! Note that async http clients may need specific runtime otherwise it will panic. User should make +//! sure the http client is running in appropriate runime. +//! +//! Users can always use their own http clients by implementing `HttpClient` trait. +//! +//! ## Kitchen Sink Full Configuration +//! +//! Example showing how to override all configuration options. See the +//! [`DatadogPipelineBuilder`] docs for details of each option. +//! +//! [`DatadogPipelineBuilder`]: struct.DatadogPipelineBuilder.html +//! +//! ```no_run +//! use opentelemetry::{KeyValue, trace::Tracer}; +//! use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}; +//! use opentelemetry_sdk::export::trace::ExportResult; +//! use opentelemetry::global::shutdown_tracer_provider; +//! use opentelemetry_datadog::{new_pipeline, ApiVersion, Error}; +//! use opentelemetry_http::{HttpClient, HttpError}; +//! use async_trait::async_trait; +//! use bytes::Bytes; +//! use futures_util::io::AsyncReadExt as _; +//! use http::{Request, Response}; +//! use std::convert::TryInto as _; +//! +//! // `reqwest` and `surf` are supported through features, if you prefer an +//! // alternate http client you can add support by implementing `HttpClient` as +//! // shown here. +//! #[derive(Debug)] +//! struct IsahcClient(isahc::HttpClient); +//! +//! #[async_trait] +//! impl HttpClient for IsahcClient { +//! async fn send(&self, request: Request>) -> Result, HttpError> { +//! let mut response = self.0.send_async(request).await?; +//! let status = response.status(); +//! let mut bytes = Vec::with_capacity(response.body().len().unwrap_or(0).try_into()?); +//! isahc::AsyncReadResponseExt::copy_to(&mut response, &mut bytes).await?; +//! +//! Ok(Response::builder() +//! .status(response.status()) +//! .body(bytes.into())?) +//! } +//! } +//! +//! fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! let tracer = new_pipeline() +//! .with_service_name("my_app") +//! .with_api_version(ApiVersion::Version05) +//! .with_agent_endpoint("http://localhost:8126") +//! .with_trace_config( +//! trace::config() +//! .with_sampler(Sampler::AlwaysOn) +//! .with_id_generator(RandomIdGenerator::default()) +//! ) +//! .install_batch(opentelemetry_sdk::runtime::Tokio)?; +//! +//! tracer.in_span("doing_work", |cx| { +//! // Traced app logic here... +//! }); +//! +//! shutdown_tracer_provider(); // sending remaining spans before exit +//! +//! Ok(()) +//! } +//! ``` + +mod exporter; + +#[allow(unused_imports)] +pub use exporter::new_pipeline; +#[allow(unused_imports)] +pub use exporter::ApiVersion; +#[allow(unused_imports)] +pub use exporter::DatadogExporter; +#[allow(unused_imports)] +pub use exporter::DatadogPipelineBuilder; +#[allow(unused_imports)] +pub use exporter::Error; +#[allow(unused_imports)] +pub use exporter::FieldMappingFn; +#[allow(unused_imports)] +pub use exporter::ModelConfig; +#[allow(unused_imports)] +pub use propagator::DatadogPropagator; +#[allow(unused_imports)] +pub use propagator::DatadogTraceState; +#[allow(unused_imports)] +pub use propagator::DatadogTraceStateBuilder; + +pub(crate) mod propagator { + use once_cell::sync::Lazy; + use opentelemetry::propagation::text_map_propagator::FieldIter; + use opentelemetry::propagation::Extractor; + use opentelemetry::propagation::Injector; + use opentelemetry::propagation::TextMapPropagator; + use opentelemetry::trace::SpanContext; + use opentelemetry::trace::SpanId; + use opentelemetry::trace::TraceContextExt; + use opentelemetry::trace::TraceFlags; + use opentelemetry::trace::TraceId; + use opentelemetry::trace::TraceState; + use opentelemetry::Context; + + const DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id"; + const DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id"; + const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority"; + + const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02); + const TRACE_STATE_PRIORITY_SAMPLING: &str = "psr"; + pub(crate) const TRACE_STATE_MEASURE: &str = "m"; + pub(crate) const TRACE_STATE_TRUE_VALUE: &str = "1"; + pub(crate) const TRACE_STATE_FALSE_VALUE: &str = "0"; + + static DATADOG_HEADER_FIELDS: Lazy<[String; 3]> = Lazy::new(|| { + [ + DATADOG_TRACE_ID_HEADER.to_string(), + DATADOG_PARENT_ID_HEADER.to_string(), + DATADOG_SAMPLING_PRIORITY_HEADER.to_string(), + ] + }); + + #[derive(Default)] + pub struct DatadogTraceStateBuilder { + priority_sampling: bool, + measuring: bool, + } + + fn boolean_to_trace_state_flag(value: bool) -> &'static str { + if value { + TRACE_STATE_TRUE_VALUE + } else { + TRACE_STATE_FALSE_VALUE + } + } + + fn trace_flag_to_boolean(value: &str) -> bool { + value == TRACE_STATE_TRUE_VALUE + } + + #[allow(clippy::needless_update)] + impl DatadogTraceStateBuilder { + pub fn with_priority_sampling(self, enabled: bool) -> Self { + Self { + priority_sampling: enabled, + ..self + } + } + + pub fn with_measuring(self, enabled: bool) -> Self { + Self { + measuring: enabled, + ..self + } + } + + pub fn build(self) -> TraceState { + let values = [ + ( + TRACE_STATE_MEASURE, + boolean_to_trace_state_flag(self.measuring), + ), + ( + TRACE_STATE_PRIORITY_SAMPLING, + boolean_to_trace_state_flag(self.priority_sampling), + ), + ]; + + TraceState::from_key_value(values).unwrap_or_default() + } + } + + pub trait DatadogTraceState { + fn with_measuring(&self, enabled: bool) -> TraceState; + + fn measuring_enabled(&self) -> bool; + + fn with_priority_sampling(&self, enabled: bool) -> TraceState; + + fn priority_sampling_enabled(&self) -> bool; + } + + impl DatadogTraceState for TraceState { + fn with_measuring(&self, enabled: bool) -> TraceState { + self.insert(TRACE_STATE_MEASURE, boolean_to_trace_state_flag(enabled)) + .unwrap_or_else(|_err| self.clone()) + } + + fn measuring_enabled(&self) -> bool { + self.get(TRACE_STATE_MEASURE) + .map(trace_flag_to_boolean) + .unwrap_or_default() + } + + fn with_priority_sampling(&self, enabled: bool) -> TraceState { + self.insert( + TRACE_STATE_PRIORITY_SAMPLING, + boolean_to_trace_state_flag(enabled), + ) + .unwrap_or_else(|_err| self.clone()) + } + + fn priority_sampling_enabled(&self) -> bool { + self.get(TRACE_STATE_PRIORITY_SAMPLING) + .map(trace_flag_to_boolean) + .unwrap_or_default() + } + } + + enum SamplingPriority { + UserReject = -1, + AutoReject = 0, + AutoKeep = 1, + UserKeep = 2, + } + + #[derive(Debug)] + enum ExtractError { + TraceId, + SpanId, + SamplingPriority, + } + + /// Extracts and injects `SpanContext`s into `Extractor`s or `Injector`s using Datadog's header format. + /// + /// The Datadog header format does not have an explicit spec, but can be divined from the client libraries, + /// such as [dd-trace-go] + /// + /// ## Example + /// + /// ``` + /// use opentelemetry::global; + /// use opentelemetry_datadog::DatadogPropagator; + /// + /// global::set_text_map_propagator(DatadogPropagator::default()); + /// ``` + /// + /// [dd-trace-go]: https://github.com/DataDog/dd-trace-go/blob/v1.28.0/ddtrace/tracer/textmap.go#L293 + #[derive(Clone, Debug, Default)] + pub struct DatadogPropagator { + _private: (), + } + + fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) { + if trace_flags & TRACE_FLAG_DEFERRED == TRACE_FLAG_DEFERRED { + (TraceState::default(), trace_flags) + } else { + ( + DatadogTraceStateBuilder::default() + .with_priority_sampling(trace_flags.is_sampled()) + .build(), + TraceFlags::SAMPLED, + ) + } + } + + impl DatadogPropagator { + /// Creates a new `DatadogPropagator`. + pub fn new() -> Self { + DatadogPropagator::default() + } + + fn extract_trace_id(&self, trace_id: &str) -> Result { + trace_id + .parse::() + .map(|id| TraceId::from(id as u128)) + .map_err(|_| ExtractError::TraceId) + } + + fn extract_span_id(&self, span_id: &str) -> Result { + span_id + .parse::() + .map(SpanId::from) + .map_err(|_| ExtractError::SpanId) + } + + fn extract_sampling_priority( + &self, + sampling_priority: &str, + ) -> Result { + let i = sampling_priority + .parse::() + .map_err(|_| ExtractError::SamplingPriority)?; + + match i { + -1 => Ok(SamplingPriority::UserReject), + 0 => Ok(SamplingPriority::AutoReject), + 1 => Ok(SamplingPriority::AutoKeep), + 2 => Ok(SamplingPriority::UserKeep), + _ => Err(ExtractError::SamplingPriority), + } + } + + fn extract_span_context( + &self, + extractor: &dyn Extractor, + ) -> Result { + let trace_id = + self.extract_trace_id(extractor.get(DATADOG_TRACE_ID_HEADER).unwrap_or(""))?; + // If we have a trace_id but can't get the parent span, we default it to invalid instead of completely erroring + // out so that the rest of the spans aren't completely lost + let span_id = self + .extract_span_id(extractor.get(DATADOG_PARENT_ID_HEADER).unwrap_or("")) + .unwrap_or(SpanId::INVALID); + let sampling_priority = self.extract_sampling_priority( + extractor + .get(DATADOG_SAMPLING_PRIORITY_HEADER) + .unwrap_or(""), + ); + let sampled = match sampling_priority { + Ok(SamplingPriority::UserReject) | Ok(SamplingPriority::AutoReject) => { + TraceFlags::default() + } + Ok(SamplingPriority::UserKeep) | Ok(SamplingPriority::AutoKeep) => { + TraceFlags::SAMPLED + } + // Treat the sampling as DEFERRED instead of erroring on extracting the span context + Err(_) => TRACE_FLAG_DEFERRED, + }; + + let (trace_state, trace_flags) = create_trace_state_and_flags(sampled); + + Ok(SpanContext::new( + trace_id, + span_id, + trace_flags, + true, + trace_state, + )) + } + } + + fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority { + if span_context.trace_state().priority_sampling_enabled() { + SamplingPriority::AutoKeep + } else { + SamplingPriority::AutoReject + } + } + + impl TextMapPropagator for DatadogPropagator { + fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) { + let span = cx.span(); + let span_context = span.span_context(); + if span_context.is_valid() { + injector.set( + DATADOG_TRACE_ID_HEADER, + (u128::from_be_bytes(span_context.trace_id().to_bytes()) as u64).to_string(), + ); + injector.set( + DATADOG_PARENT_ID_HEADER, + u64::from_be_bytes(span_context.span_id().to_bytes()).to_string(), + ); + + if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED { + let sampling_priority = get_sampling_priority(span_context); + + injector.set( + DATADOG_SAMPLING_PRIORITY_HEADER, + (sampling_priority as i32).to_string(), + ); + } + } + } + + fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context { + self.extract_span_context(extractor) + .map(|sc| cx.with_remote_span_context(sc)) + .unwrap_or_else(|_| cx.clone()) + } + + fn fields(&self) -> FieldIter<'_> { + FieldIter::new(DATADOG_HEADER_FIELDS.as_ref()) + } + } + + #[cfg(test)] + mod tests { + use std::collections::HashMap; + + use opentelemetry::trace::TraceState; + use opentelemetry_sdk::testing::trace::TestSpan; + + use super::*; + + #[rustfmt::skip] + fn extract_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> { + vec![ + (vec![], SpanContext::empty_context()), + (vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()), + (vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())), + ] + } + + #[rustfmt::skip] + fn inject_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> { + vec![ + (vec![], SpanContext::empty_context()), + (vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())), + ] + } + + #[test] + fn test_extract() { + for (header_list, expected) in extract_test_data() { + let map: HashMap = header_list + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + let propagator = DatadogPropagator::default(); + let context = propagator.extract(&map); + assert_eq!(context.span().span_context(), &expected); + } + } + + #[test] + fn test_extract_empty() { + let map: HashMap = HashMap::new(); + let propagator = DatadogPropagator::default(); + let context = propagator.extract(&map); + assert_eq!(context.span().span_context(), &SpanContext::empty_context()) + } + + #[test] + fn test_extract_with_empty_remote_context() { + let map: HashMap = HashMap::new(); + let propagator = DatadogPropagator::default(); + let context = propagator.extract_with_context(&Context::new(), &map); + assert!(!context.has_active_span()) + } + + #[test] + fn test_inject() { + let propagator = DatadogPropagator::default(); + for (header_values, span_context) in inject_test_data() { + let mut injector: HashMap = HashMap::new(); + propagator.inject_context( + &Context::current_with_span(TestSpan(span_context)), + &mut injector, + ); + + if !header_values.is_empty() { + for (k, v) in header_values.into_iter() { + let injected_value: Option<&String> = injector.get(k); + assert_eq!(injected_value, Some(&v.to_string())); + injector.remove(k); + } + } + assert!(injector.is_empty()); + } + } + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/mod.rs b/apollo-router/src/plugins/telemetry/tracing/mod.rs index 99ea905472..0172f3e094 100644 --- a/apollo-router/src/plugins/telemetry/tracing/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/mod.rs @@ -22,6 +22,8 @@ use crate::plugins::telemetry::config::TracingCommon; pub(crate) mod apollo; pub(crate) mod apollo_telemetry; pub(crate) mod datadog; +#[allow(unreachable_pub, dead_code)] +pub(crate) mod datadog_exporter; pub(crate) mod jaeger; pub(crate) mod otlp; pub(crate) mod reload; diff --git a/apollo-router/src/services/layers/query_analysis.rs b/apollo-router/src/services/layers/query_analysis.rs index 71eae898e7..4377efb205 100644 --- a/apollo-router/src/services/layers/query_analysis.rs +++ b/apollo-router/src/services/layers/query_analysis.rs @@ -23,6 +23,7 @@ use crate::graphql::IntoGraphQLErrors; use crate::plugins::authorization::AuthorizationPlugin; use crate::plugins::telemetry::config::ApolloMetricsReferenceMode; use crate::plugins::telemetry::config::Conf as TelemetryConfig; +use crate::plugins::telemetry::consts::QUERY_PARSING_SPAN_NAME; use crate::query_planner::fetch::QueryHash; use crate::query_planner::OperationKind; use crate::services::SupergraphRequest; @@ -33,8 +34,6 @@ use crate::spec::SpecError; use crate::Configuration; use crate::Context; -pub(crate) const QUERY_PARSING_SPAN_NAME: &str = "parse_query"; - /// [`Layer`] for QueryAnalysis implementation. #[derive(Clone)] #[allow(clippy::type_complexity)] diff --git a/apollo-router/tests/integration/telemetry/datadog.rs b/apollo-router/tests/integration/telemetry/datadog.rs index 192e37c629..6e8d9be3d4 100644 --- a/apollo-router/tests/integration/telemetry/datadog.rs +++ b/apollo-router/tests/integration/telemetry/datadog.rs @@ -60,6 +60,8 @@ async fn test_default_span_names() -> Result<(), BoxError> { "http_request", "parse_query", ], + &[], + &[], ) .await?; router.graceful_shutdown().await; @@ -112,6 +114,8 @@ async fn test_override_span_names() -> Result<(), BoxError> { "http_request", "parse_query", ], + &[], + &[], ) .await?; router.graceful_shutdown().await; @@ -164,6 +168,8 @@ async fn test_override_span_names_late() -> Result<(), BoxError> { "http_request", "parse_query", ], + &[], + &[], ) .await?; router.graceful_shutdown().await; @@ -213,6 +219,17 @@ async fn test_basic() -> Result<(), BoxError> { "subgraph server", "parse_query", ], + &[ + "query_planning", + "subgraph", + "http_request", + "subgraph_request", + "router", + "execution", + "supergraph", + "parse_query", + ], + &[], ) .await?; router.graceful_shutdown().await; @@ -250,6 +267,7 @@ async fn test_resource_mapping_default() -> Result<(), BoxError> { false, &[ "parse_query", + "/", "ExampleQuery", "client_request", "execution", @@ -259,6 +277,8 @@ async fn test_resource_mapping_default() -> Result<(), BoxError> { "subgraph server", "ExampleQuery__products__0", ], + &[], + &[], ) .await?; router.graceful_shutdown().await; @@ -306,12 +326,61 @@ async fn test_resource_mapping_override() -> Result<(), BoxError> { "overridden", "ExampleQuery__products__0", ], + &[], + &[], ) .await?; router.graceful_shutdown().await; Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_span_metrics() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Datadog) + .config(include_str!("fixtures/disable_span_metrics.router.yaml")) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, result) = router.execute_query(&query).await; + assert!(!result + .headers() + .get("apollo-custom-trace-id") + .unwrap() + .is_empty()); + validate_trace( + id, + &query, + Some("ExampleQuery"), + &["client", "router", "subgraph"], + false, + &[ + "parse_query", + "ExampleQuery", + "client_request", + "execution", + "query_planning", + "products", + "fetch", + "subgraph server", + "ExampleQuery__products__0", + ], + &["subgraph"], + &["supergraph"], + ) + .await?; + router.graceful_shutdown().await; + Ok(()) +} + +#[allow(clippy::too_many_arguments)] async fn validate_trace( id: TraceId, query: &Value, @@ -319,6 +388,8 @@ async fn validate_trace( services: &[&'static str], custom_span_instrumentation: bool, expected_span_names: &[&'static str], + expected_measured: &[&'static str], + unexpected_measured: &[&'static str], ) -> Result<(), BoxError> { let datadog_id = id.to_datadog(); let url = format!("http://localhost:8126/test/traces?trace_ids={datadog_id}"); @@ -330,6 +401,8 @@ async fn validate_trace( services, custom_span_instrumentation, expected_span_names, + expected_measured, + unexpected_measured, ) .await .is_ok() @@ -345,11 +418,14 @@ async fn validate_trace( services, custom_span_instrumentation, expected_span_names, + expected_measured, + unexpected_measured, ) .await?; Ok(()) } +#[allow(clippy::too_many_arguments)] async fn find_valid_trace( url: &str, _query: &Value, @@ -357,6 +433,8 @@ async fn find_valid_trace( services: &[&'static str], _custom_span_instrumentation: bool, expected_span_names: &[&'static str], + expected_measured: &[&'static str], + unexpected_measured: &[&'static str], ) -> Result<(), BoxError> { // A valid trace has: // * All three services @@ -373,6 +451,55 @@ async fn find_valid_trace( tracing::debug!("{}", serde_json::to_string_pretty(&trace)?); verify_trace_participants(&trace, services)?; verify_spans_present(&trace, operation_name, services, expected_span_names)?; + validate_span_kinds(&trace)?; + validate_measured_spans(&trace, expected_measured, unexpected_measured)?; + Ok(()) +} + +fn validate_measured_spans( + trace: &Value, + expected: &[&'static str], + unexpected: &[&'static str], +) -> Result<(), BoxError> { + for expected in expected { + assert!( + measured_span(trace, expected)?, + "missing measured span {}", + expected + ); + } + for unexpected in unexpected { + assert!( + !measured_span(trace, unexpected)?, + "unexpected measured span {}", + unexpected + ); + } + Ok(()) +} + +fn measured_span(trace: &Value, name: &&str) -> Result { + let binding1 = trace.select_path(&format!( + "$..[?(@.meta.['otel.original_name'] == '{}')].metrics.['_dd.measured']", + name + ))?; + let binding2 = trace.select_path(&format!( + "$..[?(@.name == '{}')].metrics.['_dd.measured']", + name + ))?; + Ok(binding1 + .first() + .or(binding2.first()) + .and_then(|v| v.as_f64()) + .map(|v| v == 1.0) + .unwrap_or_default()) +} + +fn validate_span_kinds(trace: &Value) -> Result<(), BoxError> { + // Validate that the span.kind has been propagated. We can just do this for a selection of spans. + validate_span_kind(trace, "router", "server")?; + validate_span_kind(trace, "supergraph", "internal")?; + validate_span_kind(trace, "http_request", "client")?; Ok(()) } @@ -425,6 +552,31 @@ fn verify_spans_present( Ok(()) } +fn validate_span_kind(trace: &Value, name: &str, kind: &str) -> Result<(), BoxError> { + let binding1 = trace.select_path(&format!( + "$..[?(@.meta.['otel.original_name'] == '{}')].meta.['span.kind']", + name + ))?; + let binding2 = + trace.select_path(&format!("$..[?(@.name == '{}')].meta.['span.kind']", name))?; + let binding = binding1.first().or(binding2.first()); + + assert!( + binding.is_some(), + "span.kind missing or incorrect {}, {}", + name, + trace + ); + assert_eq!( + binding + .expect("expected binding") + .as_str() + .expect("expected string"), + kind + ); + Ok(()) +} + pub(crate) trait DatadogId { fn to_datadog(&self) -> String; } diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml index 396a60fa5d..96160b1831 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml @@ -12,6 +12,12 @@ telemetry: enable_span_mapping: true batch_processor: scheduled_delay: 100ms + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true diff --git a/apollo-router/tests/integration/telemetry/fixtures/disable_span_metrics.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/disable_span_metrics.router.yaml new file mode 100644 index 0000000000..0d47070c4a --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/disable_span_metrics.router.yaml @@ -0,0 +1,26 @@ +telemetry: + exporters: + tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + format: datadog + common: + service_name: router + datadog: + enabled: true + batch_processor: + scheduled_delay: 100ms + span_metrics: + supergraph: false + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + + + diff --git a/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx b/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx index b7f802a7aa..466e0accd1 100644 --- a/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx +++ b/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx @@ -185,6 +185,39 @@ telemetry: my.span.attribute: request_header: x-custom-header ``` +If you have introduced a new span in a custom build of the Router you can enable resource mapping for it by adding it to the `resource_mapping` configuration. + +### `span_metrics` +When set, `span_metrics` allows you to specify which spans will show span metrics in the Datadog APM and Trace view. +By default, span metrics are enabled for: + +* `request` +* `router` +* `supergraph` +* `subgraph` +* `subgraph_request` +* `http_request` +* `query_planning` +* `execution` +* `query_parsing` + +You may override these defaults by specifying `span_metrics` configuration: + +The following will disable span metrics for the supergraph span. +```yaml title="router.yaml" +telemetry: + exporters: + tracing: + datadog: + enabled: true + span_metrics: + # Disable span metrics for supergraph + supergraph: false + # Enable span metrics for my_custom_span + my_custom_span: true +``` + +If you have introduced a new span in a custom build of the Router you can enable span metrics for it by adding it to the `span_metrics` configuration. ### `batch_processor` @@ -209,10 +242,12 @@ telemetry: ## Datadog native configuration reference -| Attribute | Default | Description | -|-----------------------|---------------------------------------|---------------------------------| -| `enabled` | `false` | Enable the OTLP exporter. | -| `enable_span_mapping` | `false` | If span mapping should be used. | -| `endpoint` | `http://localhost:8126/v0.4/traces` | The endpoint to send spans to. | -| `batch_processor` | | The batch processor settings. | +| Attribute | Default | Description | +|-----------------------|-------------------------------------|-----------------------------------------| +| `enabled` | `false` | Enable the OTLP exporter. | +| `enable_span_mapping` | `false` | If span mapping should be used. | +| `endpoint` | `http://localhost:8126/v0.4/traces` | The endpoint to send spans to. | +| `batch_processor` | | The batch processor settings. | +| `resource_mapping` | See [config](#resource_mapping) | A map of span names to attribute names. | +| `span_metrics` | See [config](#span_metrics) | A map of span names to boolean. |