Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
49b08bd
(feat) Allocation tracking
Oct 31, 2025
366b6ff
(feat) Allocation tracking
Oct 31, 2025
67a7ee0
(feat) Allocation tracking
Oct 31, 2025
571d431
Add attributes to remove warnings and test for dealloc/realloc/zeroing
rohan-b99 Nov 7, 2025
03dc08a
Add some extra with_memory_tracking calls
rohan-b99 Nov 7, 2025
ac47466
Remove extra tracking/debug code
rohan-b99 Nov 21, 2025
75e5b3e
format/lint
rohan-b99 Nov 21, 2025
b4ee066
Remove extra compute job tracking
rohan-b99 Dec 3, 2025
556a424
Implement memory-limit based cooperative cancellation
rohan-b99 Dec 15, 2025
39927d0
Add tests for when both timeout and memory limit are enabled
rohan-b99 Dec 19, 2025
6ab911f
Clippy
rohan-b99 Dec 19, 2025
3165feb
Flag limit check
rohan-b99 Dec 19, 2025
730e379
Log warning with full query when limit exceeded
rohan-b99 Jan 14, 2026
326419d
un-feature-gate AllocationLimit fields
rohan-b99 Jan 14, 2026
3399864
Remove feature gate flag for Ordering
rohan-b99 Jan 14, 2026
d9bd794
Feature gate tests to avoid windows execution
rohan-b99 Jan 14, 2026
696530b
More feature gates so tests work across platforms
rohan-b99 Jan 14, 2026
2d843f9
Update caching_query_planner.rs
rohan-b99 Jan 14, 2026
cc6eda0
Reorder import
rohan-b99 Jan 14, 2026
3995773
Add changeset
rohan-b99 Jan 14, 2026
f803f68
Merge branch 'dev' into rohan-b99/cooperative-cancellation-memory-limit
rohan-b99 Jan 14, 2026
4f0a75c
Log errors instead of panicking if stats not set, use match instead o…
rohan-b99 Jan 21, 2026
75bae5f
Merge branch 'dev' into rohan-b99/cooperative-cancellation-memory-limit
rohan-b99 Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
### feat: add memory limit option for cooperative cancellation ([PR #8808](https://github.com/apollographql/router/pull/8808))

Adds a `memory_limit` option to the `experimental_cooperative_cancellation` configuration that allows you to set a maximum memory allocation limit for query planning operations. When the memory limit is exceeded during query planning, the router will:

- **In enforce mode**: Cancel the query planning task and return an error to the client
- **In measure mode**: Record the cancellation outcome in metrics but allow the query planning to complete

In both modes, the query will be logged in a warn message.

The memory limit works alongside the existing `timeout` option, and whichever limit is reached first will trigger cancellation. This feature helps prevent excessive memory usage from complex queries or query planning operations that consume too much memory.

**Platform requirements**: This feature is only available on Unix platforms when the `global-allocator` feature is enabled and `dhat-heap` is not enabled (same requirements as memory tracking metrics).

**Example configuration:**

```yaml
supergraph:
query_planning:
experimental_cooperative_cancellation:
enabled: true
mode: enforce # or "measure" to only record metrics
memory_limit: 50mb # Supports formats like "50mb", "1gb", "1024kb", etc.
timeout: 5s # Optional: can be combined with memory_limit
```

By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8808
53 changes: 51 additions & 2 deletions apollo-router/src/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,34 @@ use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix))]
use std::sync::atomic::Ordering;
use std::task::Context;
use std::task::Poll;

#[cfg(feature = "dhat-heap")]
use parking_lot::Mutex;

#[allow(dead_code)] // some fields are only used if feature global-allocator is enabled
pub(crate) struct AllocationLimit {
bytes: usize,
on_exceeded: Box<dyn Fn(usize) + Send + Sync>,
exceeded: AtomicBool,
}

impl std::fmt::Debug for AllocationLimit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"AllocationLimit {{ bytes: {}, exceeded: {} }}",
self.bytes,
self.exceeded.load(Ordering::Relaxed)
)
}
}

