Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,010 changes: 646 additions & 364 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion crates/apollo-mcp-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,20 @@ tracing.workspace = true
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tokio-util = "0.7.15"
tower-http = { version = "0.6.6", features = ["cors"] }
tower-http = { version = "0.6.6", features = ["cors", "trace"] }
url.workspace = true
opentelemetry = "0.30.0"
opentelemetry_sdk = "0.30.0"
opentelemetry-otlp = "0.30.0"
opentelemetry-resource-detectors = "0.9.0"
opentelemetry-appender-log = "0.30.0"
reqwest-tracing = "0.5.8"
reqwest-middleware = "0.4.2"
axum-tracing-opentelemetry = "0.29.0"
opentelemetry-semantic-conventions = "0.30.0"
tracing-core.workspace = true
tracing-opentelemetry = "0.31.0"
opentelemetry-stdout = "0.30.0"

[dev-dependencies]
chrono = { version = "0.4.41", default-features = false, features = ["now"] }
Expand Down
17 changes: 16 additions & 1 deletion crates/apollo-mcp-server/src/graphql.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! Execute GraphQL operations from an MCP tool

use crate::errors::McpError;
use opentelemetry::trace::FutureExt;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest_middleware::{ClientBuilder, Extension};
use reqwest_tracing::{OtelName, TracingMiddleware};
use rmcp::model::{CallToolResult, Content, ErrorCode};
use serde_json::{Map, Value};
use url::Url;

