diff --git a/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md b/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md new file mode 100644 index 0000000000..8bd434359d --- /dev/null +++ b/.changesets/fix_rohan_b99_coprocessor_context_keys_not_deleted.md @@ -0,0 +1,5 @@ +### Ensure that coprocessor context keys/values removed in a stage do not re-appear in later stages ([PR #8679](https://github.com/apollographql/router/pull/8679)) + +Ensure that coprocessor keys that are deleted in a previous stage do not re-appear in later stages + +By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8679 diff --git a/apollo-router/src/context/mod.rs b/apollo-router/src/context/mod.rs index 5b33b66146..1b76e49583 100644 --- a/apollo-router/src/context/mod.rs +++ b/apollo-router/src/context/mod.rs @@ -255,6 +255,14 @@ impl Context { } } + pub(crate) fn retain(&self, f: impl Fn(&String, &Value) -> bool) { + self.entries.retain(|k, v| f(k, v)); + } + + pub(crate) fn len(&self) -> usize { + self.entries.len() + } + /// Read only access to the executable document for internal router plugins. pub(crate) fn executable_document(&self) -> Option>> { self.extensions() diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index 09f5b6b2cd..2904fa3816 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -409,16 +409,7 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { @@ -487,14 +478,11 @@ where )?; if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config_context - { - key = context_key_from_deprecated(key); - } - generator_map_context.upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &generator_map_context, + context, + &response_config_context, + )?; } // We return the deferred_response into our stream of response chunks diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index bde7e6a64b..6c8c7ee6fc 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -495,6 +495,47 @@ fn default_response_validation() -> bool { true } +/// Update the target context based on the context returned from the coprocessor. +/// This function handles both updates/inserts and deletions: +/// - Keys present in the returned context (with non-null values) are updated/inserted +/// - Keys that were sent to the coprocessor but are missing from the returned context are deleted +pub(crate) fn update_context_from_coprocessor( + target_context: &Context, + context_returned: Context, + context_config: &ContextConf, +) -> Result<(), BoxError> { + // Collect keys that are in the returned context + let mut keys_returned = HashSet::with_capacity(context_returned.len()); + + for (mut key, value) in context_returned.try_into_iter()? { + // Handle deprecated key names - convert back to actual key names + if let ContextConf::NewContextConf(NewContextConf::Deprecated) = context_config { + key = context_key_from_deprecated(key); + } + + keys_returned.insert(key.clone()); + target_context.insert_json_value(key, value); + } + + // Delete keys that were sent but are missing from the returned context + // If the context config is selective, only delete keys that are in the selective list + match context_config { + ContextConf::NewContextConf(NewContextConf::Selective(context_keys)) => { + target_context.retain(|key, _v| { + if keys_returned.contains(key) { + return true; + } else if context_keys.contains(key) { + return false; + } + true + }); + } + _ => target_context.retain(|key, _v| keys_returned.contains(key)), + } + + Ok(()) +} + fn record_coprocessor_duration(stage: PipelineStep, duration: Duration) { f64_histogram!( "apollo.router.operations.coprocessor.duration", @@ -1025,16 +1066,7 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { @@ -1099,14 +1131,11 @@ where }; if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &context_conf - { - key = context_key_from_deprecated(key); - } - generator_map_context.upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &generator_map_context, + context, + &context_conf, + )?; } // We return the final_bytes into our stream of response chunks @@ -1368,16 +1397,7 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index da8020cfa1..600217ac3a 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -413,16 +413,7 @@ where } if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config.context - { - key = context_key_from_deprecated(key); - } - response - .context - .upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor(&response.context, context, &response_config.context)?; } if let Some(headers) = co_processor_output.headers { @@ -499,14 +490,11 @@ where )?; if let Some(context) = co_processor_output.context { - for (mut key, value) in context.try_into_iter()? { - if let ContextConf::NewContextConf(NewContextConf::Deprecated) = - &response_config_context - { - key = context_key_from_deprecated(key); - } - generator_map_context.upsert_json_value(key, move |_current| value); - } + update_context_from_coprocessor( + &generator_map_context, + context, + &response_config_context, + )?; } // We return the deferred_response into our stream of response chunks diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index 753155f77e..054269b985 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -3999,4 +3999,119 @@ mod tests { .is_ok() ); } + + // Tests for context key deletion functionality + + #[test] + fn test_update_context_from_coprocessor_deletes_missing_keys() { + use crate::Context; + use crate::plugins::coprocessor::update_context_from_coprocessor; + + // Create a context with some keys + let target_context = Context::new(); + target_context.insert("k1", "v1".to_string()).unwrap(); + target_context.insert("k2", "v2".to_string()).unwrap(); + target_context.insert("k3", "v3".to_string()).unwrap(); + + // Coprocessor returns context without k2 (deleted) + let returned_context = Context::new(); + returned_context + .insert("k1", "v1_updated".to_string()) + .unwrap(); + // k2 is missing (deleted) + returned_context.insert("k3", "v3".to_string()).unwrap(); + + // Update context + update_context_from_coprocessor( + &target_context, + returned_context, + &ContextConf::NewContextConf(NewContextConf::All), + ) + .unwrap(); + + // k1 should be updated + assert_eq!( + target_context.get_json_value("k1"), + Some(serde_json_bytes::json!("v1_updated")) + ); + // k2 should be deleted + assert!(!target_context.contains_key("k2")); + // k3 should remain + assert_eq!( + target_context.get_json_value("k3"), + Some(serde_json_bytes::json!("v3")) + ); + } + + #[test] + fn test_update_context_from_coprocessor_adds_new_keys() { + use crate::Context; + use crate::plugins::coprocessor::update_context_from_coprocessor; + + // Create a context with some keys + let target_context = Context::new(); + target_context.insert("k1", "v1".to_string()).unwrap(); + + // Coprocessor returns context with a new key + let returned_context = Context::new(); + returned_context + .insert("k1", "v1_updated".to_string()) + .unwrap(); + returned_context.insert("k2", "v2_new".to_string()).unwrap(); + + // Update context + update_context_from_coprocessor( + &target_context, + returned_context, + &ContextConf::NewContextConf(NewContextConf::All), + ) + .unwrap(); + + // k1 should be updated + assert_eq!( + target_context.get_json_value("k1"), + Some(serde_json_bytes::json!("v1_updated")) + ); + // k2 should be added + assert_eq!( + target_context.get_json_value("k2"), + Some(serde_json_bytes::json!("v2_new")) + ); + } + + #[test] + fn test_update_context_from_coprocessor_preserves_keys_not_sent() { + use std::collections::HashSet; + use std::sync::Arc; + + use crate::Context; + use crate::plugins::coprocessor::update_context_from_coprocessor; + + // Create a context with some keys + let target_context = Context::new(); + target_context.insert("k1", "v1".to_string()).unwrap(); + target_context + .insert("key_not_sent", "preserved_value".to_string()) + .unwrap(); + + // Coprocessor returns context without k1 (deleted) + let returned_context = Context::new(); + + // Use Selective config to only send "k1", not "key_not_sent" + let selective_keys: HashSet = ["k1".to_string()].into(); + let context_config = + ContextConf::NewContextConf(NewContextConf::Selective(Arc::new(selective_keys))); + + // Update context + update_context_from_coprocessor(&target_context, returned_context, &context_config) + .unwrap(); + + // k1 should be deleted (was sent but missing from returned context) + assert!(!target_context.contains_key("k1")); + // key_not_sent should be preserved (wasn't sent to coprocessor) + assert_eq!( + target_context.get_json_value("key_not_sent"), + Some(serde_json_bytes::json!("preserved_value")) + ); + } } diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 7bda3487a5..2d5f8516b4 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -489,3 +489,107 @@ mod on_graphql_error_selector { Ok(()) } } + +#[tokio::test(flavor = "multi_thread")] +async fn test_coprocessor_context_key_deletion() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + + let mock_server = wiremock::MockServer::start().await; + let coprocessor_address = mock_server.uri(); + + // Track the context keys received in each stage + let router_response_context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let router_response_context_clone = router_response_context.clone(); + + // Handle all coprocessor stages, but modify SubgraphResponse and track RouterResponse + Mock::given(method("POST")) + .and(path("/")) + .respond_with(move |req: &wiremock::Request| { + let body = req.body_json::().expect("body"); + let stage = body.get("stage").and_then(|s| s.as_str()).unwrap_or(""); + + let mut response = body.clone(); + + // Ensure Request stages have a control field + if stage.ends_with("Request") + && !response.as_object().unwrap().contains_key("control") + && let Some(obj) = response.as_object_mut() + { + obj.insert("control".to_string(), serde_json::json!("continue")); + } + + if stage == "RouterRequest" { + // Add a context entry to the router request + response + .as_object_mut() + .expect("response was not an object") + .entry("context") + .or_insert_with(|| serde_json::Value::Object(Default::default())) + .as_object_mut() + .expect("context was not an object") + .entry("entries") + .or_insert_with(|| serde_json::Value::Object(Default::default())) + .as_object_mut() + .expect("entries was not an object") + .insert("k1".to_string(), serde_json::json!("v1")); + } else if stage == "SubgraphResponse" { + // Return context without "k1" (deleted) + response + .as_object_mut() + .expect("response was not an object") + .get_mut("context") + .expect("context was not found") + .as_object_mut() + .expect("context was not an object") + .get_mut("entries") + .expect("entries was not found") + .as_object_mut() + .expect("entries was not an object") + .remove("k1"); + } else if stage == "RouterResponse" { + // Track the context received in RouterResponse + let context = body.get("context").and_then(|c| c.get("entries")); + if let Some(ctx) = context { + *router_response_context_clone.lock().unwrap() = Some(ctx.clone()); + } + } + + // For all other stages, just pass through + ResponseTemplate::new(200).set_body_json(response) + }) + .mount(&mock_server) + .await; + + let mut router = IntegrationTest::builder() + .config( + include_str!("fixtures/coprocessor_context.router.yaml") + .replace("", &coprocessor_address), + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // Execute a query that will trigger both SubgraphResponse and RouterResponse stages + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + + // Verify that RouterResponse does NOT have "k1" (it was deleted in SubgraphResponse) + assert!( + !router_response_context + .lock() + .unwrap() + .as_ref() + .expect("router response context was None") + .as_object() + .expect("router response context was not an object") + .contains_key("k1") + ); + + router.graceful_shutdown().await; + + Ok(()) +} diff --git a/apollo-router/tests/integration/fixtures/coprocessor_context.router.yaml b/apollo-router/tests/integration/fixtures/coprocessor_context.router.yaml new file mode 100644 index 0000000000..368604b57a --- /dev/null +++ b/apollo-router/tests/integration/fixtures/coprocessor_context.router.yaml @@ -0,0 +1,33 @@ +# This coprocessor url will be updated to a test-scoped mock server +# This config enables context for stages that need to test context propagation +coprocessor: + url: "" + router: + request: + body: true + context: all + response: + body: true + context: all + supergraph: + request: + body: true + context: all + response: + body: true + context: all + subgraph: + all: + request: + body: true + context: all + response: + body: true + context: all + execution: + request: + body: true + context: all + response: + body: true + context: all