Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Add graphql.operation.name label to apollo.router.opened.subscriptions counter ([PR #7606](https://github.com/apollographql/router/pull/7606))

`apollo.router.opened.subscriptions` metric contains `graphql.operation.name` label to know exactly which subscription is still opened.

By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/7606
100 changes: 94 additions & 6 deletions apollo-router/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
//! * Instruments that measure a count of something should only use annotations with curly braces to
//! give additional meaning. For example, use `{packet}`, `{error}`, `{fault}`, etc., not `packet`,
//! `error`, `fault`, etc.
//! * Other instrument units should be specified using the UCUM case sensitive (c/s) variant. For
//! example, Cel for the unit with full name degree Celsius.
//! * Other instrument units should be specified using the UCUM case sensitive (c/s) variant. For
//! example, Cel for the unit with full name degree Celsius.
//! * When instruments are measuring durations, seconds (i.e. s) should be used.
//! * Instruments should use non-prefixed units (i.e. By instead of MiBy) unless there is good
//! technical reason to not do so.
Expand Down Expand Up @@ -1522,10 +1522,27 @@ macro_rules! assert_non_zero_metrics_snapshot {
#[cfg(test)]
pub(crate) type MetricFuture<T> = Pin<Box<dyn Future<Output = <T as Future>::Output>>>;

#[cfg(test)]
/// Extension trait for Futures that wish to test metrics.
pub(crate) trait FutureMetricsExt<T> {
/// See [dev-docs/metrics.md](https://github.com/apollographql/router/blob/dev/dev-docs/metrics.md#testing-async)
/// for details on this function.
/// Wraps a Future with metrics collection capabilities.
///
/// This method creates a new Future that will:
/// 1. Initialize the meter provider before executing the Future
/// 2. Execute the original Future
/// 3. Shutdown the meter provider after completion
///
/// This is useful for testing scenarios where you need to ensure metrics are properly
/// collected throughout the entire Future's execution.
///
/// # Example
/// ```rust
/// # use apollo_router::metrics::FutureMetricsExt;
/// # async fn example() {
/// let future = async { /* your async code that produces metrics */ };
/// let result = future.with_metrics().await;
/// # }
/// ```
#[cfg(test)]
fn with_metrics(
self,
) -> tokio::task::futures::TaskLocalFuture<
Expand All @@ -1539,6 +1556,8 @@ pub(crate) trait FutureMetricsExt<T> {
test_utils::AGGREGATE_METER_PROVIDER_ASYNC.scope(
Default::default(),
async move {
// We want to eagerly create the meter provider, the reason is that this will be shared among subtasks that use `with_current_meter_provider`.
let _ = meter_provider_internal();
let result = self.await;
let _ = tokio::task::spawn_blocking(|| {
meter_provider_internal().shutdown();
Expand All @@ -1549,9 +1568,53 @@ pub(crate) trait FutureMetricsExt<T> {
.boxed_local(),
)
}

/// Propagates the current meter provider to child tasks during test execution.
///
/// This method ensures that the meter provider is properly shared across tasks
/// during test scenarios. In non-test contexts, it returns the original Future
/// unchanged.
///
/// # Example
/// ```rust
/// # use apollo_router::metrics::FutureMetricsExt;
/// # async fn example() {
/// let result = tokio::task::spawn(async { /* your async code that produces metrics */ }.with_current_meter_provider()).await;
/// # }
/// ```
#[cfg(test)]
fn with_current_meter_provider(
self,
) -> tokio::task::futures::TaskLocalFuture<
OnceLock<(AggregateMeterProvider, test_utils::ClonableManualReader)>,
Self,
>
where
Self: Sized + Future + 'static,
<Self as Future>::Output: 'static,
{
// We need to determine if the meter was set. If not then we can use default provider which is empty
let meter_provider_set = test_utils::AGGREGATE_METER_PROVIDER_ASYNC
.try_with(|_| {})
.is_ok();
if meter_provider_set {
test_utils::AGGREGATE_METER_PROVIDER_ASYNC
.scope(test_utils::AGGREGATE_METER_PROVIDER_ASYNC.get(), self)
} else {
test_utils::AGGREGATE_METER_PROVIDER_ASYNC.scope(Default::default(), self)
}
}

#[cfg(not(test))]
fn with_current_meter_provider(self) -> Self
where
Self: Sized + Future + 'static,
{
// This is intentionally a noop. In the real world meter provider is a global variable.
self
}
}

#[cfg(test)]
impl<T> FutureMetricsExt<T> for T where T: Future {}

#[cfg(test)]
Expand Down Expand Up @@ -2270,4 +2333,29 @@ mod test {
.with_metrics()
.await;
}

#[tokio::test]
async fn test_metrics_across_tasks() {
async {
// Initial metric in the main task
u64_counter!("apollo.router.test", "metric", 1);
assert_counter!("apollo.router.test", 1);

// Spawn a task that also records metrics
let handle = tokio::spawn(
async move {
u64_counter!("apollo.router.test", "metric", 2);
}
.with_current_meter_provider(),
);

// Wait for the spawned task to complete
handle.await.unwrap();

// The metric should now be 3 since both tasks contributed
assert_counter!("apollo.router.test", 3);
}
.with_metrics()
.await;
}
}
Loading