diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 014b8148c8..7a63c0d27b 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -1,6 +1,7 @@ --- source: apollo-router/src/configuration/tests.rs expression: "&schema" +snapshot_kind: text --- { "$schema": "http://json-schema.org/draft-07/schema#", @@ -2311,6 +2312,7 @@ expression: "&schema" }, "type": "object" }, +<<<<<<< HEAD "ErrorsForward": { "additionalProperties": false, "properties": { @@ -2332,6 +2334,9 @@ expression: "&schema" "type": "object" }, "EventLevel": { +======= + "EventLevelConfig": { +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) "enum": [ "info", "warn", @@ -2373,6 +2378,40 @@ expression: "&schema" } ] }, +<<<<<<< HEAD +======= + "Event_for_ConnectorAttributes_and_ConnectorSelector": { + "description": "An event that can be logged as part of a trace. The event has an implicit `type` attribute that matches the name of the event in the yaml and a message that can be used to provide additional information.", + "properties": { + "attributes": { + "$ref": "#/definitions/extendable_attribute_apollo_router::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes_apollo_router::plugins::telemetry::config_new::connector::selectors::ConnectorSelector", + "description": "#/definitions/extendable_attribute_apollo_router::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes_apollo_router::plugins::telemetry::config_new::connector::selectors::ConnectorSelector" + }, + "condition": { + "$ref": "#/definitions/Condition_for_ConnectorSelector", + "description": "#/definitions/Condition_for_ConnectorSelector" + }, + "level": { + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" + }, + "message": { + "description": "The event message.", + "type": "string" + }, + "on": { + "$ref": "#/definitions/EventOn", + "description": "#/definitions/EventOn" + } + }, + "required": [ + "level", + "message", + "on" + ], + "type": "object" + }, +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) "Event_for_RouterAttributes_and_RouterSelector": { "description": "An event that can be logged as part of a trace. The event has an implicit `type` attribute that matches the name of the event in the yaml and a message that can be used to provide additional information.", "properties": { @@ -2385,8 +2424,8 @@ expression: "&schema" "description": "#/definitions/Condition_for_RouterSelector" }, "level": { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" }, "message": { "description": "The event message.", @@ -2416,8 +2455,8 @@ expression: "&schema" "description": "#/definitions/Condition_for_SubgraphSelector" }, "level": { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" }, "message": { "description": "The event message.", @@ -2447,8 +2486,8 @@ expression: "&schema" "description": "#/definitions/Condition_for_SupergraphSelector" }, "level": { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" }, "message": { "description": "The event message.", @@ -5449,11 +5488,39 @@ expression: "&schema" } ] }, +<<<<<<< HEAD +======= + "StandardEventConfig_for_ConnectorSelector": { + "anyOf": [ + { + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" + }, + { + "properties": { + "condition": { + "$ref": "#/definitions/Condition_for_ConnectorSelector", + "description": "#/definitions/Condition_for_ConnectorSelector" + }, + "level": { + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" + } + }, + "required": [ + "condition", + "level" + ], + "type": "object" + } + ] + }, +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) "StandardEventConfig_for_RouterSelector": { "anyOf": [ { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" }, { "properties": { @@ -5462,8 +5529,8 @@ expression: "&schema" "description": "#/definitions/Condition_for_RouterSelector" }, "level": { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" } }, "required": [ @@ -5477,8 +5544,8 @@ expression: "&schema" "StandardEventConfig_for_SubgraphSelector": { "anyOf": [ { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" }, { "properties": { @@ -5487,8 +5554,8 @@ expression: "&schema" "description": "#/definitions/Condition_for_SubgraphSelector" }, "level": { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" } }, "required": [ @@ -5502,8 +5569,8 @@ expression: "&schema" "StandardEventConfig_for_SupergraphSelector": { "anyOf": [ { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" }, { "properties": { @@ -5512,8 +5579,8 @@ expression: "&schema" "description": "#/definitions/Condition_for_SupergraphSelector" }, "level": { - "$ref": "#/definitions/EventLevel", - "description": "#/definitions/EventLevel" + "$ref": "#/definitions/EventLevelConfig", + "description": "#/definitions/EventLevelConfig" } }, "required": [ diff --git a/apollo-router/src/plugins/connectors/handle_responses.rs b/apollo-router/src/plugins/connectors/handle_responses.rs new file mode 100644 index 0000000000..03bc295577 --- /dev/null +++ b/apollo-router/src/plugins/connectors/handle_responses.rs @@ -0,0 +1,1474 @@ +use std::sync::Arc; + +use apollo_compiler::collections::HashMap; +use apollo_federation::sources::connect::Connector; +use apollo_federation::sources::connect::JSONSelection; +use axum::body::HttpBody; +use http::header::CONTENT_LENGTH; +use itertools::Itertools; +use opentelemetry::KeyValue; +use parking_lot::Mutex; +use serde_json_bytes::ByteString; +use serde_json_bytes::Value; +use tracing::Span; + +use crate::Context; +use crate::graphql; +use crate::json_ext::Path; +use crate::plugins::connectors::make_requests::ResponseKey; +use crate::plugins::connectors::mapping::Problem; +use crate::plugins::connectors::mapping::aggregate_apply_to_errors; +use crate::plugins::connectors::plugin::debug::ConnectorContext; +use crate::plugins::connectors::plugin::debug::ConnectorDebugHttpRequest; +use crate::plugins::connectors::plugin::debug::SelectionData; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION; +use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse; +use crate::plugins::telemetry::config_new::events::log_event; +use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; +use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR; +use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK; +use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event; +use crate::services::connect::Response; +use crate::services::connector; +use crate::services::connector::request_service::Error; +use crate::services::connector::request_service::TransportResponse; +use crate::services::connector::request_service::transport::http::HttpResponse; +use crate::services::fetch::AddSubgraphNameExt; +use crate::services::router; + +const ENTITIES: &str = "_entities"; +const TYPENAME: &str = "__typename"; + +// --- ERRORS ------------------------------------------------------------------ + +#[derive(Debug, thiserror::Error, displaydoc::Display)] +pub(crate) enum HandleResponseError { + /// Merge error: {0} + MergeError(String), +} + +// --- RAW RESPONSE ------------------------------------------------------------ + +enum RawResponse { + /// This error type is used if: + /// 1. We didn't even make the request (we hit the request limit) + /// 2. We couldn't deserialize the response body + Error { + error: graphql::Error, + key: ResponseKey, + }, + /// Contains the response data directly from the HTTP response. We'll apply + /// a selection to convert this into either `data` or `errors` based on + /// whether it's successful or not. + Data { + parts: http::response::Parts, + data: Value, + key: ResponseKey, + debug_request: Option, + }, +} + +impl RawResponse { + /// Returns a response with data transformed by the selection mapping. + /// + /// As a side effect, this will also write to the debug context. + fn map_response( + self, + result: Result, + connector: Arc, + context: &Context, + debug_context: &Option>>, + supergraph_request: Arc>, + ) -> connector::request_service::Response { + let mapped_response = match self { + RawResponse::Error { error, key } => MappedResponse::Error { error, key }, + RawResponse::Data { + data, + key, + parts, + debug_request, + } => { + let inputs = key.inputs().merge( + &connector.response_variables, + &connector.response_headers, + connector.config.as_ref(), + context, + Some(parts.status.as_u16()), + supergraph_request, + Some(&parts), + ); + + let (res, apply_to_errors) = key.selection().apply_with_vars(&data, &inputs); + + let mapping_problems = aggregate_apply_to_errors(&apply_to_errors); + + if let Some(debug) = debug_context { + debug.lock().push_response( + debug_request.clone(), + &parts, + &data, + Some(SelectionData { + source: connector.selection.to_string(), + transformed: key.selection().to_string(), + result: res.clone(), + errors: mapping_problems.clone(), + }), + ); + } + + MappedResponse::Data { + key, + data: res.unwrap_or_else(|| Value::Null), + problems: mapping_problems, + } + } + }; + + connector::request_service::Response { + context: context.clone(), + connector: connector.clone(), + transport_result: result, + mapped_response, + } + } + + /// Returns a `MappedResponse` with a GraphQL error. + /// + /// As a side effect, this will also write to the debug context. + // TODO: This is where we'd map the response to a top-level GraphQL error + // once we have an error mapping. For now, it just creates a basic top-level + // error with the status code. + fn map_error( + self, + result: Result, + connector: Arc, + context: &Context, + debug_context: &Option>>, + ) -> connector::request_service::Response { + use serde_json_bytes::*; + + let mapped_response = match self { + RawResponse::Error { error, key } => MappedResponse::Error { error, key }, + RawResponse::Data { + key, + parts, + debug_request, + data, + } => { + let error = graphql::Error::builder() + .message("Request failed".to_string()) + .extension_code("CONNECTOR_FETCH") + .extension("service", connector.id.subgraph_name.clone()) + .extension( + "http", + Value::Object(Map::from_iter([( + "status".into(), + Value::Number(parts.status.as_u16().into()), + )])), + ) + .extension( + "connector", + Value::Object(Map::from_iter([( + "coordinate".into(), + Value::String(connector.id.coordinate().into()), + )])), + ) + .path::((&key).into()) + .build() + .add_subgraph_name(&connector.id.subgraph_name); // for include_subgraph_errors + + if let Some(debug) = debug_context { + debug + .lock() + .push_response(debug_request.clone(), &parts, &data, None); + } + + MappedResponse::Error { error, key } + } + }; + + if let MappedResponse::Error { + error: ref mapped_error, + key: _, + } = mapped_response + { + if let Some(Value::String(error_code)) = mapped_error.extensions.get("code") { + emit_error_event(error_code.as_str(), "Connector error occurred"); + } + } + + connector::request_service::Response { + context: context.clone(), + connector: connector.clone(), + transport_result: result, + mapped_response, + } + } +} + +// --- MAPPED RESPONSE --------------------------------------------------------- +#[derive(Debug)] +pub(crate) enum MappedResponse { + /// This is equivalent to RawResponse::Error, but it also represents errors + /// when the request is semantically unsuccessful (e.g. 404, 500). + Error { + error: graphql::Error, + key: ResponseKey, + }, + /// The response data after applying the selection mapping. + Data { + data: Value, + key: ResponseKey, + problems: Vec, + }, +} + +impl MappedResponse { + /// Adds the response data to the `data` map or the error to the `errors` + /// array. How data is added depends on the `ResponseKey`: it's either a + /// property directly on the map, or stored in the `_entities` array. + fn add_to_data( + self, + data: &mut serde_json_bytes::Map, + errors: &mut Vec, + count: usize, + ) -> Result<(), HandleResponseError> { + match self { + Self::Error { error, key, .. } => { + match key { + // add a null to the "_entities" array at the right index + ResponseKey::Entity { index, .. } | ResponseKey::EntityField { index, .. } => { + let entities = data + .entry(ENTITIES) + .or_insert(Value::Array(Vec::with_capacity(count))); + entities + .as_array_mut() + .ok_or_else(|| { + HandleResponseError::MergeError("_entities is not an array".into()) + })? + .insert(index, Value::Null); + } + _ => {} + }; + errors.push(error); + } + Self::Data { + data: value, key, .. + } => match key { + ResponseKey::RootField { ref name, .. } => { + data.insert(name.clone(), value); + } + ResponseKey::Entity { index, .. } => { + let entities = data + .entry(ENTITIES) + .or_insert(Value::Array(Vec::with_capacity(count))); + entities + .as_array_mut() + .ok_or_else(|| { + HandleResponseError::MergeError("_entities is not an array".into()) + })? + .insert(index, value); + } + ResponseKey::EntityField { + index, + ref field_name, + ref typename, + .. + } => { + let entities = data + .entry(ENTITIES) + .or_insert(Value::Array(Vec::with_capacity(count))) + .as_array_mut() + .ok_or_else(|| { + HandleResponseError::MergeError("_entities is not an array".into()) + })?; + + match entities.get_mut(index) { + Some(Value::Object(entity)) => { + entity.insert(field_name.clone(), value); + } + _ => { + let mut entity = serde_json_bytes::Map::new(); + if let Some(typename) = typename { + entity.insert(TYPENAME, Value::String(typename.as_str().into())); + } + entity.insert(field_name.clone(), value); + entities.insert(index, Value::Object(entity)); + } + }; + } + ResponseKey::BatchEntity { keys, inputs, .. } => { + let Value::Array(values) = value else { + return Err(HandleResponseError::MergeError( + "Response for a batch request does not map to an array".into(), + )); + }; + + let key_selection: Result = keys.try_into(); + let key_selection = key_selection + .map_err(|e| HandleResponseError::MergeError(e.to_string()))?; + + // Convert representations into keys for use in the map + let key_values = inputs.batch.iter().map(|v| { + key_selection + .apply_to(&Value::Object(v.clone())) + .0 + .unwrap_or(Value::Null) + }); + + // Create a map of keys to entities + let mut map = values + .into_iter() + .filter_map(|v| key_selection.apply_to(&v).0.map(|key| (key, v))) + .collect::>(); + + // Make a list of entities that matches the representations list + let new_entities = key_values + .map(|key| map.remove(&key).unwrap_or(Value::Null)) + .collect_vec(); + + // Because we may have multiple batch entities requests, we should add to ENTITIES as the requests come in so it is additive + let entities = data + .entry(ENTITIES) + .or_insert(Value::Array(Vec::with_capacity(count))); + + entities + .as_array_mut() + .ok_or_else(|| { + HandleResponseError::MergeError("_entities is not an array".into()) + })? + .extend(new_entities); + } + }, + } + + Ok(()) + } +} + +// --- handle_responses -------------------------------------------------------- + +pub(crate) async fn process_response( + result: Result, Error>, + response_key: ResponseKey, + connector: Arc, + context: &Context, + debug_request: Option, + debug_context: &Option>>, + supergraph_request: Arc>, +) -> connector::request_service::Response { + match result { + // This occurs when we short-circuit the request when over the limit + Err(error) => { + let raw = RawResponse::Error { + error: error.to_graphql_error(connector.clone(), Some((&response_key).into())), + key: response_key, + }; + Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); + raw.map_error(Err(error), connector, context, debug_context) + } + Ok(response) => { + let (parts, body) = response.into_parts(); + + let result = Ok(TransportResponse::Http(HttpResponse { + inner: parts.clone(), + })); + + // If this errors, it will write to the debug context because it + // has access to the raw bytes, so we can't write to it again + // in any RawResponse::Error branches. + let raw = match deserialize_response( + body, + &parts, + connector.clone(), + context, + &response_key, + debug_context, + &debug_request, + ) + .await + { + Ok(data) => RawResponse::Data { + parts, + data, + key: response_key, + debug_request, + }, + Err(error) => RawResponse::Error { + error, + key: response_key, + }, + }; + let is_success = match &raw { + RawResponse::Error { .. } => false, + RawResponse::Data { parts, .. } => parts.status.is_success(), + }; + if is_success { + Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK); + raw.map_response( + result, + connector, + context, + debug_context, + supergraph_request, + ) + } else { + Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); + raw.map_error(result, connector, context, debug_context) + } + } + } +} + +pub(crate) fn aggregate_responses( + responses: Vec, +) -> Result { + let mut data = serde_json_bytes::Map::new(); + let mut errors = Vec::new(); + let count = responses.len(); + + for mapped in responses { + mapped.add_to_data(&mut data, &mut errors, count)?; + } + + let data = if data.is_empty() { + Value::Null + } else { + Value::Object(data) + }; + + Span::current().record( + OTEL_STATUS_CODE, + if errors.is_empty() { + OTEL_STATUS_CODE_OK + } else { + OTEL_STATUS_CODE_ERROR + }, + ); + + Ok(Response { + response: http::Response::builder() + .body( + graphql::Response::builder() + .data(data) + .errors(errors) + .build(), + ) + .unwrap(), + }) +} + +/// Converts the response body to bytes and deserializes it into a json Value. +/// This is the last time we have access to the original bytes, so it's the only +/// opportunity to write the invalid response to the debug context. +async fn deserialize_response( + body: T, + parts: &http::response::Parts, + connector: Arc, + context: &Context, + response_key: &ResponseKey, + debug_context: &Option>>, + debug_request: &Option, +) -> Result { + use serde_json_bytes::*; + + let make_err = |path: Path| { + graphql::Error::builder() + .message("Request failed".to_string()) + .extension_code("CONNECTOR_FETCH") + .extension("service", connector.id.subgraph_name.clone()) + .extension( + "http", + Value::Object(Map::from_iter([( + "status".into(), + Value::Number(parts.status.as_u16().into()), + )])), + ) + .extension( + "connector", + Value::Object(Map::from_iter([( + "coordinate".into(), + Value::String(connector.id.coordinate().into()), + )])), + ) + .path(path) + .build() + .add_subgraph_name(&connector.id.subgraph_name) // for include_subgraph_errors + }; + + let path: Path = response_key.into(); + let body = &router::body::into_bytes(body) + .await + .map_err(|_| make_err(path.clone()))?; + + let log_response_level = context + .extensions() + .with_lock(|lock| lock.get::().cloned()) + .and_then(|event| { + // Create a temporary response here so we can evaluate the condition. This response + // is missing any information about the mapped response, because we don't have that + // yet. This means that we cannot correctly evaluate any condition that relies on + // the mapped response data or mapping problems. But we can't wait until we do have + // that information, because this is the only place we have the body bytes (without + // making an expensive clone of the body). So we either need to not expose any + // selector which can be used as a condition that requires mapping information, or + // we must document that such selectors cannot be used as conditions on standard + // connectors events. + + let response = connector::request_service::Response { + context: context.clone(), + connector: connector.clone(), + transport_result: Ok(TransportResponse::Http(HttpResponse { + inner: parts.clone(), + })), + mapped_response: MappedResponse::Data { + data: Value::Null, + key: response_key.clone(), + problems: vec![], + }, + }; + if event.condition.evaluate_response(&response) { + Some(event.level) + } else { + None + } + }); + + if let Some(level) = log_response_level { + let mut attrs = Vec::with_capacity(4); + #[cfg(test)] + let headers = { + let mut headers: indexmap::IndexMap = parts + .headers + .clone() + .into_iter() + .filter_map(|(name, val)| Some((name?.to_string(), val))) + .collect(); + headers.sort_keys(); + headers + }; + #[cfg(not(test))] + let headers = &parts.headers; + + attrs.push(KeyValue::new( + HTTP_RESPONSE_HEADERS, + opentelemetry::Value::String(format!("{:?}", headers).into()), + )); + attrs.push(KeyValue::new( + HTTP_RESPONSE_STATUS, + opentelemetry::Value::String(format!("{}", parts.status).into()), + )); + attrs.push(KeyValue::new( + HTTP_RESPONSE_VERSION, + opentelemetry::Value::String(format!("{:?}", parts.version).into()), + )); + attrs.push(KeyValue::new( + HTTP_RESPONSE_BODY, + opentelemetry::Value::String( + String::from_utf8(body.clone().to_vec()) + .unwrap_or_default() + .into(), + ), + )); + + log_event( + level, + "connector.response", + attrs, + &format!( + "Response from connector {label:?}", + label = connector.id.label + ), + ); + } + + // If the body is obviously empty, don't try to parse it + if let Some(content_length) = parts + .headers + .get(CONTENT_LENGTH) + .and_then(|len| len.to_str().ok()) + .and_then(|s| s.parse::().ok()) + { + if content_length == 0 { + return Ok(Value::Null); + } + } + + match serde_json::from_slice::(body) { + Ok(json_data) => Ok(json_data), + Err(_) => { + if let Some(debug_context) = debug_context { + debug_context + .lock() + .push_invalid_response(debug_request.clone(), parts, body); + } + + Err(make_err(path)) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use apollo_compiler::Schema; + use apollo_compiler::name; + use apollo_federation::sources::connect::ConnectId; + use apollo_federation::sources::connect::ConnectSpec; + use apollo_federation::sources::connect::Connector; + use apollo_federation::sources::connect::EntityResolver; + use apollo_federation::sources::connect::HTTPMethod; + use apollo_federation::sources::connect::HttpJsonTransport; + use apollo_federation::sources::connect::JSONSelection; + use insta::assert_debug_snapshot; + use itertools::Itertools; + use url::Url; + + use crate::Context; + use crate::graphql; + use crate::plugins::connectors::handle_responses::process_response; + use crate::plugins::connectors::make_requests::RequestInputs; + use crate::plugins::connectors::make_requests::ResponseKey; + use crate::services::router; + use crate::services::router::body::RouterBody; + + #[tokio::test] + async fn test_handle_responses_root_fields() { + let connector = Arc::new(Connector { + spec: ConnectSpec::V0_1, + id: ConnectId::new( + "subgraph_name".into(), + None, + name!(Query), + name!(hello), + 0, + "test label", + ), + transport: HttpJsonTransport { + source_url: Some(Url::parse("http://localhost/api").unwrap()), + connect_template: "/path".parse().unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: Default::default(), + }, + selection: JSONSelection::parse("$.data").unwrap(), + entity_resolver: None, + config: Default::default(), + max_requests: None, + request_variables: Default::default(), + response_variables: Default::default(), + batch_settings: None, + request_headers: Default::default(), + response_headers: Default::default(), + }); + + let response1: http::Response = http::Response::builder() + .body(router::body::from_bytes(r#"{"data":"world"}"#)) + .unwrap(); + let response_key1 = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let response2 = http::Response::builder() + .body(router::body::from_bytes(r#"{"data":"world"}"#)) + .unwrap(); + let response_key2 = ResponseKey::RootField { + name: "hello2".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let supergraph_request = Arc::new( + http::Request::builder() + .body(graphql::Request::builder().build()) + .unwrap(), + ); + + let res = super::aggregate_responses(vec![ + process_response( + Ok(response1), + response_key1, + connector.clone(), + &Context::default(), + None, + &None, + supergraph_request.clone(), + ) + .await + .mapped_response, + process_response( + Ok(response2), + response_key2, + connector, + &Context::default(), + None, + &None, + supergraph_request, + ) + .await + .mapped_response, + ]) + .unwrap(); + + assert_debug_snapshot!(res, @r###" + Response { + response: Response { + status: 200, + version: HTTP/1.1, + headers: {}, + body: Response { + label: None, + data: Some( + Object({ + "hello": String( + "world", + ), + "hello2": String( + "world", + ), + }), + ), + path: None, + errors: [], + extensions: {}, + has_next: None, + subscribed: None, + created_at: None, + incremental: [], + }, + }, + } + "###); + } + + #[tokio::test] + async fn test_handle_responses_entities() { + let connector = Arc::new(Connector { + spec: ConnectSpec::V0_1, + id: ConnectId::new( + "subgraph_name".into(), + None, + name!(Query), + name!(user), + 0, + "test label", + ), + transport: HttpJsonTransport { + source_url: Some(Url::parse("http://localhost/api").unwrap()), + connect_template: "/path".parse().unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: Default::default(), + }, + selection: JSONSelection::parse("$.data { id }").unwrap(), + entity_resolver: Some(EntityResolver::Explicit), + config: Default::default(), + max_requests: None, + request_variables: Default::default(), + response_variables: Default::default(), + batch_settings: None, + request_headers: Default::default(), + response_headers: Default::default(), + }); + + let response1: http::Response = http::Response::builder() + .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#)) + .unwrap(); + let response_key1 = ResponseKey::Entity { + index: 0, + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let response2 = http::Response::builder() + .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#)) + .unwrap(); + let response_key2 = ResponseKey::Entity { + index: 1, + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let supergraph_request = Arc::new( + http::Request::builder() + .body(graphql::Request::builder().build()) + .unwrap(), + ); + + let res = super::aggregate_responses(vec![ + process_response( + Ok(response1), + response_key1, + connector.clone(), + &Context::default(), + None, + &None, + supergraph_request.clone(), + ) + .await + .mapped_response, + process_response( + Ok(response2), + response_key2, + connector, + &Context::default(), + None, + &None, + supergraph_request, + ) + .await + .mapped_response, + ]) + .unwrap(); + + assert_debug_snapshot!(res, @r###" + Response { + response: Response { + status: 200, + version: HTTP/1.1, + headers: {}, + body: Response { + label: None, + data: Some( + Object({ + "_entities": Array([ + Object({ + "id": String( + "1", + ), + }), + Object({ + "id": String( + "2", + ), + }), + ]), + }), + ), + path: None, + errors: [], + extensions: {}, + has_next: None, + subscribed: None, + created_at: None, + incremental: [], + }, + }, + } + "###); + } + + #[tokio::test] + async fn test_handle_responses_batch() { + let connector = Arc::new(Connector { + spec: ConnectSpec::V0_2, + id: ConnectId::new_on_object( + "subgraph_name".into(), + None, + name!(User), + 0, + "test label", + ), + transport: HttpJsonTransport { + source_url: Some(Url::parse("http://localhost/api").unwrap()), + connect_template: "/path".parse().unwrap(), + method: HTTPMethod::Post, + headers: Default::default(), + body: Some(JSONSelection::parse("ids: $batch.id").unwrap()), + }, + selection: JSONSelection::parse("$.data { id name }").unwrap(), + entity_resolver: Some(EntityResolver::TypeBatch), + config: Default::default(), + max_requests: None, + request_variables: Default::default(), + response_variables: Default::default(), + batch_settings: None, + request_headers: Default::default(), + response_headers: Default::default(), + }); + + let keys = connector + .resolvable_key( + &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "") + .unwrap(), + ) + .unwrap() + .unwrap(); + + let response1: http::Response = http::Response::builder() + // different order from the request inputs + .body(router::body::from_bytes( + r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#, + )) + .unwrap(); + + let mut inputs: RequestInputs = RequestInputs::default(); + let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]); + inputs.batch = representations + .as_array() + .unwrap() + .iter() + .cloned() + .map(|v| v.as_object().unwrap().clone()) + .collect_vec(); + + let response_key1 = ResponseKey::BatchEntity { + selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()), + keys, + inputs, + }; + + let supergraph_request = Arc::new( + http::Request::builder() + .body(graphql::Request::builder().build()) + .unwrap(), + ); + + let res = super::aggregate_responses(vec![ + process_response( + Ok(response1), + response_key1, + connector.clone(), + &Context::default(), + None, + &None, + supergraph_request, + ) + .await + .mapped_response, + ]) + .unwrap(); + + assert_debug_snapshot!(res, @r#" + Response { + response: Response { + status: 200, + version: HTTP/1.1, + headers: {}, + body: Response { + label: None, + data: Some( + Object({ + "_entities": Array([ + Object({ + "id": String( + "1", + ), + "name": String( + "A", + ), + }), + Object({ + "id": String( + "2", + ), + "name": String( + "B", + ), + }), + ]), + }), + ), + path: None, + errors: [], + extensions: {}, + has_next: None, + subscribed: None, + created_at: None, + incremental: [], + }, + }, + } + "#); + } + + #[tokio::test] + async fn test_handle_responses_entity_field() { + let connector = Arc::new(Connector { + spec: ConnectSpec::V0_1, + id: ConnectId::new( + "subgraph_name".into(), + None, + name!(User), + name!(field), + 0, + "test label", + ), + transport: HttpJsonTransport { + source_url: Some(Url::parse("http://localhost/api").unwrap()), + connect_template: "/path".parse().unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: Default::default(), + }, + selection: JSONSelection::parse("$.data").unwrap(), + entity_resolver: Some(EntityResolver::Implicit), + config: Default::default(), + max_requests: None, + request_variables: Default::default(), + response_variables: Default::default(), + batch_settings: None, + request_headers: Default::default(), + response_headers: Default::default(), + }); + + let response1: http::Response = http::Response::builder() + .body(router::body::from_bytes(r#"{"data":"value1"}"#)) + .unwrap(); + let response_key1 = ResponseKey::EntityField { + index: 0, + inputs: Default::default(), + field_name: "field".to_string(), + typename: Some(name!("User")), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let response2 = http::Response::builder() + .body(router::body::from_bytes(r#"{"data":"value2"}"#)) + .unwrap(); + let response_key2 = ResponseKey::EntityField { + index: 1, + inputs: Default::default(), + field_name: "field".to_string(), + typename: Some(name!("User")), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let supergraph_request = Arc::new( + http::Request::builder() + .body(graphql::Request::builder().build()) + .unwrap(), + ); + + let res = super::aggregate_responses(vec![ + process_response( + Ok(response1), + response_key1, + connector.clone(), + &Context::default(), + None, + &None, + supergraph_request.clone(), + ) + .await + .mapped_response, + process_response( + Ok(response2), + response_key2, + connector, + &Context::default(), + None, + &None, + supergraph_request, + ) + .await + .mapped_response, + ]) + .unwrap(); + + assert_debug_snapshot!(res, @r###" + Response { + response: Response { + status: 200, + version: HTTP/1.1, + headers: {}, + body: Response { + label: None, + data: Some( + Object({ + "_entities": Array([ + Object({ + "__typename": String( + "User", + ), + "field": String( + "value1", + ), + }), + Object({ + "__typename": String( + "User", + ), + "field": String( + "value2", + ), + }), + ]), + }), + ), + path: None, + errors: [], + extensions: {}, + has_next: None, + subscribed: None, + created_at: None, + incremental: [], + }, + }, + } + "###); + } + + #[tokio::test] + async fn test_handle_responses_errors() { + let connector = Arc::new(Connector { + spec: ConnectSpec::V0_1, + id: ConnectId::new( + "subgraph_name".into(), + None, + name!(Query), + name!(user), + 0, + "test label", + ), + transport: HttpJsonTransport { + source_url: Some(Url::parse("http://localhost/api").unwrap()), + connect_template: "/path".parse().unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: Default::default(), + }, + selection: JSONSelection::parse("$.data").unwrap(), + entity_resolver: Some(EntityResolver::Explicit), + config: Default::default(), + max_requests: None, + request_variables: Default::default(), + response_variables: Default::default(), + batch_settings: None, + request_headers: Default::default(), + response_headers: Default::default(), + }); + + let response_plaintext: http::Response = http::Response::builder() + .body(router::body::from_bytes(r#"plain text"#)) + .unwrap(); + let response_key_plaintext = ResponseKey::Entity { + index: 0, + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let response1: http::Response = http::Response::builder() + .status(404) + .body(router::body::from_bytes(r#"{"error":"not found"}"#)) + .unwrap(); + let response_key1 = ResponseKey::Entity { + index: 1, + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let response2 = http::Response::builder() + .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#)) + .unwrap(); + let response_key2 = ResponseKey::Entity { + index: 2, + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let response3: http::Response = http::Response::builder() + .status(500) + .body(router::body::from_bytes(r#"{"error":"whoops"}"#)) + .unwrap(); + let response_key3 = ResponseKey::Entity { + index: 3, + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + + let supergraph_request = Arc::new( + http::Request::builder() + .body(graphql::Request::builder().build()) + .unwrap(), + ); + + let res = super::aggregate_responses(vec![ + process_response( + Ok(response_plaintext), + response_key_plaintext, + connector.clone(), + &Context::default(), + None, + &None, + supergraph_request.clone(), + ) + .await + .mapped_response, + process_response( + Ok(response1), + response_key1, + connector.clone(), + &Context::default(), + None, + &None, + supergraph_request.clone(), + ) + .await + .mapped_response, + process_response( + Ok(response2), + response_key2, + connector.clone(), + &Context::default(), + None, + &None, + supergraph_request.clone(), + ) + .await + .mapped_response, + process_response( + Ok(response3), + response_key3, + connector, + &Context::default(), + None, + &None, + supergraph_request, + ) + .await + .mapped_response, + ]) + .unwrap(); + + assert_debug_snapshot!(res, @r###" + Response { + response: Response { + status: 200, + version: HTTP/1.1, + headers: {}, + body: Response { + label: None, + data: Some( + Object({ + "_entities": Array([ + Null, + Null, + Object({ + "id": String( + "2", + ), + }), + Null, + ]), + }), + ), + path: None, + errors: [ + Error { + message: "Request failed", + locations: [], + path: Some( + Path( + [ + Key( + "_entities", + None, + ), + Index( + 0, + ), + ], + ), + ), + extensions: { + "service": String( + "subgraph_name", + ), + "http": Object({ + "status": Number(200), + }), + "connector": Object({ + "coordinate": String( + "subgraph_name:Query.user@connect[0]", + ), + }), + "code": String( + "CONNECTOR_FETCH", + ), + "fetch_subgraph_name": String( + "subgraph_name", + ), + }, + }, + Error { + message: "Request failed", + locations: [], + path: Some( + Path( + [ + Key( + "_entities", + None, + ), + Index( + 1, + ), + ], + ), + ), + extensions: { + "service": String( + "subgraph_name", + ), + "http": Object({ + "status": Number(404), + }), + "connector": Object({ + "coordinate": String( + "subgraph_name:Query.user@connect[0]", + ), + }), + "code": String( + "CONNECTOR_FETCH", + ), + "fetch_subgraph_name": String( + "subgraph_name", + ), + }, + }, + Error { + message: "Request failed", + locations: [], + path: Some( + Path( + [ + Key( + "_entities", + None, + ), + Index( + 3, + ), + ], + ), + ), + extensions: { + "service": String( + "subgraph_name", + ), + "http": Object({ + "status": Number(500), + }), + "connector": Object({ + "coordinate": String( + "subgraph_name:Query.user@connect[0]", + ), + }), + "code": String( + "CONNECTOR_FETCH", + ), + "fetch_subgraph_name": String( + "subgraph_name", + ), + }, + }, + ], + extensions: {}, + has_next: None, + subscribed: None, + created_at: None, + incremental: [], + }, + }, + } + "###); + } + + #[tokio::test] + async fn test_handle_responses_status() { + let selection = JSONSelection::parse("$status").unwrap(); + let connector = Arc::new(Connector { + spec: ConnectSpec::V0_1, + id: ConnectId::new( + "subgraph_name".into(), + None, + name!(Query), + name!(hello), + 0, + "test label", + ), + transport: HttpJsonTransport { + source_url: Some(Url::parse("http://localhost/api").unwrap()), + connect_template: "/path".parse().unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: Default::default(), + }, + selection: selection.clone(), + entity_resolver: None, + config: Default::default(), + max_requests: None, + request_variables: Default::default(), + batch_settings: None, + response_variables: selection + .variable_references() + .map(|var_ref| var_ref.namespace.namespace) + .collect(), + request_headers: Default::default(), + response_headers: Default::default(), + }); + + let response1: http::Response = http::Response::builder() + .status(201) + .body(router::body::from_bytes(r#"{}"#)) + .unwrap(); + let response_key1 = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$status").unwrap()), + }; + + let supergraph_request = Arc::new( + http::Request::builder() + .body(graphql::Request::builder().build()) + .unwrap(), + ); + + let res = super::aggregate_responses(vec![ + process_response( + Ok(response1), + response_key1, + connector, + &Context::default(), + None, + &None, + supergraph_request, + ) + .await + .mapped_response, + ]) + .unwrap(); + + assert_debug_snapshot!(res, @r###" + Response { + response: Response { + status: 200, + version: HTTP/1.1, + headers: {}, + body: Response { + label: None, + data: Some( + Object({ + "hello": Number(201), + }), + ), + path: None, + errors: [], + extensions: {}, + has_next: None, + subscribed: None, + created_at: None, + incremental: [], + }, + }, + } + "###); + } +} diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/events.rs b/apollo-router/src/plugins/telemetry/config_new/connector/events.rs new file mode 100644 index 0000000000..22faabca97 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/connector/events.rs @@ -0,0 +1,125 @@ +use std::sync::Arc; + +use opentelemetry::Key; +use opentelemetry::KeyValue; +use parking_lot::Mutex; +use schemars::JsonSchema; +use serde::Deserialize; +use tower::BoxError; + +use crate::Context; +use crate::plugins::telemetry::config_new::conditions::Condition; +use crate::plugins::telemetry::config_new::connector::ConnectorRequest; +use crate::plugins::telemetry::config_new::connector::ConnectorResponse; +use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes; +use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelector; +use crate::plugins::telemetry::config_new::events::CustomEvent; +use crate::plugins::telemetry::config_new::events::CustomEvents; +use crate::plugins::telemetry::config_new::events::Event; +use crate::plugins::telemetry::config_new::events::EventLevel; +use crate::plugins::telemetry::config_new::events::StandardEvent; +use crate::plugins::telemetry::config_new::events::StandardEventConfig; +use crate::plugins::telemetry::config_new::events::log_event; +use crate::plugins::telemetry::config_new::extendable::Extendable; + +#[derive(Clone)] +pub(crate) struct ConnectorEventRequest { + // XXX(@IvanGoncharov): As part of removing Mutex from StandardEvent I moved it here + // I think it's not nessary here but can't verify it right now, so in future can just wrap StandardEvent + pub(crate) level: EventLevel, + pub(crate) condition: Arc>>, +} + +#[derive(Clone)] +pub(crate) struct ConnectorEventResponse { + // XXX(@IvanGoncharov): As part of removing Arc from StandardEvent I moved it here + // I think it's not nessary here but can't verify it right now, so in future can just wrap StandardEvent + pub(crate) level: EventLevel, + pub(crate) condition: Arc>, +} + +#[derive(Clone, Deserialize, JsonSchema, Debug, Default)] +#[serde(deny_unknown_fields, default)] +pub(crate) struct ConnectorEventsConfig { + /// Log the connector HTTP request + pub(crate) request: StandardEventConfig, + /// Log the connector HTTP response + pub(crate) response: StandardEventConfig, + /// Log the connector HTTP error + pub(crate) error: StandardEventConfig, +} + +pub(crate) type ConnectorEvents = + CustomEvents; + +pub(crate) fn new_connector_events( + config: &Extendable>, +) -> ConnectorEvents { + let custom_events = config + .custom + .iter() + .filter_map(|(name, config)| CustomEvent::from_config(name, config)) + .collect(); + + ConnectorEvents { + request: StandardEvent::from_config(&config.attributes.request), + response: StandardEvent::from_config(&config.attributes.response), + error: StandardEvent::from_config(&config.attributes.error), + custom: custom_events, + } +} + +impl CustomEvents { + pub(crate) fn on_request(&mut self, request: &ConnectorRequest) { + // Any condition on the request is NOT evaluated here. It must be evaluated later when + // getting the ConnectorEventRequest from the context. The request context is shared + // between all connector requests, so any request could find this ConnectorEventRequest in + // the context. Its presence on the context cannot be conditional on an individual request. + if let Some(request_event) = self.request.take() { + request.context.extensions().with_lock(|lock| { + lock.insert(ConnectorEventRequest { + level: request_event.level, + condition: Arc::new(Mutex::new(request_event.condition)), + }) + }); + } + + if let Some(response_event) = self.response.take() { + request.context.extensions().with_lock(|lock| { + lock.insert(ConnectorEventResponse { + level: response_event.level, + condition: Arc::new(response_event.condition), + }) + }); + } + + for custom_event in &mut self.custom { + custom_event.on_request(request); + } + } + + pub(crate) fn on_response(&mut self, response: &ConnectorResponse) { + for custom_event in &mut self.custom { + custom_event.on_response(response); + } + } + + pub(crate) fn on_error(&mut self, error: &BoxError, ctx: &Context) { + if let Some(error_event) = &mut self.error { + if error_event.condition.evaluate_error(error, ctx) { + log_event( + error_event.level, + "connector.http.error", + vec![KeyValue::new( + Key::from_static_str("error"), + opentelemetry::Value::String(error.to_string().into()), + )], + "", + ); + } + } + for custom_event in &mut self.custom { + custom_event.on_error(error, ctx); + } + } +} diff --git a/apollo-router/src/plugins/telemetry/config_new/events.rs b/apollo-router/src/plugins/telemetry/config_new/events.rs index e0a9b108ab..6bacdfdda6 100644 --- a/apollo-router/src/plugins/telemetry/config_new/events.rs +++ b/apollo-router/src/plugins/telemetry/config_new/events.rs @@ -16,7 +16,6 @@ use tracing::info_span; use super::Selector; use super::Selectors; use super::Stage; -use super::instruments::Instrumented; use crate::Context; use crate::graphql; use crate::plugins::telemetry::config_new::attributes::RouterAttributes; @@ -32,6 +31,16 @@ use crate::services::router; use crate::services::subgraph; use crate::services::supergraph; +<<<<<<< HEAD +======= +#[derive(Clone)] +pub(crate) struct DisplayRouterRequest(pub(crate) EventLevel); +#[derive(Default, Clone)] +pub(crate) struct DisplayRouterResponse(pub(crate) bool); +#[derive(Default, Clone)] +pub(crate) struct RouterResponseBodyExtensionType(pub(crate) String); + +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) /// Events are #[derive(Deserialize, JsonSchema, Clone, Default, Debug)] #[serde(deny_unknown_fields, default)] @@ -50,27 +59,13 @@ impl Events { .router .custom .iter() - .filter_map(|(event_name, event_cfg)| match &event_cfg.level { - EventLevel::Off => None, - _ => Some(CustomEvent { - inner: Mutex::new(CustomEventInner { - name: event_name.clone(), - level: event_cfg.level, - event_on: event_cfg.on, - message: event_cfg.message.clone(), - selectors: event_cfg.attributes.clone().into(), - condition: event_cfg.condition.clone(), - attributes: Vec::new(), - _phantom: PhantomData, - }), - }), - }) + .filter_map(|(name, config)| CustomEvent::from_config(name, config)) .collect(); RouterEvents { - request: self.router.attributes.request.clone().into(), - response: self.router.attributes.response.clone().into(), - error: self.router.attributes.error.clone().into(), + request: StandardEvent::from_config(&self.router.attributes.request), + response: StandardEvent::from_config(&self.router.attributes.response), + error: StandardEvent::from_config(&self.router.attributes.error), custom: custom_events, } } @@ -80,27 +75,13 @@ impl Events { .supergraph .custom .iter() - .filter_map(|(event_name, event_cfg)| match &event_cfg.level { - EventLevel::Off => None, - _ => Some(CustomEvent { - inner: Mutex::new(CustomEventInner { - name: event_name.clone(), - level: event_cfg.level, - event_on: event_cfg.on, - message: event_cfg.message.clone(), - selectors: event_cfg.attributes.clone().into(), - condition: event_cfg.condition.clone(), - attributes: Vec::new(), - _phantom: PhantomData, - }), - }), - }) + .filter_map(|(name, config)| CustomEvent::from_config(name, config)) .collect(); SupergraphEvents { - request: self.supergraph.attributes.request.clone().into(), - response: self.supergraph.attributes.response.clone().into(), - error: self.supergraph.attributes.error.clone().into(), + request: StandardEvent::from_config(&self.supergraph.attributes.request), + response: StandardEvent::from_config(&self.supergraph.attributes.response), + error: StandardEvent::from_config(&self.supergraph.attributes.error), custom: custom_events, } } @@ -110,32 +91,19 @@ impl Events { .subgraph .custom .iter() - .filter_map(|(event_name, event_cfg)| match &event_cfg.level { - EventLevel::Off => None, - _ => Some(CustomEvent { - inner: Mutex::new(CustomEventInner { - name: event_name.clone(), - level: event_cfg.level, - event_on: event_cfg.on, - message: event_cfg.message.clone(), - selectors: event_cfg.attributes.clone().into(), - condition: event_cfg.condition.clone(), - attributes: Vec::new(), - _phantom: PhantomData, - }), - }), - }) + .filter_map(|(name, config)| CustomEvent::from_config(name, config)) .collect(); SubgraphEvents { - request: self.subgraph.attributes.request.clone().into(), - response: self.subgraph.attributes.response.clone().into(), - error: self.subgraph.attributes.error.clone().into(), + request: StandardEvent::from_config(&self.subgraph.attributes.request), + response: StandardEvent::from_config(&self.subgraph.attributes.response), + error: StandardEvent::from_config(&self.subgraph.attributes.error), custom: custom_events, } } pub(crate) fn validate(&self) -> Result<(), String> { +<<<<<<< HEAD if let StandardEventConfig::Conditional { condition, .. } = &self.router.attributes.request { condition.validate(Some(Stage::Request))?; @@ -164,6 +132,40 @@ impl Events { { condition.validate(Some(Stage::Response))?; } +======= + self.router + .attributes + .request + .validate(Some(Stage::Request))?; + self.router + .attributes + .response + .validate(Some(Stage::Response))?; + self.supergraph + .attributes + .request + .validate(Some(Stage::Request))?; + self.supergraph + .attributes + .response + .validate(Some(Stage::Response))?; + self.subgraph + .attributes + .request + .validate(Some(Stage::Request))?; + self.subgraph + .attributes + .response + .validate(Some(Stage::Response))?; + self.connector + .attributes + .request + .validate(Some(Stage::Request))?; + self.connector + .attributes + .response + .validate(Some(Stage::Response))?; +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) for (name, custom_event) in &self.router.custom { custom_event.validate().map_err(|err| { format!("configuration error for router custom event {name:?}: {err}") @@ -203,25 +205,24 @@ where Attributes: Selectors + Default, Sel: Selector + Debug, { +<<<<<<< HEAD request: StandardEvent, response: StandardEvent, error: StandardEvent, custom: Vec>, +======= + pub(super) request: Option>, + pub(super) response: Option>, + pub(super) error: Option>, + pub(super) custom: Vec>, +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) } -impl Instrumented - for CustomEvents -{ - type Request = router::Request; - type Response = router::Response; - type EventResponse = (); - - fn on_request(&self, request: &Self::Request) { - if self.request.level() != EventLevel::Off { - if let Some(condition) = self.request.condition() { - if condition.lock().evaluate_request(request) != Some(true) { - return; - } +impl CustomEvents { + pub(crate) fn on_request(&mut self, request: &router::Request) { + if let Some(request_event) = &mut self.request { + if request_event.condition.evaluate_request(request) != Some(true) { + return; } let mut attrs = Vec::with_capacity(5); #[cfg(test)] @@ -237,6 +238,7 @@ impl Instrumented #[cfg(not(test))] let headers = request.router_request.headers(); +<<<<<<< HEAD attrs.push(KeyValue::new( Key::from_static_str("http.request.headers"), opentelemetry::Value::String(format!("{:?}", headers).into()), @@ -260,18 +262,28 @@ impl Instrumented opentelemetry::Value::String(format!("{:?}", request.router_request.body()).into()), )); log_event(self.request.level(), "router.request", attrs, ""); +======= + request + .context + .extensions() + .with_lock(|ext| ext.insert(DisplayRouterRequest(request_event.level))); } - for custom_event in &self.custom { + if self.response.is_some() { + request + .context + .extensions() + .with_lock(|ext| ext.insert(DisplayRouterResponse(true))); +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) + } + for custom_event in &mut self.custom { custom_event.on_request(request); } } - fn on_response(&self, response: &Self::Response) { - if self.response.level() != EventLevel::Off { - if let Some(condition) = self.response.condition() { - if !condition.lock().evaluate_response(response) { - return; - } + pub(crate) fn on_response(&mut self, response: &router::Response) { + if let Some(response_event) = &self.response { + if !response_event.condition.evaluate_response(response) { + return; } let mut attrs = Vec::with_capacity(4); @@ -299,26 +311,40 @@ impl Instrumented Key::from_static_str("http.response.version"), opentelemetry::Value::String(format!("{:?}", response.response.version()).into()), )); +<<<<<<< HEAD attrs.push(KeyValue::new( Key::from_static_str("http.response.body"), opentelemetry::Value::String(format!("{:?}", response.response.body()).into()), )); log_event(self.response.level(), "router.response", attrs, ""); +======= + + if let Some(body) = response + .context + .extensions() + .with_lock(|ext| ext.remove::()) + { + attrs.push(KeyValue::new( + HTTP_RESPONSE_BODY, + opentelemetry::Value::String(body.0.into()), + )); + } + + log_event(response_event.level, "router.response", attrs, ""); +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) } - for custom_event in &self.custom { + for custom_event in &mut self.custom { custom_event.on_response(response); } } - fn on_error(&self, error: &BoxError, ctx: &Context) { - if self.error.level() != EventLevel::Off { - if let Some(condition) = self.error.condition() { - if !condition.lock().evaluate_error(error, ctx) { - return; - } + pub(crate) fn on_error(&mut self, error: &BoxError, ctx: &Context) { + if let Some(error_event) = &self.error { + if !error_event.condition.evaluate_error(error, ctx) { + return; } log_event( - self.error.level(), + error_event.level, "router.error", vec![KeyValue::new( Key::from_static_str("error"), @@ -327,31 +353,25 @@ impl Instrumented "", ); } - for custom_event in &self.custom { + for custom_event in &mut self.custom { custom_event.on_error(error, ctx); } } } -impl Instrumented - for CustomEvents< +impl + CustomEvents< supergraph::Request, supergraph::Response, - crate::graphql::Response, + graphql::Response, SupergraphAttributes, SupergraphSelector, > { - type Request = supergraph::Request; - type Response = supergraph::Response; - type EventResponse = crate::graphql::Response; - - fn on_request(&self, request: &Self::Request) { - if self.request.level() != EventLevel::Off { - if let Some(condition) = self.request.condition() { - if condition.lock().evaluate_request(request) != Some(true) { - return; - } + pub(crate) fn on_request(&mut self, request: &supergraph::Request) { + if let Some(request_event) = &mut self.request { + if request_event.condition.evaluate_request(request) != Some(true) { + return; } let mut attrs = Vec::with_capacity(5); #[cfg(test)] @@ -396,40 +416,48 @@ impl Instrumented .into(), ), )); - log_event(self.request.level(), "supergraph.request", attrs, ""); + log_event(request_event.level, "supergraph.request", attrs, ""); } +<<<<<<< HEAD if self.response.level() != EventLevel::Off { request .context .extensions() .with_lock(|mut lock| lock.insert(SupergraphEventResponse(self.response.clone()))); +======= + if let Some(response_event) = self.response.take() { + request.context.extensions().with_lock(|lock| { + lock.insert(SupergraphEventResponse { + level: response_event.level, + condition: Arc::new(response_event.condition), + }) + }); +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) } - for custom_event in &self.custom { + for custom_event in &mut self.custom { custom_event.on_request(request); } } - fn on_response(&self, response: &Self::Response) { - for custom_event in &self.custom { + pub(crate) fn on_response(&mut self, response: &supergraph::Response) { + for custom_event in &mut self.custom { custom_event.on_response(response); } } - fn on_response_event(&self, response: &Self::EventResponse, ctx: &Context) { + pub(crate) fn on_response_event(&self, response: &graphql::Response, ctx: &Context) { for custom_event in &self.custom { custom_event.on_response_event(response, ctx); } } - fn on_error(&self, error: &BoxError, ctx: &Context) { - if self.error.level() != EventLevel::Off { - if let Some(condition) = self.error.condition() { - if !condition.lock().evaluate_error(error, ctx) { - return; - } + pub(crate) fn on_error(&mut self, error: &BoxError, ctx: &Context) { + if let Some(error_event) = &self.error { + if !error_event.condition.evaluate_error(error, ctx) { + return; } log_event( - self.error.level(), + error_event.level, "supergraph.error", vec![KeyValue::new( Key::from_static_str("error"), @@ -438,31 +466,26 @@ impl Instrumented "", ); } - for custom_event in &self.custom { + for custom_event in &mut self.custom { custom_event.on_error(error, ctx); } } } -impl Instrumented - for CustomEvents< - subgraph::Request, - subgraph::Response, - (), - SubgraphAttributes, - SubgraphSelector, - > -{ - type Request = subgraph::Request; - type Response = subgraph::Response; - type EventResponse = (); - - fn on_request(&self, request: &Self::Request) { - if let Some(condition) = self.request.condition() { - if condition.lock().evaluate_request(request) != Some(true) { +impl CustomEvents { + pub(crate) fn on_request(&mut self, request: &subgraph::Request) { + if let Some(mut request_event) = self.request.take() { + if request_event.condition.evaluate_request(request) != Some(true) { return; } + request.context.extensions().with_lock(|lock| { + lock.insert(SubgraphEventRequest { + level: request_event.level, + condition: Arc::new(Mutex::new(request_event.condition)), + }) + }); } +<<<<<<< HEAD if self.request.level() != EventLevel::Off { request .context @@ -476,25 +499,34 @@ impl Instrumented .with_lock(|mut lock| lock.insert(SubgraphEventResponse(self.response.clone()))); } for custom_event in &self.custom { +======= + if let Some(response_event) = self.response.take() { + request.context.extensions().with_lock(|lock| { + lock.insert(SubgraphEventResponse { + level: response_event.level, + condition: Arc::new(response_event.condition), + }) + }); + } + for custom_event in &mut self.custom { +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) custom_event.on_request(request); } } - fn on_response(&self, response: &Self::Response) { - for custom_event in &self.custom { + pub(crate) fn on_response(&mut self, response: &subgraph::Response) { + for custom_event in &mut self.custom { custom_event.on_response(response); } } - fn on_error(&self, error: &BoxError, ctx: &Context) { - if self.error.level() != EventLevel::Off { - if let Some(condition) = self.error.condition() { - if !condition.lock().evaluate_error(error, ctx) { - return; - } + pub(crate) fn on_error(&mut self, error: &BoxError, ctx: &Context) { + if let Some(error_event) = &self.error { + if !error_event.condition.evaluate_error(error, ctx) { + return; } log_event( - self.error.level(), + error_event.level, "subgraph.error", vec![KeyValue::new( Key::from_static_str("error"), @@ -503,7 +535,7 @@ impl Instrumented "", ); } - for custom_event in &self.custom { + for custom_event in &mut self.custom { custom_event.on_error(error, ctx); } } @@ -521,11 +553,28 @@ struct RouterEventsConfig { } #[derive(Clone)] -pub(crate) struct SupergraphEventResponse(pub(crate) StandardEvent); +pub(crate) struct SupergraphEventResponse { + // XXX(@IvanGoncharov): As part of removing Arc from StandardEvent I moved it here + // I think it's not nessary here but can't verify it right now, so in future can just wrap StandardEvent + pub(crate) level: EventLevel, + pub(crate) condition: Arc>, +} + #[derive(Clone)] -pub(crate) struct SubgraphEventResponse(pub(crate) StandardEvent); +pub(crate) struct SubgraphEventResponse { + // XXX(@IvanGoncharov): As part of removing Arc from StandardEvent I moved it here + // I think it's not nessary here but can't verify it right now, so in future can just wrap StandardEvent + pub(crate) level: EventLevel, + pub(crate) condition: Arc>, +} + #[derive(Clone)] -pub(crate) struct SubgraphEventRequest(pub(crate) StandardEvent); +pub(crate) struct SubgraphEventRequest { + // XXX(@IvanGoncharov): As part of removing Mutex from StandardEvent I moved it here + // I think it's not nessary here but can't verify it right now, so in future can just wrap StandardEvent + pub(crate) level: EventLevel, + pub(crate) condition: Arc>>, +} #[derive(Clone, Deserialize, JsonSchema, Debug, Default)] #[serde(deny_unknown_fields, default)] @@ -552,59 +601,54 @@ struct SubgraphEventsConfig { #[derive(Deserialize, JsonSchema, Clone, Debug)] #[serde(untagged)] pub(crate) enum StandardEventConfig { - Level(EventLevel), + Level(EventLevelConfig), Conditional { - level: EventLevel, + level: EventLevelConfig, condition: Condition, }, } -#[derive(Debug, Clone)] -pub(crate) enum StandardEvent { - Level(EventLevel), - Conditional { - level: EventLevel, - condition: Arc>>, - }, -} - -impl From> for StandardEvent { - fn from(value: StandardEventConfig) -> Self { - match value { - StandardEventConfig::Level(level) => StandardEvent::Level(level), - StandardEventConfig::Conditional { level, condition } => StandardEvent::Conditional { - level, - condition: Arc::new(Mutex::new(condition)), - }, +impl StandardEventConfig { + fn validate(&self, restricted_stage: Option) -> Result<(), String> { + if let Self::Conditional { condition, .. } = self { + condition.validate(restricted_stage) + } else { + Ok(()) } } } impl Default for StandardEventConfig { fn default() -> Self { - Self::Level(EventLevel::default()) + Self::Level(EventLevelConfig::default()) } } -impl StandardEvent { - pub(crate) fn level(&self) -> EventLevel { - match self { - Self::Level(level) => *level, - Self::Conditional { level, .. } => *level, - } - } +#[derive(Debug)] +pub(crate) struct StandardEvent { + pub(crate) level: EventLevel, + pub(crate) condition: Condition, +} - pub(crate) fn condition(&self) -> Option<&Arc>>> { - match self { - Self::Level(_) => None, - Self::Conditional { condition, .. } => Some(condition), +impl StandardEvent { + pub(crate) fn from_config(config: &StandardEventConfig) -> Option { + match &config { + StandardEventConfig::Level(level) => EventLevel::from_config(level).map(|level| Self { + level, + condition: Condition::True, + }), + StandardEventConfig::Conditional { level, condition } => EventLevel::from_config(level) + .map(|level| Self { + level, + condition: condition.clone(), + }), } } } #[derive(Deserialize, JsonSchema, Clone, Debug, Default, PartialEq, Copy)] #[serde(rename_all = "snake_case")] -pub(crate) enum EventLevel { +pub(crate) enum EventLevelConfig { Info, Warn, Error, @@ -612,6 +656,24 @@ pub(crate) enum EventLevel { Off, } +#[derive(Debug, PartialEq, Clone, Copy)] +pub(crate) enum EventLevel { + Info, + Warn, + Error, +} + +impl EventLevel { + pub(crate) fn from_config(config: &EventLevelConfig) -> Option { + match config { + EventLevelConfig::Off => None, + EventLevelConfig::Info => Some(EventLevel::Info), + EventLevelConfig::Warn => Some(EventLevel::Warn), + EventLevelConfig::Error => Some(EventLevel::Error), + } + } +} + /// An event that can be logged as part of a trace. /// The event has an implicit `type` attribute that matches the name of the event in the yaml /// and a message that can be used to provide additional information. @@ -622,7 +684,11 @@ where E: Debug, { /// The log level of the event. +<<<<<<< HEAD level: EventLevel, +======= + pub(super) level: EventLevelConfig, +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) /// The event message. message: Arc, @@ -671,6 +737,7 @@ where A: Selectors + Default, T: Selector + Debug, { +<<<<<<< HEAD inner: Mutex>, } @@ -687,106 +754,100 @@ where condition: Condition, attributes: Vec, _phantom: PhantomData, +======= + pub(super) name: String, + pub(super) level: EventLevel, + pub(super) event_on: EventOn, + pub(super) message: Arc, + pub(super) selectors: Arc>, + pub(super) condition: Condition, + pub(super) attributes: Vec, + pub(super) _phantom: PhantomData, +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) } -impl Instrumented - for CustomEvent +impl CustomEvent where - A: Selectors + Default, + A: Selectors + Default + Clone + Debug, T: Selector + Debug - + Debug, + + Clone, { - type Request = Request; - type Response = Response; - type EventResponse = EventResponse; - - fn on_request(&self, request: &Self::Request) { - let mut inner = self.inner.lock(); - if inner.condition.evaluate_request(request) != Some(true) - && inner.event_on == EventOn::Request + pub(crate) fn from_config(name: &str, config: &Event) -> Option { + EventLevel::from_config(&config.level).map(|level| Self { + name: name.to_owned(), + level, + event_on: config.on, + message: config.message.clone(), + selectors: config.attributes.clone(), + condition: config.condition.clone(), + attributes: Vec::new(), + _phantom: PhantomData, + }) + } + + pub(crate) fn on_request(&mut self, request: &Request) { + if self.condition.evaluate_request(request) != Some(true) + && self.event_on == EventOn::Request { return; } - if let Some(selectors) = &inner.selectors { - inner.attributes = selectors.on_request(request); - } + self.attributes = self.selectors.on_request(request); - if inner.event_on == EventOn::Request - && inner.condition.evaluate_request(request) != Some(false) + if self.event_on == EventOn::Request + && self.condition.evaluate_request(request) != Some(false) { - let attrs = std::mem::take(&mut inner.attributes); - inner.send_event(attrs); + let attrs = std::mem::take(&mut self.attributes); + log_event(self.level, &self.name, attrs, &self.message); } } - fn on_response(&self, response: &Self::Response) { - let mut inner = self.inner.lock(); - if inner.event_on != EventOn::Response { + pub(crate) fn on_response(&mut self, response: &Response) { + if self.event_on != EventOn::Response { return; } - if !inner.condition.evaluate_response(response) { + if !self.condition.evaluate_response(response) { return; } - if let Some(selectors) = &inner.selectors { - let mut new_attributes = selectors.on_response(response); - inner.attributes.append(&mut new_attributes); - } + let mut new_attributes = self.selectors.on_response(response); + self.attributes.append(&mut new_attributes); - let attrs = std::mem::take(&mut inner.attributes); - inner.send_event(attrs); + let attrs = std::mem::take(&mut self.attributes); + log_event(self.level, &self.name, attrs, &self.message); } - fn on_response_event(&self, response: &Self::EventResponse, ctx: &Context) { - let inner = self.inner.lock(); - if inner.event_on != EventOn::EventResponse { + pub(crate) fn on_response_event(&self, response: &EventResponse, ctx: &Context) { + if self.event_on != EventOn::EventResponse { return; } - if !inner.condition.evaluate_event_response(response, ctx) { + if !self.condition.evaluate_event_response(response, ctx) { return; } - let mut attributes = inner.attributes.clone(); - if let Some(selectors) = &inner.selectors { - let mut new_attributes = selectors.on_response_event(response, ctx); - attributes.append(&mut new_attributes); - } + let mut attributes = self.attributes.clone(); + let mut new_attributes = self.selectors.on_response_event(response, ctx); + attributes.append(&mut new_attributes); // Stub span to make sure the custom attributes are saved in current span extensions // It won't be extracted or sampled at all if Span::current().is_none() { let span = info_span!("supergraph_event_send_event"); let _entered = span.enter(); - inner.send_event(attributes); + log_event(self.level, &self.name, attributes, &self.message); } else { - inner.send_event(attributes); + log_event(self.level, &self.name, attributes, &self.message); } } - fn on_error(&self, error: &BoxError, ctx: &Context) { - let mut inner = self.inner.lock(); - if inner.event_on != EventOn::Error { + pub(crate) fn on_error(&mut self, error: &BoxError, ctx: &Context) { + if self.event_on != EventOn::Error { return; } - if let Some(selectors) = &inner.selectors { - let mut new_attributes = selectors.on_error(error, ctx); - inner.attributes.append(&mut new_attributes); - } - - let attrs = std::mem::take(&mut inner.attributes); - inner.send_event(attrs); - } -} + let mut new_attributes = self.selectors.on_error(error, ctx); + self.attributes.append(&mut new_attributes); -impl - CustomEventInner -where - A: Selectors + Default, - T: Selector + Debug + Debug, -{ - #[inline] - fn send_event(&self, attributes: Vec) { - log_event(self.level, &self.name, attributes, &self.message); + let attrs = std::mem::take(&mut self.attributes); + log_event(self.level, &self.name, attrs, &self.message); } } @@ -809,7 +870,6 @@ pub(crate) fn log_event(level: EventLevel, kind: &str, attributes: Vec EventLevel::Error => { ::tracing::error!(%kind, "{}", message) } - EventLevel::Off => {} } } diff --git a/apollo-router/src/plugins/telemetry/fmt_layer.rs b/apollo-router/src/plugins/telemetry/fmt_layer.rs index 5b3b451801..187d1ed4d0 100644 --- a/apollo-router/src/plugins/telemetry/fmt_layer.rs +++ b/apollo-router/src/plugins/telemetry/fmt_layer.rs @@ -266,6 +266,13 @@ mod tests { use http::HeaderValue; use http::header::CONTENT_LENGTH; +<<<<<<< HEAD +======= + use parking_lot::Mutex; + use parking_lot::MutexGuard; + use tests::events::EventLevel; + use tests::events::RouterResponseBodyExtensionType; +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) use tracing::error; use tracing::info; use tracing::info_span; @@ -275,9 +282,7 @@ mod tests { use super::*; use crate::graphql; use crate::plugins::telemetry::config_new::events; - use crate::plugins::telemetry::config_new::events::EventLevel; use crate::plugins::telemetry::config_new::events::log_event; - use crate::plugins::telemetry::config_new::instruments::Instrumented; use crate::plugins::telemetry::config_new::logging::JsonFormat; use crate::plugins::telemetry::config_new::logging::RateLimit; use crate::plugins::telemetry::config_new::logging::TextFormat; @@ -706,7 +711,7 @@ subgraph: error!(http.method = "GET", "Hello from test"); - let router_events = event_config.new_router_events(); + let mut router_events = event_config.new_router_events(); let router_req = router::Request::fake_builder() .header(CONTENT_LENGTH, "0") .header("custom-header", "val1") @@ -724,7 +729,7 @@ subgraph: .expect("expecting valid response"); router_events.on_response(&router_resp); - let supergraph_events = event_config.new_supergraph_events(); + let mut supergraph_events = event_config.new_supergraph_events(); let supergraph_req = supergraph::Request::fake_builder() .query("query { foo }") .header("x-log-request", HeaderValue::from_static("log")) @@ -740,7 +745,7 @@ subgraph: .expect("expecting valid response"); supergraph_events.on_response(&supergraph_resp); - let subgraph_events = event_config.new_subgraph_events(); + let mut subgraph_events = event_config.new_subgraph_events(); let mut subgraph_req = http::Request::new( graphql::Request::fake_builder() .query("query { foo }") @@ -764,7 +769,7 @@ subgraph: .expect("expecting valid response"); subgraph_events.on_response(&subgraph_resp); - let subgraph_events = event_config.new_subgraph_events(); + let mut subgraph_events = event_config.new_subgraph_events(); let mut subgraph_req = http::Request::new( graphql::Request::fake_builder() .query("query { foo }") @@ -787,6 +792,221 @@ subgraph: .build() .expect("expecting valid response"); subgraph_events.on_response(&subgraph_resp); +<<<<<<< HEAD +======= + + let context = crate::Context::default(); + let mut http_request = http::Request::builder().body("".into()).unwrap(); + http_request + .headers_mut() + .insert("x-log-request", HeaderValue::from_static("log")); + let transport_request = TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }); + let connector = Arc::new(Connector { + id: ConnectId::new( + "connector_subgraph".into(), + Some("source".into()), + name!(Query), + name!(users), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str("/test").unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + batch_settings: None, + request_headers: Default::default(), + response_headers: Default::default(), + }); + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + let connector_request = Request { + context: context.clone(), + connector: connector.clone(), + service_name: Default::default(), + transport_request, + key: response_key.clone(), + mapping_problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], + supergraph_request: Default::default(), + }; + let mut connector_events = event_config.new_connector_events(); + connector_events.on_request(&connector_request); + + let connector_response = Response { + context, + connector: connector.clone(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() + .status(200) + .header("x-log-response", HeaderValue::from_static("log")) + .body(body::empty()) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: response_key, + problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], + }, + }; + connector_events.on_response(&connector_response); + }, + ); + + insta::assert_snapshot!(buff.to_string()); + } + + #[tokio::test] + async fn test_json_logging_deduplicates_attributes() { + let buff = LogBuffer::default(); + let text_format = JsonFormat { + display_span_list: false, + display_current_span: false, + display_resource: false, + ..Default::default() + }; + let format = Json::new(Default::default(), text_format); + let fmt_layer = FmtLayer::new( + RateLimitFormatter::new(format, &RateLimit::default()), + buff.clone(), + ) + .boxed(); + + let event_config: events::Events = serde_yaml::from_str( + r#" +subgraph: + request: info + response: warn + error: error + event.with.duplicate.attribute: + message: "this event has a duplicate attribute" + level: error + on: response + attributes: + subgraph.name: true + static: foo # This shows up twice without attribute deduplication + "#, + ) + .unwrap(); + + ::tracing::subscriber::with_default( + fmt::Subscriber::new() + .with(otel::layer().force_sampling()) + .with(fmt_layer), + move || { + let test_span = info_span!("test"); + let _enter = test_span.enter(); + + let mut router_events = event_config.new_router_events(); + let mut supergraph_events = event_config.new_supergraph_events(); + let mut subgraph_events = event_config.new_subgraph_events(); + + // In: Router -> Supergraph -> Subgraphs + let router_req = router::Request::fake_builder().build().unwrap(); + router_events.on_request(&router_req); + + let supergraph_req = supergraph::Request::fake_builder() + .query("query { foo }") + .build() + .unwrap(); + supergraph_events.on_request(&supergraph_req); + + let subgraph_req_1 = subgraph::Request::fake_builder() + .subgraph_name("subgraph") + .subgraph_request(http::Request::new( + graphql::Request::fake_builder() + .query("query { foo }") + .build(), + )) + .build(); + subgraph_events.on_request(&subgraph_req_1); + + let subgraph_req_2 = subgraph::Request::fake_builder() + .subgraph_name("subgraph_bis") + .subgraph_request(http::Request::new( + graphql::Request::fake_builder() + .query("query { foo }") + .build(), + )) + .build(); + subgraph_events.on_request(&subgraph_req_2); + + // Out: Subgraphs -> Supergraph -> Router + let subgraph_resp_1 = subgraph::Response::fake2_builder() + .data(serde_json::json!({"products": [{"id": 1234, "name": "first_name"}, {"id": 567, "name": "second_name"}]})) + .build() + .expect("expecting valid response"); + subgraph_events.on_response(&subgraph_resp_1); + + let subgraph_resp_2 = subgraph::Response::fake2_builder() + .data(serde_json::json!({"products": [{"id": 1234, "name": "first_name"}, {"id": 567, "name": "second_name"}], "other": {"foo": "bar"}})) + .build() + .expect("expecting valid response"); + subgraph_events.on_response(&subgraph_resp_2); + + let supergraph_resp = supergraph::Response::fake_builder() + .data(serde_json::json!({"data": "res"}).to_string()) + .build() + .expect("expecting valid response"); + supergraph_events.on_response(&supergraph_resp); + + let router_resp = router::Response::fake_builder() + .data(serde_json_bytes::json!({"data": "res"})) + .build() + .expect("expecting valid response"); + router_events.on_response(&router_resp); +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) }, ); @@ -845,7 +1065,7 @@ subgraph: error!(http.method = "GET", "Hello from test"); - let router_events = event_config.new_router_events(); + let mut router_events = event_config.new_router_events(); let router_req = router::Request::fake_builder() .header(CONTENT_LENGTH, "0") .header("custom-header", "val1") @@ -863,7 +1083,7 @@ subgraph: .expect("expecting valid response"); router_events.on_response(&router_resp); - let supergraph_events = event_config.new_supergraph_events(); + let mut supergraph_events = event_config.new_supergraph_events(); let supergraph_req = supergraph::Request::fake_builder() .query("query { foo }") .header("x-log-request", HeaderValue::from_static("log")) @@ -879,7 +1099,7 @@ subgraph: .expect("expecting valid response"); supergraph_events.on_response(&supergraph_resp); - let subgraph_events = event_config.new_subgraph_events(); + let mut subgraph_events = event_config.new_subgraph_events(); let mut subgraph_req = http::Request::new( graphql::Request::fake_builder() .query("query { foo }") @@ -903,7 +1123,7 @@ subgraph: .expect("expecting valid response"); subgraph_events.on_response(&subgraph_resp); - let subgraph_events = event_config.new_subgraph_events(); + let mut subgraph_events = event_config.new_subgraph_events(); let mut subgraph_req = http::Request::new( graphql::Request::fake_builder() .query("query { foo }") @@ -926,6 +1146,116 @@ subgraph: .build() .expect("expecting valid response"); subgraph_events.on_response(&subgraph_resp); +<<<<<<< HEAD +======= + + let context = crate::Context::default(); + let mut http_request = http::Request::builder().body("".into()).unwrap(); + http_request + .headers_mut() + .insert("x-log-request", HeaderValue::from_static("log")); + let transport_request = TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }); + let connector = Arc::new(Connector { + id: ConnectId::new( + "connector_subgraph".into(), + Some("source".into()), + name!(Query), + name!(users), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str("/test").unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + batch_settings: None, + request_headers: Default::default(), + response_headers: Default::default(), + }); + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + let connector_request = Request { + context: context.clone(), + connector: connector.clone(), + service_name: Default::default(), + transport_request, + key: response_key.clone(), + mapping_problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], + supergraph_request: Default::default(), + }; + let mut connector_events = event_config.new_connector_events(); + connector_events.on_request(&connector_request); + + let connector_response = Response { + context, + connector: connector.clone(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() + .status(200) + .header("x-log-response", HeaderValue::from_static("log")) + .body(body::empty()) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: response_key, + problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], + }, + }; + connector_events.on_response(&connector_response); +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) }, ); diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index ebd7b5d501..cfc4726fcc 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -443,7 +443,7 @@ impl PluginPrivate for Telemetry { .new_router_instruments(static_router_instruments.clone()); custom_instruments.on_request(request); - let custom_events: RouterEvents = + let mut custom_events: RouterEvents = config_request.instrumentation.events.new_router_events(); custom_events.on_request(request); @@ -454,7 +454,7 @@ impl PluginPrivate for Telemetry { request.context.clone(), ) }, - move |(custom_attributes, custom_instruments, custom_events, ctx): ( + move |(custom_attributes, custom_instruments, mut custom_events, ctx): ( Vec, RouterInstruments, RouterEvents, @@ -651,12 +651,30 @@ impl PluginPrivate for Telemetry { .instruments.new_graphql_instruments(static_graphql_instruments.clone()); custom_graphql_instruments.on_request(req); - let supergraph_events = config.instrumentation.events.new_supergraph_events(); + let mut supergraph_events = + config.instrumentation.events.new_supergraph_events(); supergraph_events.on_request(req); (req.context.clone(), custom_instruments, custom_attributes, supergraph_events, custom_graphql_instruments) }, +<<<<<<< HEAD move |(ctx, custom_instruments, mut custom_attributes, supergraph_events, custom_graphql_instruments): (Context, SupergraphInstruments, Vec, SupergraphEvents, GraphQLInstruments), fut| { +======= + move |( + ctx, + custom_instruments, + mut custom_attributes, + mut supergraph_events, + custom_graphql_instruments, + ): ( + Context, + SupergraphInstruments, + Vec, + SupergraphEvents, + GraphQLInstruments, + ), + fut| { +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) let config = config_map_res.clone(); let sender = metrics_sender.clone(); let start = Instant::now(); @@ -766,7 +784,7 @@ impl PluginPrivate for Telemetry { .instruments .new_subgraph_instruments(static_subgraph_instruments.clone()); custom_instruments.on_request(sub_request); - let custom_events = config.instrumentation.events.new_subgraph_events(); + let mut custom_events = config.instrumentation.events.new_subgraph_events(); custom_events.on_request(sub_request); let custom_cache_instruments: CacheInstruments = config @@ -787,7 +805,7 @@ impl PluginPrivate for Telemetry { context, custom_instruments, custom_attributes, - custom_events, + mut custom_events, custom_cache_instruments, ): ( Context, @@ -856,6 +874,104 @@ impl PluginPrivate for Telemetry { .boxed() } +<<<<<<< HEAD +======= + fn connector_request_service( + &self, + service: connector::request_service::BoxService, + source_name: String, + ) -> connector::request_service::BoxService { + let req_fn_config = self.config.clone(); + let res_fn_config = self.config.clone(); + let span_mode = self.config.instrumentation.spans.mode; + let static_connector_instruments = self + .builtin_instruments + .read() + .connector_custom_instruments + .clone(); + ServiceBuilder::new() + .instrument(move |_req: &connector::request_service::Request| { + span_mode.create_connector(source_name.as_str()) + }) + .map_future_with_request_data( + move |request: &connector::request_service::Request| { + let custom_instruments = req_fn_config + .instrumentation + .instruments + .new_connector_instruments(static_connector_instruments.clone()); + custom_instruments.on_request(request); + let mut custom_events = + req_fn_config.instrumentation.events.new_connector_events(); + custom_events.on_request(request); + + let custom_span_attributes = req_fn_config + .instrumentation + .spans + .connector + .attributes + .on_request(request); + + ( + request.context.clone(), + Some((custom_instruments, custom_events, custom_span_attributes)), + ) + }, + move |(context, custom_telemetry): ( + Context, + Option<(ConnectorInstruments, ConnectorEvents, Vec)>, + ), + f: BoxFuture< + 'static, + Result, + >| { + let conf = res_fn_config.clone(); + async move { + match custom_telemetry { + Some(( + custom_instruments, + mut custom_events, + custom_span_attributes, + )) => { + let span = Span::current(); + span.set_span_dyn_attributes(custom_span_attributes); + + let result = f.await; + match &result { + Ok(response) => { + span.set_span_dyn_attributes( + conf.instrumentation + .spans + .connector + .attributes + .on_response(response), + ); + custom_instruments.on_response(response); + custom_events.on_response(response); + } + Err(err) => { + span.set_span_dyn_attributes( + conf.instrumentation + .spans + .connector + .attributes + .on_error(err, &context), + ); + custom_instruments.on_error(err, &context); + custom_events.on_error(err, &context); + } + } + result + } + _ => f.await, + } + } + }, + ) + .service(service) + .boxed() + } + +>>>>>>> e7d8e7bb (Simplify implementation of telementry's events (#7280)) fn web_endpoints(&self) -> MultiMap { self.custom_endpoints.clone() } diff --git a/apollo-router/src/services/connector/request_service.rs b/apollo-router/src/services/connector/request_service.rs new file mode 100644 index 0000000000..b01377b8d4 --- /dev/null +++ b/apollo-router/src/services/connector/request_service.rs @@ -0,0 +1,483 @@ +//! Service which makes individual requests to Apollo Connectors over some transport + +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; +use std::task::Poll; + +use apollo_federation::sources::connect::Connector; +use futures::future::BoxFuture; +use http::HeaderMap; +use http::HeaderValue; +use indexmap::IndexMap; +use opentelemetry::KeyValue; +use opentelemetry_semantic_conventions::trace::HTTP_REQUEST_METHOD; +use parking_lot::Mutex; +use serde_json_bytes::Value; +use static_assertions::assert_impl_all; +use tower::BoxError; +use tower::ServiceExt; +use tower::buffer::Buffer; + +use crate::Context; +use crate::error::FetchError; +use crate::graphql; +use crate::graphql::ErrorExtension; +use crate::json_ext::Path; +use crate::layers::DEFAULT_BUFFER_SIZE; +use crate::plugins::connectors::handle_responses::MappedResponse; +use crate::plugins::connectors::handle_responses::process_response; +use crate::plugins::connectors::make_requests::ResponseKey; +use crate::plugins::connectors::mapping::Problem; +use crate::plugins::connectors::plugin::debug::ConnectorContext; +use crate::plugins::connectors::plugin::debug::ConnectorDebugHttpRequest; +use crate::plugins::connectors::request_limit::RequestLimits; +use crate::plugins::connectors::tracing::CONNECTOR_TYPE_HTTP; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_BODY; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_HEADERS; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_URI; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_VERSION; +use crate::plugins::telemetry::config_new::connector::events::ConnectorEventRequest; +use crate::plugins::telemetry::config_new::events::EventLevel; +use crate::plugins::telemetry::config_new::events::log_event; +use crate::services::Plugins; +use crate::services::connector::request_service::transport::http::HttpRequest; +use crate::services::connector::request_service::transport::http::HttpResponse; +use crate::services::http::HttpClientServiceFactory; +use crate::services::router; + +pub(crate) mod transport; + +pub(crate) type BoxService = tower::util::BoxService; +pub(crate) type ServiceResult = Result; + +assert_impl_all!(Request: Send); +assert_impl_all!(Response: Send); + +/// Request type for a single connector request +#[derive(Debug)] +#[non_exhaustive] +pub struct Request { + /// The request context + pub(crate) context: Context, + + /// The connector associated with this request + // If this service moves into the public API, consider whether this exposes too much + // internal information about the connector. A new type may be needed which exposes only + // what is necessary for customizations. + pub(crate) connector: Arc, + + /// The service name for this connector + #[allow(dead_code)] + pub(crate) service_name: String, + + /// The request to the underlying transport + pub(crate) transport_request: TransportRequest, + + /// Information about how to map the response to GraphQL + pub(crate) key: ResponseKey, + + /// Mapping problems encountered when creating the transport request + pub(crate) mapping_problems: Vec, + + /// Original request to the Router. + pub(crate) supergraph_request: Arc>, +} + +/// Response type for a connector +#[derive(Debug)] +#[non_exhaustive] +pub struct Response { + /// The response context + #[allow(dead_code)] + pub(crate) context: Context, + + /// The connector associated with this response + #[allow(dead_code)] + pub(crate) connector: Arc, + + /// The result of the transport request + pub(crate) transport_result: Result, + + /// The mapped response, including any mapping problems encountered when processing the response + pub(crate) mapped_response: MappedResponse, +} + +#[buildstructor::buildstructor] +impl Response { + #[builder(visibility = "pub")] + pub(crate) fn error_new( + context: Context, + connector: Arc, + error: Error, + message: String, + response_key: ResponseKey, + ) -> Self { + let graphql_error = graphql::Error::builder() + .message(message) + .extension_code(error.extension_code()) + .build(); + + let mapped_response = MappedResponse::Error { + error: graphql_error, + key: response_key, + }; + + Self { + context, + connector, + transport_result: Err(error), + mapped_response, + } + } + + #[builder(visibility = "pub")] + pub(crate) fn test_new( + context: Context, + connector: Arc, + response_key: ResponseKey, + problems: Vec, + data: Value, + headers: Option>, + ) -> Self { + let mapped_response = MappedResponse::Data { + data: data.clone(), + problems, + key: response_key, + }; + + let mut response_builder = http::Response::builder(); + if let Some(headers) = headers { + for (header_name, header_value) in headers.iter() { + response_builder = response_builder.header(header_name, header_value); + } + } + let (parts, _value) = response_builder.body(data).unwrap().into_parts(); + let http_response = HttpResponse { inner: parts }; + + Self { + context, + connector, + transport_result: Ok(http_response.into()), + mapped_response, + } + } +} + +/// Request to an underlying transport +#[derive(Debug)] +#[non_exhaustive] +pub(crate) enum TransportRequest { + /// A request to an HTTP transport + Http(HttpRequest), +} + +/// Response from an underlying transport +#[derive(Debug)] +#[non_exhaustive] +pub(crate) enum TransportResponse { + /// A response from an HTTP transport + Http(HttpResponse), +} + +impl From for TransportRequest { + fn from(value: HttpRequest) -> Self { + Self::Http(value) + } +} + +impl From for TransportResponse { + fn from(value: HttpResponse) -> Self { + Self::Http(value) + } +} + +/// An error sending a connector request. This represents a problem with sending the request +/// to the connector, rather than an error returned from the connector itself. +#[derive(Debug, thiserror::Error, displaydoc::Display)] +pub(crate) enum Error { + /// Request limit exceeded + RequestLimitExceeded, + + /// Rate limit exceeded + RateLimited, + + /// Timeout + GatewayTimeout, + + /// {0} + TransportFailure(#[from] BoxError), +} + +impl Clone for Error { + fn clone(&self) -> Self { + match self { + Self::TransportFailure(err) => Self::TransportFailure(BoxError::from(err.to_string())), + err => err.clone(), + } + } +} + +impl Error { + /// Create a GraphQL error from this error. + #[must_use] + pub(crate) fn to_graphql_error( + &self, + connector: Arc, + path: Option, + ) -> crate::error::Error { + use serde_json_bytes::*; + + let builder = graphql::Error::builder() + .message(self.to_string()) + .extension_code(self.extension_code()) + .extension("service", connector.id.subgraph_name.clone()) + .extension( + "connector", + Value::Object(Map::from_iter([( + "coordinate".into(), + Value::String(connector.id.coordinate().into()), + )])), + ); + if let Some(path) = path { + builder.path(path).build() + } else { + builder.build() + } + } +} + +impl ErrorExtension for Error { + fn extension_code(&self) -> String { + match self { + Self::RequestLimitExceeded => "REQUEST_LIMIT_EXCEEDED", + Self::TransportFailure(_) => "HTTP_CLIENT_ERROR", + Self::RateLimited => "REQUEST_RATE_LIMITED", + Self::GatewayTimeout => "GATEWAY_TIMEOUT", + } + .to_string() + } +} + +#[derive(Clone)] +pub(crate) struct ConnectorRequestServiceFactory { + pub(crate) services: Arc>>>, +} + +impl ConnectorRequestServiceFactory { + pub(crate) fn new( + http_client_service_factory: Arc>, + plugins: Arc, + connector_sources: Arc>, + ) -> Self { + let mut map = HashMap::with_capacity(connector_sources.len()); + for source in connector_sources.iter() { + let service = Buffer::new( + plugins + .iter() + .rev() + .fold( + ConnectorRequestService { + http_client_service_factory: http_client_service_factory.clone(), + } + .boxed(), + |acc, (_, e)| e.connector_request_service(acc, source.clone()), + ) + .boxed(), + DEFAULT_BUFFER_SIZE, + ); + map.insert(source.clone(), service); + } + + Self { + services: Arc::new(map), //connector_sources, + } + } + + pub(crate) fn create(&self, source_name: String) -> BoxService { + // Note: We have to box our cloned service to erase the type of the Buffer. + self.services + .get(&source_name) + .map(|svc| svc.clone().boxed()) + .expect("We should always get a service, even if it is a blank/default one") + } +} + +/// A service for executing individual requests to Apollo Connectors +#[derive(Clone)] +pub(crate) struct ConnectorRequestService { + pub(crate) http_client_service_factory: Arc>, +} + +impl tower::Service for ConnectorRequestService { + type Response = Response; + type Error = BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + let original_subgraph_name = request.connector.id.subgraph_name.to_string(); + let http_client_service_factory = self.http_client_service_factory.clone(); + + // Load the information needed from the context + let (debug, connector_request_event, request_limit) = + request.context.extensions().with_lock(|lock| { + ( + lock.get::>>().cloned(), + lock.get::().cloned(), + lock.get::>() + .map(|limits| { + limits.get( + (&request.connector.id).into(), + request.connector.max_requests, + ) + }) + .unwrap_or(None), + ) + }); + + let log_request_level = connector_request_event.and_then(|s| { + if s.condition.lock().evaluate_request(&request) == Some(true) { + Some(s.level) + } else { + None + } + }); + + Box::pin(async move { + let mut debug_request: Option = None; + let result = if request_limit.is_some_and(|request_limit| !request_limit.allow()) { + Err(Error::RequestLimitExceeded) + } else { + let result = match request.transport_request { + TransportRequest::Http(http_request) => { + debug_request = http_request.debug; + + log_request( + &http_request.inner, + log_request_level, + &request.connector.id.label, + ); + + let source_name = request.connector.source_config_key(); + + if let Some(http_client_service_factory) = + http_client_service_factory.get(&source_name).cloned() + { + let (parts, body) = http_request.inner.into_parts(); + let http_request = + http::Request::from_parts(parts, router::body::from_bytes(body)); + + http_client_service_factory + .create(&original_subgraph_name) + .oneshot(crate::services::http::HttpRequest { + http_request, + context: request.context.clone(), + }) + .await + .map(|result| result.http_response) + .map_err(|e| replace_subgraph_name(e, &request.connector).into()) + } else { + Err(Error::TransportFailure("no http client found".into())) + } + } + }; + + u64_counter!( + "apollo.router.operations.connectors", + "Total number of requests to connectors", + 1, + "connector.type" = CONNECTOR_TYPE_HTTP, + "subgraph.name" = original_subgraph_name + ); + + result + }; + + Ok(process_response( + result, + request.key.clone(), + request.connector, + &request.context, + debug_request, + &debug, + request.supergraph_request, + ) + .await) + }) + } +} + +/// Log an event for this request, if configured +fn log_request( + request: &http::Request, + log_request_level: Option, + label: &str, +) { + if let Some(level) = log_request_level { + let mut attrs = Vec::with_capacity(5); + + #[cfg(test)] + let headers = { + let mut headers: IndexMap = request + .headers() + .clone() + .into_iter() + .filter_map(|(name, val)| Some((name?.to_string(), val))) + .collect(); + headers.sort_keys(); + headers + }; + #[cfg(not(test))] + let headers = request.headers().clone(); + + attrs.push(KeyValue::new( + HTTP_REQUEST_HEADERS, + opentelemetry::Value::String(format!("{:?}", headers).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_METHOD, + opentelemetry::Value::String(request.method().as_str().to_string().into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_URI, + opentelemetry::Value::String(format!("{}", request.uri()).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_VERSION, + opentelemetry::Value::String(format!("{:?}", request.version()).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_BODY, + opentelemetry::Value::String(request.body().clone().into()), + )); + log_event( + level, + "connector.request", + attrs, + &format!("Request to connector {label:?}"), + ); + } +} + +/// Replace the internal subgraph name in an error with the connector label +fn replace_subgraph_name(err: BoxError, connector: &Connector) -> BoxError { + match err.downcast::() { + Ok(inner) => match *inner { + FetchError::SubrequestHttpError { + status_code, + service: _, + reason, + } => Box::new(FetchError::SubrequestHttpError { + status_code, + service: connector.id.subgraph_source(), + reason, + }), + _ => inner, + }, + Err(e) => e, + } +} diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 9974cf19d5..5c364caa85 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -484,15 +484,12 @@ async fn call_websocket( let subgraph_request_event = context .extensions() .with_lock(|lock| lock.get::().cloned()); - let log_request_level = subgraph_request_event.and_then(|s| match s.0.condition() { - Some(condition) => { - if condition.lock().evaluate_request(&request) == Some(true) { - Some(s.0.level()) - } else { - None - } + let log_request_level = subgraph_request_event.and_then(|s| { + if s.condition.lock().evaluate_request(&request) == Some(true) { + Some(s.level) + } else { + None } - None => Some(s.0.level()), }); let SubgraphRequest { @@ -959,7 +956,7 @@ pub(crate) async fn process_batch( let subgraph_response_event = batch_context .extensions() .with_lock(|lock| lock.get::().cloned()); - if let Some(level) = subgraph_response_event { + if let Some(event) = subgraph_response_event { let mut attrs = Vec::with_capacity(5); attrs.push(KeyValue::new( Key::from_static_str("http.response.headers"), @@ -984,7 +981,7 @@ pub(crate) async fn process_batch( opentelemetry::Value::String(service.clone().into()), )); log_event( - level.0.level(), + event.level, "subgraph.response", attrs, &format!("Raw response from subgraph {service:?} received"), @@ -1275,15 +1272,12 @@ pub(crate) async fn call_single_http( let subgraph_request_event = context .extensions() .with_lock(|lock| lock.get::().cloned()); - let log_request_level = subgraph_request_event.and_then(|s| match s.0.condition() { - Some(condition) => { - if condition.lock().evaluate_request(&request) == Some(true) { - Some(s.0.level()) - } else { - None - } + let log_request_level = subgraph_request_event.and_then(|s| { + if s.condition.lock().evaluate_request(&request) == Some(true) { + Some(s.level) + } else { + None } - None => Some(s.0.level()), }); let SubgraphRequest { @@ -1405,25 +1399,25 @@ pub(crate) async fn call_single_http( } if let Some(subgraph_response_event) = subgraph_response_event { - let mut should_log = true; - if let Some(condition) = subgraph_response_event.0.condition() { - // We have to do this in order to use selectors - let mut resp_builder = http::Response::builder() - .status(parts.status) - .version(parts.version); - if let Some(headers) = resp_builder.headers_mut() { - *headers = parts.headers.clone(); - } - let subgraph_response = SubgraphResponse::new_from_response( - resp_builder - .body(graphql::Response::default()) - .expect("it won't fail everything is coming from an existing response"), - context.clone(), - service_name.to_owned(), - subgraph_request_id.clone(), - ); - should_log = condition.lock().evaluate_response(&subgraph_response); + // We have to do this in order to use selectors + let mut resp_builder = http::Response::builder() + .status(parts.status) + .version(parts.version); + if let Some(headers) = resp_builder.headers_mut() { + *headers = parts.headers.clone(); } + let subgraph_response = SubgraphResponse::new_from_response( + resp_builder + .body(graphql::Response::default()) + .expect("it won't fail everything is coming from an existing response"), + context.clone(), + service_name.to_owned(), + subgraph_request_id.clone(), + ); + + let should_log = subgraph_response_event + .condition + .evaluate_response(&subgraph_response); if should_log { let mut attrs = Vec::with_capacity(5); attrs.push(KeyValue::new( @@ -1449,7 +1443,7 @@ pub(crate) async fn call_single_http( opentelemetry::Value::String(service_name.to_string().into()), )); log_event( - subgraph_response_event.0.level(), + subgraph_response_event.level, "subgraph.response", attrs, &format!("Raw response from subgraph {service_name:?} received"), diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index de3661db33..dc97d521ec 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -392,10 +392,11 @@ async fn service_call( )); let ctx = context.clone(); let response_stream = Box::pin(response_stream.inspect(move |resp| { - if let Some(condition) = supergraph_response_event.0.condition() { - if !condition.lock().evaluate_event_response(resp, &ctx) { - return; - } + if !supergraph_response_event + .condition + .evaluate_event_response(resp, &ctx) + { + return; } attrs.push(KeyValue::new( Key::from_static_str("http.response.body"), @@ -404,7 +405,7 @@ async fn service_call( ), )); log_event( - supergraph_response_event.0.level(), + supergraph_response_event.level, "supergraph.response", attrs.clone(), "", diff --git a/apollo-router/tests/integration/telemetry/fixtures/trace_id_via_header.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/trace_id_via_header.router.yaml index a213522b36..b244673a47 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/trace_id_via_header.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/trace_id_via_header.router.yaml @@ -1,9 +1,6 @@ telemetry: - instrumentation: - spans: - mode: spec_compliant router: attributes: @@ -30,4 +27,3 @@ telemetry: display_span_id: true ansi_escape_codes: false display_current_span: true - diff --git a/apollo-router/tests/integration/telemetry/propagation.rs b/apollo-router/tests/integration/telemetry/propagation.rs index 2be357cdd4..64fa0ecc2e 100644 --- a/apollo-router/tests/integration/telemetry/propagation.rs +++ b/apollo-router/tests/integration/telemetry/propagation.rs @@ -1,9 +1,7 @@ -use serde_json::json; use tower::BoxError; use crate::integration::common::IntegrationTest; use crate::integration::common::Query; -use crate::integration::common::Telemetry; use crate::integration::common::graph_os_enabled; #[tokio::test(flavor = "multi_thread")] @@ -13,11 +11,13 @@ async fn test_trace_id_via_header() -> Result<(), BoxError> { return Ok(()); } async fn make_call(router: &mut IntegrationTest, trace_id: &str) { - let _ = router.execute_query(Query::builder().body(json!({"query":"query {topProducts{name, name, name, name, name, name, name, name, name, name}}","variables":{}})).header("id_from_header".to_string(), trace_id.to_string()).build()).await; + let query = Query::builder() + .header("id_from_header".to_string(), trace_id.to_string()) + .build(); + let _ = router.execute_query(query).await; } let mut router = IntegrationTest::builder() - .telemetry(Telemetry::None) .config(include_str!("fixtures/trace_id_via_header.router.yaml")) .build() .await;