diff --git a/.changesets/feat_rreg_experimental_subgraph_metrics.md b/.changesets/feat_rreg_experimental_subgraph_metrics.md new file mode 100644 index 0000000000..481da985e6 --- /dev/null +++ b/.changesets/feat_rreg_experimental_subgraph_metrics.md @@ -0,0 +1,18 @@ +### [Subgraph Insights] Experimental Apollo Subgraph Fetch Histogram ([PR #8013](https://github.com/apollographql/router/pull/8013)) + +This change adds a new, experimental histogram to capture subgraph fetch duration for GraphOS. This will +eventually be used to power subgraph-level insights in Apollo Studio. + +This can be toggled on using a new boolean config flag: + +```yaml +telemetry: + apollo: + experimental_subgraph_metrics: true +``` + +The new instrument is only sent to GraphOS and is not available in 3rd-party OTel export targets. It is not currently +customizable. Users requiring a customizable alternative can use the existing `http.client.request.duration` +instrument, which measures the same value. + +By [@rregitsky](https://github.com/rregitsky) in https://github.com/apollographql/router/pull/8013 \ No newline at end of file diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index c0853a466a..ec32c83768 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -1,6 +1,7 @@ --- source: apollo-router/src/configuration/tests.rs expression: "&schema" +snapshot_kind: text --- { "$schema": "http://json-schema.org/draft-07/schema#", @@ -1496,6 +1497,11 @@ expression: "&schema" "$ref": "#/definitions/Protocol", "description": "#/definitions/Protocol" }, + "experimental_subgraph_metrics": { + "default": false, + "description": "Enable sending additional subgraph metrics to Apollo Studio via OTLP", + "type": "boolean" + }, "field_level_instrumentation_sampler": { "$ref": "#/definitions/SamplerOption", "description": "#/definitions/SamplerOption" diff --git a/apollo-router/src/metrics/filter.rs b/apollo-router/src/metrics/filter.rs index 1b46ee8135..dd9115648e 100644 --- a/apollo-router/src/metrics/filter.rs +++ b/apollo-router/src/metrics/filter.rs @@ -89,7 +89,8 @@ impl FilterMeterProvider { } fn get_private_realtime_regex() -> Regex { - Regex::new(r"apollo\.router\.operations\.error").expect("regex should have been valid") + Regex::new(r"apollo\.router\.operations\.(?:error|fetch\.duration)") + .expect("regex should have been valid") } pub(crate) fn private_realtime>(delegate: T) -> Self { diff --git a/apollo-router/src/plugins/telemetry/apollo.rs b/apollo-router/src/plugins/telemetry/apollo.rs index fad1094f17..9b3c700451 100644 --- a/apollo-router/src/plugins/telemetry/apollo.rs +++ b/apollo-router/src/plugins/telemetry/apollo.rs @@ -119,6 +119,9 @@ pub(crate) struct Config { /// Enable field metrics that are generated without FTV1 to be sent to Apollo Studio. pub(crate) experimental_local_field_metrics: bool, + + /// Enable sending additional subgraph metrics to Apollo Studio via OTLP + pub(crate) experimental_subgraph_metrics: bool, } #[derive(Debug, Clone, Deserialize, JsonSchema, Default)] @@ -253,6 +256,7 @@ impl Default for Config { signature_normalization_algorithm: ApolloSignatureNormalizationAlgorithm::default(), experimental_local_field_metrics: false, metrics_reference_mode: ApolloMetricsReferenceMode::default(), + experimental_subgraph_metrics: false, } } } diff --git a/apollo-router/src/plugins/telemetry/config_new/apollo/instruments.rs b/apollo-router/src/plugins/telemetry/config_new/apollo/instruments.rs new file mode 100644 index 0000000000..3b2d457e91 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/apollo/instruments.rs @@ -0,0 +1,176 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use opentelemetry::metrics::MeterProvider; +use tokio::time::Instant; +use tower::BoxError; + +use crate::Context; +use crate::metrics; +use crate::plugins::telemetry::APOLLO_CLIENT_NAME_ATTRIBUTE; +use crate::plugins::telemetry::APOLLO_CLIENT_VERSION_ATTRIBUTE; +use crate::plugins::telemetry::APOLLO_HAS_ERRORS_ATTRIBUTE; +use crate::plugins::telemetry::APOLLO_OPERATION_ID_ATTRIBUTE; +use crate::plugins::telemetry::CLIENT_NAME; +use crate::plugins::telemetry::CLIENT_VERSION; +use crate::plugins::telemetry::GRAPHQL_OPERATION_NAME_ATTRIBUTE; +use crate::plugins::telemetry::GRAPHQL_OPERATION_TYPE_ATTRIBUTE; +use crate::plugins::telemetry::apollo::Config; +use crate::plugins::telemetry::config_new::attributes::StandardAttribute; +use crate::plugins::telemetry::config_new::extendable::Extendable; +use crate::plugins::telemetry::config_new::instruments::APOLLO_ROUTER_OPERATIONS_FETCH_DURATION; +use crate::plugins::telemetry::config_new::instruments::CustomHistogram; +use crate::plugins::telemetry::config_new::instruments::Increment; +use crate::plugins::telemetry::config_new::instruments::Instrumented; +use crate::plugins::telemetry::config_new::instruments::METER_NAME; +use crate::plugins::telemetry::config_new::instruments::StaticInstrument; +use crate::plugins::telemetry::config_new::selectors::OperationKind; +use crate::plugins::telemetry::config_new::selectors::OperationName; +use crate::plugins::telemetry::config_new::subgraph::attributes::SubgraphAttributes; +use crate::plugins::telemetry::config_new::subgraph::selectors::SubgraphSelector; +use crate::query_planner::APOLLO_OPERATION_ID; +use crate::services::subgraph; + +pub(crate) struct ApolloSubgraphInstruments { + pub(crate) apollo_router_operations_fetch_duration: Option< + CustomHistogram< + subgraph::Request, + subgraph::Response, + (), + SubgraphAttributes, + SubgraphSelector, + >, + >, +} + +impl ApolloSubgraphInstruments { + pub(crate) fn new( + static_instruments: Arc>, + apollo_config: Config, + ) -> Self { + let selectors = Extendable { + attributes: SubgraphAttributes::builder() + .subgraph_name(StandardAttribute::Bool(true)) + .build(), + custom: HashMap::from([ + ( + APOLLO_CLIENT_NAME_ATTRIBUTE.to_string(), + SubgraphSelector::ResponseContext { + response_context: CLIENT_NAME.to_string(), + redact: None, + default: None, + }, + ), + ( + APOLLO_CLIENT_VERSION_ATTRIBUTE.to_string(), + SubgraphSelector::ResponseContext { + response_context: CLIENT_VERSION.to_string(), + redact: None, + default: None, + }, + ), + ( + GRAPHQL_OPERATION_NAME_ATTRIBUTE.to_string(), + SubgraphSelector::SupergraphOperationName { + supergraph_operation_name: OperationName::String, + redact: None, + default: None, + }, + ), + ( + GRAPHQL_OPERATION_TYPE_ATTRIBUTE.to_string(), + SubgraphSelector::SupergraphOperationKind { + supergraph_operation_kind: OperationKind::String, + }, + ), + ( + APOLLO_OPERATION_ID_ATTRIBUTE.to_string(), + SubgraphSelector::ResponseContext { + response_context: APOLLO_OPERATION_ID.to_string(), + redact: None, + default: None, + }, + ), + ( + APOLLO_HAS_ERRORS_ATTRIBUTE.to_string(), + SubgraphSelector::OnGraphQLError { + subgraph_on_graphql_error: true, + }, + ), + ]), + }; + let attribute_count = selectors.custom.len() + 1; // 1 for subgraph_name on attributes + + let apollo_router_operations_fetch_duration = + apollo_config.experimental_subgraph_metrics.then(|| { + CustomHistogram::builder() + .increment(Increment::Duration(Instant::now())) + .attributes(Vec::with_capacity(attribute_count)) + .selectors(Arc::new(selectors)) + .histogram(static_instruments + .get(APOLLO_ROUTER_OPERATIONS_FETCH_DURATION) + .expect( + "cannot get apollo static instrument for subgraph; this should not happen", + ) + .as_histogram() + .cloned() + .expect( + "cannot convert apollo instrument to histogram for subgraph; this should not happen", + ) + ) + .build() + }); + + Self { + apollo_router_operations_fetch_duration, + } + } + + pub(crate) fn new_builtin() -> HashMap { + let meter = metrics::meter_provider().meter(METER_NAME); + let mut static_instruments = HashMap::with_capacity(1); + + static_instruments.insert( + APOLLO_ROUTER_OPERATIONS_FETCH_DURATION.to_string(), + StaticInstrument::Histogram( + meter + .f64_histogram(APOLLO_ROUTER_OPERATIONS_FETCH_DURATION) + .with_unit("s") + .with_description("Duration of a subgraph fetch.") + .init(), + ), + ); + + static_instruments + } +} + +impl Instrumented for ApolloSubgraphInstruments { + type Request = subgraph::Request; + type Response = subgraph::Response; + type EventResponse = (); + + fn on_request(&self, request: &Self::Request) { + if let Some(apollo_router_operations_fetch_duration) = + &self.apollo_router_operations_fetch_duration + { + apollo_router_operations_fetch_duration.on_request(request); + } + } + + fn on_response(&self, response: &Self::Response) { + if let Some(apollo_router_operations_fetch_duration) = + &self.apollo_router_operations_fetch_duration + { + apollo_router_operations_fetch_duration.on_response(response); + } + } + + fn on_error(&self, error: &BoxError, ctx: &Context) { + if let Some(apollo_router_operations_fetch_duration) = + &self.apollo_router_operations_fetch_duration + { + apollo_router_operations_fetch_duration.on_error(error, ctx); + } + } +} diff --git a/apollo-router/src/plugins/telemetry/config_new/apollo/mod.rs b/apollo-router/src/plugins/telemetry/config_new/apollo/mod.rs new file mode 100644 index 0000000000..0cac8eccc3 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/apollo/mod.rs @@ -0,0 +1 @@ +pub(crate) mod instruments; diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/metrics.snap b/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/metrics.snap new file mode 100644 index 0000000000..df1e2d4808 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/metrics.snap @@ -0,0 +1,29 @@ +--- +source: apollo-router/src/plugins/telemetry/config_new/instruments.rs +description: Apollo subgraph fetch duration histogram +expression: "&metrics.all()" +info: + telemetry: + apollo: + experimental_subgraph_metrics: true + instrumentation: + instruments: + subgraph: + http.client.request.duration: false +snapshot_kind: text +--- +- name: apollo.router.operations.fetch.duration + description: Duration of a subgraph fetch. + unit: s + data: + datapoints: + - sum: 0.1 + count: 1 + attributes: + apollo.client.name: myClient + apollo.client.version: v0.1.0 + apollo.operation.id: myOperationID + graphql.operation.name: Test + graphql.operation.type: query + has_errors: false + subgraph.name: products diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/router.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/router.yaml new file mode 100644 index 0000000000..ca75086292 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/router.yaml @@ -0,0 +1,7 @@ +telemetry: + apollo: + experimental_subgraph_metrics: true + instrumentation: + instruments: + subgraph: + http.client.request.duration: false \ No newline at end of file diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/test.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/test.yaml new file mode 100644 index 0000000000..8a615ee9a1 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/apollo/subgraph_fetch_duration/test.yaml @@ -0,0 +1,18 @@ +description: Apollo subgraph fetch duration histogram +events: + - - context: + map: + "apollo::supergraph::operation_name": "Test" + "apollo::supergraph::operation_id": "myOperationID" + "apollo::supergraph::operation_kind": "query" + "apollo::telemetry::client_name": "myClient" + "apollo::telemetry::client_version": "v0.1.0" + - subgraph_request: + query: "query { hello }" + operation_name: "Products" + operation_kind: query + subgraph_name: "products" + - subgraph_response: + status: 200 + data: + hello: "world" \ No newline at end of file diff --git a/apollo-router/src/plugins/telemetry/config_new/instruments.rs b/apollo-router/src/plugins/telemetry/config_new/instruments.rs index 6b36bbbb60..99e00f75b3 100644 --- a/apollo-router/src/plugins/telemetry/config_new/instruments.rs +++ b/apollo-router/src/plugins/telemetry/config_new/instruments.rs @@ -43,7 +43,9 @@ use crate::axum_factory::connection_handle::ConnectionState; use crate::axum_factory::connection_handle::OPEN_CONNECTIONS_METRIC; use crate::metrics; use crate::metrics::meter_provider; +use crate::plugins::telemetry::apollo::Config; use crate::plugins::telemetry::config_new::Selectors; +use crate::plugins::telemetry::config_new::apollo::instruments::ApolloSubgraphInstruments; use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequirementLevel; use crate::plugins::telemetry::config_new::conditions::Condition; use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes; @@ -121,6 +123,8 @@ pub(super) const HTTP_CLIENT_REQUEST_DURATION_METRIC: &str = "http.client.reques pub(super) const HTTP_CLIENT_REQUEST_BODY_SIZE_METRIC: &str = "http.client.request.body.size"; pub(super) const HTTP_CLIENT_RESPONSE_BODY_SIZE_METRIC: &str = "http.client.response.body.size"; +pub(super) const APOLLO_ROUTER_OPERATIONS_FETCH_DURATION: &str = + "apollo.router.operations.fetch.duration"; impl InstrumentsConfig { pub(crate) fn validate(&self) -> Result<(), String> { for (name, custom) in &self.router.custom { @@ -717,6 +721,20 @@ impl InstrumentsConfig { } } + pub(crate) fn new_builtin_apollo_subgraph_instruments( + &self, + ) -> HashMap { + ApolloSubgraphInstruments::new_builtin() + } + + pub(crate) fn new_apollo_subgraph_instruments( + &self, + static_instruments: Arc>, + apollo_config: Config, + ) -> ApolloSubgraphInstruments { + ApolloSubgraphInstruments::new(static_instruments, apollo_config) + } + pub(crate) fn new_connector_instruments( &self, static_instruments: Arc>, @@ -2138,6 +2156,37 @@ where pub(crate) _phantom: PhantomData, } +#[buildstructor::buildstructor] +impl + CustomHistogram +where + A: Selectors + Default, + T: Selector, +{ + #[builder(visibility = "pub")] + fn new( + increment: Increment, + condition: Option>, + selector: Option>, + selectors: Option>>, + histogram: Option>, + attributes: Vec, + ) -> Self { + Self { + inner: Mutex::new(CustomHistogramInner { + increment, + condition: condition.unwrap_or(Condition::True), + attributes, + selector, + selectors, + histogram, + updated: false, + _phantom: PhantomData, + }), + } + } +} + impl Instrumented for CustomHistogram where @@ -2802,6 +2851,7 @@ mod tests { let mut config = load_config(&router_config_file.data); config.update_defaults(); + let apollo_config = load_apollo_config(&router_config_file.data); for request in test_definition.events { // each array of actions is a separate request @@ -2809,6 +2859,7 @@ mod tests { let mut supergraph_instruments = None; let mut subgraph_instruments = None; let mut connector_instruments = None; + let mut apollo_subgraph_instruments = None; let mut cache_instruments: Option = None; let graphql_instruments: GraphQLInstruments = config .new_graphql_instruments(Arc::new( @@ -2926,6 +2977,10 @@ mod tests { subgraph_instruments = Some(config.new_subgraph_instruments( Arc::new(config.new_builtin_subgraph_instruments()), )); + apollo_subgraph_instruments = Some(config.new_apollo_subgraph_instruments( + Arc::new(config.new_builtin_apollo_subgraph_instruments()), + apollo_config.clone() + )); cache_instruments = Some(config.new_cache_instruments( Arc::new(config.new_builtin_cache_instruments()), )); @@ -2946,6 +3001,7 @@ mod tests { .build(); subgraph_instruments.as_mut().unwrap().on_request(&request); + apollo_subgraph_instruments.as_mut().unwrap().on_request(&request); cache_instruments.as_mut().unwrap().on_request(&request); } Event::SubgraphResponse { @@ -2970,6 +3026,10 @@ mod tests { .take() .expect("subgraph request must have been made first") .on_response(&response); + apollo_subgraph_instruments + .take() + .expect("subgraph request must have been made first") + .on_response(&response); cache_instruments .take() .expect("subgraph request must have been made first") @@ -3233,6 +3293,12 @@ mod tests { serde_json::from_value(instruments.clone()).unwrap() } + fn load_apollo_config(config: &[u8]) -> Config { + let val: serde_json::Value = serde_yaml::from_slice(config).unwrap(); + let apollo_config = &val["telemetry"]["apollo"]; + serde_json::from_value(apollo_config.clone()).unwrap_or_default() + } + #[test] fn write_schema() { // Write a json schema for the above test diff --git a/apollo-router/src/plugins/telemetry/config_new/mod.rs b/apollo-router/src/plugins/telemetry/config_new/mod.rs index 68aea5a8ac..bb9fc5dbf0 100644 --- a/apollo-router/src/plugins/telemetry/config_new/mod.rs +++ b/apollo-router/src/plugins/telemetry/config_new/mod.rs @@ -18,6 +18,7 @@ use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequireme pub(crate) mod attributes; pub(crate) mod conditions; +pub(crate) mod apollo; pub(crate) mod cache; mod conditional; pub(crate) mod connector; diff --git a/apollo-router/src/plugins/telemetry/config_new/subgraph/attributes.rs b/apollo-router/src/plugins/telemetry/config_new/subgraph/attributes.rs index e71e05a16e..b1ebcddc8b 100644 --- a/apollo-router/src/plugins/telemetry/config_new/subgraph/attributes.rs +++ b/apollo-router/src/plugins/telemetry/config_new/subgraph/attributes.rs @@ -24,7 +24,7 @@ pub(crate) const SUBGRAPH_GRAPHQL_OPERATION_NAME: Key = pub(crate) const SUBGRAPH_GRAPHQL_OPERATION_TYPE: Key = Key::from_static_str("subgraph.graphql.operation.type"); -#[derive(Deserialize, JsonSchema, Clone, Default, Debug)] +#[derive(Deserialize, JsonSchema, Clone, Default, Debug, buildstructor::Builder)] #[serde(deny_unknown_fields, default)] pub(crate) struct SubgraphAttributes { /// The name of the subgraph diff --git a/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs b/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs index aa58d874ba..7578103318 100644 --- a/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs +++ b/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs @@ -9,6 +9,7 @@ use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::Resource; use opentelemetry_sdk::metrics::PeriodicReader; use opentelemetry_sdk::runtime; +use prometheus::exponential_buckets; use sys_info::hostname; use tonic::metadata::MetadataMap; use tonic::transport::ClientTlsConfig; @@ -185,9 +186,14 @@ impl Config { Box::new(CustomTemporalitySelector( opentelemetry_sdk::metrics::data::Temporality::Delta, )), + // This aggregation uses the Apollo histogram format where a duration, x, in μs is + // counted in the bucket of index max(0, min(ceil(ln(x)/ln(1.1)), 383)). Box::new( CustomAggregationSelector::builder() - .boundaries(default_buckets()) + .boundaries( + // Returns [~1.4ms ... ~5min] + exponential_buckets(0.001399084909, 1.1, 129).unwrap(), + ) .build(), ), )?; diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index c5f53d6622..e430203c71 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -103,6 +103,7 @@ use crate::plugins::telemetry::config::AttributeValue; use crate::plugins::telemetry::config::MetricsCommon; use crate::plugins::telemetry::config::TracingCommon; use crate::plugins::telemetry::config_new::DatadogId; +use crate::plugins::telemetry::config_new::apollo::instruments::ApolloSubgraphInstruments; use crate::plugins::telemetry::config_new::connector::events::ConnectorEvents; use crate::plugins::telemetry::config_new::cost::add_cost_attributes; use crate::plugins::telemetry::config_new::graphql::GraphQLInstruments; @@ -202,6 +203,14 @@ pub(crate) const APOLLO_PRIVATE_QUERY_HEIGHT: Key = pub(crate) const APOLLO_PRIVATE_QUERY_ROOT_FIELDS: Key = Key::from_static_str("apollo_private.query.root_fields"); +// Standard Apollo Otel Metric Attribute Names +pub(crate) const APOLLO_CLIENT_NAME_ATTRIBUTE: &str = "apollo.client.name"; +pub(crate) const APOLLO_CLIENT_VERSION_ATTRIBUTE: &str = "apollo.client.version"; +pub(crate) const GRAPHQL_OPERATION_NAME_ATTRIBUTE: &str = "graphql.operation.name"; +pub(crate) const GRAPHQL_OPERATION_TYPE_ATTRIBUTE: &str = "graphql.operation.type"; +pub(crate) const APOLLO_OPERATION_ID_ATTRIBUTE: &str = "apollo.operation.id"; +pub(crate) const APOLLO_HAS_ERRORS_ATTRIBUTE: &str = "has_errors"; + #[doc(hidden)] // Only public for integration tests pub(crate) struct Telemetry { pub(crate) config: Arc, @@ -310,6 +319,7 @@ struct BuiltinInstruments { router_custom_instruments: Arc>, supergraph_custom_instruments: Arc>, subgraph_custom_instruments: Arc>, + apollo_subgraph_instruments: Arc>, connector_custom_instruments: Arc>, cache_custom_instruments: Arc>, _pipeline_instruments: Arc>, @@ -321,6 +331,7 @@ fn create_builtin_instruments(config: &InstrumentsConfig) -> BuiltinInstruments router_custom_instruments: Arc::new(config.new_builtin_router_instruments()), supergraph_custom_instruments: Arc::new(config.new_builtin_supergraph_instruments()), subgraph_custom_instruments: Arc::new(config.new_builtin_subgraph_instruments()), + apollo_subgraph_instruments: Arc::new(config.new_builtin_apollo_subgraph_instruments()), connector_custom_instruments: Arc::new(config.new_builtin_connector_instruments()), cache_custom_instruments: Arc::new(config.new_builtin_cache_instruments()), _pipeline_instruments: Arc::new(config.new_pipeline_instruments()), @@ -904,6 +915,11 @@ impl PluginPrivate for Telemetry { .read() .subgraph_custom_instruments .clone(); + let static_apollo_subgraph_instruments = self + .builtin_instruments + .read() + .apollo_subgraph_instruments + .clone(); let static_cache_instruments = self .builtin_instruments .read() @@ -929,6 +945,15 @@ impl PluginPrivate for Telemetry { let mut custom_events = config.instrumentation.events.new_subgraph_events(); custom_events.on_request(sub_request); + let apollo_instruments: ApolloSubgraphInstruments = config + .instrumentation + .instruments + .new_apollo_subgraph_instruments( + static_apollo_subgraph_instruments.clone(), + config.apollo.clone(), + ); + apollo_instruments.on_request(sub_request); + let custom_cache_instruments: CacheInstruments = config .instrumentation .instruments @@ -940,6 +965,7 @@ impl PluginPrivate for Telemetry { custom_instruments, custom_attributes, custom_events, + apollo_instruments, custom_cache_instruments, ) }, @@ -948,12 +974,14 @@ impl PluginPrivate for Telemetry { custom_instruments, custom_attributes, mut custom_events, + apollo_instruments, custom_cache_instruments, ): ( Context, SubgraphInstruments, Vec, SubgraphEvents, + ApolloSubgraphInstruments, CacheInstruments, ), f: BoxFuture<'static, Result>| { @@ -977,6 +1005,7 @@ impl PluginPrivate for Telemetry { .attributes .on_response(resp), ); + apollo_instruments.on_response(resp); custom_cache_instruments.on_response(resp); custom_instruments.on_response(resp); custom_events.on_response(resp); @@ -991,6 +1020,7 @@ impl PluginPrivate for Telemetry { .attributes .on_error(err, &context), ); + apollo_instruments.on_error(err, &context); custom_cache_instruments.on_error(err, &context); custom_instruments.on_error(err, &context); custom_events.on_error(err, &context); diff --git a/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs b/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs index 43b08def9f..8030237e4a 100644 --- a/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs +++ b/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs @@ -13,6 +13,7 @@ use opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue; use opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue; use opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue; use opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue; +use opentelemetry_proto::tonic::metrics::v1::HistogramDataPoint; use opentelemetry_proto::tonic::metrics::v1::NumberDataPoint; use opentelemetry_proto::tonic::metrics::v1::metric; use opentelemetry_proto::tonic::metrics::v1::number_data_point; @@ -493,25 +494,93 @@ async fn test_router_layer_error_emits_metric() { router.graceful_shutdown().await; } +#[tokio::test(flavor = "multi_thread")] +async fn test_subgraph_request_emits_histogram() { + if !graph_os_enabled() { + return; + } + let expected_operation_name = "ExampleQuery"; + let expected_client_name = "myClient"; + let expected_client_version = "v0.14"; + let expected_service = "products"; + let expected_operation_type = "query"; + + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { endpoint: None }) + .config( + r#" + telemetry: + apollo: + experimental_otlp_metrics_protocol: http + batch_processor: + scheduled_delay: 10ms + experimental_subgraph_metrics: true + include_subgraph_errors: + all: true + "#, + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, _response) = router + .execute_query( + Query::builder() + .header("apollographql-client-name", expected_client_name) + .header("apollographql-client-version", expected_client_version) + .build(), + ) + .await; + + let metrics = router + .wait_for_emitted_otel_metrics(Duration::from_millis(20)) + .await; + assert!(!metrics.is_empty()); + assert_metrics_contain( + &metrics, + Metric::builder() + .name("apollo.router.operations.fetch.duration".to_string()) + .attribute("graphql.operation.name", expected_operation_name) + .attribute("apollo.client.name", expected_client_name) + .attribute("apollo.client.version", expected_client_version) + .attribute("subgraph.name", expected_service) + .attribute("graphql.operation.type", expected_operation_type) + .attribute("has_errors", false) + .count(1) + .build(), + ); + router.graceful_shutdown().await; +} + /// Assert that the given metric exists in the list of Otel requests. This is a crude attempt at /// replicating _some_ assert_counter!() functionality since that test util can't be accessed here. fn assert_metrics_contain(actual_metrics: &[ExportMetricsServiceRequest], expected_metric: Metric) { let expected_name = &expected_metric.name.clone(); let actual_metric = find_metric(expected_name, actual_metrics) .unwrap_or_else(|| panic!("Metric '{}' not found", expected_name)); - let sum = match &actual_metric.data { - Some(metric::Data::Sum(sum)) => sum, - _ => panic!("Metric '{}' is not a sum", expected_name), - }; - let actual_metrics: Vec = sum - .data_points - .iter() - .map(|dp| Metric::from_datapoint(expected_name, dp)) - .collect(); + let actual_metrics: Vec = match &actual_metric.data { + Some(metric::Data::Sum(sum)) => sum + .data_points + .iter() + .map(|dp| Metric::from_number_datapoint(expected_name, dp)) + .collect(), + Some(metric::Data::Histogram(histogram)) => histogram + .data_points + .iter() + .map(|dp| Metric::from_histogram_datapoint(expected_name, dp)) + .collect(), + _ => panic!("Metric type for '{}' is not yet implemented", expected_name), + }; let metric_found = actual_metrics.iter().any(|m| { - m.value == expected_metric.value && m.attributes_contain(&expected_metric.attributes) + // Only match values and attributes that are explicitly set + expected_metric.value.is_none_or(|v| Some(v) == m.value) + && expected_metric.sum.is_none_or(|s| Some(s) == m.sum) + && expected_metric.count.is_none_or(|c| Some(c) == m.count) + && m.attributes_contain(&expected_metric.attributes) }); assert!( @@ -542,26 +611,30 @@ fn find_metric<'a>( struct Metric { pub name: String, pub attributes: HashMap, - pub value: i64, + pub value: Option, + pub sum: Option, + pub count: Option, } #[buildstructor::buildstructor] impl Metric { #[builder] - fn new(name: String, attributes: HashMap, value: i64) -> Self - where - V: Into, - { + fn new( + name: String, + attributes: HashMap, + value: Option, + sum: Option, + count: Option, + ) -> Self { Metric { name, - attributes: attributes - .into_iter() - .map(|(k, v)| (k, v.into().into())) - .collect(), + attributes: attributes.into_iter().map(|(k, v)| (k, v.into())).collect(), value, + sum, + count, } } - fn from_datapoint(name: &str, datapoint: &NumberDataPoint) -> Self { + fn from_number_datapoint(name: &str, datapoint: &NumberDataPoint) -> Self { Metric { name: name.to_string(), attributes: datapoint @@ -570,9 +643,24 @@ impl Metric { .map(|kv| (kv.key.clone(), kv.value.clone().unwrap())) .collect::>(), value: match datapoint.value { - Some(number_data_point::Value::AsInt(value)) => value, + Some(number_data_point::Value::AsInt(value)) => Some(value), _ => panic!("expected integer datapoint"), }, + sum: None, + count: None, + } + } + fn from_histogram_datapoint(name: &str, datapoint: &HistogramDataPoint) -> Self { + Metric { + name: name.to_string(), + attributes: datapoint + .attributes + .iter() + .map(|kv| (kv.key.clone(), kv.value.clone().unwrap())) + .collect::>(), + value: None, + sum: datapoint.sum, + count: Some(datapoint.count as i64), } } fn attributes_contain(&self, other_attributes: &HashMap) -> bool { @@ -586,10 +674,12 @@ impl Display for Metric { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "name: {}, value: {}, attributes: [", - self.name, self.value + "name: {},\nvalue: {:?},\ncount: {:?},\nsum: {:?}, \nattributes: [", + self.name, self.value, self.count, self.sum )?; - for (i, (key, any)) in self.attributes.iter().enumerate() { + let mut attrs: Vec<_> = self.attributes.iter().collect(); + attrs.sort_by(|a, b| a.0.cmp(b.0)); + for (i, (key, any)) in attrs.into_iter().enumerate() { if i > 0 { write!(f, ", ")?; } @@ -604,8 +694,8 @@ impl Display for Metric { other => format!("{:?}", other), }) .unwrap_or_else(|| "nil".into()); - write!(f, "{}={}", key, value)?; + write!(f, "\n\t{}={}", key, value)?; } - write!(f, "]") + write!(f, "\n]") } }