diff --git a/.changesets/config_telemetry_config.md b/.changesets/config_telemetry_config.md new file mode 100644 index 00000000..2f1132cc --- /dev/null +++ b/.changesets/config_telemetry_config.md @@ -0,0 +1,7 @@ +### Add basic config file options to otel telemetry - @swcollard PR #330 + +Adds new Configuration options for setting up configuration beyond the standard OTEL environment variables needed before. + +* Renames trace->telemetry +* Adds OTLP options for metrics and tracing to choose grpc or http upload protocols and setting the endpoints +* This configuration is all optional, so by default nothing will be logged \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index f9f0a395..08e362ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1570,6 +1570,19 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -2425,6 +2438,8 @@ dependencies = [ "prost", "reqwest", "thiserror 2.0.16", + "tokio", + "tonic", "tracing", ] @@ -3964,10 +3979,15 @@ dependencies = [ "http", "http-body", "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", "percent-encoding", "pin-project", "prost", + "tokio", "tokio-stream", + "tower", "tower-layer", "tower-service", "tracing", @@ -3981,9 +4001,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/crates/apollo-mcp-server/Cargo.toml b/crates/apollo-mcp-server/Cargo.toml index ea1f80b4..0aea5615 100644 --- a/crates/apollo-mcp-server/Cargo.toml +++ b/crates/apollo-mcp-server/Cargo.toml @@ -31,7 +31,13 @@ jwks = "0.4.0" lz-str = "0.2.1" opentelemetry = "0.30.0" opentelemetry-appender-log = "0.30.0" -opentelemetry-otlp = "0.30.0" +opentelemetry-otlp = { version = "0.30.0", features = [ + "grpc-tonic", + "tonic", + "http-proto", + "metrics", + "trace", +] } opentelemetry-resource-detectors = "0.9.0" opentelemetry-semantic-conventions = "0.30.0" opentelemetry-stdout = "0.30.0" diff --git a/crates/apollo-mcp-server/src/graphql.rs b/crates/apollo-mcp-server/src/graphql.rs index 38f2acff..aae5fd38 100644 --- a/crates/apollo-mcp-server/src/graphql.rs +++ b/crates/apollo-mcp-server/src/graphql.rs @@ -447,27 +447,27 @@ mod test { .find(|scope_metrics| scope_metrics.scope().name() == "apollo.mcp") { for metric in scope_metrics.metrics() { - if metric.name() == "apollo.mcp.operation.count" { - if let AggregatedMetrics::U64(MetricData::Sum(data)) = metric.data() { - for point in data.data_points() { - let attributes = point.attributes(); - let mut attr_map = std::collections::HashMap::new(); - for kv in attributes { - attr_map.insert(kv.key.as_str(), kv.value.as_str()); - } - assert_eq!( - attr_map.get("operation.id").map(|s| s.as_ref()), - Some("mock_operation") - ); - assert_eq!( - attr_map.get("operation.type").map(|s| s.as_ref()), - Some("persisted_query") - ); - assert_eq!( - attr_map.get("success"), - Some(&std::borrow::Cow::Borrowed("false")) - ); + if metric.name() == "apollo.mcp.operation.count" + && let AggregatedMetrics::U64(MetricData::Sum(data)) = metric.data() + { + for point in data.data_points() { + let attributes = point.attributes(); + let mut attr_map = std::collections::HashMap::new(); + for kv in attributes { + attr_map.insert(kv.key.as_str(), kv.value.as_str()); } + assert_eq!( + attr_map.get("operation.id").map(|s| s.as_ref()), + Some("mock_operation") + ); + assert_eq!( + attr_map.get("operation.type").map(|s| s.as_ref()), + Some("persisted_query") + ); + assert_eq!( + attr_map.get("success"), + Some(&std::borrow::Cow::Borrowed("false")) + ); } } } diff --git a/crates/apollo-mcp-server/src/main.rs b/crates/apollo-mcp-server/src/main.rs index 2ac652ad..11952f7d 100644 --- a/crates/apollo-mcp-server/src/main.rs +++ b/crates/apollo-mcp-server/src/main.rs @@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> { None => runtime::read_config_from_env().unwrap_or_default(), }; - let _guard = runtime::trace::init_tracing_subscriber(&config)?; + let _guard = runtime::telemetry::init_tracing_subscriber(&config)?; info!( "Apollo MCP Server v{} // (c) Apollo Graph, Inc. // Licensed under MIT", diff --git a/crates/apollo-mcp-server/src/runtime.rs b/crates/apollo-mcp-server/src/runtime.rs index 92afc236..e5cd668f 100644 --- a/crates/apollo-mcp-server/src/runtime.rs +++ b/crates/apollo-mcp-server/src/runtime.rs @@ -12,7 +12,7 @@ mod operation_source; mod overrides; mod schema_source; mod schemas; -pub mod trace; +pub mod telemetry; use std::path::Path; @@ -243,6 +243,11 @@ mod test { path: None, rotation: Hourly, }, + telemetry: Telemetry { + exporters: None, + service_name: None, + version: None, + }, operations: Infer, overrides: Overrides { disable_type_description: false, diff --git a/crates/apollo-mcp-server/src/runtime/config.rs b/crates/apollo-mcp-server/src/runtime/config.rs index 8f64f518..d5c044b0 100644 --- a/crates/apollo-mcp-server/src/runtime/config.rs +++ b/crates/apollo-mcp-server/src/runtime/config.rs @@ -8,7 +8,7 @@ use url::Url; use super::{ OperationSource, SchemaSource, endpoint::Endpoint, graphos::GraphOSConfig, - introspection::Introspection, logging::Logging, overrides::Overrides, + introspection::Introspection, logging::Logging, overrides::Overrides, telemetry::Telemetry, }; /// Configuration for the MCP server @@ -40,6 +40,9 @@ pub struct Config { /// Logging configuration pub logging: Logging, + /// Telemetry configuration + pub telemetry: Telemetry, + /// Operations pub operations: OperationSource, diff --git a/crates/apollo-mcp-server/src/runtime/telemetry.rs b/crates/apollo-mcp-server/src/runtime/telemetry.rs new file mode 100644 index 00000000..ce518883 --- /dev/null +++ b/crates/apollo-mcp-server/src/runtime/telemetry.rs @@ -0,0 +1,314 @@ +use opentelemetry::{KeyValue, global, trace::TracerProvider as _}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + Resource, + metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, + propagation::TraceContextPropagator, + trace::{RandomIdGenerator, SdkTracerProvider}, +}; + +use opentelemetry_semantic_conventions::{ + SCHEMA_URL, + attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION}, +}; +use schemars::JsonSchema; +use serde::Deserialize; +use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use crate::runtime::Config; +use crate::runtime::logging::Logging; + +/// Telemetry related options +#[derive(Debug, Deserialize, JsonSchema, Default)] +pub struct Telemetry { + exporters: Option, + service_name: Option, + version: Option, +} + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct Exporters { + metrics: Option, + tracing: Option, +} + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct MetricsExporters { + otlp: Option, +} + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct OTLPMetricExporter { + endpoint: String, + protocol: String, +} + +impl Default for OTLPMetricExporter { + fn default() -> Self { + Self { + endpoint: "http://localhost:4317".into(), + protocol: "grpc".into(), + } + } +} + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct TracingExporters { + otlp: Option, +} + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct OTLPTracingExporter { + endpoint: String, + protocol: String, +} + +impl Default for OTLPTracingExporter { + fn default() -> Self { + Self { + endpoint: "http://localhost:4317".into(), + protocol: "grpc".into(), + } + } +} + +fn resource(telemetry: &Telemetry) -> Resource { + let service_name = telemetry + .service_name + .clone() + .unwrap_or_else(|| env!("CARGO_PKG_NAME").to_string()); + + let service_version = telemetry + .version + .clone() + .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string()); + + let deployment_env = std::env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()); + + Resource::builder() + .with_service_name(service_name) + .with_schema_url( + [ + KeyValue::new(SERVICE_VERSION, service_version), + KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, deployment_env), + ], + SCHEMA_URL, + ) + .build() +} + +fn init_meter_provider(telemetry: &Telemetry) -> Result { + let otlp = telemetry + .exporters + .as_ref() + .and_then(|exporters| exporters.metrics.as_ref()) + .and_then(|metrics_exporters| metrics_exporters.otlp.as_ref()) + .ok_or_else(|| { + anyhow::anyhow!("No metrics exporters configured, at least one is required") + })?; + let exporter = match otlp.protocol.as_str() { + "grpc" => opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(otlp.endpoint.clone()) + .build()?, + "http/protobuf" => opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_endpoint(otlp.endpoint.clone()) + .build()?, + other => { + return Err(anyhow::anyhow!( + "Unsupported OTLP protocol: {other}. Supported protocols are: grpc, http/protobuf" + )); + } + }; + + let reader = PeriodicReader::builder(exporter) + .with_interval(std::time::Duration::from_secs(30)) + .build(); + + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource(telemetry)) + .with_reader(reader) + .build(); + + Ok(meter_provider) +} + +fn init_tracer_provider(telemetry: &Telemetry) -> Result { + let otlp = telemetry + .exporters + .as_ref() + .and_then(|exporters| exporters.tracing.as_ref()) + .and_then(|tracing_exporters| tracing_exporters.otlp.as_ref()) + .ok_or_else(|| { + anyhow::anyhow!("No tracing exporters configured, at least one is required") + })?; + let exporter = match otlp.protocol.as_str() { + "grpc" => opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(otlp.endpoint.clone()) + .build()?, + "http/protobuf" => opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_endpoint(otlp.endpoint.clone()) + .build()?, + other => { + return Err(anyhow::anyhow!( + "Unsupported OTLP protocol: {other}. Supported protocols are: grpc, http/protobuf" + )); + } + }; + + let tracer_provider = SdkTracerProvider::builder() + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource(telemetry)) + .with_batch_exporter(exporter) + .build(); + + Ok(tracer_provider) +} + +/// Initialize tracing-subscriber and return TelemetryGuard for logging and opentelemetry-related termination processing +pub fn init_tracing_subscriber(config: &Config) -> Result { + let tracer_provider = if let Some(exporters) = &config.telemetry.exporters { + if let Some(_tracing_exporters) = &exporters.tracing { + init_tracer_provider(&config.telemetry)? + } else { + SdkTracerProvider::builder().build() + } + } else { + SdkTracerProvider::builder().build() + }; + let meter_provider = if let Some(exporters) = &config.telemetry.exporters { + if let Some(_metrics_exporters) = &exporters.metrics { + init_meter_provider(&config.telemetry)? + } else { + SdkMeterProvider::builder().build() + } + } else { + SdkMeterProvider::builder().build() + }; + let env_filter = Logging::env_filter(&config.logging)?; + let (logging_layer, logging_guard) = Logging::logging_layer(&config.logging)?; + + let tracer = tracer_provider.tracer("apollo-mcp-trace"); + + global::set_meter_provider(meter_provider.clone()); + global::set_text_map_propagator(TraceContextPropagator::new()); + global::set_tracer_provider(tracer_provider.clone()); + + tracing_subscriber::registry() + .with(logging_layer) + .with(env_filter) + .with(MetricsLayer::new(meter_provider.clone())) + .with(OpenTelemetryLayer::new(tracer)) + .try_init()?; + + Ok(TelemetryGuard { + tracer_provider, + meter_provider, + logging_guard, + }) +} + +pub struct TelemetryGuard { + tracer_provider: SdkTracerProvider, + meter_provider: SdkMeterProvider, + logging_guard: Option, +} + +impl Drop for TelemetryGuard { + fn drop(&mut self) { + if let Err(err) = self.tracer_provider.shutdown() { + tracing::error!("{err:?}"); + } + if let Err(err) = self.meter_provider.shutdown() { + tracing::error!("{err:?}"); + } + drop(self.logging_guard.take()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config( + service_name: Option<&str>, + version: Option<&str>, + metrics: Option, + tracing: Option, + ) -> Config { + Config { + telemetry: Telemetry { + exporters: Some(Exporters { metrics, tracing }), + service_name: service_name.map(|s| s.to_string()), + version: version.map(|v| v.to_string()), + }, + ..Default::default() + } + } + + #[tokio::test] + async fn guard_is_provided_when_tracing_configued() { + let config = test_config( + Some("test-config"), + Some("1.0.0"), + Some(MetricsExporters { + otlp: Some(OTLPMetricExporter::default()), + }), + Some(TracingExporters { + otlp: Some(OTLPTracingExporter::default()), + }), + ); + // init_tracing_subscriber can only be called once in the test suite to avoid + // panic when calling global::set_tracer_provider multiple times + let guard = init_tracing_subscriber(&config); + assert!(guard.is_ok()); + } + + #[tokio::test] + async fn unknown_protocol_raises_meter_provider_error() { + let config = test_config( + None, + None, + Some(MetricsExporters { + otlp: Some(OTLPMetricExporter { + protocol: "bogus".to_string(), + endpoint: "http://localhost:4317".to_string(), + }), + }), + None, + ); + let result = init_meter_provider(&config.telemetry); + assert!( + result + .err() + .map(|e| e.to_string().contains("Unsupported OTLP protocol")) + .unwrap_or(false) + ); + } + + #[tokio::test] + async fn unknown_protocol_raises_tracer_provider_error() { + let config = test_config( + None, + None, + None, + Some(TracingExporters { + otlp: Some(OTLPTracingExporter { + protocol: "bogus".to_string(), + endpoint: "http://localhost:4317".to_string(), + }), + }), + ); + let result = init_tracer_provider(&config.telemetry); + assert!( + result + .err() + .map(|e| e.to_string().contains("Unsupported OTLP protocol")) + .unwrap_or(false) + ); + } +} diff --git a/crates/apollo-mcp-server/src/runtime/trace.rs b/crates/apollo-mcp-server/src/runtime/trace.rs deleted file mode 100644 index 0e50ab64..00000000 --- a/crates/apollo-mcp-server/src/runtime/trace.rs +++ /dev/null @@ -1,115 +0,0 @@ -use opentelemetry::{KeyValue, global, trace::TracerProvider as _}; -use opentelemetry_sdk::{ - Resource, - metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, - propagation::TraceContextPropagator, - trace::{RandomIdGenerator, SdkTracerProvider}, -}; - -use opentelemetry_semantic_conventions::{ - SCHEMA_URL, - attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION}, -}; -use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -use super::{Config, logging::Logging}; - -/// Create a Resource that captures information about the entity for which telemetry is recorded. -fn resource() -> Resource { - Resource::builder() - .with_service_name(env!("CARGO_PKG_NAME")) - .with_schema_url( - [ - KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), - KeyValue::new( - DEPLOYMENT_ENVIRONMENT_NAME, - std::env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()), - ), - ], - SCHEMA_URL, - ) - .build() -} - -/// Construct MeterProvider for MetricsLayer -fn init_meter_provider() -> Result { - let exporter = opentelemetry_otlp::MetricExporter::builder() - .with_http() - .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) - .build()?; - - let reader = PeriodicReader::builder(exporter) - .with_interval(std::time::Duration::from_secs(30)) - .build(); - - let meter_provider = MeterProviderBuilder::default() - .with_resource(resource()) - .with_reader(reader) - .build(); - - global::set_meter_provider(meter_provider.clone()); - - Ok(meter_provider) -} - -/// Construct TracerProvider for OpenTelemetryLayer -fn init_tracer_provider() -> Result { - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_http() - .build()?; - - let trace_provider = SdkTracerProvider::builder() - // TODO: Should this use session information to group spans to a request? - .with_id_generator(RandomIdGenerator::default()) - .with_resource(resource()) - .with_batch_exporter(exporter) - .build(); - - // Set the global propagator (usually early in main()) - global::set_text_map_propagator(TraceContextPropagator::new()); - global::set_tracer_provider(trace_provider.clone()); - - Ok(trace_provider) -} - -/// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing -pub fn init_tracing_subscriber(config: &Config) -> Result { - let tracer_provider = init_tracer_provider()?; - let meter_provider = init_meter_provider()?; - let env_filter = Logging::env_filter(&config.logging)?; - let (logging_layer, logging_guard) = Logging::logging_layer(&config.logging)?; - - let tracer = tracer_provider.tracer("tracing-otel-subscriber"); - - tracing_subscriber::registry() - .with(logging_layer) - .with(env_filter) - .with(MetricsLayer::new(meter_provider.clone())) - .with(OpenTelemetryLayer::new(tracer)) - .init(); - - Ok(TelemetryGuard { - tracer_provider, - meter_provider, - logging_guard, - }) -} - -pub struct TelemetryGuard { - tracer_provider: SdkTracerProvider, - meter_provider: SdkMeterProvider, - logging_guard: Option, -} - -impl Drop for TelemetryGuard { - fn drop(&mut self) { - if let Err(err) = self.tracer_provider.shutdown() { - tracing::error!("{err:?}"); - } - if let Err(err) = self.meter_provider.shutdown() { - tracing::error!("{err:?}"); - } - drop(self.logging_guard.take()); - } -}