diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 5fd2d550f6..43f72d59fe 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -57,6 +57,13 @@ These items have been removed from the public API of `apollo_router::services::e By [@SimonSapin](https://github.com/SimonSapin) in https://github.com/apollographql/router/pull/1568 ## 🚀 Features + +### instrument the rhai plugin with a tracing span ([PR #1598](https://github.com/apollographql/router/pull/1598)) + +If you have an active rhai script in your router, you will now see a "rhai plugin" tracing span. + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/1598 + ## 🐛 Fixes ### Only send one report for a response with deferred responses ([PR #1576](https://github.com/apollographql/router/issues/1576)) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index b503a293a8..65a6e92163 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -18,6 +18,7 @@ use http::uri::PathAndQuery; use http::HeaderMap; use http::StatusCode; use http::Uri; +use opentelemetry::trace::SpanKind; use rhai::module_resolvers::FileModuleResolver; use rhai::plugin::*; use rhai::serde::from_dynamic; @@ -88,7 +89,7 @@ impl OptionDance for SharedMut { } } -mod router { +mod supergraph { pub(crate) use crate::services::supergraph::*; pub(crate) type Response = super::RhaiSupergraphResponse; pub(crate) type DeferredResponse = super::RhaiSupergraphDeferredResponse; @@ -182,15 +183,15 @@ mod router_plugin_mod { // End of SubgraphRequest specific section #[rhai_fn(get = "headers", pure, return_raw)] - pub(crate) fn get_originating_headers_router_response( - obj: &mut SharedMut, + pub(crate) fn get_originating_headers_supergraph_response( + obj: &mut SharedMut, ) -> Result> { Ok(obj.with_mut(|response| response.response.headers().clone())) } #[rhai_fn(get = "headers", pure, return_raw)] pub(crate) fn get_originating_headers_router_deferred_response( - _obj: &mut SharedMut, + _obj: &mut SharedMut, ) -> Result> { Err("cannot access headers on a deferred response".into()) } @@ -217,8 +218,8 @@ mod router_plugin_mod { } #[rhai_fn(get = "body", pure, return_raw)] - pub(crate) fn get_originating_body_router_response( - obj: &mut SharedMut, + pub(crate) fn get_originating_body_supergraph_response( + obj: &mut SharedMut, ) -> Result> { Ok(obj.with_mut(|response| response.response.body().clone())) } @@ -239,7 +240,7 @@ mod router_plugin_mod { #[rhai_fn(get = "body", pure, return_raw)] pub(crate) fn get_originating_body_router_deferred_response( - obj: &mut SharedMut, + obj: &mut SharedMut, ) -> Result> { Ok(obj.with_mut(|response| response.response.clone())) } @@ -252,8 +253,8 @@ mod router_plugin_mod { } #[rhai_fn(set = "headers", return_raw)] - pub(crate) fn set_originating_headers_router_response( - obj: &mut SharedMut, + pub(crate) fn set_originating_headers_supergraph_response( + obj: &mut SharedMut, headers: HeaderMap, ) -> Result<(), Box> { obj.with_mut(|response| *response.response.headers_mut() = headers); @@ -262,7 +263,7 @@ mod router_plugin_mod { #[rhai_fn(set = "headers", return_raw)] pub(crate) fn set_originating_headers_router_deferred_response( - _obj: &mut SharedMut, + _obj: &mut SharedMut, _headers: HeaderMap, ) -> Result<(), Box> { Err("cannot access headers on a deferred response".into()) @@ -295,8 +296,8 @@ mod router_plugin_mod { } #[rhai_fn(set = "body", return_raw)] - pub(crate) fn set_originating_body_router_response( - obj: &mut SharedMut, + pub(crate) fn set_originating_body_supergraph_response( + obj: &mut SharedMut, body: Response, ) -> Result<(), Box> { obj.with_mut(|response| *response.response.body_mut() = body); @@ -323,7 +324,7 @@ mod router_plugin_mod { #[rhai_fn(set = "body", return_raw)] pub(crate) fn set_originating_body_router_deferred_response( - obj: &mut SharedMut, + obj: &mut SharedMut, body: Response, ) -> Result<(), Box> { obj.with_mut(|response| response.response = body); @@ -388,7 +389,7 @@ impl Plugin for Rhai { Ok(Self { ast, engine }) } - fn supergraph_service(&self, service: router::BoxService) -> router::BoxService { + fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService { const FUNCTION_NAME_SERVICE: &str = "supergraph_service"; if !self.ast_has_function(FUNCTION_NAME_SERVICE) { return service; @@ -398,7 +399,7 @@ impl Plugin for Rhai { if let Err(error) = self.run_rhai_service( FUNCTION_NAME_SERVICE, None, - ServiceStep::Router(shared_service.clone()), + ServiceStep::Supergraph(shared_service.clone()), ) { tracing::error!("service callback failed: {error}"); } @@ -442,7 +443,7 @@ impl Plugin for Rhai { #[derive(Clone, Debug)] pub(crate) enum ServiceStep { - Router(SharedMut), + Supergraph(SharedMut), Execution(SharedMut), Subgraph(SharedMut), } @@ -451,7 +452,17 @@ pub(crate) enum ServiceStep { macro_rules! gen_map_request { ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { $borrow.replace(|service| { + fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone { + move |_request: &$base::Request| { + tracing::info_span!( + "rhai plugin", + "rhai service" = stringify!($base::Request), + "otel.kind" = %SpanKind::Internal + ) + } + } ServiceBuilder::new() + .instrument(rhai_service_span()) .checkpoint(move |request: $base::Request| { // Let's define a local function to build an error response fn failure_message( @@ -511,6 +522,60 @@ macro_rules! gen_map_request { }; } +// Actually use the checkpoint function so that we can shortcut requests which fail +macro_rules! gen_map_deferred_request { + ($request: ident, $response: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + $borrow.replace(|service| { + fn rhai_service_span() -> impl Fn(&$request) -> tracing::Span + Clone { + move |_request: &$request| { + tracing::info_span!( + "rhai plugin", + "rhai service" = stringify!($request), + "otel.kind" = %SpanKind::Internal + ) + } + } + ServiceBuilder::new() + .instrument(rhai_service_span()) + .checkpoint(move |request: $request| { + // Let's define a local function to build an error response + fn failure_message( + context: Context, + msg: String, + status: StatusCode, + ) -> Result, BoxError> { + let res = $response::error_builder() + .errors(vec![Error { + message: msg, + ..Default::default() + }]) + .status_code(status) + .context(context) + .build()?; + Ok(ControlFlow::Break(res)) + } + let shared_request = Shared::new(Mutex::new(Some(request))); + let result = execute(&$rhai_service, &$callback, (shared_request.clone(),)); + + if let Err(error) = result { + tracing::error!("map_request callback failed: {error}"); + let mut guard = shared_request.lock().unwrap(); + let request_opt = guard.take(); + return failure_message( + request_opt.unwrap().context, + format!("rhai execution error: '{}'", error), + StatusCode::INTERNAL_SERVER_ERROR, + ); + } + let mut guard = shared_request.lock().unwrap(); + let request_opt = guard.take(); + Ok(ControlFlow::Continue(request_opt.unwrap())) + }) + .service(service) + .boxed() + }) + }; +} macro_rules! gen_map_response { ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { $borrow.replace(|service| { @@ -578,6 +643,123 @@ macro_rules! gen_map_response { }; } +macro_rules! gen_map_deferred_response { + ($response: ident, $rhai_response: ident, $rhai_deferred_response: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + $borrow.replace(|service| { + BoxService::new(service.and_then( + |mapped_response: $response| async move { + // Let's define a local function to build an error response + // XXX: This isn't ideal. We already have a response, so ideally we'd + // like to append this error into the existing response. However, + // the significantly different treatment of errors in different + // response types makes this extremely painful. This needs to be + // re-visited at some point post GA. + fn failure_message( + context: Context, + msg: String, + status: StatusCode, + ) -> $response { + let res = $response::error_builder() + .errors(vec![Error { + message: msg, + ..Default::default() + }]) + .status_code(status) + .context(context) + .build() + .expect("can't fail to build our error message"); + res + } + + // we split the response stream into headers+first response, then a stream of deferred responses + // for which we will implement mapping later + let $response { response, context } = mapped_response; + let (parts, stream) = response.into_parts(); + let (first, rest) = stream.into_future().await; + + if first.is_none() { + return Ok(failure_message( + context, + "rhai execution error: empty response".to_string(), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + + let response = $rhai_response { + context, + response: http::Response::from_parts( + parts, + first.expect("already checked"), + ) + .into(), + }; + let shared_response = Shared::new(Mutex::new(Some(response))); + + let result = + execute(&$rhai_service, &$callback, (shared_response.clone(),)); + if let Err(error) = result { + tracing::error!("map_response callback failed: {error}"); + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + return Ok(failure_message( + response_opt.unwrap().context, + format!("rhai execution error: '{}'", error), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + let $rhai_response { context, response } = + response_opt.unwrap(); + let (parts, body) = http::Response::from(response).into_parts(); + + let ctx = context.clone(); + + let mapped_stream = rest.filter_map(move |deferred_response| { + let rhai_service = $rhai_service.clone(); + let context = context.clone(); + let callback = $callback.clone(); + async move { + let response = $rhai_deferred_response { + context, + response: deferred_response, + }; + let shared_response = Shared::new(Mutex::new(Some(response))); + + let result = execute( + &rhai_service, + &callback, + (shared_response.clone(),), + ); + if let Err(error) = result { + tracing::error!("map_response callback failed: {error}"); + return None; + } + + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + let $rhai_deferred_response { response, .. } = + response_opt.unwrap(); + Some(response) + } + }); + + let response = http::Response::from_parts( + parts, + once(ready(body)).chain(mapped_stream).boxed(), + ) + .into(); + Ok($response { + context: ctx, + response, + }) + }, + )) + }) + }; +} + pub(crate) struct RhaiExecutionResponse { context: Context, response: http_ext::Response, @@ -712,93 +894,23 @@ macro_rules! register_rhai_interface { impl ServiceStep { fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) { match self { - ServiceStep::Router(service) => { - //gen_map_request!(router, service, rhai_service, callback); - service.replace(|service| { - ServiceBuilder::new() - .checkpoint(move |request: SupergraphRequest| { - // Let's define a local function to build an error response - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> Result, BoxError> - { - let res = SupergraphResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build()?; - Ok(ControlFlow::Break(res)) - } - let shared_request = Shared::new(Mutex::new(Some(request))); - let result = - execute(&rhai_service, &callback, (shared_request.clone(),)); - - if let Err(error) = result { - tracing::error!("map_request callback failed: {error}"); - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - return failure_message( - request_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - ); - } - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - Ok(ControlFlow::Continue(request_opt.unwrap())) - }) - .service(service) - .boxed() - }) + ServiceStep::Supergraph(service) => { + gen_map_deferred_request!( + SupergraphRequest, + SupergraphResponse, + service, + rhai_service, + callback + ); } ServiceStep::Execution(service) => { - //gen_map_request!(execution, service, rhai_service, callback); - service.replace(|service| { - ServiceBuilder::new() - .checkpoint(move |request: ExecutionRequest| { - // Let's define a local function to build an error response - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> Result, BoxError> - { - let res = ExecutionResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build()?; - Ok(ControlFlow::Break(res)) - } - let shared_request = Shared::new(Mutex::new(Some(request))); - let result = - execute(&rhai_service, &callback, (shared_request.clone(),)); - - if let Err(error) = result { - tracing::error!("map_request callback failed: {error}"); - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - return failure_message( - request_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - ); - } - let mut guard = shared_request.lock().unwrap(); - let request_opt = guard.take(); - Ok(ControlFlow::Continue(request_opt.unwrap())) - }) - .service(service) - .boxed() - }) + gen_map_deferred_request!( + ExecutionRequest, + ExecutionResponse, + service, + rhai_service, + callback + ); } ServiceStep::Subgraph(service) => { gen_map_request!(subgraph, service, rhai_service, callback); @@ -808,229 +920,25 @@ impl ServiceStep { fn map_response(&mut self, rhai_service: RhaiService, callback: FnPtr) { match self { - ServiceStep::Router(service) => { - service.replace(|service| { - BoxService::new(service.and_then( - |router_response: SupergraphResponse| async move { - // Let's define a local function to build an error response - // XXX: This isn't ideal. We already have a response, so ideally we'd - // like to append this error into the existing response. However, - // the significantly different treatment of errors in different - // response types makes this extremely painful. This needs to be - // re-visited at some point post GA. - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> SupergraphResponse { - let res = SupergraphResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build() - .expect("can't fail to build our error message"); - res - } - - // we split the response stream into headers+first response, then a stream of deferred responses - // for which we will implement mapping later - let SupergraphResponse { response, context } = router_response; - let (parts, stream) = response.into_parts(); - let (first, rest) = stream.into_future().await; - - if first.is_none() { - return Ok(failure_message( - context, - "rhai execution error: empty response".to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let response = RhaiSupergraphResponse { - context, - response: http::Response::from_parts( - parts, - first.expect("already checked"), - ) - .into(), - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - - let result = - execute(&rhai_service, &callback, (shared_response.clone(),)); - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - return Ok(failure_message( - response_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiSupergraphResponse { context, response } = - response_opt.unwrap(); - let (parts, body) = http::Response::from(response).into_parts(); - - let ctx = context.clone(); - - let mapped_stream = rest.filter_map(move |deferred_response| { - let rhai_service = rhai_service.clone(); - let context = context.clone(); - let callback = callback.clone(); - async move { - let response = RhaiSupergraphDeferredResponse { - context, - response: deferred_response, - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - - let result = execute( - &rhai_service, - &callback, - (shared_response.clone(),), - ); - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - return None; - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiSupergraphDeferredResponse { response, .. } = - response_opt.unwrap(); - Some(response) - } - }); - - let response = http::Response::from_parts( - parts, - once(ready(body)).chain(mapped_stream).boxed(), - ); - Ok(SupergraphResponse { - context: ctx, - response, - }) - }, - )) - }) + ServiceStep::Supergraph(service) => { + gen_map_deferred_response!( + SupergraphResponse, + RhaiSupergraphResponse, + RhaiSupergraphDeferredResponse, + service, + rhai_service, + callback + ); } ServiceStep::Execution(service) => { - service.replace(|service| { - service - .and_then(|execution_response: ExecutionResponse| async move { - // Let's define a local function to build an error response - // XXX: This isn't ideal. We already have a response, so ideally we'd - // like to append this error into the existing response. However, - // the significantly different treatment of errors in different - // response types makes this extremely painful. This needs to be - // re-visited at some point post GA. - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> ExecutionResponse { - ExecutionResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build() - .expect("can't fail to build our error message") - } - - // we split the response stream into headers+first response, then a stream of deferred responses - // for which we will implement mapping later - let ExecutionResponse { response, context } = execution_response; - let (parts, stream) = response.into_parts(); - let (first, rest) = stream.into_future().await; - - if first.is_none() { - return Ok(failure_message( - context, - "rhai execution error: empty response".to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let response = RhaiExecutionResponse { - context, - response: http::Response::from_parts( - parts, - first.expect("already checked"), - ) - .into(), - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - let result = - execute(&rhai_service, &callback, (shared_response.clone(),)); - - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - return Ok(failure_message( - response_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiExecutionResponse { context, response } = response_opt.unwrap(); - let (parts, body) = http::Response::from(response).into_parts(); - - let ctx = context.clone(); - - let mapped_stream = rest.filter_map(move |deferred_response| { - let rhai_service = rhai_service.clone(); - let context = context.clone(); - let callback = callback.clone(); - async move { - let response = RhaiExecutionDeferredResponse { - context, - response: deferred_response, - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - let result = execute( - &rhai_service, - &callback, - (shared_response.clone(),), - ); - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - return None; - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiExecutionDeferredResponse { response, .. } = - response_opt.unwrap(); - - Some(response) - } - }); - - let response = http::Response::from_parts( - parts, - once(ready(body)).chain(mapped_stream).boxed(), - ); - Ok(ExecutionResponse { - context: ctx, - response, - }) - }) - .boxed() - }) + gen_map_deferred_response!( + ExecutionResponse, + RhaiExecutionResponse, + RhaiExecutionDeferredResponse, + service, + rhai_service, + callback + ); } ServiceStep::Subgraph(service) => { gen_map_response!(subgraph, service, rhai_service, callback); @@ -1145,11 +1053,11 @@ impl Rhai { }) .register_fn( "headers_are_available", - |_: &mut SharedMut| -> bool { true }, + |_: &mut SharedMut| -> bool { true }, ) .register_fn( "headers_are_available", - |_: &mut SharedMut| -> bool { false }, + |_: &mut SharedMut| -> bool { false }, ) .register_fn( "headers_are_available", @@ -1402,18 +1310,18 @@ impl Rhai { }) .register_fn("to_string", |x: &mut Uri| -> String { format!("{:?}", x) }); - register_rhai_interface!(engine, router, execution, subgraph); + register_rhai_interface!(engine, supergraph, execution, subgraph); engine .register_get_result( "context", - |obj: &mut SharedMut| { + |obj: &mut SharedMut| { Ok(obj.with_mut(|response| response.context.clone())) }, ) .register_set_result( "context", - |obj: &mut SharedMut, context: Context| { + |obj: &mut SharedMut, context: Context| { obj.with_mut(|response| response.context = context); Ok(()) }, @@ -1488,14 +1396,14 @@ mod tests { let mut router_service = dyn_plugin.supergraph_service(BoxService::new(mock_service)); let context = Context::new(); context.insert("test", 5i64).unwrap(); - let router_req = SupergraphRequest::fake_builder().context(context).build()?; + let supergraph_req = SupergraphRequest::fake_builder().context(context).build()?; - let mut router_resp = router_service.ready().await?.call(router_req).await?; - assert_eq!(router_resp.response.status(), 200); - let headers = router_resp.response.headers().clone(); - let context = router_resp.context.clone(); + let mut supergraph_resp = router_service.ready().await?.call(supergraph_req).await?; + assert_eq!(supergraph_resp.response.status(), 200); + let headers = supergraph_resp.response.headers().clone(); + let context = supergraph_resp.context.clone(); // Check if it fails - let resp = router_resp.next_response().await.unwrap(); + let resp = supergraph_resp.next_response().await.unwrap(); if !resp.errors.is_empty() { panic!( "Contains errors : {}",