#[derive(Debug)]
pub struct Request<'a> {
pub input: Value,
pub endpoint: &'a Url,
Expand Down Expand Up @@ -33,6 +37,7 @@ pub trait Executable {
fn headers(&self, default_headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue>;

/// Execute as a GraphQL operation using the endpoint and headers
#[tracing::instrument(skip(self))]
async fn execute(&self, request: Request<'_>) -> Result<CallToolResult, McpError> {
let client_metadata = serde_json::json!({
"name": "mcp",
Expand Down Expand Up @@ -74,11 +79,21 @@ pub trait Executable {
}
}

reqwest::Client::new()
let client = ClientBuilder::new(reqwest::Client::new())
.with_init(Extension(OtelName("graphql-client".into())))
// Insert the tracing middleware
.with(TracingMiddleware::default())
.build();

client
.post(request.endpoint.as_str())
.with_extension(OtelName(
format!("POST {}", request.endpoint.as_str()).into(),
))
.headers(self.headers(&request.headers))
.body(Value::Object(request_body).to_string())
.send()
.with_current_context()
.await
.map_err(|reqwest_error| {
McpError::new(
Expand Down
5 changes: 1 addition & 4 deletions crates/apollo-mcp-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use clap::Parser;
use clap::builder::Styles;
use clap::builder::styling::{AnsiColor, Effects};
use runtime::IdOrDefault;
use runtime::logging::Logging;
use tracing::{info, warn};

mod runtime;
Expand Down Expand Up @@ -42,9 +41,7 @@ async fn main() -> anyhow::Result<()> {
None => runtime::read_config_from_env().unwrap_or_default(),
};

// WorkerGuard is not used but needed to be at least defined or else the guard
// is cleaned up too early and file appender logging does not work
let _guard = Logging::setup(&config)?;
let _guard = runtime::trace::init_tracing_subscriber(&config)?;

info!(
"Apollo MCP Server v{} // (c) Apollo Graph, Inc. // Licensed under MIT",
Expand Down
1 change: 1 addition & 0 deletions crates/apollo-mcp-server/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod operation_source;
mod overrides;
mod schema_source;
mod schemas;
pub mod trace;

use std::path::Path;

Expand Down
40 changes: 25 additions & 15 deletions crates/apollo-mcp-server/src/runtime/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ use schemars::JsonSchema;
use serde::Deserialize;
use std::path::PathBuf;
use tracing::Level;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::RollingFileAppender;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::fmt::writer::BoxMakeWriter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

use super::Config;

Expand Down Expand Up @@ -53,7 +51,7 @@ impl Default for Logging {
}

impl Logging {
pub fn setup(config: &Config) -> Result<Option<WorkerGuard>, anyhow::Error> {
pub fn env_filter(config: &Config) -> Result<EnvFilter, anyhow::Error> {
let mut env_filter =
EnvFilter::from_default_env().add_directive(config.logging.level.into());

Expand All @@ -62,7 +60,23 @@ impl Logging {
.add_directive("rmcp=warn".parse()?)
.add_directive("tantivy=warn".parse()?);
}
Ok(env_filter)
}

pub fn logging_layer(
config: &Config,
) -> Result<
(
Layer<
tracing_subscriber::Registry,
tracing_subscriber::fmt::format::DefaultFields,
tracing_subscriber::fmt::format::Format,
BoxMakeWriter,
>,
Option<tracing_appender::non_blocking::WorkerGuard>,
),
anyhow::Error,
> {
macro_rules! log_error {
() => {
|e| eprintln!("Failed to setup logging: {e:?}")
Expand Down Expand Up @@ -98,17 +112,13 @@ impl Logging {
None => (BoxMakeWriter::new(std::io::stdout), None, true),
};

tracing_subscriber::registry()
.with(env_filter)
.with(
tracing_subscriber::fmt::layer()
.with_writer(writer)
.with_ansi(with_ansi)
.with_target(false),
)
.init();

Ok(guard)
Ok((
tracing_subscriber::fmt::layer()
.with_writer(writer)
.with_ansi(with_ansi)
.with_target(false),
guard,
))
}
}

Expand Down
109 changes: 109 additions & 0 deletions crates/apollo-mcp-server/src/runtime/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
use opentelemetry_sdk::{
Resource,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, SdkTracerProvider},
};
use opentelemetry_semantic_conventions::{
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION},
};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

use crate::runtime::{Config, logging::Logging};

// Create a Resource that captures information about the entity for which telemetry is recorded.
fn resource() -> Resource {
Resource::builder()
.with_service_name(env!("CARGO_PKG_NAME"))
Copy link

Copilot AI Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using env!("CARGO_PKG_NAME") will expand to "apollo-mcp-server" at compile time. Consider using a more user-friendly service name like "apollo-mcp-server" or making this configurable through the Config struct.

Suggested change
.with_service_name(env!("CARGO_PKG_NAME"))
fn resource(service_name: &str) -> Resource {
Resource::builder()
.with_service_name(service_name)

Copilot uses AI. Check for mistakes.
.with_schema_url(
[
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(
DEPLOYMENT_ENVIRONMENT_NAME,
std::env::var("ENVIRONMENT").unwrap_or_else(|_| "develop".to_string()),
),
],
SCHEMA_URL,
)
.build()
}

// Construct MeterProvider for MetricsLayer
fn init_meter_provider() -> Result<SdkMeterProvider, anyhow::Error> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()?;

let reader = PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(30))
.build();

let meter_provider = MeterProviderBuilder::default()
.with_resource(resource())
.with_reader(reader)
.build();

global::set_meter_provider(meter_provider.clone());

Ok(meter_provider)
}

// Construct TracerProvider for OpenTelemetryLayer
fn init_tracer_provider() -> Result<SdkTracerProvider, anyhow::Error> {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.build()?;

let trace_provider = SdkTracerProvider::builder()
// If export trace to AWS X-Ray, you can use XrayIdGenerator
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource())
.with_batch_exporter(exporter)
.build();

Ok(trace_provider)
}

// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing
pub fn init_tracing_subscriber(config: &Config) -> Result<TelemetryGuard, anyhow::Error> {
let tracer_provider = init_tracer_provider()?;
let meter_provider = init_meter_provider()?;
let env_filter = Logging::env_filter(config)?;
let (logging_layer, logging_guard) = Logging::logging_layer(config)?;
Copy link

Copilot AI Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging setup logic is split between this module and the Logging module, creating tight coupling. Consider consolidating all tracing setup logic in one place or making the dependency more explicit through better separation of concerns.

Suggested change
let (logging_layer, logging_guard) = Logging::logging_layer(config)?;
let env_filter = build_env_filter(config)?;
let (logging_layer, logging_guard) = build_logging_layer(config)?;

Copilot uses AI. Check for mistakes.

let tracer = tracer_provider.tracer("tracing-otel-subscriber");

tracing_subscriber::registry()
.with(logging_layer)
.with(env_filter)
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.init();

Ok(TelemetryGuard {
tracer_provider,
meter_provider,
logging_guard,
})
}

pub struct TelemetryGuard {
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
logging_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}

impl Drop for TelemetryGuard {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("{err:?}");
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
self.logging_guard.take();
Copy link

Copilot AI Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking the logging_guard without dropping it explicitly may not properly clean up the logging worker. The guard should be allowed to drop naturally or explicitly dropped to ensure proper cleanup.

Suggested change
self.logging_guard.take();
drop(self.logging_guard.take());

Copilot uses AI. Check for mistakes.
}
}
5 changes: 5 additions & 0 deletions crates/apollo-mcp-server/src/server/states/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::Arc;

use apollo_compiler::{Schema, validation::Valid};
use headers::HeaderMapExt as _;
use opentelemetry::Context;
use opentelemetry::trace::FutureExt;
use reqwest::header::HeaderMap;
use rmcp::model::Implementation;
use rmcp::{
Expand Down Expand Up @@ -169,6 +171,7 @@ impl Running {
}

impl ServerHandler for Running {
#[tracing::instrument(skip(self))]
async fn initialize(
&self,
_request: InitializeRequestParam,
Expand All @@ -180,6 +183,7 @@ impl ServerHandler for Running {
Ok(self.get_info())
}

#[tracing::instrument(skip(self, context), fields(tool_name = request.name.as_ref(), request_id = %context.id.clone()))]
Copy link

Copilot AI Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context.id.clone() in the tracing fields creates an unnecessary clone on every function call. Consider using a reference or moving the field extraction into the span creation to avoid the clone overhead.

Suggested change
#[tracing::instrument(skip(self, context), fields(tool_name = request.name.as_ref(), request_id = %context.id.clone()))]
#[tracing::instrument(skip(self, context), fields(tool_name = request.name.as_ref(), request_id = %context.id))]

Copilot uses AI. Check for mistakes.
async fn call_tool(
&self,
request: CallToolRequestParam,
Expand Down Expand Up @@ -259,6 +263,7 @@ impl ServerHandler for Running {
.find(|op| op.as_ref().name == request.name)
.ok_or(tool_not_found(&request.name))?
.execute(graphql_request)
.with_context(Context::current())
.await
}
};
Expand Down
18 changes: 17 additions & 1 deletion crates/apollo-mcp-server/src/server/states/starting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{net::SocketAddr, sync::Arc};

use apollo_compiler::{Name, Schema, ast::OperationType, validation::Valid};
use axum::{Router, extract::Query, http::StatusCode, response::Json, routing::get};
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
use rmcp::transport::StreamableHttpService;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use rmcp::{
Expand All @@ -11,6 +12,7 @@ use rmcp::{
use serde_json::json;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;
use tracing::{Instrument as _, debug, error, info, trace};

use crate::{
Expand Down Expand Up @@ -177,7 +179,21 @@ impl Starting {
Default::default(),
);
let mut router =
with_auth!(axum::Router::new().nest_service("/mcp", service), auth);
with_auth!(axum::Router::new().nest_service("/mcp", service), auth)
// include trace context as header into the response
.layer(OtelInResponseLayer)
//start OpenTelemetry trace on incoming request
.layer(OtelAxumLayer::default())
// Add tower-http tracing layer for additional HTTP-level tracing
.layer(TraceLayer::new_for_http().make_span_with(
|request: &axum::http::Request<_>| {
tracing::info_span!(
"http_request",
method = %request.method(),
uri = %request.uri(),
)
},
));

// Add health check endpoint if configured
if let Some(health_check) = health_check.filter(|h| h.config().enabled) {
Expand Down
Loading