diff --git a/.changesets/fix_bnjjj_add_operation_name_subscription_metric.md b/.changesets/fix_bnjjj_add_operation_name_subscription_metric.md new file mode 100644 index 0000000000..7bdae49ded --- /dev/null +++ b/.changesets/fix_bnjjj_add_operation_name_subscription_metric.md @@ -0,0 +1,5 @@ +### Add graphql.operation.name label to apollo.router.opened.subscriptions counter ([PR #7606](https://github.com/apollographql/router/pull/7606)) + +`apollo.router.opened.subscriptions` metric contains `graphql.operation.name` label to know exactly which subscription is still opened. + +By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/7606 \ No newline at end of file diff --git a/apollo-router/src/metrics/mod.rs b/apollo-router/src/metrics/mod.rs index 749fdc33f3..cdd1210a51 100644 --- a/apollo-router/src/metrics/mod.rs +++ b/apollo-router/src/metrics/mod.rs @@ -16,8 +16,8 @@ //! * Instruments that measure a count of something should only use annotations with curly braces to //! give additional meaning. For example, use `{packet}`, `{error}`, `{fault}`, etc., not `packet`, //! `error`, `fault`, etc. -//! * Other instrument units should be specified using the UCUM case sensitive (“c/s”) variant. For -//! example, “Cel” for the unit with full name “degree Celsius”. +//! * Other instrument units should be specified using the UCUM case sensitive (c/s) variant. For +//! example, Cel for the unit with full name degree Celsius. //! * When instruments are measuring durations, seconds (i.e. s) should be used. //! * Instruments should use non-prefixed units (i.e. By instead of MiBy) unless there is good //! technical reason to not do so. @@ -1522,10 +1522,27 @@ macro_rules! assert_non_zero_metrics_snapshot { #[cfg(test)] pub(crate) type MetricFuture = Pin::Output>>>; -#[cfg(test)] +/// Extension trait for Futures that wish to test metrics. pub(crate) trait FutureMetricsExt { - /// See [dev-docs/metrics.md](https://github.com/apollographql/router/blob/dev/dev-docs/metrics.md#testing-async) - /// for details on this function. + /// Wraps a Future with metrics collection capabilities. + /// + /// This method creates a new Future that will: + /// 1. Initialize the meter provider before executing the Future + /// 2. Execute the original Future + /// 3. Shutdown the meter provider after completion + /// + /// This is useful for testing scenarios where you need to ensure metrics are properly + /// collected throughout the entire Future's execution. + /// + /// # Example + /// ```rust + /// # use apollo_router::metrics::FutureMetricsExt; + /// # async fn example() { + /// let future = async { /* your async code that produces metrics */ }; + /// let result = future.with_metrics().await; + /// # } + /// ``` + #[cfg(test)] fn with_metrics( self, ) -> tokio::task::futures::TaskLocalFuture< @@ -1539,6 +1556,8 @@ pub(crate) trait FutureMetricsExt { test_utils::AGGREGATE_METER_PROVIDER_ASYNC.scope( Default::default(), async move { + // We want to eagerly create the meter provider, the reason is that this will be shared among subtasks that use `with_current_meter_provider`. + let _ = meter_provider_internal(); let result = self.await; let _ = tokio::task::spawn_blocking(|| { meter_provider_internal().shutdown(); @@ -1549,9 +1568,53 @@ pub(crate) trait FutureMetricsExt { .boxed_local(), ) } + + /// Propagates the current meter provider to child tasks during test execution. + /// + /// This method ensures that the meter provider is properly shared across tasks + /// during test scenarios. In non-test contexts, it returns the original Future + /// unchanged. + /// + /// # Example + /// ```rust + /// # use apollo_router::metrics::FutureMetricsExt; + /// # async fn example() { + /// let result = tokio::task::spawn(async { /* your async code that produces metrics */ }.with_current_meter_provider()).await; + /// # } + /// ``` + #[cfg(test)] + fn with_current_meter_provider( + self, + ) -> tokio::task::futures::TaskLocalFuture< + OnceLock<(AggregateMeterProvider, test_utils::ClonableManualReader)>, + Self, + > + where + Self: Sized + Future + 'static, + ::Output: 'static, + { + // We need to determine if the meter was set. If not then we can use default provider which is empty + let meter_provider_set = test_utils::AGGREGATE_METER_PROVIDER_ASYNC + .try_with(|_| {}) + .is_ok(); + if meter_provider_set { + test_utils::AGGREGATE_METER_PROVIDER_ASYNC + .scope(test_utils::AGGREGATE_METER_PROVIDER_ASYNC.get(), self) + } else { + test_utils::AGGREGATE_METER_PROVIDER_ASYNC.scope(Default::default(), self) + } + } + + #[cfg(not(test))] + fn with_current_meter_provider(self) -> Self + where + Self: Sized + Future + 'static, + { + // This is intentionally a noop. In the real world meter provider is a global variable. + self + } } -#[cfg(test)] impl FutureMetricsExt for T where T: Future {} #[cfg(test)] @@ -2270,4 +2333,29 @@ mod test { .with_metrics() .await; } + + #[tokio::test] + async fn test_metrics_across_tasks() { + async { + // Initial metric in the main task + u64_counter!("apollo.router.test", "metric", 1); + assert_counter!("apollo.router.test", 1); + + // Spawn a task that also records metrics + let handle = tokio::spawn( + async move { + u64_counter!("apollo.router.test", "metric", 2); + } + .with_current_meter_provider(), + ); + + // Wait for the spawned task to complete + handle.await.unwrap(); + + // The metric should now be 3 since both tasks contributed + assert_counter!("apollo.router.test", 3); + } + .with_metrics() + .await; + } } diff --git a/apollo-router/src/notification.rs b/apollo-router/src/notification.rs index 6e8b03604e..759316e13e 100644 --- a/apollo-router/src/notification.rs +++ b/apollo-router/src/notification.rs @@ -28,6 +28,7 @@ use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use crate::Configuration; use crate::graphql; +use crate::metrics::FutureMetricsExt; use crate::spec::Schema; static NOTIFY_CHANNEL_SIZE: usize = 1024; @@ -66,6 +67,8 @@ pub(crate) enum Notification { // To know if it has been created or re-used response_sender: ResponseSenderWithCreated, heartbeat_enabled: bool, + // Useful for the metric we create + operation_name: Option, }, Subscribe { topic: K, @@ -153,7 +156,9 @@ where ) -> Notify { let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE); let receiver_stream: ReceiverStream> = ReceiverStream::new(receiver); - tokio::task::spawn(task(receiver_stream, ttl, heartbeat_error_message)); + tokio::task::spawn( + task(receiver_stream, ttl, heartbeat_error_message).with_current_meter_provider(), + ); Notify { sender, queue_size, @@ -212,6 +217,7 @@ where &mut self, topic: K, heartbeat_enabled: bool, + operation_name: Option, ) -> Result<(Handle, bool), NotifyError> { let (sender, _receiver) = broadcast::channel(self.queue_size.unwrap_or(DEFAULT_MSG_CHANNEL_SIZE)); @@ -223,6 +229,7 @@ where msg_sender: sender, response_sender: tx, heartbeat_enabled, + operation_name, }) .await?; @@ -617,8 +624,8 @@ async fn task( match message { Notification::Unsubscribe { topic } => pubsub.unsubscribe(topic), Notification::ForceDelete { topic } => pubsub.force_delete(topic), - Notification::CreateOrSubscribe { topic, msg_sender, response_sender, heartbeat_enabled } => { - pubsub.subscribe_or_create(topic, msg_sender, response_sender, heartbeat_enabled); + Notification::CreateOrSubscribe { topic, msg_sender, response_sender, heartbeat_enabled, operation_name } => { + pubsub.subscribe_or_create(topic, msg_sender, response_sender, heartbeat_enabled, operation_name); } Notification::Subscribe { topic, @@ -697,14 +704,20 @@ struct Subscription { msg_sender: broadcast::Sender>, heartbeat_enabled: bool, updated_at: Instant, + operation_name: Option, } impl Subscription { - fn new(msg_sender: broadcast::Sender>, heartbeat_enabled: bool) -> Self { + fn new( + msg_sender: broadcast::Sender>, + heartbeat_enabled: bool, + operation_name: Option, + ) -> Self { Self { msg_sender, heartbeat_enabled, updated_at: Instant::now(), + operation_name, } } // Update the updated_at value @@ -751,16 +764,21 @@ where topic: K, sender: broadcast::Sender>, heartbeat_enabled: bool, + operation_name: Option, ) { let existed = self .subscriptions - .insert(topic, Subscription::new(sender, heartbeat_enabled)) + .insert( + topic, + Subscription::new(sender, heartbeat_enabled, operation_name.clone()), + ) .is_some(); if !existed { i64_up_down_counter!( "apollo.router.opened.subscriptions", "Number of opened subscriptions", - 1 + 1, + graphql.operation.name = operation_name.unwrap_or_default() ); } } @@ -785,6 +803,7 @@ where msg_sender: broadcast::Sender>, sender: ResponseSenderWithCreated, heartbeat_enabled: bool, + operation_name: Option, ) { match self.subscriptions.get(&topic) { Some(subscription) => { @@ -795,7 +814,7 @@ where )); } None => { - self.create_topic(topic, msg_sender.clone(), heartbeat_enabled); + self.create_topic(topic, msg_sender.clone(), heartbeat_enabled, operation_name); let _ = sender.send((msg_sender.clone(), msg_sender.subscribe(), true)); } @@ -813,11 +832,12 @@ where #[allow(clippy::collapsible_if)] if topic_to_delete { tracing::trace!("deleting subscription from unsubscribe"); - if self.subscriptions.remove(&topic).is_some() { + if let Some(sub) = self.subscriptions.remove(&topic) { i64_up_down_counter!( "apollo.router.opened.subscriptions", "Number of opened subscriptions", - -1 + -1, + graphql.operation.name = sub.operation_name.unwrap_or_default() ); } }; @@ -890,7 +910,8 @@ where i64_up_down_counter!( "apollo.router.opened.subscriptions", "Number of opened subscriptions", - -1 + -1, + graphql.operation.name = subscription.operation_name.unwrap_or_default() ); if let Some(heartbeat_error_message) = &heartbeat_error_message { let _ = subscription @@ -920,7 +941,8 @@ where i64_up_down_counter!( "apollo.router.opened.subscriptions", "Number of opened subscriptions", - -1 + -1, + graphql.operation.name = sub.operation_name.unwrap_or_default() ); let _ = sub.msg_sender.send(None); } @@ -991,6 +1013,7 @@ mod tests { use uuid::Uuid; use super::*; + use crate::metrics::FutureMetricsExt; #[tokio::test] async fn subscribe() { @@ -998,9 +1021,15 @@ mod tests { let topic_1 = Uuid::new_v4(); let topic_2 = Uuid::new_v4(); - let (handle1, created) = notify.create_or_subscribe(topic_1, false).await.unwrap(); + let (handle1, created) = notify + .create_or_subscribe(topic_1, false, None) + .await + .unwrap(); assert!(created); - let (_handle2, created) = notify.create_or_subscribe(topic_2, false).await.unwrap(); + let (_handle2, created) = notify + .create_or_subscribe(topic_2, false, None) + .await + .unwrap(); assert!(created); let handle_1_bis = notify.subscribe(topic_1).await.unwrap(); @@ -1037,9 +1066,15 @@ mod tests { let topic_1 = Uuid::new_v4(); let topic_2 = Uuid::new_v4(); - let (handle1, created) = notify.create_or_subscribe(topic_1, true).await.unwrap(); + let (handle1, created) = notify + .create_or_subscribe(topic_1, true, None) + .await + .unwrap(); assert!(created); - let (_handle2, created) = notify.create_or_subscribe(topic_2, true).await.unwrap(); + let (_handle2, created) = notify + .create_or_subscribe(topic_2, true, None) + .await + .unwrap(); assert!(created); let mut _handle_1_bis = notify.subscribe(topic_1).await.unwrap(); @@ -1074,6 +1109,99 @@ mod tests { assert_eq!(subscriptions_nb, 0); } + #[tokio::test] + async fn it_subscribe_and_delete_metrics() { + async { + let mut notify = Notify::builder().build(); + let topic_1 = Uuid::new_v4(); + let topic_2 = Uuid::new_v4(); + + let (handle1, created) = notify + .create_or_subscribe(topic_1, true, Some("TestSubscription".to_string())) + .await + .unwrap(); + assert!(created); + let (_handle2, created) = notify + .create_or_subscribe(topic_2, true, Some("TestSubscriptionBis".to_string())) + .await + .unwrap(); + assert!(created); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 1i64, + "graphql.operation.name" = "TestSubscription" + ); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 1i64, + "graphql.operation.name" = "TestSubscriptionBis" + ); + + let mut _handle_1_bis = notify.subscribe(topic_1).await.unwrap(); + let mut _handle_1_other = notify.subscribe(topic_1).await.unwrap(); + let mut cloned_notify = notify.clone(); + let mut handle = cloned_notify.subscribe(topic_1).await.unwrap().into_sink(); + handle + .send_sync(serde_json_bytes::json!({"test": "ok"})) + .unwrap(); + drop(handle); + assert!(notify.exist(topic_1).await.unwrap()); + drop(_handle_1_bis); + drop(_handle_1_other); + + notify.try_delete(topic_1).unwrap(); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 1i64, + "graphql.operation.name" = "TestSubscription" + ); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 1i64, + "graphql.operation.name" = "TestSubscriptionBis" + ); + + let subscriptions_nb = notify.debug().await.unwrap(); + assert_eq!(subscriptions_nb, 1); + + assert!(!notify.exist(topic_1).await.unwrap()); + + notify.force_delete(topic_1).await.unwrap(); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 0i64, + "graphql.operation.name" = "TestSubscription" + ); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 1i64, + "graphql.operation.name" = "TestSubscriptionBis" + ); + + let mut handle1 = handle1.into_stream(); + let new_msg = handle1.next().await.unwrap(); + assert_eq!(new_msg, serde_json_bytes::json!({"test": "ok"})); + assert!(handle1.next().await.is_none()); + assert!(notify.exist(topic_2).await.unwrap()); + notify.try_delete(topic_2).unwrap(); + + let subscriptions_nb = notify.debug().await.unwrap(); + assert_eq!(subscriptions_nb, 0); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 0i64, + "graphql.operation.name" = "TestSubscription" + ); + assert_up_down_counter!( + "apollo.router.opened.subscriptions", + 0i64, + "graphql.operation.name" = "TestSubscriptionBis" + ); + } + .with_metrics() + .await; + } + #[tokio::test] async fn it_test_ttl() { let mut notify = Notify::builder() @@ -1083,9 +1211,15 @@ mod tests { let topic_1 = Uuid::new_v4(); let topic_2 = Uuid::new_v4(); - let (handle1, created) = notify.create_or_subscribe(topic_1, true).await.unwrap(); + let (handle1, created) = notify + .create_or_subscribe(topic_1, true, None) + .await + .unwrap(); assert!(created); - let (_handle2, created) = notify.create_or_subscribe(topic_2, true).await.unwrap(); + let (_handle2, created) = notify + .create_or_subscribe(topic_2, true, None) + .await + .unwrap(); assert!(created); let handle_1_bis = notify.subscribe(topic_1).await.unwrap(); diff --git a/apollo-router/src/plugins/subscription.rs b/apollo-router/src/plugins/subscription.rs index 6e832caab5..e5865364ae 100644 --- a/apollo-router/src/plugins/subscription.rs +++ b/apollo-router/src/plugins/subscription.rs @@ -810,7 +810,7 @@ mod tests { assert_eq!(resp.status(), http::StatusCode::NOT_FOUND); let new_sub_id = uuid::Uuid::new_v4().to_string(); let (handler, _created) = notify - .create_or_subscribe(new_sub_id.clone(), true) + .create_or_subscribe(new_sub_id.clone(), true, None) .await .unwrap(); let verifier = create_verifier(&new_sub_id).unwrap(); @@ -953,7 +953,7 @@ mod tests { assert_eq!(resp.status(), http::StatusCode::NOT_FOUND); let new_sub_id = uuid::Uuid::new_v4().to_string(); let (_handler, _created) = notify - .create_or_subscribe(new_sub_id.clone(), true) + .create_or_subscribe(new_sub_id.clone(), true, None) .await .unwrap(); let verifier = String::from("XXX"); @@ -1043,7 +1043,7 @@ mod tests { assert_eq!(resp.status(), http::StatusCode::NOT_FOUND); let new_sub_id = uuid::Uuid::new_v4().to_string(); let (handler, _created) = notify - .create_or_subscribe(new_sub_id.clone(), true) + .create_or_subscribe(new_sub_id.clone(), true, None) .await .unwrap(); let verifier = create_verifier(&new_sub_id).unwrap(); diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index ebffe2cc2e..a1c27dd61a 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -54,6 +54,7 @@ use crate::batching::assemble_batch; use crate::configuration::Batching; use crate::configuration::BatchingMode; use crate::configuration::TlsClientAuth; +use crate::context::OPERATION_NAME; use crate::error::FetchError; use crate::error::SubgraphBatchingError; use crate::graphql; @@ -289,9 +290,11 @@ impl tower::Service for SubgraphService { // Hash the subgraph_request let subscription_id = hashed_request; + let operation_name = + context.get::<_, String>(OPERATION_NAME).ok().flatten(); // Call create_or_subscribe on notify let (handle, created) = notify - .create_or_subscribe(subscription_id.clone(), true) + .create_or_subscribe(subscription_id.clone(), true, operation_name) .await?; // If it existed before just send the right stream (handle) and early return @@ -501,9 +504,9 @@ async fn call_websocket( service: service_name.clone(), reason: "cannot get the websocket stream".to_string(), })?; - + let supergraph_operation_name = context.get::<_, String>(OPERATION_NAME).ok().flatten(); let (handle, created) = notify - .create_or_subscribe(subscription_hash.clone(), false) + .create_or_subscribe(subscription_hash.clone(), false, supergraph_operation_name) .await?; u64_counter!( "apollo.router.operations.subscriptions", diff --git a/apollo-router/src/services/supergraph/tests.rs b/apollo-router/src/services/supergraph/tests.rs index a668f20103..e4f8af792e 100644 --- a/apollo-router/src/services/supergraph/tests.rs +++ b/apollo-router/src/services/supergraph/tests.rs @@ -974,7 +974,7 @@ async fn root_typename_with_defer() { async fn subscription_with_callback() { let mut notify = Notify::builder().build(); let (handle, _) = notify - .create_or_subscribe("TEST_TOPIC".to_string(), false) + .create_or_subscribe("TEST_TOPIC".to_string(), false, None) .await .unwrap(); let subgraphs = MockedSubgraphs([ @@ -1055,7 +1055,7 @@ async fn subscription_with_callback() { async fn subscription_callback_schema_reload() { let mut notify = Notify::builder().build(); let (handle, _) = notify - .create_or_subscribe("TEST_TOPIC".to_string(), false) + .create_or_subscribe("TEST_TOPIC".to_string(), false, None) .await .unwrap(); let orga_subgraph = MockSubgraph::builder().with_json( @@ -1144,7 +1144,7 @@ async fn subscription_callback_schema_reload() { async fn subscription_with_callback_with_limit() { let mut notify = Notify::builder().build(); let (handle, _) = notify - .create_or_subscribe("TEST_TOPIC".to_string(), false) + .create_or_subscribe("TEST_TOPIC".to_string(), false, None) .await .unwrap(); let subgraphs = MockedSubgraphs([ diff --git a/apollo-router/tests/integration/mod.rs b/apollo-router/tests/integration/mod.rs index 2c5281448f..250c79a62c 100644 --- a/apollo-router/tests/integration/mod.rs +++ b/apollo-router/tests/integration/mod.rs @@ -23,7 +23,7 @@ mod typename; #[cfg(any(not(feature = "ci"), all(target_arch = "x86_64", target_os = "linux")))] mod redis; mod rhai; -mod subscription; +mod subscription_load_test; mod telemetry; mod validation; diff --git a/apollo-router/tests/integration/subscription.rs b/apollo-router/tests/integration/subscription_load_test.rs similarity index 98% rename from apollo-router/tests/integration/subscription.rs rename to apollo-router/tests/integration/subscription_load_test.rs index eba1fd8dd3..2d537b76d0 100644 --- a/apollo-router/tests/integration/subscription.rs +++ b/apollo-router/tests/integration/subscription_load_test.rs @@ -1,4 +1,4 @@ -//! This file is to load test subscriptions and should be launched manually, not in our CI +//! This file is to load-test subscriptions and should be launched manually, not in our CI use futures::StreamExt; use http::HeaderValue; use serde_json::json; diff --git a/dev-docs/metrics.md b/dev-docs/metrics.md index 401a99eda9..e5e4ee33a4 100644 --- a/dev-docs/metrics.md +++ b/dev-docs/metrics.md @@ -156,6 +156,9 @@ When using the macro in a test you will need a different pattern depending on if Make sure to use `.with_metrics()` method on the async block to ensure that the metrics are stored in a task local. *Tests will silently fail to record metrics if this is not done.* + +For testing metrics across spawned tasks, use `.with_current_meter_provider()` to propagate the meter provider to child tasks: + ```rust use crate::metrics::FutureMetricsExt; @@ -180,10 +183,29 @@ Make sure to use `.with_metrics()` method on the async block to ensure that the .with_metrics() .await; } + + #[tokio::test] + async fn test_metrics_across_tasks() { + async { + u64_counter!("apollo.router.test", "metric", 1); + assert_counter!("apollo.router.test", 1); + + // Use with_current_meter_provider to propagate metrics to spawned task + tokio::spawn(async move { + u64_counter!("apollo.router.test", "metric", 2); + }.with_current_meter_provider()) + .await + .unwrap(); + + // Now the metric correctly resolves to 3 since the meter provider was propagated + assert_counter!("apollo.router.test", 3); + } + .with_metrics() + .await; + } ``` -Note: this relies on metrics being updated within the same thread. Metrics that are updated from multiple threads will -not be collected correctly. +Note: Without using `with_current_meter_provider()`, metrics updated from spawned tasks will not be collected correctly: ```rust #[tokio::test] diff --git a/docs/source/routing/observability/telemetry/instrumentation/standard-instruments.mdx b/docs/source/routing/observability/telemetry/instrumentation/standard-instruments.mdx index 743374198e..ba64d3fc47 100644 --- a/docs/source/routing/observability/telemetry/instrumentation/standard-instruments.mdx +++ b/docs/source/routing/observability/telemetry/instrumentation/standard-instruments.mdx @@ -106,7 +106,7 @@ The initial call to Uplink during router startup is not reflected in metrics. -- `apollo.router.opened.subscriptions` - Number of different opened subscriptions (not the number of clients with an opened subscriptions in case it's deduplicated) +- `apollo.router.opened.subscriptions` - Number of different opened subscriptions (not the number of clients with an opened subscriptions in case it's deduplicated). This metric contains `graphql.operation.name` label to know exactly which subscription is still opened. - `apollo.router.skipped.event.count` - Number of subscription events that has been skipped because too many events have been received from the subgraph but not yet sent to the client. ### Batching