Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changesets/feat_telemetry_operations_trace_metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Telemetry: Trace operations and auth - @swcollard PR #375

* Adds traces for the MCP server generating Tools from Operations and performing authorization
* Includes the HTTP status code to the top level HTTP trace
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl From<&OperationCollectionDefaultEntry> for OperationData {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum CollectionSource {
Id(String, PlatformApiConfig),
Default(String, PlatformApiConfig),
Expand Down
1 change: 1 addition & 0 deletions crates/apollo-mcp-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ mockito = "1.7.0"
opentelemetry_sdk = { version = "0.30.0", features = ["testing"] }
rstest.workspace = true
tokio.workspace = true
tower = "0.5.2"
tracing-test = "0.2.5"

[build-dependencies]
Expand Down
81 changes: 75 additions & 6 deletions crates/apollo-mcp-server/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Config {
}

/// Validate that requests made have a corresponding bearer JWT token
#[tracing::instrument(skip_all, fields(status_code, reason))]
async fn oauth_validate(
State(auth_config): State<Config>,
token: Option<TypedHeader<Authorization<Bearer>>>,
Expand All @@ -104,17 +105,85 @@ async fn oauth_validate(
};

let validator = NetworkedTokenValidator::new(&auth_config.audiences, &auth_config.servers);
let token = token.ok_or_else(unauthorized_error)?;

let valid_token = validator
.validate(token.0)
.await
.ok_or_else(unauthorized_error)?;
let token = token.ok_or_else(|| {
tracing::Span::current().record("reason", "missing_token");
tracing::Span::current().record("status_code", StatusCode::UNAUTHORIZED.as_u16());
unauthorized_error()
})?;

let valid_token = validator.validate(token.0).await.ok_or_else(|| {
tracing::Span::current().record("reason", "invalid_token");
tracing::Span::current().record("status_code", StatusCode::UNAUTHORIZED.as_u16());
unauthorized_error()
})?;

// Insert new context to ensure that handlers only use our enforced token verification
// for propagation
request.extensions_mut().insert(valid_token);

let response = next.run(request).await;
tracing::Span::current().record("status_code", response.status().as_u16());
Ok(response)
}

#[cfg(test)]
mod tests {
use super::*;
use axum::middleware::from_fn_with_state;
use axum::routing::get;
use axum::{
Router,
body::Body,
http::{Request, StatusCode},
};
use http::header::{AUTHORIZATION, WWW_AUTHENTICATE};
use tower::ServiceExt; // for .oneshot()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! 👍

use url::Url;

fn test_config() -> Config {
Config {
servers: vec![Url::parse("http://localhost:1234").unwrap()],
audiences: vec!["test-audience".to_string()],
resource: Url::parse("http://localhost:4000").unwrap(),
resource_documentation: None,
scopes: vec!["read".to_string()],
disable_auth_token_passthrough: false,
}
}

fn test_router(config: Config) -> Router {
Router::new()
.route("/test", get(|| async { "ok" }))
.layer(from_fn_with_state(config, oauth_validate))
}

#[tokio::test]
async fn missing_token_returns_unauthorized() {
let config = test_config();
let app = test_router(config.clone());
let req = Request::builder().uri("/test").body(Body::empty()).unwrap();
let res = app.oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
let headers = res.headers();
let www_auth = headers.get(WWW_AUTHENTICATE).unwrap().to_str().unwrap();
assert!(www_auth.contains("Bearer"));
assert!(www_auth.contains("resource_metadata"));
}

#[tokio::test]
async fn invalid_token_returns_unauthorized() {
let config = test_config();
let app = test_router(config.clone());
let req = Request::builder()
.uri("/test")
.header(AUTHORIZATION, "Bearer invalidtoken")
.body(Body::empty())
.unwrap();
let res = app.oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
let headers = res.headers();
let www_auth = headers.get(WWW_AUTHENTICATE).unwrap().to_str().unwrap();
assert!(www_auth.contains("Bearer"));
assert!(www_auth.contains("resource_metadata"));
}
}
6 changes: 6 additions & 0 deletions crates/apollo-mcp-server/src/operations/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl Operation {
self.inner
}

#[tracing::instrument(skip(graphql_schema, custom_scalar_map))]
pub fn from_document(
raw_operation: RawOperation,
graphql_schema: &GraphqlSchema,
Expand Down Expand Up @@ -138,6 +139,7 @@ impl Operation {
}

/// Generate a description for an operation based on documentation in the schema
#[tracing::instrument(skip(comments, tree_shaker, graphql_schema))]
fn tool_description(
comments: Option<String>,
tree_shaker: &mut SchemaTreeShaker,
Expand Down Expand Up @@ -335,6 +337,7 @@ impl graphql::Executable for Operation {
}

#[allow(clippy::type_complexity)]
#[tracing::instrument(skip_all)]
pub fn operation_defs(
source_text: &str,
allow_mutations: bool,
Expand Down Expand Up @@ -424,6 +427,7 @@ pub fn operation_name(
.to_string())
}

#[tracing::instrument(skip(source_text))]
pub fn variable_description_overrides(
source_text: &str,
operation_definition: &Node<OperationDefinition>,
Expand Down Expand Up @@ -455,6 +459,7 @@ pub fn variable_description_overrides(
argument_overrides_map
}

#[tracing::instrument(skip(source_text))]
pub fn find_opening_parens_offset(
source_text: &str,
operation_definition: &Node<OperationDefinition>,
Expand Down Expand Up @@ -512,6 +517,7 @@ fn tool_character_length(tool: &Tool) -> Result<usize, serde_json::Error> {
+ tool_schema_string.len())
}

#[tracing::instrument(skip_all)]
fn get_json_schema(
operation: &Node<OperationDefinition>,
schema_argument_descriptions: &HashMap<String, Vec<String>>,
Expand Down
4 changes: 3 additions & 1 deletion crates/apollo-mcp-server/src/operations/operation_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::RawOperation;
const OPERATION_DOCUMENT_EXTENSION: &str = "graphql";

/// The source of the operations exposed as MCP tools
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum OperationSource {
/// GraphQL document files
Files(Vec<PathBuf>),
Expand All @@ -38,6 +38,7 @@ pub enum OperationSource {
}

impl OperationSource {
#[tracing::instrument(skip_all, fields(operation_source = ?self))]
pub async fn into_stream(self) -> impl Stream<Item = Event> {
match self {
OperationSource::Files(paths) => Self::stream_file_changes(paths).boxed(),
Expand Down Expand Up @@ -73,6 +74,7 @@ impl OperationSource {
}
}

#[tracing::instrument]
fn stream_file_changes(paths: Vec<PathBuf>) -> impl Stream<Item = Event> {
let path_count = paths.len();
let state = Arc::new(Mutex::new(HashMap::<PathBuf, Vec<RawOperation>>::new()));
Expand Down
2 changes: 2 additions & 0 deletions crates/apollo-mcp-server/src/server/states/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Running {
Ok(self)
}

#[tracing::instrument(skip_all)]
pub(super) async fn update_operations(
self,
operations: Vec<RawOperation>,
Expand Down Expand Up @@ -146,6 +147,7 @@ impl Running {
}

/// Notify any peers that tools have changed. Drops unreachable peers from the list.
#[tracing::instrument(skip_all)]
async fn notify_tool_list_changed(peers: Arc<RwLock<Vec<Peer<RoleServer>>>>) {
let mut peers = peers.write().await;
if !peers.is_empty() {
Expand Down
76 changes: 67 additions & 9 deletions crates/apollo-mcp-server/src/server/states/starting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,27 @@ impl Starting {
//start OpenTelemetry trace on incoming request
.layer(OtelAxumLayer::default())
// Add tower-http tracing layer for additional HTTP-level tracing
.layer(TraceLayer::new_for_http().make_span_with(
|request: &axum::http::Request<_>| {
tracing::info_span!(
"mcp_server",
method = %request.method(),
uri = %request.uri(),
)
},
));
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &axum::http::Request<_>| {
tracing::info_span!(
"mcp_server",
method = %request.method(),
uri = %request.uri(),
status = tracing::field::Empty,
)
})
.on_response(
|response: &axum::http::Response<_>,
_latency: std::time::Duration,
span: &tracing::Span| {
span.record(
"status",
tracing::field::display(response.status()),
);
},
),
);

// Add health check endpoint if configured
if let Some(health_check) = health_check.filter(|h| h.config().enabled) {
Expand Down Expand Up @@ -297,3 +309,49 @@ async fn health_endpoint(

Ok((status_code, Json(json!(health))))
}

#[cfg(test)]
mod tests {
use http::HeaderMap;
use url::Url;

use crate::health::HealthCheckConfig;

use super::*;

#[tokio::test]
async fn start_basic_server() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let starting = Starting {
config: Config {
transport: Transport::StreamableHttp {
auth: None,
address: "127.0.0.1".parse().unwrap(),
port: 7799,
stateful_mode: false,
},
endpoint: Url::parse("http://localhost:4000").expect("valid url"),
mutation_mode: MutationMode::All,
execute_introspection: true,
headers: HeaderMap::new(),
validate_introspection: true,
introspect_introspection: true,
search_introspection: true,
introspect_minify: false,
search_minify: false,
explorer_graph_ref: None,
custom_scalar_map: None,
disable_type_description: false,
disable_schema_description: false,
disable_auth_token_passthrough: false,
search_leaf_depth: 5,
index_memory_bytes: 1024 * 1024 * 1024,
health_check: HealthCheckConfig::default(),
},
schema: Schema::parse_and_validate("type Query { hello: String }", "test.graphql")
.expect("Valid schema"),
operations: vec![],
};
let running = starting.start();
assert!(running.await.is_ok());
}
}
3 changes: 3 additions & 0 deletions crates/apollo-mcp-server/src/telemetry_attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ impl TelemetryAttribute {
TelemetryAttribute::RequestId => {
Key::from_static_str(TelemetryAttribute::RequestId.as_str())
}
TelemetryAttribute::RawOperation => {
Key::from_static_str(TelemetryAttribute::RawOperation.as_str())
}
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/apollo-mcp-server/telemetry.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ operation_id = "The operation id - either persisted query id, operation name, or
operation_source = "The operation source - either operation (local file/op collection), persisted query, or LLM generated"
request_id = "The request id"
success = "Sucess flag indicator"
raw_operation = "Graphql operation text and metadata used for Tool generation"

[metrics.apollo.mcp]
"initialize.count" = "Number of times initialize has been called"
Expand Down