From 318fff6bce2dbfcbf96164043c6fff15d7971f27 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 26 Nov 2025 12:12:18 +0000 Subject: [PATCH 01/13] fix(coprocessor): context keys/values not deleted across stages --- apollo-router/src/context/mod.rs | 4 + .../src/plugins/coprocessor/execution.rs | 35 ++- apollo-router/src/plugins/coprocessor/mod.rs | 126 ++++++++--- .../src/plugins/coprocessor/supergraph.rs | 35 ++- apollo-router/src/plugins/coprocessor/test.rs | 210 ++++++++++++++++++ .../tests/integration/coprocessor.rs | 166 ++++++++++++++ 6 files changed, 512 insertions(+), 64 deletions(-) diff --git a/apollo-router/src/context/mod.rs b/apollo-router/src/context/mod.rs index 5b33b66146..df98c35e6e 100644 --- a/apollo-router/src/context/mod.rs +++ b/apollo-router/src/context/mod.rs @@ -255,6 +255,10 @@ impl Context { } } + pub(crate) fn remove>(&self, key: K) -> Option<(String, Value)> { + self.entries.remove(&key.into()) + } + /// Read only access to the executable document for internal router plugins. pub(crate) fn executable_document(&self) -> Option>> { self.extensions() diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index 09f5b6b2cd..3fed18f785 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -364,6 +364,8 @@ where .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); + // Extract keys that would be sent from target_context directly (no translation needed) + let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); let payload = Externalizable::execution_builder() @@ -409,16 +411,12 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &response.context, + &keys_sent, + context, + &response_config.context, + )?; } if let Some(headers) = co_processor_output.headers { @@ -445,6 +443,9 @@ where .expect("serialization will not fail") }); let context_to_send = response_config_context.get_context(&generator_map_context); + // Extract keys that would be sent from target_context directly (no translation needed) + let keys_sent = + extract_context_keys_sent(&generator_map_context, &response_config_context); // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and @@ -487,14 +488,12 @@ where )?; if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config_context - { - key = context_key_from_deprecated(key); - } - generator_map_context.upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &generator_map_context, + &keys_sent, + context, + &response_config_context, + )?; } // We return the deferred_response into our stream of response chunks diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index bde7e6a64b..d760fae397 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -495,6 +495,80 @@ fn default_response_validation() -> bool { true } +/// Extract the keys that would be sent to the coprocessor from the target context. +/// Returns the actual keys (not deprecated names) that would be sent. +/// This extracts keys directly from target_context based on the config, avoiding +/// the need to translate deprecated names since we only need the actual keys for deletion tracking. +pub(crate) fn extract_context_keys_sent( + target_context: &Context, + context_config: &ContextConf, +) -> HashSet { + match context_config { + ContextConf::NewContextConf(NewContextConf::All) => { + // All keys are sent, use actual keys directly + target_context.iter().map(|elt| elt.key().clone()).collect() + } + ContextConf::NewContextConf(NewContextConf::Deprecated) | ContextConf::Deprecated(true) => { + // All keys are sent, but with deprecated names. However, we want actual keys + // for deletion tracking, so just extract actual keys directly + target_context.iter().map(|elt| elt.key().clone()).collect() + } + ContextConf::NewContextConf(NewContextConf::Selective(context_keys)) => { + // Only selected keys are sent, use actual keys directly + target_context + .iter() + .filter_map(|elt| context_keys.get(elt.key()).map(|_| elt.key().clone())) + .collect() + } + ContextConf::Deprecated(false) => { + // No keys sent + HashSet::new() + } + } +} + +/// Update the target context based on the context returned from the coprocessor. +/// This function handles both updates/inserts and deletions: +/// - Keys present in the returned context (with non-null values) are updated/inserted +/// - Keys that were sent to the coprocessor but are missing from the returned context are deleted +/// - Keys with null values in the returned context are deleted +pub(crate) fn update_context_from_coprocessor( + target_context: &Context, + keys_sent: &HashSet, + context_returned: Context, + context_config: &ContextConf, +) -> Result<(), BoxError> { + // Collect keys that are in the returned context + let mut keys_returned = HashSet::new(); + let mut keys_with_null = HashSet::new(); + + for (mut key, value) in context_returned.try_into_iter()? { + // Handle deprecated key names - convert back to actual key names + if let ContextConf::NewContextConf(NewContextConf::Deprecated) = context_config { + key = context_key_from_deprecated(key); + } + + // Check if value is null (indicates deletion) + if matches!(value, Value::Null) { + keys_with_null.insert(key); + } else { + keys_returned.insert(key.clone()); + // Update/insert the key + target_context.upsert_json_value(key, move |_current| value); + } + } + + // Delete keys that were sent but are missing from the returned context + // or have null values + for key in keys_sent { + if !keys_returned.contains(key) || keys_with_null.contains(key) { + target_context.remove(key); + } + } + + Ok(()) +} + fn record_coprocessor_duration(stage: PipelineStep, duration: Duration) { f64_histogram!( "apollo.router.operations.coprocessor.duration", @@ -984,6 +1058,8 @@ where .transpose()?; let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); + // Extract keys that would be sent from target_context directly (no translation needed) + let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); let payload = Externalizable::router_builder() @@ -1025,16 +1101,12 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &response.context, + &keys_sent, + context, + &response_config.context, + )?; } if let Some(headers) = co_processor_output.headers { @@ -1067,6 +1139,8 @@ where .transpose()?; let generator_map_context = generator_map_context.clone(); let context_to_send = context_conf.get_context(&generator_map_context); + // Extract keys that would be sent from target_context directly (no translation needed) + let keys_sent = extract_context_keys_sent(&generator_map_context, &context_conf); // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and @@ -1099,14 +1173,12 @@ where }; if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &context_conf - { - key = context_key_from_deprecated(key); - } - generator_map_context.upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &generator_map_context, + &keys_sent, + context, + &context_conf, + )?; } // We return the final_bytes into our stream of response chunks @@ -1319,6 +1391,8 @@ where .then(|| serde_json_bytes::to_value(&body)) .transpose()?; let context_to_send = response_config.context.get_context(&response.context); + // Extract keys that would be sent from target_context directly (no translation needed) + let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let service_name = response_config.service_name.then_some(service_name); let subgraph_request_id = response_config .subgraph_request_id @@ -1368,16 +1442,12 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &response.context, + &keys_sent, + context, + &response_config.context, + )?; } if let Some(headers) = co_processor_output.headers { diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index da8020cfa1..cde0f97377 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -368,6 +368,8 @@ where .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); + // Extract keys that would be sent from target_context directly (no translation needed) + let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); let payload = Externalizable::supergraph_builder() @@ -413,16 +415,12 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &response.context, + &keys_sent, + context, + &response_config.context, + )?; } if let Some(headers) = co_processor_output.headers { @@ -454,6 +452,9 @@ where .expect("serialization will not fail") }); let context_to_send = response_config_context.get_context(&generator_map_context); + // Extract keys that would be sent from target_context directly (no translation needed) + let keys_sent = + extract_context_keys_sent(&generator_map_context, &response_config_context); // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and @@ -499,14 +500,12 @@ where )?; if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config_context - { - key = context_key_from_deprecated(key); - } - generator_map_context.upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &generator_map_context, + &keys_sent, + context, + &response_config_context, + )?; } // We return the deferred_response into our stream of response chunks diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index 753155f77e..90130ea9e0 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -3999,4 +3999,214 @@ mod tests { .is_ok() ); } + + // Tests for context key deletion functionality + + #[test] + fn test_extract_context_keys_sent() { + use crate::Context; + use crate::plugins::coprocessor::extract_context_keys_sent; + + let context = Context::new(); + context.insert("k1", "v1".to_string()).unwrap(); + context.insert("k2", "v2".to_string()).unwrap(); + context.insert("k3", "v3".to_string()).unwrap(); + + // Test with All context config + let keys = + extract_context_keys_sent(&context, &ContextConf::NewContextConf(NewContextConf::All)); + assert_eq!(keys.len(), 3); + assert!(keys.contains("k1")); + assert!(keys.contains("k2")); + assert!(keys.contains("k3")); + + // Test with Selective context config - only selected keys should be extracted + let selective_keys: std::collections::HashSet = + ["k1".to_string(), "k2".to_string()].into(); + let keys = extract_context_keys_sent( + &context, + &ContextConf::NewContextConf(NewContextConf::Selective(Arc::new(selective_keys))), + ); + // Only selected keys should be extracted + assert_eq!(keys.len(), 2); + assert!(keys.contains("k1")); + assert!(keys.contains("k2")); + assert!(!keys.contains("k3")); + + // Test with Deprecated config - all keys should be extracted (actual keys, not deprecated) + let keys = extract_context_keys_sent( + &context, + &ContextConf::NewContextConf(NewContextConf::Deprecated), + ); + assert_eq!(keys.len(), 3); + assert!(keys.contains("k1")); + assert!(keys.contains("k2")); + assert!(keys.contains("k3")); + + // Test with Deprecated(false) - no keys should be extracted + let keys = extract_context_keys_sent(&context, &ContextConf::Deprecated(false)); + assert_eq!(keys.len(), 0); + } + + #[test] + fn test_update_context_from_coprocessor_deletes_missing_keys() { + use crate::Context; + use crate::plugins::coprocessor::update_context_from_coprocessor; + + // Create a context with some keys + let target_context = Context::new(); + target_context.insert("k1", "v1".to_string()).unwrap(); + target_context.insert("k2", "v2".to_string()).unwrap(); + target_context.insert("k3", "v3".to_string()).unwrap(); + + // Keys that were sent to coprocessor + let keys_sent: std::collections::HashSet = + ["k1".to_string(), "k2".to_string(), "k3".to_string()].into(); + + // Coprocessor returns context without k2 (deleted) + let returned_context = Context::new(); + returned_context + .insert("k1", "v1_updated".to_string()) + .unwrap(); + // k2 is missing (deleted) + returned_context.insert("k3", "v3".to_string()).unwrap(); + + // Update context + update_context_from_coprocessor( + &target_context, + &keys_sent, + returned_context, + &ContextConf::NewContextConf(NewContextConf::All), + ) + .unwrap(); + + // k1 should be updated + assert_eq!( + target_context.get_json_value("k1"), + Some(serde_json_bytes::json!("v1_updated")) + ); + // k2 should be deleted + assert!(!target_context.contains_key("k2")); + // k3 should remain + assert_eq!( + target_context.get_json_value("k3"), + Some(serde_json_bytes::json!("v3")) + ); + } + + #[test] + fn test_update_context_from_coprocessor_deletes_null_keys() { + use crate::Context; + use crate::json_ext::Value; + use crate::plugins::coprocessor::update_context_from_coprocessor; + + // Create a context with some keys + let target_context = Context::new(); + target_context.insert("k1", "v1".to_string()).unwrap(); + target_context.insert("k2", "v2".to_string()).unwrap(); + + // Keys that were sent to coprocessor + let keys_sent: std::collections::HashSet = + ["k1".to_string(), "k2".to_string()].into(); + + // Coprocessor returns context with k2 set to null (indicating deletion) + let returned_context = Context::new(); + returned_context + .insert("k1", "v1_updated".to_string()) + .unwrap(); + returned_context.insert_json_value("k2", Value::Null); + + // Update context + update_context_from_coprocessor( + &target_context, + &keys_sent, + returned_context, + &ContextConf::NewContextConf(NewContextConf::All), + ) + .unwrap(); + + // k1 should be updated + assert_eq!( + target_context.get_json_value("k1"), + Some(serde_json_bytes::json!("v1_updated")) + ); + // k2 should be deleted (was null) + assert!(!target_context.contains_key("k2")); + } + + #[test] + fn test_update_context_from_coprocessor_adds_new_keys() { + use crate::Context; + use crate::plugins::coprocessor::update_context_from_coprocessor; + + // Create a context with some keys + let target_context = Context::new(); + target_context.insert("k1", "v1".to_string()).unwrap(); + + // Keys that were sent to coprocessor + let keys_sent: std::collections::HashSet = ["k1".to_string()].into(); + + // Coprocessor returns context with a new key + let returned_context = Context::new(); + returned_context + .insert("k1", "v1_updated".to_string()) + .unwrap(); + returned_context.insert("k2", "v2_new".to_string()).unwrap(); + + // Update context + update_context_from_coprocessor( + &target_context, + &keys_sent, + returned_context, + &ContextConf::NewContextConf(NewContextConf::All), + ) + .unwrap(); + + // k1 should be updated + assert_eq!( + target_context.get_json_value("k1"), + Some(serde_json_bytes::json!("v1_updated")) + ); + // k2 should be added + assert_eq!( + target_context.get_json_value("k2"), + Some(serde_json_bytes::json!("v2_new")) + ); + } + + #[test] + fn test_update_context_from_coprocessor_preserves_keys_not_sent() { + use crate::Context; + use crate::plugins::coprocessor::update_context_from_coprocessor; + + // Create a context with some keys + let target_context = Context::new(); + target_context.insert("k1", "v1".to_string()).unwrap(); + target_context + .insert("key_not_sent", "preserved_value".to_string()) + .unwrap(); + + // Only k1 was sent to coprocessor + let keys_sent: std::collections::HashSet = ["k1".to_string()].into(); + + // Coprocessor returns context without k1 (deleted) + let returned_context = Context::new(); + + // Update context + update_context_from_coprocessor( + &target_context, + &keys_sent, + returned_context, + &ContextConf::NewContextConf(NewContextConf::All), + ) + .unwrap(); + + // k1 should be deleted (was sent but missing from returned context) + assert!(!target_context.contains_key("k1")); + // key_not_sent should be preserved (wasn't sent to coprocessor) + assert_eq!( + target_context.get_json_value("key_not_sent"), + Some(serde_json_bytes::json!("preserved_value")) + ); + } } diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 7bda3487a5..1d294c378c 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -489,3 +489,169 @@ mod on_graphql_error_selector { Ok(()) } } + +#[tokio::test(flavor = "multi_thread")] +async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + + let mock_server = wiremock::MockServer::start().await; + let coprocessor_address = mock_server.uri(); + + // Track the context keys received in each stage + let router_response_context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let router_response_context_clone = router_response_context.clone(); + + // Handle all coprocessor stages, but modify SubgraphResponse and track RouterResponse + Mock::given(method("POST")) + .and(path("/")) + .respond_with(move |req: &wiremock::Request| { + let body = req.body_json::().expect("body"); + let stage = body.get("stage").and_then(|s| s.as_str()).unwrap_or(""); + + let mut response = body.clone(); + + // Ensure Request stages have a control field + if stage.ends_with("Request") + && !response.as_object().unwrap().contains_key("control") + && let Some(obj) = response.as_object_mut() + { + obj.insert("control".to_string(), serde_json::json!("continue")); + } + + if stage == "SubgraphResponse" { + // Return context without "myValue" (deleted) + if let Some(obj) = response.as_object_mut() + && let Some(ctx) = obj.get_mut("context").and_then(|c| c.as_object_mut()) + && let Some(entries) = ctx.get_mut("entries").and_then(|e| e.as_object_mut()) + { + entries.remove("myValue"); + } + } else if stage == "RouterResponse" { + // Track the context received in RouterResponse + let context = body.get("context").and_then(|c| c.get("entries")); + if let Some(ctx) = context { + *router_response_context_clone.lock().unwrap() = Some(ctx.clone()); + } + } + // For all other stages, just pass through + + ResponseTemplate::new(200).set_body_json(response) + }) + .mount(&mock_server) + .await; + + let mut router = IntegrationTest::builder() + .config( + include_str!("fixtures/coprocessor.router.yaml") + .replace("", &coprocessor_address), + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // Execute a query that will trigger both SubgraphResponse and RouterResponse stages + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + + // Verify that RouterResponse does NOT have "myValue" (it was deleted in SubgraphResponse) + if let Some(ctx) = router_response_context.lock().unwrap().as_ref() + && let Some(entries) = ctx.as_object() + { + assert!( + !entries.contains_key("myValue"), + "myValue should be deleted in RouterResponse stage, but was found: {:?}", + entries + ); + } + + router.graceful_shutdown().await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_coprocessor_context_key_deletion_with_null() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + + let mock_server = wiremock::MockServer::start().await; + let coprocessor_address = mock_server.uri(); + + // Track the context keys received in RouterResponse + let router_response_context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let router_response_context_clone = router_response_context.clone(); + + // Handle all coprocessor stages, but modify SubgraphResponse and track RouterResponse + Mock::given(method("POST")) + .and(path("/")) + .respond_with(move |req: &wiremock::Request| { + let body = req.body_json::().expect("body"); + let stage = body.get("stage").and_then(|s| s.as_str()).unwrap_or(""); + + let mut response = body.clone(); + + // Ensure Request stages have a control field + if stage.ends_with("Request") + && !response.as_object().unwrap().contains_key("control") + && let Some(obj) = response.as_object_mut() + { + obj.insert("control".to_string(), serde_json::json!("continue")); + } + + if stage == "SubgraphResponse" { + // Return context with "myValue" set to null (indicating deletion) + if let Some(obj) = response.as_object_mut() + && let Some(ctx) = obj.get_mut("context").and_then(|c| c.as_object_mut()) + && let Some(entries) = ctx.get_mut("entries").and_then(|e| e.as_object_mut()) + { + entries.insert("myValue".to_string(), serde_json::Value::Null); + } + } else if stage == "RouterResponse" { + // Track the context received in RouterResponse + let context = body.get("context").and_then(|c| c.get("entries")); + if let Some(ctx) = context { + *router_response_context_clone.lock().unwrap() = Some(ctx.clone()); + } + } + // For all other stages, just pass through + + ResponseTemplate::new(200).set_body_json(response) + }) + .mount(&mock_server) + .await; + + let mut router = IntegrationTest::builder() + .config( + include_str!("fixtures/coprocessor.router.yaml") + .replace("", &coprocessor_address), + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // Execute a query that will trigger both SubgraphResponse and RouterResponse stages + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + + // Verify that RouterResponse does NOT have "myValue" (it was set to null in SubgraphResponse) + if let Some(ctx) = router_response_context.lock().unwrap().as_ref() + && let Some(entries) = ctx.as_object() + { + assert!( + !entries.contains_key("myValue"), + "myValue should be deleted in RouterResponse stage (was set to null), but was found: {:?}", + entries + ); + } + + router.graceful_shutdown().await; + + Ok(()) +} From 1402303b9156ece96160ccbe4195fe48c7cd7621 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 26 Nov 2025 17:12:41 +0000 Subject: [PATCH 02/13] Remove need to key strings separately (avoid allocations) --- ...99_coprocessor_context_keys_not_deleted.md | 5 ++ apollo-router/src/context/mod.rs | 4 +- .../src/plugins/coprocessor/execution.rs | 13 +-- apollo-router/src/plugins/coprocessor/mod.rs | 78 +++++------------- .../src/plugins/coprocessor/supergraph.rs | 7 -- apollo-router/src/plugins/coprocessor/test.rs | 79 +++---------------- 6 files changed, 38 insertions(+), 148 deletions(-) create mode 100644 .changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md diff --git a/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md b/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md new file mode 100644 index 0000000000..77db88fb8a --- /dev/null +++ b/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md @@ -0,0 +1,5 @@ +### Ensure that coprocessor context keys/values removed in a stage do not re-appear in later stages ([PR #8679](https://github.com/apollographql/router/pull/8679)) + +Ensure that coprocessor keys that are deleted or set to null in a previous stage do not re-appear in later stages + +By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8679 diff --git a/apollo-router/src/context/mod.rs b/apollo-router/src/context/mod.rs index df98c35e6e..ceb9bb9f43 100644 --- a/apollo-router/src/context/mod.rs +++ b/apollo-router/src/context/mod.rs @@ -255,8 +255,8 @@ impl Context { } } - pub(crate) fn remove>(&self, key: K) -> Option<(String, Value)> { - self.entries.remove(&key.into()) + pub(crate) fn retain(&self, f: impl Fn(&String, &Value) -> bool) { + self.entries.retain(|k, v| f(k, v)); } /// Read only access to the executable document for internal router plugins. diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index 3fed18f785..2904fa3816 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -364,8 +364,6 @@ where .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); - // Extract keys that would be sent from target_context directly (no translation needed) - let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); let payload = Externalizable::execution_builder() @@ -411,12 +409,7 @@ where } if let Some(context) = co_processor_output.context { - update_context_from_coprocessor( - &response.context, - &keys_sent, - context, - &response_config.context, - )?; + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { @@ -443,9 +436,6 @@ where .expect("serialization will not fail") }); let context_to_send = response_config_context.get_context(&generator_map_context); - // Extract keys that would be sent from target_context directly (no translation needed) - let keys_sent = - extract_context_keys_sent(&generator_map_context, &response_config_context); // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and @@ -490,7 +480,6 @@ where if let Some(context) = co_processor_output.context { update_context_from_coprocessor( &generator_map_context, - &keys_sent, context, &response_config_context, )?; diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index d760fae397..b2db839d6b 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -495,38 +495,6 @@ fn default_response_validation() -> bool { true } -/// Extract the keys that would be sent to the coprocessor from the target context. -/// Returns the actual keys (not deprecated names) that would be sent. -/// This extracts keys directly from target_context based on the config, avoiding -/// the need to translate deprecated names since we only need the actual keys for deletion tracking. -pub(crate) fn extract_context_keys_sent( - target_context: &Context, - context_config: &ContextConf, -) -> HashSet { - match context_config { - ContextConf::NewContextConf(NewContextConf::All) => { - // All keys are sent, use actual keys directly - target_context.iter().map(|elt| elt.key().clone()).collect() - } - ContextConf::NewContextConf(NewContextConf::Deprecated) | ContextConf::Deprecated(true) => { - // All keys are sent, but with deprecated names. However, we want actual keys - // for deletion tracking, so just extract actual keys directly - target_context.iter().map(|elt| elt.key().clone()).collect() - } - ContextConf::NewContextConf(NewContextConf::Selective(context_keys)) => { - // Only selected keys are sent, use actual keys directly - target_context - .iter() - .filter_map(|elt| context_keys.get(elt.key()).map(|_| elt.key().clone())) - .collect() - } - ContextConf::Deprecated(false) => { - // No keys sent - HashSet::new() - } - } -} - /// Update the target context based on the context returned from the coprocessor. /// This function handles both updates/inserts and deletions: /// - Keys present in the returned context (with non-null values) are updated/inserted @@ -534,7 +502,6 @@ pub(crate) fn extract_context_keys_sent( /// - Keys with null values in the returned context are deleted pub(crate) fn update_context_from_coprocessor( target_context: &Context, - keys_sent: &HashSet, context_returned: Context, context_config: &ContextConf, ) -> Result<(), BoxError> { @@ -558,12 +525,26 @@ pub(crate) fn update_context_from_coprocessor( } } - // Delete keys that were sent but are missing from the returned context - // or have null values - for key in keys_sent { - if !keys_returned.contains(key) || keys_with_null.contains(key) { - target_context.remove(key); + // Delete keys that match any of the following conditions: + // - were sent but are missing from the returned context + // - have null values + match context_config { + ContextConf::NewContextConf(NewContextConf::Selective(context_keys)) => { + target_context.retain(|key, _v| { + if keys_with_null.contains(key) { + return false; + } + if keys_returned.contains(key) { + return true; + } + if context_keys.contains(key) && !keys_returned.contains(key) { + return false; + } + true + }); } + _ => target_context + .retain(|key, _v| keys_returned.contains(key) && !keys_with_null.contains(key)), } Ok(()) @@ -1058,8 +1039,6 @@ where .transpose()?; let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); - // Extract keys that would be sent from target_context directly (no translation needed) - let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); let payload = Externalizable::router_builder() @@ -1101,12 +1080,7 @@ where } if let Some(context) = co_processor_output.context { - update_context_from_coprocessor( - &response.context, - &keys_sent, - context, - &response_config.context, - )?; + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { @@ -1139,8 +1113,6 @@ where .transpose()?; let generator_map_context = generator_map_context.clone(); let context_to_send = context_conf.get_context(&generator_map_context); - // Extract keys that would be sent from target_context directly (no translation needed) - let keys_sent = extract_context_keys_sent(&generator_map_context, &context_conf); // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and @@ -1175,7 +1147,6 @@ where if let Some(context) = co_processor_output.context { update_context_from_coprocessor( &generator_map_context, - &keys_sent, context, &context_conf, )?; @@ -1391,8 +1362,6 @@ where .then(|| serde_json_bytes::to_value(&body)) .transpose()?; let context_to_send = response_config.context.get_context(&response.context); - // Extract keys that would be sent from target_context directly (no translation needed) - let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let service_name = response_config.service_name.then_some(service_name); let subgraph_request_id = response_config .subgraph_request_id @@ -1442,12 +1411,7 @@ where } if let Some(context) = co_processor_output.context { - update_context_from_coprocessor( - &response.context, - &keys_sent, - context, - &response_config.context, - )?; + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index cde0f97377..754f2b3ee8 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -368,8 +368,6 @@ where .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); - // Extract keys that would be sent from target_context directly (no translation needed) - let keys_sent = extract_context_keys_sent(&response.context, &response_config.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); let payload = Externalizable::supergraph_builder() @@ -417,7 +415,6 @@ where if let Some(context) = co_processor_output.context { update_context_from_coprocessor( &response.context, - &keys_sent, context, &response_config.context, )?; @@ -452,9 +449,6 @@ where .expect("serialization will not fail") }); let context_to_send = response_config_context.get_context(&generator_map_context); - // Extract keys that would be sent from target_context directly (no translation needed) - let keys_sent = - extract_context_keys_sent(&generator_map_context, &response_config_context); // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and @@ -502,7 +496,6 @@ where if let Some(context) = co_processor_output.context { update_context_from_coprocessor( &generator_map_context, - &keys_sent, context, &response_config_context, )?; diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index 90130ea9e0..38703b65a5 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -4002,52 +4002,6 @@ mod tests { // Tests for context key deletion functionality - #[test] - fn test_extract_context_keys_sent() { - use crate::Context; - use crate::plugins::coprocessor::extract_context_keys_sent; - - let context = Context::new(); - context.insert("k1", "v1".to_string()).unwrap(); - context.insert("k2", "v2".to_string()).unwrap(); - context.insert("k3", "v3".to_string()).unwrap(); - - // Test with All context config - let keys = - extract_context_keys_sent(&context, &ContextConf::NewContextConf(NewContextConf::All)); - assert_eq!(keys.len(), 3); - assert!(keys.contains("k1")); - assert!(keys.contains("k2")); - assert!(keys.contains("k3")); - - // Test with Selective context config - only selected keys should be extracted - let selective_keys: std::collections::HashSet = - ["k1".to_string(), "k2".to_string()].into(); - let keys = extract_context_keys_sent( - &context, - &ContextConf::NewContextConf(NewContextConf::Selective(Arc::new(selective_keys))), - ); - // Only selected keys should be extracted - assert_eq!(keys.len(), 2); - assert!(keys.contains("k1")); - assert!(keys.contains("k2")); - assert!(!keys.contains("k3")); - - // Test with Deprecated config - all keys should be extracted (actual keys, not deprecated) - let keys = extract_context_keys_sent( - &context, - &ContextConf::NewContextConf(NewContextConf::Deprecated), - ); - assert_eq!(keys.len(), 3); - assert!(keys.contains("k1")); - assert!(keys.contains("k2")); - assert!(keys.contains("k3")); - - // Test with Deprecated(false) - no keys should be extracted - let keys = extract_context_keys_sent(&context, &ContextConf::Deprecated(false)); - assert_eq!(keys.len(), 0); - } - #[test] fn test_update_context_from_coprocessor_deletes_missing_keys() { use crate::Context; @@ -4059,10 +4013,6 @@ mod tests { target_context.insert("k2", "v2".to_string()).unwrap(); target_context.insert("k3", "v3".to_string()).unwrap(); - // Keys that were sent to coprocessor - let keys_sent: std::collections::HashSet = - ["k1".to_string(), "k2".to_string(), "k3".to_string()].into(); - // Coprocessor returns context without k2 (deleted) let returned_context = Context::new(); returned_context @@ -4074,7 +4024,6 @@ mod tests { // Update context update_context_from_coprocessor( &target_context, - &keys_sent, returned_context, &ContextConf::NewContextConf(NewContextConf::All), ) @@ -4105,10 +4054,6 @@ mod tests { target_context.insert("k1", "v1".to_string()).unwrap(); target_context.insert("k2", "v2".to_string()).unwrap(); - // Keys that were sent to coprocessor - let keys_sent: std::collections::HashSet = - ["k1".to_string(), "k2".to_string()].into(); - // Coprocessor returns context with k2 set to null (indicating deletion) let returned_context = Context::new(); returned_context @@ -4119,7 +4064,6 @@ mod tests { // Update context update_context_from_coprocessor( &target_context, - &keys_sent, returned_context, &ContextConf::NewContextConf(NewContextConf::All), ) @@ -4143,9 +4087,6 @@ mod tests { let target_context = Context::new(); target_context.insert("k1", "v1".to_string()).unwrap(); - // Keys that were sent to coprocessor - let keys_sent: std::collections::HashSet = ["k1".to_string()].into(); - // Coprocessor returns context with a new key let returned_context = Context::new(); returned_context @@ -4156,7 +4097,6 @@ mod tests { // Update context update_context_from_coprocessor( &target_context, - &keys_sent, returned_context, &ContextConf::NewContextConf(NewContextConf::All), ) @@ -4178,6 +4118,8 @@ mod tests { fn test_update_context_from_coprocessor_preserves_keys_not_sent() { use crate::Context; use crate::plugins::coprocessor::update_context_from_coprocessor; + use std::collections::HashSet; + use std::sync::Arc; // Create a context with some keys let target_context = Context::new(); @@ -4186,20 +4128,17 @@ mod tests { .insert("key_not_sent", "preserved_value".to_string()) .unwrap(); - // Only k1 was sent to coprocessor - let keys_sent: std::collections::HashSet = ["k1".to_string()].into(); - // Coprocessor returns context without k1 (deleted) let returned_context = Context::new(); + // Use Selective config to only send "k1", not "key_not_sent" + let selective_keys: HashSet = ["k1".to_string()].into(); + let context_config = + ContextConf::NewContextConf(NewContextConf::Selective(Arc::new(selective_keys))); + // Update context - update_context_from_coprocessor( - &target_context, - &keys_sent, - returned_context, - &ContextConf::NewContextConf(NewContextConf::All), - ) - .unwrap(); + update_context_from_coprocessor(&target_context, returned_context, &context_config) + .unwrap(); // k1 should be deleted (was sent but missing from returned context) assert!(!target_context.contains_key("k1")); From f5c43549c01a41036d9491e166525394bbeffbdc Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 26 Nov 2025 17:15:55 +0000 Subject: [PATCH 03/13] Remove behaviour that considers setting to null as deletion --- ...99_coprocessor_context_keys_not_deleted.md | 2 +- apollo-router/src/plugins/coprocessor/mod.rs | 27 ++---- apollo-router/src/plugins/coprocessor/test.rs | 35 -------- .../tests/integration/coprocessor.rs | 83 ------------------- 4 files changed, 8 insertions(+), 139 deletions(-) diff --git a/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md b/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md index 77db88fb8a..8bd434359d 100644 --- a/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md +++ b/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md @@ -1,5 +1,5 @@ ### Ensure that coprocessor context keys/values removed in a stage do not re-appear in later stages ([PR #8679](https://github.com/apollographql/router/pull/8679)) -Ensure that coprocessor keys that are deleted or set to null in a previous stage do not re-appear in later stages +Ensure that coprocessor keys that are deleted in a previous stage do not re-appear in later stages By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8679 diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index b2db839d6b..b49291e3a4 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -499,7 +499,6 @@ fn default_response_validation() -> bool { /// This function handles both updates/inserts and deletions: /// - Keys present in the returned context (with non-null values) are updated/inserted /// - Keys that were sent to the coprocessor but are missing from the returned context are deleted -/// - Keys with null values in the returned context are deleted pub(crate) fn update_context_from_coprocessor( target_context: &Context, context_returned: Context, @@ -507,7 +506,6 @@ pub(crate) fn update_context_from_coprocessor( ) -> Result<(), BoxError> { // Collect keys that are in the returned context let mut keys_returned = HashSet::new(); - let mut keys_with_null = HashSet::new(); for (mut key, value) in context_returned.try_into_iter()? { // Handle deprecated key names - convert back to actual key names @@ -515,36 +513,25 @@ pub(crate) fn update_context_from_coprocessor( key = context_key_from_deprecated(key); } - // Check if value is null (indicates deletion) - if matches!(value, Value::Null) { - keys_with_null.insert(key); - } else { - keys_returned.insert(key.clone()); - // Update/insert the key - target_context.upsert_json_value(key, move |_current| value); - } + keys_returned.insert(key.clone()); + // Update/insert the key + target_context.upsert_json_value(key, move |_current| value); } - // Delete keys that match any of the following conditions: - // - were sent but are missing from the returned context - // - have null values + // Delete keys that were sent but are missing from the returned context + // If the context config is selective, only delete keys that are in the selective list match context_config { ContextConf::NewContextConf(NewContextConf::Selective(context_keys)) => { target_context.retain(|key, _v| { - if keys_with_null.contains(key) { - return false; - } if keys_returned.contains(key) { return true; - } - if context_keys.contains(key) && !keys_returned.contains(key) { + } else if context_keys.contains(key) { return false; } true }); } - _ => target_context - .retain(|key, _v| keys_returned.contains(key) && !keys_with_null.contains(key)), + _ => target_context.retain(|key, _v| keys_returned.contains(key)), } Ok(()) diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index 38703b65a5..13639b8f73 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -4043,41 +4043,6 @@ mod tests { ); } - #[test] - fn test_update_context_from_coprocessor_deletes_null_keys() { - use crate::Context; - use crate::json_ext::Value; - use crate::plugins::coprocessor::update_context_from_coprocessor; - - // Create a context with some keys - let target_context = Context::new(); - target_context.insert("k1", "v1".to_string()).unwrap(); - target_context.insert("k2", "v2".to_string()).unwrap(); - - // Coprocessor returns context with k2 set to null (indicating deletion) - let returned_context = Context::new(); - returned_context - .insert("k1", "v1_updated".to_string()) - .unwrap(); - returned_context.insert_json_value("k2", Value::Null); - - // Update context - update_context_from_coprocessor( - &target_context, - returned_context, - &ContextConf::NewContextConf(NewContextConf::All), - ) - .unwrap(); - - // k1 should be updated - assert_eq!( - target_context.get_json_value("k1"), - Some(serde_json_bytes::json!("v1_updated")) - ); - // k2 should be deleted (was null) - assert!(!target_context.contains_key("k2")); - } - #[test] fn test_update_context_from_coprocessor_adds_new_keys() { use crate::Context; diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 1d294c378c..531d566cac 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -572,86 +572,3 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { Ok(()) } - -#[tokio::test(flavor = "multi_thread")] -async fn test_coprocessor_context_key_deletion_with_null() -> Result<(), BoxError> { - if !graph_os_enabled() { - return Ok(()); - } - - let mock_server = wiremock::MockServer::start().await; - let coprocessor_address = mock_server.uri(); - - // Track the context keys received in RouterResponse - let router_response_context = std::sync::Arc::new(std::sync::Mutex::new(None)); - let router_response_context_clone = router_response_context.clone(); - - // Handle all coprocessor stages, but modify SubgraphResponse and track RouterResponse - Mock::given(method("POST")) - .and(path("/")) - .respond_with(move |req: &wiremock::Request| { - let body = req.body_json::().expect("body"); - let stage = body.get("stage").and_then(|s| s.as_str()).unwrap_or(""); - - let mut response = body.clone(); - - // Ensure Request stages have a control field - if stage.ends_with("Request") - && !response.as_object().unwrap().contains_key("control") - && let Some(obj) = response.as_object_mut() - { - obj.insert("control".to_string(), serde_json::json!("continue")); - } - - if stage == "SubgraphResponse" { - // Return context with "myValue" set to null (indicating deletion) - if let Some(obj) = response.as_object_mut() - && let Some(ctx) = obj.get_mut("context").and_then(|c| c.as_object_mut()) - && let Some(entries) = ctx.get_mut("entries").and_then(|e| e.as_object_mut()) - { - entries.insert("myValue".to_string(), serde_json::Value::Null); - } - } else if stage == "RouterResponse" { - // Track the context received in RouterResponse - let context = body.get("context").and_then(|c| c.get("entries")); - if let Some(ctx) = context { - *router_response_context_clone.lock().unwrap() = Some(ctx.clone()); - } - } - // For all other stages, just pass through - - ResponseTemplate::new(200).set_body_json(response) - }) - .mount(&mock_server) - .await; - - let mut router = IntegrationTest::builder() - .config( - include_str!("fixtures/coprocessor.router.yaml") - .replace("", &coprocessor_address), - ) - .build() - .await; - - router.start().await; - router.assert_started().await; - - // Execute a query that will trigger both SubgraphResponse and RouterResponse stages - let (_trace_id, response) = router.execute_default_query().await; - assert_eq!(response.status(), 200); - - // Verify that RouterResponse does NOT have "myValue" (it was set to null in SubgraphResponse) - if let Some(ctx) = router_response_context.lock().unwrap().as_ref() - && let Some(entries) = ctx.as_object() - { - assert!( - !entries.contains_key("myValue"), - "myValue should be deleted in RouterResponse stage (was set to null), but was found: {:?}", - entries - ); - } - - router.graceful_shutdown().await; - - Ok(()) -} From 733d56f949ab67be760d4c1195b233fb5c476034 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 26 Nov 2025 17:25:23 +0000 Subject: [PATCH 04/13] fmt --- apollo-router/src/plugins/coprocessor/supergraph.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 754f2b3ee8..600217ac3a 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -413,11 +413,7 @@ where } if let Some(context) = co_processor_output.context { - update_context_from_coprocessor( - &response.context, - context, - &response_config.context, - )?; + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { From 45780feb64924b17474c13d1d05f41454f98415a Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 26 Nov 2025 17:28:01 +0000 Subject: [PATCH 05/13] Reorder imports --- apollo-router/src/plugins/coprocessor/test.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index 13639b8f73..054269b985 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -4081,11 +4081,12 @@ mod tests { #[test] fn test_update_context_from_coprocessor_preserves_keys_not_sent() { - use crate::Context; - use crate::plugins::coprocessor::update_context_from_coprocessor; use std::collections::HashSet; use std::sync::Arc; + use crate::Context; + use crate::plugins::coprocessor::update_context_from_coprocessor; + // Create a context with some keys let target_context = Context::new(); target_context.insert("k1", "v1".to_string()).unwrap(); From f14af6bb9227886fb68f1ffd070801cb333b961f Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 26 Nov 2025 17:39:39 +0000 Subject: [PATCH 06/13] Set capacity for keys_returned, insert instead of upsert --- apollo-router/src/plugins/coprocessor/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index b49291e3a4..6c8c7ee6fc 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -505,7 +505,7 @@ pub(crate) fn update_context_from_coprocessor( context_config: &ContextConf, ) -> Result<(), BoxError> { // Collect keys that are in the returned context - let mut keys_returned = HashSet::new(); + let mut keys_returned = HashSet::with_capacity(context_returned.len()); for (mut key, value) in context_returned.try_into_iter()? { // Handle deprecated key names - convert back to actual key names @@ -514,8 +514,7 @@ pub(crate) fn update_context_from_coprocessor( } keys_returned.insert(key.clone()); - // Update/insert the key - target_context.upsert_json_value(key, move |_current| value); + target_context.insert_json_value(key, value); } // Delete keys that were sent but are missing from the returned context From 2a213e6bd3fea89903cd37ccc1f01d2ce15c4520 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 27 Nov 2025 09:06:34 +0000 Subject: [PATCH 07/13] Add missing len function --- apollo-router/src/context/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apollo-router/src/context/mod.rs b/apollo-router/src/context/mod.rs index ceb9bb9f43..1b76e49583 100644 --- a/apollo-router/src/context/mod.rs +++ b/apollo-router/src/context/mod.rs @@ -259,6 +259,10 @@ impl Context { self.entries.retain(|k, v| f(k, v)); } + pub(crate) fn len(&self) -> usize { + self.entries.len() + } + /// Read only access to the executable document for internal router plugins. pub(crate) fn executable_document(&self) -> Option>> { self.extensions() From 20b0d11208b11f20a04f3de5701f8c0dd3a6919e Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 27 Nov 2025 11:00:13 +0000 Subject: [PATCH 08/13] Improve test by ensuring failure if router context is none or not an object --- .../tests/integration/coprocessor.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 531d566cac..7296f9fb27 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -558,15 +558,16 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { assert_eq!(response.status(), 200); // Verify that RouterResponse does NOT have "myValue" (it was deleted in SubgraphResponse) - if let Some(ctx) = router_response_context.lock().unwrap().as_ref() - && let Some(entries) = ctx.as_object() - { - assert!( - !entries.contains_key("myValue"), - "myValue should be deleted in RouterResponse stage, but was found: {:?}", - entries - ); - } + assert!( + !router_response_context + .lock() + .unwrap() + .as_ref() + .expect("router response context was None") + .as_object() + .expect("router response context was not an object") + .contains_key("myValue") + ); router.graceful_shutdown().await; From d85613d564c4e84ab7168329cd4bc03ccf0a38fd Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 27 Nov 2025 12:13:57 +0000 Subject: [PATCH 09/13] Use channel in test to avoid timing issue --- apollo-router/tests/integration/coprocessor.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 7296f9fb27..4f0c9beb1e 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -499,10 +499,9 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { let mock_server = wiremock::MockServer::start().await; let coprocessor_address = mock_server.uri(); - // Track the context keys received in each stage - let router_response_context = std::sync::Arc::new(std::sync::Mutex::new(None)); - let router_response_context_clone = router_response_context.clone(); - + // Send the response context into this channel when RouterResponse stage is reached + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + // Handle all coprocessor stages, but modify SubgraphResponse and track RouterResponse Mock::given(method("POST")) .and(path("/")) @@ -532,7 +531,7 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { // Track the context received in RouterResponse let context = body.get("context").and_then(|c| c.get("entries")); if let Some(ctx) = context { - *router_response_context_clone.lock().unwrap() = Some(ctx.clone()); + tx.try_send(ctx.clone()).unwrap(); } } // For all other stages, just pass through @@ -557,13 +556,10 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { let (_trace_id, response) = router.execute_default_query().await; assert_eq!(response.status(), 200); + let router_response_context = rx.blocking_recv().unwrap(); // Verify that RouterResponse does NOT have "myValue" (it was deleted in SubgraphResponse) assert!( !router_response_context - .lock() - .unwrap() - .as_ref() - .expect("router response context was None") .as_object() .expect("router response context was not an object") .contains_key("myValue") From 313c0cc3d3cda2a39c522b0aa823467f15a1a7e5 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 27 Nov 2025 12:15:59 +0000 Subject: [PATCH 10/13] Update coprocessor.rs --- apollo-router/tests/integration/coprocessor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 4f0c9beb1e..c38dd58d69 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -501,7 +501,7 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { // Send the response context into this channel when RouterResponse stage is reached let (tx, mut rx) = tokio::sync::mpsc::channel(1); - + // Handle all coprocessor stages, but modify SubgraphResponse and track RouterResponse Mock::given(method("POST")) .and(path("/")) From e4d02bddde4503ea7ac1ea298469488ba1e76684 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 27 Nov 2025 13:29:49 +0000 Subject: [PATCH 11/13] Use await instead of blocking_recv --- apollo-router/tests/integration/coprocessor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index c38dd58d69..240d455472 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -556,7 +556,7 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { let (_trace_id, response) = router.execute_default_query().await; assert_eq!(response.status(), 200); - let router_response_context = rx.blocking_recv().unwrap(); + let router_response_context = rx.recv().await.unwrap(); // Verify that RouterResponse does NOT have "myValue" (it was deleted in SubgraphResponse) assert!( !router_response_context From 12ae23541d3b6f6dd43a77ecd1e0dd393d77cd1e Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 27 Nov 2025 13:54:43 +0000 Subject: [PATCH 12/13] Add recv timeout --- apollo-router/tests/integration/coprocessor.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 240d455472..ce542ee3ae 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -556,7 +556,12 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { let (_trace_id, response) = router.execute_default_query().await; assert_eq!(response.status(), 200); - let router_response_context = rx.recv().await.unwrap(); + let router_response_context = + tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv()) + .await + .expect("timeout waiting for router response context") + .expect("router response context was not received"); + // Verify that RouterResponse does NOT have "myValue" (it was deleted in SubgraphResponse) assert!( !router_response_context From dc89f3e6749a778cb085783379595b13a796174f Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 27 Nov 2025 15:45:08 +0000 Subject: [PATCH 13/13] Ensure test actually sends/receives context from coprocessor --- .../tests/integration/coprocessor.rs | 61 ++++++++++++------- .../fixtures/coprocessor_context.router.yaml | 33 ++++++++++ 2 files changed, 73 insertions(+), 21 deletions(-) create mode 100644 apollo-router/tests/integration/fixtures/coprocessor_context.router.yaml diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index ce542ee3ae..2d5f8516b4 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -499,8 +499,9 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { let mock_server = wiremock::MockServer::start().await; let coprocessor_address = mock_server.uri(); - // Send the response context into this channel when RouterResponse stage is reached - let (tx, mut rx) = tokio::sync::mpsc::channel(1); + // Track the context keys received in each stage + let router_response_context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let router_response_context_clone = router_response_context.clone(); // Handle all coprocessor stages, but modify SubgraphResponse and track RouterResponse Mock::given(method("POST")) @@ -519,23 +520,43 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { obj.insert("control".to_string(), serde_json::json!("continue")); } - if stage == "SubgraphResponse" { - // Return context without "myValue" (deleted) - if let Some(obj) = response.as_object_mut() - && let Some(ctx) = obj.get_mut("context").and_then(|c| c.as_object_mut()) - && let Some(entries) = ctx.get_mut("entries").and_then(|e| e.as_object_mut()) - { - entries.remove("myValue"); - } + if stage == "RouterRequest" { + // Add a context entry to the router request + response + .as_object_mut() + .expect("response was not an object") + .entry("context") + .or_insert_with(|| serde_json::Value::Object(Default::default())) + .as_object_mut() + .expect("context was not an object") + .entry("entries") + .or_insert_with(|| serde_json::Value::Object(Default::default())) + .as_object_mut() + .expect("entries was not an object") + .insert("k1".to_string(), serde_json::json!("v1")); + } else if stage == "SubgraphResponse" { + // Return context without "k1" (deleted) + response + .as_object_mut() + .expect("response was not an object") + .get_mut("context") + .expect("context was not found") + .as_object_mut() + .expect("context was not an object") + .get_mut("entries") + .expect("entries was not found") + .as_object_mut() + .expect("entries was not an object") + .remove("k1"); } else if stage == "RouterResponse" { // Track the context received in RouterResponse let context = body.get("context").and_then(|c| c.get("entries")); if let Some(ctx) = context { - tx.try_send(ctx.clone()).unwrap(); + *router_response_context_clone.lock().unwrap() = Some(ctx.clone()); } } - // For all other stages, just pass through + // For all other stages, just pass through ResponseTemplate::new(200).set_body_json(response) }) .mount(&mock_server) @@ -543,7 +564,7 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() .config( - include_str!("fixtures/coprocessor.router.yaml") + include_str!("fixtures/coprocessor_context.router.yaml") .replace("", &coprocessor_address), ) .build() @@ -556,18 +577,16 @@ async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { let (_trace_id, response) = router.execute_default_query().await; assert_eq!(response.status(), 200); - let router_response_context = - tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv()) - .await - .expect("timeout waiting for router response context") - .expect("router response context was not received"); - - // Verify that RouterResponse does NOT have "myValue" (it was deleted in SubgraphResponse) + // Verify that RouterResponse does NOT have "k1" (it was deleted in SubgraphResponse) assert!( !router_response_context + .lock() + .unwrap() + .as_ref() + .expect("router response context was None") .as_object() .expect("router response context was not an object") - .contains_key("myValue") + .contains_key("k1") ); router.graceful_shutdown().await; diff --git a/apollo-router/tests/integration/fixtures/coprocessor_context.router.yaml b/apollo-router/tests/integration/fixtures/coprocessor_context.router.yaml new file mode 100644 index 0000000000..368604b57a --- /dev/null +++ b/apollo-router/tests/integration/fixtures/coprocessor_context.router.yaml @@ -0,0 +1,33 @@ +# This coprocessor url will be updated to a test-scoped mock server +# This config enables context for stages that need to test context propagation +coprocessor: + url: "" + router: + request: + body: true + context: all + response: + body: true + context: all + supergraph: + request: + body: true + context: all + response: + body: true + context: all + subgraph: + all: + request: + body: true + context: all + response: + body: true + context: all + execution: + request: + body: true + context: all + response: + body: true + context: all