From d5e3251953b15d822ad10acf0a200b824132a695 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 24 Aug 2022 11:09:54 +0100 Subject: [PATCH 1/7] refactor rhai code and add tracing spans for common operations The rhai code had got a bit out of shape recently. This cleans things up a bit. I've also added a couple of tracing spans for tracing request and response processing. --- apollo-router/src/plugins/rhai.rs | 559 ++++++++++++------------------ 1 file changed, 224 insertions(+), 335 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index aff4e19c16..e8c9ba3d5c 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -88,7 +88,7 @@ impl OptionDance for SharedMut { } } -mod router { +mod supergraph { pub(crate) use crate::services::supergraph::*; pub(crate) type Response = super::RhaiSupergraphResponse; pub(crate) type DeferredResponse = super::RhaiSupergraphDeferredResponse; @@ -182,15 +182,15 @@ mod router_plugin_mod { // End of SubgraphRequest specific section #[rhai_fn(get = "headers", pure, return_raw)] - pub(crate) fn get_originating_headers_router_response( - obj: &mut SharedMut, + pub(crate) fn get_originating_headers_supergraph_response( + obj: &mut SharedMut, ) -> Result> { Ok(obj.with_mut(|response| response.response.headers().clone())) } #[rhai_fn(get = "headers", pure, return_raw)] pub(crate) fn get_originating_headers_router_deferred_response( - _obj: &mut SharedMut, + _obj: &mut SharedMut, ) -> Result> { Err("cannot access headers on a deferred response".into()) } @@ -217,8 +217,8 @@ mod router_plugin_mod { } #[rhai_fn(get = "body", pure, return_raw)] - pub(crate) fn get_originating_body_router_response( - obj: &mut SharedMut, + pub(crate) fn get_originating_body_supergraph_response( + obj: &mut SharedMut, ) -> Result> { Ok(obj.with_mut(|response| response.response.body().clone())) } @@ -239,7 +239,7 @@ mod router_plugin_mod { #[rhai_fn(get = "body", pure, return_raw)] pub(crate) fn get_originating_body_router_deferred_response( - obj: &mut SharedMut, + obj: &mut SharedMut, ) -> Result> { Ok(obj.with_mut(|response| response.response.clone())) } @@ -252,8 +252,8 @@ mod router_plugin_mod { } #[rhai_fn(set = "headers", return_raw)] - pub(crate) fn set_originating_headers_router_response( - obj: &mut SharedMut, + pub(crate) fn set_originating_headers_supergraph_response( + obj: &mut SharedMut, headers: HeaderMap, ) -> Result<(), Box> { obj.with_mut(|response| *response.response.headers_mut() = headers); @@ -262,7 +262,7 @@ mod router_plugin_mod { #[rhai_fn(set = "headers", return_raw)] pub(crate) fn set_originating_headers_router_deferred_response( - _obj: &mut SharedMut, + _obj: &mut SharedMut, _headers: HeaderMap, ) -> Result<(), Box> { Err("cannot access headers on a deferred response".into()) @@ -295,8 +295,8 @@ mod router_plugin_mod { } #[rhai_fn(set = "body", return_raw)] - pub(crate) fn set_originating_body_router_response( - obj: &mut SharedMut, + pub(crate) fn set_originating_body_supergraph_response( + obj: &mut SharedMut, body: Response, ) -> Result<(), Box> { obj.with_mut(|response| *response.response.body_mut() = body); @@ -323,7 +323,7 @@ mod router_plugin_mod { #[rhai_fn(set = "body", return_raw)] pub(crate) fn set_originating_body_router_deferred_response( - obj: &mut SharedMut, + obj: &mut SharedMut, body: Response, ) -> Result<(), Box> { obj.with_mut(|response| response.response = body); @@ -388,7 +388,7 @@ impl Plugin for Rhai { Ok(Self { ast, engine }) } - fn supergraph_service(&self, service: router::BoxService) -> router::BoxService { + fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService { const FUNCTION_NAME_SERVICE: &str = "supergraph_service"; if !self.ast_has_function(FUNCTION_NAME_SERVICE) { return service; @@ -398,7 +398,7 @@ impl Plugin for Rhai { if let Err(error) = self.run_rhai_service( FUNCTION_NAME_SERVICE, None, - ServiceStep::Router(shared_service.clone()), + ServiceStep::Supergraph(shared_service.clone()), ) { tracing::error!("service callback failed: {error}"); } @@ -442,7 +442,7 @@ impl Plugin for Rhai { #[derive(Clone, Debug)] pub(crate) enum ServiceStep { - Router(SharedMut), + Supergraph(SharedMut), Execution(SharedMut), Subgraph(SharedMut), } @@ -453,6 +453,7 @@ macro_rules! gen_map_request { $borrow.replace(|service| { ServiceBuilder::new() .checkpoint(move |request: $base::Request| { + let _span = tracing::info_span!("rhai request"); // Let's define a local function to build an error response fn failure_message( context: Context, @@ -511,11 +512,57 @@ macro_rules! gen_map_request { }; } +// Actually use the checkpoint function so that we can shortcut requests which fail +macro_rules! gen_map_deferred_request { + ($request: ident, $response: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + $borrow.replace(|service| { + ServiceBuilder::new() + .checkpoint(move |request: $request| { + let _span = tracing::info_span!("rhai deferred request"); + // Let's define a local function to build an error response + fn failure_message( + context: Context, + msg: String, + status: StatusCode, + ) -> Result, BoxError> { + let res = $response::error_builder() + .errors(vec![Error { + message: msg, + ..Default::default() + }]) + .status_code(status) + .context(context) + .build()?; + Ok(ControlFlow::Break(res)) + } + let shared_request = Shared::new(Mutex::new(Some(request))); + let result = execute(&$rhai_service, &$callback, (shared_request.clone(),)); + + if let Err(error) = result { + tracing::error!("map_request callback failed: {error}"); + let mut guard = shared_request.lock().unwrap(); + let request_opt = guard.take(); + return failure_message( + request_opt.unwrap().context, + format!("rhai execution error: '{}'", error), + StatusCode::INTERNAL_SERVER_ERROR, + ); + } + let mut guard = shared_request.lock().unwrap(); + let request_opt = guard.take(); + Ok(ControlFlow::Continue(request_opt.unwrap())) + }) + .service(service) + .boxed() + }) + }; +} macro_rules! gen_map_response { ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { $borrow.replace(|service| { service .map_response(move |response: $base::Response| { + let _span = tracing::info_span!("rhai response"); // Let's define a local function to build an error response // XXX: This isn't ideal. We already have a response, so ideally we'd // like to append this error into the existing response. However, @@ -578,6 +625,124 @@ macro_rules! gen_map_response { }; } +macro_rules! gen_map_deferred_response { + ($response: ident, $rhai_response: ident, $rhai_deferred_response: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + $borrow.replace(|service| { + BoxService::new(service.and_then( + |mapped_response: $response| async move { + let _span = tracing::info_span!("rhai deferred response"); + // Let's define a local function to build an error response + // XXX: This isn't ideal. We already have a response, so ideally we'd + // like to append this error into the existing response. However, + // the significantly different treatment of errors in different + // response types makes this extremely painful. This needs to be + // re-visited at some point post GA. + fn failure_message( + context: Context, + msg: String, + status: StatusCode, + ) -> $response { + let res = $response::error_builder() + .errors(vec![Error { + message: msg, + ..Default::default() + }]) + .status_code(status) + .context(context) + .build() + .expect("can't fail to build our error message"); + res + } + + // we split the response stream into headers+first response, then a stream of deferred responses + // for which we will implement mapping later + let $response { response, context } = mapped_response; + let (parts, stream) = http::Response::from(response).into_parts(); + let (first, rest) = stream.into_future().await; + + if first.is_none() { + return Ok(failure_message( + context, + "rhai execution error: empty response".to_string(), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + + let response = $rhai_response { + context, + response: http::Response::from_parts( + parts, + first.expect("already checked"), + ) + .into(), + }; + let shared_response = Shared::new(Mutex::new(Some(response))); + + let result = + execute(&$rhai_service, &$callback, (shared_response.clone(),)); + if let Err(error) = result { + tracing::error!("map_response callback failed: {error}"); + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + return Ok(failure_message( + response_opt.unwrap().context, + format!("rhai execution error: '{}'", error), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + let $rhai_response { context, response } = + response_opt.unwrap(); + let (parts, body) = http::Response::from(response).into_parts(); + + let ctx = context.clone(); + + let mapped_stream = rest.filter_map(move |deferred_response| { + let rhai_service = $rhai_service.clone(); + let context = context.clone(); + let callback = $callback.clone(); + async move { + let response = $rhai_deferred_response { + context, + response: deferred_response, + }; + let shared_response = Shared::new(Mutex::new(Some(response))); + + let result = execute( + &rhai_service, + &callback, + (shared_response.clone(),), + ); + if let Err(error) = result { + tracing::error!("map_response callback failed: {error}"); + return None; + } + + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + let $rhai_deferred_response { response, .. } = + response_opt.unwrap(); + Some(response) + } + }); + + let response = http::Response::from_parts( + parts, + once(ready(body)).chain(mapped_stream).boxed(), + ) + .into(); + Ok($response { + context: ctx, + response, + }) + }, + )) + }) + }; +} + pub(crate) struct RhaiExecutionResponse { context: Context, response: http_ext::Response, @@ -712,93 +877,23 @@ macro_rules! register_rhai_interface { impl ServiceStep { fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) { match self { - ServiceStep::Router(service) => { - //gen_map_request!(router, service, rhai_service, callback); - service.replace(|service| { - ServiceBuilder::new() - .checkpoint(move |request: SupergraphRequest| { - // Let's define a local function to build an error response - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> Result, BoxError> - { - let res = SupergraphResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build()?; - Ok(ControlFlow::Break(res)) - } - let shared_request = Shared::new(Mutex::new(Some(request))); - let result = - execute(&rhai_service, &callback, (shared_request.clone(),)); - - if let Err(error) = result { - tracing::error!("map_request callback failed: {error}"); - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - return failure_message( - request_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - ); - } - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - Ok(ControlFlow::Continue(request_opt.unwrap())) - }) - .service(service) - .boxed() - }) + ServiceStep::Supergraph(service) => { + gen_map_deferred_request!( + SupergraphRequest, + SupergraphResponse, + service, + rhai_service, + callback + ); } ServiceStep::Execution(service) => { - //gen_map_request!(execution, service, rhai_service, callback); - service.replace(|service| { - ServiceBuilder::new() - .checkpoint(move |request: ExecutionRequest| { - // Let's define a local function to build an error response - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> Result, BoxError> - { - let res = ExecutionResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build()?; - Ok(ControlFlow::Break(res)) - } - let shared_request = Shared::new(Mutex::new(Some(request))); - let result = - execute(&rhai_service, &callback, (shared_request.clone(),)); - - if let Err(error) = result { - tracing::error!("map_request callback failed: {error}"); - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - return failure_message( - request_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - ); - } - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - Ok(ControlFlow::Continue(request_opt.unwrap())) - }) - .service(service) - .boxed() - }) + gen_map_deferred_request!( + ExecutionRequest, + ExecutionResponse, + service, + rhai_service, + callback + ); } ServiceStep::Subgraph(service) => { gen_map_request!(subgraph, service, rhai_service, callback); @@ -808,231 +903,25 @@ impl ServiceStep { fn map_response(&mut self, rhai_service: RhaiService, callback: FnPtr) { match self { - ServiceStep::Router(service) => { - service.replace(|service| { - BoxService::new(service.and_then( - |router_response: SupergraphResponse| async move { - // Let's define a local function to build an error response - // XXX: This isn't ideal. We already have a response, so ideally we'd - // like to append this error into the existing response. However, - // the significantly different treatment of errors in different - // response types makes this extremely painful. This needs to be - // re-visited at some point post GA. - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> SupergraphResponse { - let res = SupergraphResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build() - .expect("can't fail to build our error message"); - res - } - - // we split the response stream into headers+first response, then a stream of deferred responses - // for which we will implement mapping later - let SupergraphResponse { response, context } = router_response; - let (parts, stream) = http::Response::from(response).into_parts(); - let (first, rest) = stream.into_future().await; - - if first.is_none() { - return Ok(failure_message( - context, - "rhai execution error: empty response".to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let response = RhaiSupergraphResponse { - context, - response: http::Response::from_parts( - parts, - first.expect("already checked"), - ) - .into(), - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - - let result = - execute(&rhai_service, &callback, (shared_response.clone(),)); - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - return Ok(failure_message( - response_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiSupergraphResponse { context, response } = - response_opt.unwrap(); - let (parts, body) = http::Response::from(response).into_parts(); - - let ctx = context.clone(); - - let mapped_stream = rest.filter_map(move |deferred_response| { - let rhai_service = rhai_service.clone(); - let context = context.clone(); - let callback = callback.clone(); - async move { - let response = RhaiSupergraphDeferredResponse { - context, - response: deferred_response, - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - - let result = execute( - &rhai_service, - &callback, - (shared_response.clone(),), - ); - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - return None; - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiSupergraphDeferredResponse { response, .. } = - response_opt.unwrap(); - Some(response) - } - }); - - let response = http::Response::from_parts( - parts, - once(ready(body)).chain(mapped_stream).boxed(), - ) - .into(); - Ok(SupergraphResponse { - context: ctx, - response, - }) - }, - )) - }) + ServiceStep::Supergraph(service) => { + gen_map_deferred_response!( + SupergraphResponse, + RhaiSupergraphResponse, + RhaiSupergraphDeferredResponse, + service, + rhai_service, + callback + ); } ServiceStep::Execution(service) => { - service.replace(|service| { - service - .and_then(|execution_response: ExecutionResponse| async move { - // Let's define a local function to build an error response - // XXX: This isn't ideal. We already have a response, so ideally we'd - // like to append this error into the existing response. However, - // the significantly different treatment of errors in different - // response types makes this extremely painful. This needs to be - // re-visited at some point post GA. - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> ExecutionResponse { - ExecutionResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build() - .expect("can't fail to build our error message") - } - - // we split the response stream into headers+first response, then a stream of deferred responses - // for which we will implement mapping later - let ExecutionResponse { response, context } = execution_response; - let (parts, stream) = http::Response::from(response).into_parts(); - let (first, rest) = stream.into_future().await; - - if first.is_none() { - return Ok(failure_message( - context, - "rhai execution error: empty response".to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let response = RhaiExecutionResponse { - context, - response: http::Response::from_parts( - parts, - first.expect("already checked"), - ) - .into(), - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - let result = - execute(&rhai_service, &callback, (shared_response.clone(),)); - - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - return Ok(failure_message( - response_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiExecutionResponse { context, response } = response_opt.unwrap(); - let (parts, body) = http::Response::from(response).into_parts(); - - let ctx = context.clone(); - - let mapped_stream = rest.filter_map(move |deferred_response| { - let rhai_service = rhai_service.clone(); - let context = context.clone(); - let callback = callback.clone(); - async move { - let response = RhaiExecutionDeferredResponse { - context, - response: deferred_response, - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - let result = execute( - &rhai_service, - &callback, - (shared_response.clone(),), - ); - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - return None; - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiExecutionDeferredResponse { response, .. } = - response_opt.unwrap(); - - Some(response) - } - }); - - let response = http::Response::from_parts( - parts, - once(ready(body)).chain(mapped_stream).boxed(), - ) - .into(); - Ok(ExecutionResponse { - context: ctx, - response, - }) - }) - .boxed() - }) + gen_map_deferred_response!( + ExecutionResponse, + RhaiExecutionResponse, + RhaiExecutionDeferredResponse, + service, + rhai_service, + callback + ); } ServiceStep::Subgraph(service) => { gen_map_response!(subgraph, service, rhai_service, callback); @@ -1147,11 +1036,11 @@ impl Rhai { }) .register_fn( "headers_are_available", - |_: &mut SharedMut| -> bool { true }, + |_: &mut SharedMut| -> bool { true }, ) .register_fn( "headers_are_available", - |_: &mut SharedMut| -> bool { false }, + |_: &mut SharedMut| -> bool { false }, ) .register_fn( "headers_are_available", @@ -1404,18 +1293,18 @@ impl Rhai { }) .register_fn("to_string", |x: &mut Uri| -> String { format!("{:?}", x) }); - register_rhai_interface!(engine, router, execution, subgraph); + register_rhai_interface!(engine, supergraph, execution, subgraph); engine .register_get_result( "context", - |obj: &mut SharedMut| { + |obj: &mut SharedMut| { Ok(obj.with_mut(|response| response.context.clone())) }, ) .register_set_result( "context", - |obj: &mut SharedMut, context: Context| { + |obj: &mut SharedMut, context: Context| { obj.with_mut(|response| response.context = context); Ok(()) }, @@ -1490,14 +1379,14 @@ mod tests { let mut router_service = dyn_plugin.supergraph_service(BoxService::new(mock_service)); let context = Context::new(); context.insert("test", 5i64).unwrap(); - let router_req = SupergraphRequest::fake_builder().context(context).build()?; + let supergraph_req = SupergraphRequest::fake_builder().context(context).build()?; - let mut router_resp = router_service.ready().await?.call(router_req).await?; - assert_eq!(router_resp.response.status(), 200); - let headers = router_resp.response.headers().clone(); - let context = router_resp.context.clone(); + let mut supergraph_resp = router_service.ready().await?.call(supergraph_req).await?; + assert_eq!(supergraph_resp.response.status(), 200); + let headers = supergraph_resp.response.headers().clone(); + let context = supergraph_resp.context.clone(); // Check if it fails - let resp = router_resp.next_response().await.unwrap(); + let resp = supergraph_resp.next_response().await.unwrap(); if !resp.errors.is_empty() { panic!( "Contains errors : {}", From 7398cc1cacccc7c7f389b7521dd24889c3747e3e Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 24 Aug 2022 11:29:02 +0100 Subject: [PATCH 2/7] improve messaging to make it clearer what the span measures. --- apollo-router/src/plugins/rhai.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index e8c9ba3d5c..fc193cafbf 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -453,7 +453,7 @@ macro_rules! gen_map_request { $borrow.replace(|service| { ServiceBuilder::new() .checkpoint(move |request: $base::Request| { - let _span = tracing::info_span!("rhai request"); + let _span = tracing::info_span!("rhai request processing"); // Let's define a local function to build an error response fn failure_message( context: Context, @@ -518,7 +518,7 @@ macro_rules! gen_map_deferred_request { $borrow.replace(|service| { ServiceBuilder::new() .checkpoint(move |request: $request| { - let _span = tracing::info_span!("rhai deferred request"); + let _span = tracing::info_span!("rhai request processing"); // Let's define a local function to build an error response fn failure_message( context: Context, @@ -562,7 +562,7 @@ macro_rules! gen_map_response { $borrow.replace(|service| { service .map_response(move |response: $base::Response| { - let _span = tracing::info_span!("rhai response"); + let _span = tracing::info_span!("rhai response processing"); // Let's define a local function to build an error response // XXX: This isn't ideal. We already have a response, so ideally we'd // like to append this error into the existing response. However, @@ -630,7 +630,7 @@ macro_rules! gen_map_deferred_response { $borrow.replace(|service| { BoxService::new(service.and_then( |mapped_response: $response| async move { - let _span = tracing::info_span!("rhai deferred response"); + let _span = tracing::info_span!("rhai response processing"); // Let's define a local function to build an error response // XXX: This isn't ideal. We already have a response, so ideally we'd // like to append this error into the existing response. However, From 42e5ce073ffbe6e39fc6697a31d2d52735b73159 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 24 Aug 2022 11:48:37 +0100 Subject: [PATCH 3/7] experiment with instrumenting the request to see what that looks like in jaeger. --- apollo-router/src/plugins/rhai.rs | 77 +++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index fc193cafbf..dc16394660 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -18,6 +18,7 @@ use http::uri::PathAndQuery; use http::HeaderMap; use http::StatusCode; use http::Uri; +use opentelemetry::trace::SpanKind; use rhai::module_resolvers::FileModuleResolver; use rhai::plugin::*; use rhai::serde::from_dynamic; @@ -451,9 +452,43 @@ pub(crate) enum ServiceStep { macro_rules! gen_map_request { ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { $borrow.replace(|service| { + fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone { + // let client_name_header = config.client_name_header; + // let client_version_header = config.client_version_header; + + move |request: &$base::Request| { + let http_request = &request.originating_request; + // let headers = http_request.headers(); + let query = http_request.body().query.clone().unwrap_or_default(); + let operation_name = http_request + .body() + .operation_name + .clone() + .unwrap_or_default(); + // let client_name = headers + // .get(&client_name_header) + // .cloned() + // .unwrap_or_else(|| HeaderValue::from_static("")); + // let client_version = headers + // .get(&client_version_header) + // .cloned() + // .unwrap_or_else(|| HeaderValue::from_static("")); + let span = tracing::info_span!( + "rhai span", + graphql.document = query.as_str(), + // TODO add graphql.operation.type + graphql.operation.name = operation_name.as_str(), + // client_name = client_name.to_str().unwrap_or_default(), + // client_version = client_version.to_str().unwrap_or_default(), + "otel.kind" = %SpanKind::Internal + ); + span + } + } ServiceBuilder::new() + .instrument(rhai_service_span()) .checkpoint(move |request: $base::Request| { - let _span = tracing::info_span!("rhai request processing"); + // let _span = tracing::info_span!("rhai request processing"); // Let's define a local function to build an error response fn failure_message( context: Context, @@ -516,9 +551,43 @@ macro_rules! gen_map_request { macro_rules! gen_map_deferred_request { ($request: ident, $response: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { $borrow.replace(|service| { + fn rhai_service_span() -> impl Fn(&$request) -> tracing::Span + Clone { + // let client_name_header = config.client_name_header; + // let client_version_header = config.client_version_header; + + move |request: &$request| { + let http_request = &request.originating_request; + // let headers = http_request.headers(); + let query = http_request.body().query.clone().unwrap_or_default(); + let operation_name = http_request + .body() + .operation_name + .clone() + .unwrap_or_default(); + // let client_name = headers + // .get(&client_name_header) + // .cloned() + // .unwrap_or_else(|| HeaderValue::from_static("")); + // let client_version = headers + // .get(&client_version_header) + // .cloned() + // .unwrap_or_else(|| HeaderValue::from_static("")); + let span = tracing::info_span!( + "rhai span", + graphql.document = query.as_str(), + // TODO add graphql.operation.type + graphql.operation.name = operation_name.as_str(), + // client_name = client_name.to_str().unwrap_or_default(), + // client_version = client_version.to_str().unwrap_or_default(), + "otel.kind" = %SpanKind::Internal + ); + span + } + } ServiceBuilder::new() + .instrument(rhai_service_span()) .checkpoint(move |request: $request| { - let _span = tracing::info_span!("rhai request processing"); + // let _span = tracing::info_span!("rhai request processing"); // Let's define a local function to build an error response fn failure_message( context: Context, @@ -562,7 +631,7 @@ macro_rules! gen_map_response { $borrow.replace(|service| { service .map_response(move |response: $base::Response| { - let _span = tracing::info_span!("rhai response processing"); + // let _span = tracing::info_span!("rhai response processing"); // Let's define a local function to build an error response // XXX: This isn't ideal. We already have a response, so ideally we'd // like to append this error into the existing response. However, @@ -630,7 +699,7 @@ macro_rules! gen_map_deferred_response { $borrow.replace(|service| { BoxService::new(service.and_then( |mapped_response: $response| async move { - let _span = tracing::info_span!("rhai response processing"); + // let _span = tracing::info_span!("rhai response processing"); // Let's define a local function to build an error response // XXX: This isn't ideal. We already have a response, so ideally we'd // like to append this error into the existing response. However, From d9bcf50d3df9c54ea4eb1167f7466663a2a6cc67 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 24 Aug 2022 12:06:44 +0100 Subject: [PATCH 4/7] remove the duplicated data from the rhai span The rhai span doesn't need to duplicate the data from the supergraph span. That means, it doesn't really have any useful information, but that's what it is for now. --- apollo-router/src/plugins/rhai.rs | 66 ++++--------------------------- 1 file changed, 8 insertions(+), 58 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index dc16394660..58a7a9ab47 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -453,36 +453,11 @@ macro_rules! gen_map_request { ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { $borrow.replace(|service| { fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone { - // let client_name_header = config.client_name_header; - // let client_version_header = config.client_version_header; - - move |request: &$base::Request| { - let http_request = &request.originating_request; - // let headers = http_request.headers(); - let query = http_request.body().query.clone().unwrap_or_default(); - let operation_name = http_request - .body() - .operation_name - .clone() - .unwrap_or_default(); - // let client_name = headers - // .get(&client_name_header) - // .cloned() - // .unwrap_or_else(|| HeaderValue::from_static("")); - // let client_version = headers - // .get(&client_version_header) - // .cloned() - // .unwrap_or_else(|| HeaderValue::from_static("")); - let span = tracing::info_span!( - "rhai span", - graphql.document = query.as_str(), - // TODO add graphql.operation.type - graphql.operation.name = operation_name.as_str(), - // client_name = client_name.to_str().unwrap_or_default(), - // client_version = client_version.to_str().unwrap_or_default(), + move |_request: &$base::Request| { + tracing::info_span!( + "rhai plugin", "otel.kind" = %SpanKind::Internal - ); - span + ) } } ServiceBuilder::new() @@ -552,36 +527,11 @@ macro_rules! gen_map_deferred_request { ($request: ident, $response: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { $borrow.replace(|service| { fn rhai_service_span() -> impl Fn(&$request) -> tracing::Span + Clone { - // let client_name_header = config.client_name_header; - // let client_version_header = config.client_version_header; - - move |request: &$request| { - let http_request = &request.originating_request; - // let headers = http_request.headers(); - let query = http_request.body().query.clone().unwrap_or_default(); - let operation_name = http_request - .body() - .operation_name - .clone() - .unwrap_or_default(); - // let client_name = headers - // .get(&client_name_header) - // .cloned() - // .unwrap_or_else(|| HeaderValue::from_static("")); - // let client_version = headers - // .get(&client_version_header) - // .cloned() - // .unwrap_or_else(|| HeaderValue::from_static("")); - let span = tracing::info_span!( - "rhai span", - graphql.document = query.as_str(), - // TODO add graphql.operation.type - graphql.operation.name = operation_name.as_str(), - // client_name = client_name.to_str().unwrap_or_default(), - // client_version = client_version.to_str().unwrap_or_default(), + move |_request: &$request| { + tracing::info_span!( + "rhai plugin", "otel.kind" = %SpanKind::Internal - ); - span + ) } } ServiceBuilder::new() From b5e5c6e8de820dbc248c63f9986fc82a3ed3e4e9 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 24 Aug 2022 12:22:54 +0100 Subject: [PATCH 5/7] remove commented out _span instrument looks like the right way to go, so get rid of the commented out _span commands. --- apollo-router/src/plugins/rhai.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index 58a7a9ab47..6cd28225bb 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -463,7 +463,6 @@ macro_rules! gen_map_request { ServiceBuilder::new() .instrument(rhai_service_span()) .checkpoint(move |request: $base::Request| { - // let _span = tracing::info_span!("rhai request processing"); // Let's define a local function to build an error response fn failure_message( context: Context, @@ -537,7 +536,6 @@ macro_rules! gen_map_deferred_request { ServiceBuilder::new() .instrument(rhai_service_span()) .checkpoint(move |request: $request| { - // let _span = tracing::info_span!("rhai request processing"); // Let's define a local function to build an error response fn failure_message( context: Context, @@ -581,7 +579,6 @@ macro_rules! gen_map_response { $borrow.replace(|service| { service .map_response(move |response: $base::Response| { - // let _span = tracing::info_span!("rhai response processing"); // Let's define a local function to build an error response // XXX: This isn't ideal. We already have a response, so ideally we'd // like to append this error into the existing response. However, @@ -649,7 +646,6 @@ macro_rules! gen_map_deferred_response { $borrow.replace(|service| { BoxService::new(service.and_then( |mapped_response: $response| async move { - // let _span = tracing::info_span!("rhai response processing"); // Let's define a local function to build an error response // XXX: This isn't ideal. We already have a response, so ideally we'd // like to append this error into the existing response. However, From de4405d44b6e30e2916116d9951ff102886fcbf3 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 24 Aug 2022 12:33:14 +0100 Subject: [PATCH 6/7] add a changelog entry --- NEXT_CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index ccf5c94d48..390d367f34 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -42,6 +42,13 @@ These items have been removed from the public API of `apollo_router::services::e By [@SimonSapin](https://github.com/SimonSapin) in https://github.com/apollographql/router/pull/1568 ## 🚀 Features + +### instrument the rhai plugin with a tracing span ([PR #1598](https://github.com/apollographql/router/pull/1598)) + +If you have an active rhai script in your router, you will now see a "rhai plugin" tracing span. + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/1598 + ## 🐛 Fixes ### Only send one report for a response with deferred responses ([PR #1576](https://github.com/apollographql/router/issues/1576)) From aa6494d97feffdb672b5d5570ee691cbe9a6b7c4 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 24 Aug 2022 12:48:42 +0100 Subject: [PATCH 7/7] improve span by adding which rhai service is being processed If you are processing multiple services in your rhai script, it will be helpful to have an attribute which identifies which service the span relates to. It's already fairly clear from the position in the processing pipeline, but a bit of extra information won't hurt. --- apollo-router/src/plugins/rhai.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index 6cd28225bb..4a07a6b05e 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -456,6 +456,7 @@ macro_rules! gen_map_request { move |_request: &$base::Request| { tracing::info_span!( "rhai plugin", + "rhai service" = stringify!($base::Request), "otel.kind" = %SpanKind::Internal ) } @@ -529,6 +530,7 @@ macro_rules! gen_map_deferred_request { move |_request: &$request| { tracing::info_span!( "rhai plugin", + "rhai service" = stringify!($request), "otel.kind" = %SpanKind::Internal ) }