/// Thread-local allocation statistics that can be shared across threads.
///
/// Supports nested tracking where allocations in a child context are also tracked
Expand All @@ -27,7 +46,7 @@ use parking_lot::Mutex;
/// that share the same Arc<AllocationStats>. This is critical for performance in the global
/// allocator hot path where even an uncontended Mutex would add significant overhead.
#[derive(Debug)]
#[allow(dead_code)] // bytes_* fields are only read if feature global-allocator is enabled
#[allow(dead_code)] // some fields are only used if feature global-allocator is enabled
pub(crate) struct AllocationStats {
/// Context name used for metric labeling
name: &'static str,
Expand All @@ -37,6 +56,7 @@ pub(crate) struct AllocationStats {
bytes_deallocated: AtomicUsize,
bytes_zeroed: AtomicUsize,
bytes_reallocated: AtomicUsize,
allocation_limit: Arc<OnceLock<AllocationLimit>>,
}

impl AllocationStats {
Expand All @@ -49,18 +69,21 @@ impl AllocationStats {
bytes_deallocated: AtomicUsize::new(0),
bytes_zeroed: AtomicUsize::new(0),
bytes_reallocated: AtomicUsize::new(0),
allocation_limit: Arc::new(OnceLock::new()),
}
}

/// Create a new child allocation stats context that tracks to a parent.
fn with_parent(name: &'static str, parent: Arc<AllocationStats>) -> Self {
let allocation_limit = parent.allocation_limit.clone();
Self {
name,
parent: Some(parent),
bytes_allocated: AtomicUsize::new(0),
bytes_deallocated: AtomicUsize::new(0),
bytes_zeroed: AtomicUsize::new(0),
bytes_reallocated: AtomicUsize::new(0),
allocation_limit,
}
}

Expand Down Expand Up @@ -156,6 +179,19 @@ impl AllocationStats {
let deallocated = self.bytes_deallocated();
allocated.saturating_add(zeroed).saturating_sub(deallocated)
}

/// Set the allocation limit for this allocation stats context.
pub(crate) fn set_allocation_limit(
&self,
bytes: usize,
on_exceeded: Box<dyn Fn(usize) + Send + Sync>,
) {
let _ = self.allocation_limit.set(AllocationLimit {
bytes,
on_exceeded,
exceeded: AtomicBool::new(false),
});
}
}

