diff --git a/.changesets/feat_allow_attribute_configuration_alocay.md b/.changesets/feat_allow_attribute_configuration_alocay.md new file mode 100644 index 00000000..29ddfdba --- /dev/null +++ b/.changesets/feat_allow_attribute_configuration_alocay.md @@ -0,0 +1,70 @@ +### feat: adding ability to omit attributes for traces and metrics - @alocay PR #358 + +Adding ability to configure which attributes are omitted from telemetry traces and metrics. + +1. Using a Rust build script (`build.rs`) to auto-generate telemetry attribute code based on the data found in `telemetry.toml`. +2. Utilizing an enum for attributes so typos in the config file raise an error. +3. Omitting trace attributes by filtering it out in a custom exporter. +4. Omitting metric attributes by indicating which attributes are allowed via a view. +5. Created `telemetry_attributes.rs` to map `TelemetryAttribute` enum to a OTEL `Key`. + +The `telemetry.toml` file includes attributes (both for metrics and traces) as well as list of metrics gathered. An example would look like the following: +``` +[attributes.apollo.mcp] +my_attribute = "Some attribute info" + +[metrics.apollo.mcp] +some.count = "Some metric count info" +``` +This would generate a file that looks like the following: +``` +/// All TelemetryAttribute values +pub const ALL_ATTRS: &[TelemetryAttribute; 1usize] = &[ + TelemetryAttribute::MyAttribute +]; +#[derive(Debug, ::serde::Deserialize, ::schemars::JsonSchema,, Clone, Eq, PartialEq, Hash, Copy)] +pub enum TelemetryAttribute { + ///Some attribute info + #[serde(alias = "my_attribute")] + MyAttribute, +} +impl TelemetryAttribute { + /// Supported telemetry attribute (tags) values + pub const fn as_str(&self) -> &'static str { + match self { + TelemetryAttribute::MyAttribute => "apollo.mcp.my_attribute", + } + } +} +#[derive(Debug, ::serde::Deserialize, ::schemars::JsonSchema,, Clone, Eq, PartialEq, Hash, Copy)] +pub enum TelemetryMetric { + ///Some metric count info + #[serde(alias = "some.count")] + SomeCount, +} +impl TelemetryMetric { + /// Converts TelemetryMetric to &str + pub const fn as_str(&self) -> &'static str { + match self { + TelemetryMetric::SomeCount => "apollo.mcp.some.count", + } + } +} +``` +An example configuration that omits `tool_name` attribute for metrics and `request_id` for tracing would look like the following: +``` +telemetry: + exporters: + metrics: + otlp: + endpoint: "http://localhost:4317" + protocol: "grpc" + omitted_attributes: + - tool_name + tracing: + otlp: + endpoint: "http://localhost:4317" + protocol: "grpc" + omitted_attributes: + - request_id +``` diff --git a/Cargo.lock b/Cargo.lock index 08e362ae..cb4df6d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,6 +211,7 @@ dependencies = [ "apollo-federation", "apollo-mcp-registry", "apollo-schema-index", + "async-trait", "axum", "axum-extra", "axum-otel-metrics", @@ -218,6 +219,7 @@ dependencies = [ "bon", "chrono", "clap", + "cruet", "figment", "futures", "headers", @@ -235,6 +237,8 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry-stdout", "opentelemetry_sdk", + "prettyplease", + "quote", "regex", "reqwest", "reqwest-middleware", @@ -244,9 +248,11 @@ dependencies = [ "schemars 1.0.4", "serde", "serde_json", + "syn 2.0.106", "thiserror 2.0.16", "tokio", "tokio-util", + "toml", "tower-http", "tracing", "tracing-appender", @@ -830,6 +836,16 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "cruet" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7a9ae414b9768aada1b316493261653e41af05c9d2ccc9c504a8fc051c6a790" +dependencies = [ + "once_cell", + "regex", +] + [[package]] name = "crunchy" version = "0.2.4" @@ -3390,6 +3406,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40734c41988f7306bb04f0ecf60ec0f3f1caa34290e4e8ea471dcd3346483b83" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3950,12 +3975,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75129e1dc5000bfbaa9fee9d1b21f974f9fbad9daec557a521ee6e080825f6e8" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime 0.7.0", + "toml_parser", + "toml_writer", + "winnow", +] + [[package]] name = "toml_datetime" version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +[[package]] +name = "toml_datetime" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bade1c3e902f58d73d3f294cd7f20391c1cb2fbcb643b73566bc773971df91e3" +dependencies = [ + "serde", +] + [[package]] name = "toml_edit" version = "0.22.27" @@ -3963,10 +4012,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ "indexmap", - "toml_datetime", + "toml_datetime 0.6.11", "winnow", ] +[[package]] +name = "toml_parser" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b551886f449aa90d4fe2bdaa9f4a2577ad2dde302c61ecf262d80b116db95c10" +dependencies = [ + "winnow", +] + +[[package]] +name = "toml_writer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" + [[package]] name = "tonic" version = "0.13.1" diff --git a/crates/apollo-mcp-server/Cargo.toml b/crates/apollo-mcp-server/Cargo.toml index 0aea5615..1f465dc4 100644 --- a/crates/apollo-mcp-server/Cargo.toml +++ b/crates/apollo-mcp-server/Cargo.toml @@ -6,6 +6,7 @@ license-file.workspace = true repository.workspace = true rust-version.workspace = true version.workspace = true +build = "build.rs" default-run = "apollo-mcp-server" @@ -41,7 +42,9 @@ opentelemetry-otlp = { version = "0.30.0", features = [ opentelemetry-resource-detectors = "0.9.0" opentelemetry-semantic-conventions = "0.30.0" opentelemetry-stdout = "0.30.0" -opentelemetry_sdk = "0.30.0" +opentelemetry_sdk = { version = "0.30.0", features = [ + "spec_unstable_metrics_views", +] } regex = "1.11.1" reqwest-middleware = "0.4.2" reqwest-tracing = { version = "0.5.8", features = ["opentelemetry_0_30"] } @@ -65,6 +68,7 @@ tracing-opentelemetry = "0.31.0" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } tracing.workspace = true url.workspace = true +async-trait = "0.1.89" [dev-dependencies] chrono = { version = "0.4.41", default-features = false, features = ["now"] } @@ -76,6 +80,14 @@ rstest.workspace = true tokio.workspace = true tracing-test = "0.2.5" +[build-dependencies] +cruet = "0.15.0" +prettyplease = "0.2.37" +quote = "1.0.40" +serde.workspace = true +syn = "2.0.106" +toml = "0.9.5" + [lints] workspace = true diff --git a/crates/apollo-mcp-server/build.rs b/crates/apollo-mcp-server/build.rs new file mode 100644 index 00000000..e8b5d44a --- /dev/null +++ b/crates/apollo-mcp-server/build.rs @@ -0,0 +1,173 @@ +#![allow(clippy::unwrap_used)] +#![allow(clippy::expect_used)] +#![allow(clippy::panic)] + +//! Build Script for the Apollo MCP Server +//! +//! This mostly compiles all the available telemetry attributes +use quote::__private::TokenStream; +use quote::quote; +use serde::Deserialize; +use std::io::Write; +use std::{collections::VecDeque, io::Read as _}; +use syn::{Ident, parse2}; + +#[derive(Deserialize)] +struct TelemetryTomlData { + attributes: toml::Table, + metrics: toml::Table, +} + +#[derive(Eq, PartialEq, Debug, Clone)] +struct TelemetryData { + name: String, + alias: String, + value: String, + description: String, +} + +fn flatten(table: toml::Table) -> Vec { + let mut to_visit = VecDeque::from_iter(table.into_iter().map(|(key, val)| (vec![key], val))); + let mut telemetry_data = Vec::new(); + + while let Some((key, value)) = to_visit.pop_front() { + match value { + toml::Value::String(val) => { + let last_key = key.last().unwrap().clone(); + telemetry_data.push(TelemetryData { + name: cruet::to_pascal_case(last_key.as_str()), + alias: last_key, + value: key.join("."), + description: val, + }); + } + toml::Value::Table(map) => to_visit.extend( + map.into_iter() + .map(|(nested_key, value)| ([key.clone(), vec![nested_key]].concat(), value)), + ), + + _ => panic!("telemetry values should be string descriptions"), + }; + } + + telemetry_data +} + +fn generate_enum(telemetry_data: &[TelemetryData]) -> Vec { + telemetry_data + .iter() + .map(|t| { + let enum_value_ident = quote::format_ident!("{}", &t.name); + let alias = &t.alias; + let doc_message = &t.description; + quote! { + #[doc = #doc_message] + #[serde(alias = #alias)] + #enum_value_ident + } + }) + .collect::>() +} + +fn generate_enum_as_str_matches( + telemetry_data: &[TelemetryData], + enum_ident: Ident, +) -> Vec { + telemetry_data + .iter() + .map(|t| { + let name_ident = quote::format_ident!("{}", &t.name); + let value = &t.value; + quote! { + #enum_ident::#name_ident => #value + } + }) + .collect::>() +} + +fn main() { + // Parse the telemetry file + let telemetry: TelemetryTomlData = { + let mut raw = String::new(); + std::fs::File::open("telemetry.toml") + .expect("could not open telemetry file") + .read_to_string(&mut raw) + .expect("could not read telemetry file"); + + toml::from_str(&raw).expect("could not parse telemetry file") + }; + + // Generate the keys + let telemetry_attribute_data = flatten(telemetry.attributes); + let telemetry_metrics_data = flatten(telemetry.metrics); + println!( + "a {:?} | m {:?}", + telemetry_attribute_data, telemetry_metrics_data + ); + + // Write out the generated keys + let out_dir = std::env::var_os("OUT_DIR").expect("could not retrieve output directory"); + let dest_path = std::path::Path::new(&out_dir).join("telemetry_attributes.rs"); + let mut generated_file = + std::fs::File::create(&dest_path).expect("could not create generated code file"); + + let attribute_keys_len = telemetry_attribute_data.len(); + let attribute_enum_keys = generate_enum(&telemetry_attribute_data); + let all_attribute_enum_values = &telemetry_attribute_data + .iter() + .map(|t| quote::format_ident!("{}", t.name)); + let all_attribute_enum_values = (*all_attribute_enum_values).clone(); + let attribute_enum_name = quote::format_ident!("{}", "TelemetryAttribute"); + let attribute_enum_as_str_matches = + generate_enum_as_str_matches(&telemetry_attribute_data, attribute_enum_name.clone()); + + let metric_enum_name = quote::format_ident!("{}", "TelemetryMetric"); + let metric_enum_keys = generate_enum(&telemetry_metrics_data); + let metric_enum_as_str_matches = + generate_enum_as_str_matches(&telemetry_metrics_data, metric_enum_name.clone()); + + let tokens = quote! { + /// All TelemetryAttribute values + pub const ALL_ATTRS: &[TelemetryAttribute; #attribute_keys_len] = &[#(TelemetryAttribute::#all_attribute_enum_values),*]; + + /// Supported telemetry attribute (tags) values + #[derive(Debug, ::serde::Deserialize, ::schemars::JsonSchema, Clone, Eq, PartialEq, Hash, Copy)] + pub enum #attribute_enum_name { + #(#attribute_enum_keys),* + } + + impl #attribute_enum_name { + /// Converts TelemetryAttribute to &str + pub const fn as_str(&self) -> &'static str { + match self { + #(#attribute_enum_as_str_matches),* + } + } + } + + /// Supported telemetry metrics + #[derive(Debug, ::serde::Deserialize, ::schemars::JsonSchema, Clone, Eq, PartialEq, Hash, Copy)] + pub enum #metric_enum_name { + #(#metric_enum_keys),* + } + + impl #metric_enum_name { + /// Converts TelemetryMetric to &str + pub const fn as_str(&self) -> &'static str { + match self { + #(#metric_enum_as_str_matches),* + } + } + } + }; + + let file = parse2(tokens).expect("Could not parse TokenStream"); + let code = prettyplease::unparse(&file); + + write!(generated_file, "{}", code).expect("Failed to write generated code"); + + // Inform cargo that we only want this to run when either this file or the telemetry + // one changes. + println!("cargo::rerun-if-changed=build.rs"); + println!("cargo::rerun-if-changed=telemetry.toml"); +} diff --git a/crates/apollo-mcp-server/src/graphql.rs b/crates/apollo-mcp-server/src/graphql.rs index aae5fd38..be97ac7f 100644 --- a/crates/apollo-mcp-server/src/graphql.rs +++ b/crates/apollo-mcp-server/src/graphql.rs @@ -1,5 +1,6 @@ //! Execute GraphQL operations from an MCP tool +use crate::generated::telemetry::{TelemetryAttribute, TelemetryMetric}; use crate::{errors::McpError, meter::get_meter}; use opentelemetry::KeyValue; use reqwest::header::{HeaderMap, HeaderValue}; @@ -37,7 +38,7 @@ pub trait Executable { fn headers(&self, default_headers: &HeaderMap) -> HeaderMap; /// Execute as a GraphQL operation using the endpoint and headers - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self, request))] async fn execute(&self, request: Request<'_>) -> Result { let meter = get_meter(); let start = std::time::Instant::now(); @@ -127,12 +128,15 @@ pub trait Executable { // Record response metrics let attributes = vec![ KeyValue::new( - "success", + TelemetryAttribute::Success.to_key(), result.as_ref().is_ok_and(|r| r.is_error != Some(true)), ), - KeyValue::new("operation.id", op_id.unwrap_or("unknown".to_string())), KeyValue::new( - "operation.type", + TelemetryAttribute::OperationId.to_key(), + op_id.unwrap_or("unknown".to_string()), + ), + KeyValue::new( + TelemetryAttribute::OperationSource.to_key(), if self.persisted_query_id().is_some() { "persisted_query" } else { @@ -141,11 +145,11 @@ pub trait Executable { ), ]; meter - .f64_histogram("apollo.mcp.operation.duration") + .f64_histogram(TelemetryMetric::OperationDuration.as_str()) .build() .record(start.elapsed().as_millis() as f64, &attributes); meter - .u64_counter("apollo.mcp.operation.count") + .u64_counter(TelemetryMetric::OperationCount.as_str()) .build() .add(1, &attributes); diff --git a/crates/apollo-mcp-server/src/lib.rs b/crates/apollo-mcp-server/src/lib.rs index d34520c4..bd5f195d 100644 --- a/crates/apollo-mcp-server/src/lib.rs +++ b/crates/apollo-mcp-server/src/lib.rs @@ -12,3 +12,10 @@ pub mod operations; pub mod sanitize; pub(crate) mod schema_tree_shake; pub mod server; +pub mod telemetry_attributes; + +pub mod generated { + pub mod telemetry { + include!(concat!(env!("OUT_DIR"), "/telemetry_attributes.rs")); + } +} diff --git a/crates/apollo-mcp-server/src/runtime.rs b/crates/apollo-mcp-server/src/runtime.rs index e5cd668f..45db2558 100644 --- a/crates/apollo-mcp-server/src/runtime.rs +++ b/crates/apollo-mcp-server/src/runtime.rs @@ -5,6 +5,7 @@ mod config; mod endpoint; +mod filtering_exporter; mod graphos; mod introspection; pub mod logging; @@ -131,8 +132,13 @@ mod test { let config = " endpoint: http://from_file:4000/ "; + let saved_path = std::env::var("PATH").unwrap_or_default(); + let workspace = env!("CARGO_MANIFEST_DIR"); figment::Jail::expect_with(move |jail| { + jail.clear_env(); + jail.set_env("PATH", &saved_path); + jail.set_env("INSTA_WORKSPACE_ROOT", workspace); let path = "config.yaml"; jail.create_file(path, config)?; diff --git a/crates/apollo-mcp-server/src/runtime/filtering_exporter.rs b/crates/apollo-mcp-server/src/runtime/filtering_exporter.rs new file mode 100644 index 00000000..d698617d --- /dev/null +++ b/crates/apollo-mcp-server/src/runtime/filtering_exporter.rs @@ -0,0 +1,233 @@ +use opentelemetry::{Key, KeyValue}; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::trace::{SpanData, SpanExporter}; +use std::collections::HashSet; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct FilteringExporter { + inner: E, + omitted: HashSet, +} + +impl FilteringExporter { + pub fn new(inner: E, omitted: impl IntoIterator) -> Self { + Self { + inner, + omitted: omitted.into_iter().collect(), + } + } +} + +impl SpanExporter for FilteringExporter +where + E: SpanExporter + Send + Sync, +{ + fn export(&self, mut batch: Vec) -> impl Future + Send { + for span in &mut batch { + span.attributes + .retain(|kv| filter_omitted_apollo_attributes(kv, &self.omitted)); + + // TODO: while not strictly necessary for dealing with high-cardinality, do we want to + // filter out from span.events.events as well? + // for ev in &mut span.events.events { + // ev.attributes.retain(|kv| filter_omitted_apollo_attributes(kv, &self.allow)); + // } + } + + self.inner.export(batch) + } + + fn shutdown(&mut self) -> OTelSdkResult { + self.inner.shutdown() + } + fn force_flush(&mut self) -> OTelSdkResult { + self.inner.force_flush() + } + fn set_resource(&mut self, r: &Resource) { + self.inner.set_resource(r) + } +} + +fn filter_omitted_apollo_attributes(kv: &KeyValue, omitted_attributes: &HashSet) -> bool { + !kv.key.as_str().starts_with("apollo.") || !omitted_attributes.contains(&kv.key) +} + +#[cfg(test)] +mod tests { + use crate::runtime::filtering_exporter::FilteringExporter; + use opentelemetry::trace::{SpanContext, SpanKind, Status, TraceState}; + use opentelemetry::{InstrumentationScope, Key, KeyValue, SpanId, TraceFlags, TraceId}; + use opentelemetry_sdk::Resource; + use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; + use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanExporter, SpanLinks}; + use std::collections::HashSet; + use std::fmt::Debug; + use std::future::ready; + use std::time::SystemTime; + + fn create_mock_span_data() -> SpanData { + let span_context: SpanContext = SpanContext::new( + TraceId::from_u128(1), + SpanId::from_u64(12345), + TraceFlags::default(), + true, // is_remote + TraceState::default(), + ); + + SpanData { + span_context, + parent_span_id: SpanId::from_u64(54321), + span_kind: SpanKind::Internal, + name: "test-span".into(), + start_time: SystemTime::UNIX_EPOCH, + end_time: SystemTime::UNIX_EPOCH, + attributes: vec![ + KeyValue::new("http.method", "GET"), + KeyValue::new("apollo.mock", "mock"), + ], + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Ok, + instrumentation_scope: InstrumentationScope::builder("test-service") + .with_version("1.0.0") + .build(), + } + } + + #[tokio::test] + async fn filtering_exporter_filters_omitted_apollo_attributes() { + #[derive(Debug)] + struct TestExporter {} + + impl SpanExporter for TestExporter { + fn export(&self, batch: Vec) -> impl Future + Send { + batch.into_iter().for_each(|span| { + if span + .attributes + .iter() + .any(|kv| kv.key.as_str().starts_with("apollo.")) + { + panic!("Omitted attributes were not filtered"); + } + }); + + ready(Ok(())) + } + + fn shutdown(&mut self) -> OTelSdkResult { + Ok(()) + } + + fn force_flush(&mut self) -> OTelSdkResult { + Ok(()) + } + + fn set_resource(&mut self, _resource: &Resource) {} + } + + let mut omitted = HashSet::new(); + omitted.insert(Key::from_static_str("apollo.mock")); + let mock_exporter = TestExporter {}; + let mock_span_data = create_mock_span_data(); + + let filtering_exporter = FilteringExporter::new(mock_exporter, omitted); + filtering_exporter + .export(vec![mock_span_data]) + .await + .expect("Export error"); + } + + #[tokio::test] + async fn filtering_exporter_calls_inner_exporter_on_shutdown() { + #[derive(Debug)] + struct TestExporter {} + + impl SpanExporter for TestExporter { + fn export(&self, _batch: Vec) -> impl Future + Send { + ready(Err(OTelSdkError::InternalFailure( + "unexpected call".to_string(), + ))) + } + + fn shutdown(&mut self) -> OTelSdkResult { + Ok(()) + } + + fn force_flush(&mut self) -> OTelSdkResult { + Err(OTelSdkError::InternalFailure("unexpected call".to_string())) + } + + fn set_resource(&mut self, _resource: &Resource) { + unreachable!("should not be called"); + } + } + + let mock_exporter = TestExporter {}; + + let mut filtering_exporter = FilteringExporter::new(mock_exporter, HashSet::new()); + assert!(filtering_exporter.shutdown().is_ok()); + } + + #[tokio::test] + async fn filtering_exporter_calls_inner_exporter_on_force_flush() { + #[derive(Debug)] + struct TestExporter {} + + impl SpanExporter for TestExporter { + fn export(&self, _batch: Vec) -> impl Future + Send { + ready(Err(OTelSdkError::InternalFailure( + "unexpected call".to_string(), + ))) + } + + fn shutdown(&mut self) -> OTelSdkResult { + Err(OTelSdkError::InternalFailure("unexpected call".to_string())) + } + + fn force_flush(&mut self) -> OTelSdkResult { + Ok(()) + } + + fn set_resource(&mut self, _resource: &Resource) { + unreachable!("should not be called"); + } + } + + let mock_exporter = TestExporter {}; + + let mut filtering_exporter = FilteringExporter::new(mock_exporter, HashSet::new()); + assert!(filtering_exporter.force_flush().is_ok()); + } + + #[tokio::test] + async fn filtering_exporter_calls_inner_exporter_on_set_resource() { + #[derive(Debug)] + struct TestExporter {} + + impl SpanExporter for TestExporter { + fn export(&self, _batch: Vec) -> impl Future + Send { + ready(Err(OTelSdkError::InternalFailure( + "unexpected call".to_string(), + ))) + } + + fn shutdown(&mut self) -> OTelSdkResult { + Err(OTelSdkError::InternalFailure("unexpected call".to_string())) + } + + fn force_flush(&mut self) -> OTelSdkResult { + Err(OTelSdkError::InternalFailure("unexpected call".to_string())) + } + + fn set_resource(&mut self, _resource: &Resource) {} + } + + let mock_exporter = TestExporter {}; + + let mut filtering_exporter = FilteringExporter::new(mock_exporter, HashSet::new()); + filtering_exporter.set_resource(&Resource::builder_empty().build()); + } +} diff --git a/crates/apollo-mcp-server/src/runtime/telemetry.rs b/crates/apollo-mcp-server/src/runtime/telemetry.rs index ce518883..11deee20 100644 --- a/crates/apollo-mcp-server/src/runtime/telemetry.rs +++ b/crates/apollo-mcp-server/src/runtime/telemetry.rs @@ -1,24 +1,26 @@ -use opentelemetry::{KeyValue, global, trace::TracerProvider as _}; +use crate::runtime::Config; +use crate::runtime::filtering_exporter::FilteringExporter; +use crate::runtime::logging::Logging; +use apollo_mcp_server::generated::telemetry::TelemetryAttribute; +use opentelemetry::{Key, KeyValue, global, trace::TracerProvider as _}; use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::metrics::{Instrument, Stream}; use opentelemetry_sdk::{ Resource, metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, propagation::TraceContextPropagator, trace::{RandomIdGenerator, SdkTracerProvider}, }; - use opentelemetry_semantic_conventions::{ SCHEMA_URL, attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION}, }; use schemars::JsonSchema; use serde::Deserialize; +use std::collections::HashSet; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use crate::runtime::Config; -use crate::runtime::logging::Logging; - /// Telemetry related options #[derive(Debug, Deserialize, JsonSchema, Default)] pub struct Telemetry { @@ -36,6 +38,7 @@ pub struct Exporters { #[derive(Debug, Deserialize, JsonSchema)] pub struct MetricsExporters { otlp: Option, + omitted_attributes: Option>, } #[derive(Debug, Deserialize, JsonSchema)] @@ -56,6 +59,7 @@ impl Default for OTLPMetricExporter { #[derive(Debug, Deserialize, JsonSchema)] pub struct TracingExporters { otlp: Option, + omitted_attributes: Option>, } #[derive(Debug, Deserialize, JsonSchema)] @@ -99,14 +103,17 @@ fn resource(telemetry: &Telemetry) -> Resource { } fn init_meter_provider(telemetry: &Telemetry) -> Result { - let otlp = telemetry + let metrics_exporters = telemetry .exporters .as_ref() - .and_then(|exporters| exporters.metrics.as_ref()) + .and_then(|exporters| exporters.metrics.as_ref()); + + let otlp = metrics_exporters .and_then(|metrics_exporters| metrics_exporters.otlp.as_ref()) .ok_or_else(|| { anyhow::anyhow!("No metrics exporters configured, at least one is required") })?; + let exporter = match otlp.protocol.as_str() { "grpc" => opentelemetry_otlp::MetricExporter::builder() .with_tonic() @@ -123,27 +130,50 @@ fn init_meter_provider(telemetry: &Telemetry) -> Result = metrics_exporters + .and_then(|exporters| exporters.omitted_attributes.clone()) + .unwrap_or_default(); + let included_attributes: Vec = TelemetryAttribute::included_attributes(omitted_attributes) + .iter() + .map(|a| a.to_key()) + .collect(); + let reader = PeriodicReader::builder(exporter) .with_interval(std::time::Duration::from_secs(30)) .build(); + let filtered_view = move |i: &Instrument| { + if i.name().starts_with("apollo.") { + Stream::builder() + .with_allowed_attribute_keys(included_attributes.clone()) // if available in your version + .build() + .ok() + } else { + None + } + }; + let meter_provider = MeterProviderBuilder::default() .with_resource(resource(telemetry)) .with_reader(reader) + .with_view(filtered_view) .build(); Ok(meter_provider) } fn init_tracer_provider(telemetry: &Telemetry) -> Result { - let otlp = telemetry + let tracer_exporters = telemetry .exporters .as_ref() - .and_then(|exporters| exporters.tracing.as_ref()) + .and_then(|exporters| exporters.tracing.as_ref()); + + let otlp = tracer_exporters .and_then(|tracing_exporters| tracing_exporters.otlp.as_ref()) .ok_or_else(|| { anyhow::anyhow!("No tracing exporters configured, at least one is required") })?; + let exporter = match otlp.protocol.as_str() { "grpc" => opentelemetry_otlp::SpanExporter::builder() .with_tonic() @@ -160,10 +190,17 @@ fn init_tracer_provider(telemetry: &Telemetry) -> Result = tracer_exporters + .and_then(|exporters| exporters.omitted_attributes.clone()) + .map(|set| set.iter().map(|a| a.to_key()).collect()) + .unwrap_or_default(); + + let filtering_exporter = FilteringExporter::new(exporter, omitted_attributes); + let tracer_provider = SdkTracerProvider::builder() .with_id_generator(RandomIdGenerator::default()) .with_resource(resource(telemetry)) - .with_batch_exporter(exporter) + .with_batch_exporter(filtering_exporter) .build(); Ok(tracer_provider) @@ -251,15 +288,20 @@ mod tests { } #[tokio::test] - async fn guard_is_provided_when_tracing_configued() { + async fn guard_is_provided_when_tracing_configured() { + let mut ommitted = HashSet::new(); + ommitted.insert(TelemetryAttribute::RequestId); + let config = test_config( Some("test-config"), Some("1.0.0"), Some(MetricsExporters { otlp: Some(OTLPMetricExporter::default()), + omitted_attributes: None, }), Some(TracingExporters { otlp: Some(OTLPTracingExporter::default()), + omitted_attributes: Some(ommitted), }), ); // init_tracing_subscriber can only be called once in the test suite to avoid @@ -278,6 +320,7 @@ mod tests { protocol: "bogus".to_string(), endpoint: "http://localhost:4317".to_string(), }), + omitted_attributes: None, }), None, ); @@ -301,6 +344,7 @@ mod tests { protocol: "bogus".to_string(), endpoint: "http://localhost:4317".to_string(), }), + omitted_attributes: None, }), ); let result = init_tracer_provider(&config.telemetry); diff --git a/crates/apollo-mcp-server/src/server/states/running.rs b/crates/apollo-mcp-server/src/server/states/running.rs index 362da813..d56bbfa2 100644 --- a/crates/apollo-mcp-server/src/server/states/running.rs +++ b/crates/apollo-mcp-server/src/server/states/running.rs @@ -21,6 +21,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error}; use url::Url; +use crate::generated::telemetry::{TelemetryAttribute, TelemetryMetric}; use crate::{ auth::ValidToken, custom_scalar_map::CustomScalarMap, @@ -172,7 +173,7 @@ impl Running { } impl ServerHandler for Running { - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self, _request))] async fn initialize( &self, _request: InitializeRequestParam, @@ -180,7 +181,7 @@ impl ServerHandler for Running { ) -> Result { let meter = get_meter(); meter - .u64_counter("apollo.mcp.initialize.count") + .u64_counter(TelemetryMetric::InitializeCount.as_str()) .build() .add(1, &[]); // TODO: how to remove these? @@ -189,7 +190,7 @@ impl ServerHandler for Running { Ok(self.get_info()) } - #[tracing::instrument(skip(self, context), fields(tool_name = request.name.as_ref(), request_id = %context.id.clone()))] + #[tracing::instrument(skip(self, context, request), fields(apollo.mcp.tool_name = request.name.as_ref(), apollo.mcp.request_id = %context.id.clone()))] async fn call_tool( &self, request: CallToolRequestParam, @@ -289,18 +290,18 @@ impl ServerHandler for Running { let attributes = vec![ KeyValue::new( - "success", + TelemetryAttribute::Success.to_key(), result.as_ref().is_ok_and(|r| r.is_error != Some(true)), ), - KeyValue::new("tool_name", tool_name), + KeyValue::new(TelemetryAttribute::ToolName.to_key(), tool_name), ]; // Record response time and status meter - .f64_histogram("apollo.mcp.tool.duration") + .f64_histogram(TelemetryMetric::ToolDuration.as_str()) .build() .record(start.elapsed().as_millis() as f64, &attributes); meter - .u64_counter("apollo.mcp.tool.count") + .u64_counter(TelemetryMetric::ToolCount.as_str()) .build() .add(1, &attributes); @@ -315,7 +316,7 @@ impl ServerHandler for Running { ) -> Result { let meter = get_meter(); meter - .u64_counter("apollo.mcp.list_tools.count") + .u64_counter(TelemetryMetric::ListToolsCount.as_str()) .build() .add(1, &[]); Ok(ListToolsResult { @@ -338,7 +339,7 @@ impl ServerHandler for Running { fn get_info(&self) -> ServerInfo { let meter = get_meter(); meter - .u64_counter("apollo.mcp.get_info.count") + .u64_counter(TelemetryMetric::GetInfoCount.as_str()) .build() .add(1, &[]); ServerInfo { diff --git a/crates/apollo-mcp-server/src/telemetry_attributes.rs b/crates/apollo-mcp-server/src/telemetry_attributes.rs new file mode 100644 index 00000000..a21d9171 --- /dev/null +++ b/crates/apollo-mcp-server/src/telemetry_attributes.rs @@ -0,0 +1,33 @@ +use crate::generated::telemetry::{ALL_ATTRS, TelemetryAttribute}; +use opentelemetry::Key; +use std::collections::HashSet; + +impl TelemetryAttribute { + pub const fn to_key(self) -> Key { + match self { + TelemetryAttribute::ToolName => { + Key::from_static_str(TelemetryAttribute::ToolName.as_str()) + } + TelemetryAttribute::OperationId => { + Key::from_static_str(TelemetryAttribute::OperationId.as_str()) + } + TelemetryAttribute::OperationSource => { + Key::from_static_str(TelemetryAttribute::OperationSource.as_str()) + } + TelemetryAttribute::Success => { + Key::from_static_str(TelemetryAttribute::Success.as_str()) + } + TelemetryAttribute::RequestId => { + Key::from_static_str(TelemetryAttribute::RequestId.as_str()) + } + } + } + + pub fn included_attributes(omitted: HashSet) -> Vec { + ALL_ATTRS + .iter() + .copied() + .filter(|a| !omitted.contains(a)) + .collect() + } +} diff --git a/crates/apollo-mcp-server/telemetry.toml b/crates/apollo-mcp-server/telemetry.toml new file mode 100644 index 00000000..368793fe --- /dev/null +++ b/crates/apollo-mcp-server/telemetry.toml @@ -0,0 +1,15 @@ +[attributes.apollo.mcp] +tool_name = "The tool name" +operation_id = "The operation id - either persisted query id, operation name, or unknown" +operation_source = "The operation source - either operation (local file/op collection), persisted query, or LLM generated" +request_id = "The request id" +success = "Sucess flag indicator" + +[metrics.apollo.mcp] +"initialize.count" = "Number of times initialize has been called" +"tool.count" = "Number of times call_tool has been called" +"tool.duration" = "Duration of call_tool" +"list_tools.count" = "Number of times list_tools has been called" +"get_info.count" = "Number of times get_info has been called" +"operation.duration" = "Duration of graphql execute" +"operation.count" = "Number of times graphql execute has been called"