// Thread-local to track the current task's allocation stats.
Expand Down Expand Up @@ -353,6 +389,7 @@ struct CustomAllocator {

#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix))]
impl CustomAllocator {
#[allow(dead_code)] // only used if feature global-allocator is enabled
const fn new() -> Self {
Self {
inner: tikv_jemallocator::Jemalloc,
Expand All @@ -379,6 +416,17 @@ unsafe impl GlobalAlloc for CustomAllocator {
CURRENT_TASK_STATS.with(|cell| {
if let Some(stats_ptr) = cell.get() {
stats_ptr.as_ref().track_alloc(layout.size());

#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix))]
if let Some(limit) = stats_ptr.as_ref().allocation_limit.get() {
let bytes_allocated = stats_ptr.as_ref().bytes_allocated();
if bytes_allocated > limit.bytes
&& !limit.exceeded.load(Ordering::Relaxed)
{
limit.exceeded.store(true, Ordering::Relaxed);
(limit.on_exceeded)(bytes_allocated);
}
}
}
});
}
Expand Down Expand Up @@ -495,6 +543,7 @@ static malloc_conf: Option<&'static libc::c_char> = Some(unsafe {
#[cfg(test)]
#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix))]
mod tests {

use std::ffi::CStr;
use std::thread;

Expand Down
73 changes: 65 additions & 8 deletions apollo-router/src/configuration/cooperative_cancellation.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

use bytesize::ByteSize;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -19,6 +20,9 @@ pub(crate) struct CooperativeCancellation {
#[schemars(with = "Option<String>")]
/// Enable timeout for query planning.
timeout: Option<Duration>,
/// Enable memory limit for query planning.
#[schemars(with = "Option<String>", default)]
memory_limit: Option<ByteSize>,
}

impl Default for CooperativeCancellation {
Expand All @@ -27,6 +31,7 @@ impl Default for CooperativeCancellation {
enabled: true,
mode: Mode::Measure,
timeout: None,
memory_limit: None,
}
}
}
Expand All @@ -37,13 +42,19 @@ impl CooperativeCancellation {
self.timeout
}

/// Returns the memory limit, if configured.
pub(crate) fn memory_limit(&self) -> Option<ByteSize> {
self.memory_limit
}

#[cfg(test)]
/// Create a new `CooperativeCancellation` config in enforcement mode.
pub(crate) fn enabled() -> Self {
Self {
enabled: true,
mode: Mode::Enforce,
timeout: None,
memory_limit: None,
}
}

Expand All @@ -52,14 +63,8 @@ impl CooperativeCancellation {
self.enabled
}

/// Returns true if this config is in measure mode.
pub(crate) fn is_measure_mode(&self) -> bool {
self.mode.is_measure_mode()
}

/// Returns true if this config is in enforce mode.
pub(crate) fn is_enforce_mode(&self) -> bool {
self.mode.is_enforce_mode()
pub(crate) fn mode(&self) -> Mode {
self.mode
}

#[cfg(test)]
Expand All @@ -69,6 +74,7 @@ impl CooperativeCancellation {
enabled: true,
mode: Mode::Enforce,
timeout: Some(timeout),
memory_limit: None,
}
}

Expand All @@ -79,6 +85,57 @@ impl CooperativeCancellation {
enabled: true,
mode: Mode::Measure,
timeout: Some(timeout),
memory_limit: None,
}
}

#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix, test))]
/// Create a new `CooperativeCancellation` config in enforce mode with a memory limit.
pub(crate) fn enforce_with_memory_limit(memory_limit: ByteSize) -> Self {
Self {
enabled: true,
mode: Mode::Enforce,
timeout: None,
memory_limit: Some(memory_limit),
}
}

/// Create a new `CooperativeCancellation` config in measure mode with a memory limit.
#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix, test))]
pub(crate) fn measure_with_memory_limit(memory_limit: ByteSize) -> Self {
Self {
enabled: true,
mode: Mode::Measure,
timeout: None,
memory_limit: Some(memory_limit),
}
}

#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix, test))]
/// Create a new `CooperativeCancellation` config in enforcement mode with both timeout and memory limit.
pub(crate) fn enforce_with_timeout_and_memory_limit(
timeout: Duration,
memory_limit: ByteSize,
) -> Self {
Self {
enabled: true,
mode: Mode::Enforce,
timeout: Some(timeout),
memory_limit: Some(memory_limit),
}
}

#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix, test))]
/// Create a new `CooperativeCancellation` config in measure mode with both timeout and memory limit.
pub(crate) fn measure_with_timeout_and_memory_limit(
timeout: Duration,
memory_limit: ByteSize,
) -> Self {
Self {
enabled: true,
mode: Mode::Measure,
timeout: Some(timeout),
memory_limit: Some(memory_limit),
}
}
}
12 changes: 0 additions & 12 deletions apollo-router/src/configuration/mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,3 @@ pub(crate) enum Mode {
Measure,
Enforce,
}

impl Mode {
/// Returns true if this config is in measure mode.
pub(crate) fn is_measure_mode(&self) -> bool {
matches!(self, Mode::Measure)
}

/// Returns true if this config is in enforce mode.
pub(crate) fn is_enforce_mode(&self) -> bool {
matches!(self, Mode::Enforce)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,14 @@ expression: "&schema"
"description": "When true, cooperative cancellation is enabled.",
"type": "boolean"
},
"memory_limit": {
"default": null,
"description": "Enable memory limit for query planning.",
"type": [
"string",
"null"
]
},
"mode": {
"allOf": [
{
Expand Down Expand Up @@ -7597,6 +7605,7 @@ expression: "&schema"
],
"default": {
"enabled": true,
"memory_limit": null,
"mode": "measure",
"timeout": null
},
Expand Down Expand Up @@ -10683,6 +10692,7 @@ expression: "&schema"
},
"experimental_cooperative_cancellation": {
"enabled": true,
"memory_limit": null,
"mode": "measure",
"timeout": null
},
Expand Down Expand Up @@ -12239,6 +12249,7 @@ expression: "&schema"
},
"experimental_cooperative_cancellation": {
"enabled": true,
"memory_limit": null,
"mode": "measure",
"timeout": null
},
Expand Down
3 changes: 3 additions & 0 deletions apollo-router/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ pub(crate) enum QueryPlannerError {

/// Query planning timed out: {0}
Timeout(String),

/// Query planning memory limit exceeded: {0}
MemoryLimitExceeded(String),
}

impl From<FederationErrorBridge> for QueryPlannerError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ where
async move {
let result = fut.await;

// Record allocation metrics if stats are available
#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix))]
if let Some(stats) = crate::allocator::current() {
record_metrics(&stats);
Expand Down
Loading