Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Ensure that coprocessor context keys/values removed in a stage do not re-appear in later stages ([PR #8679](https://github.com/apollographql/router/pull/8679))

Ensure that coprocessor keys that are deleted in a previous stage do not re-appear in later stages

By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8679
8 changes: 8 additions & 0 deletions apollo-router/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ impl Context {
}
}

pub(crate) fn retain(&self, f: impl Fn(&String, &Value) -> bool) {
self.entries.retain(|k, v| f(k, v));
}

pub(crate) fn len(&self) -> usize {
self.entries.len()
}

/// Read only access to the executable document for internal router plugins.
pub(crate) fn executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
self.extensions()
Expand Down
24 changes: 6 additions & 18 deletions apollo-router/src/plugins/coprocessor/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,7 @@ where
}

if let Some(context) = co_processor_output.context {
for (mut key, value) in context.try_into_iter()? {
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
&response_config.context
{
key = context_key_from_deprecated(key);
}
response
.context
.upsert_json_value(key, move |_current| value);
}
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
}

if let Some(headers) = co_processor_output.headers {
Expand Down Expand Up @@ -487,14 +478,11 @@ where
)?;

if let Some(context) = co_processor_output.context {
for (mut key, value) in context.try_into_iter()? {
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
&response_config_context
{
key = context_key_from_deprecated(key);
}
generator_map_context.upsert_json_value(key, move |_current| value);
}
update_context_from_coprocessor(
&generator_map_context,
context,
&response_config_context,
)?;
}

// We return the deferred_response into our stream of response chunks
Expand Down
76 changes: 48 additions & 28 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,47 @@ fn default_response_validation() -> bool {
true
}

/// Update the target context based on the context returned from the coprocessor.
/// This function handles both updates/inserts and deletions:
/// - Keys present in the returned context (with non-null values) are updated/inserted
/// - Keys that were sent to the coprocessor but are missing from the returned context are deleted
pub(crate) fn update_context_from_coprocessor(
target_context: &Context,
context_returned: Context,
context_config: &ContextConf,
) -> Result<(), BoxError> {
// Collect keys that are in the returned context
let mut keys_returned = HashSet::with_capacity(context_returned.len());

for (mut key, value) in context_returned.try_into_iter()? {
// Handle deprecated key names - convert back to actual key names
if let ContextConf::NewContextConf(NewContextConf::Deprecated) = context_config {
key = context_key_from_deprecated(key);
}

keys_returned.insert(key.clone());
target_context.insert_json_value(key, value);
}

// Delete keys that were sent but are missing from the returned context
// If the context config is selective, only delete keys that are in the selective list
match context_config {
ContextConf::NewContextConf(NewContextConf::Selective(context_keys)) => {
target_context.retain(|key, _v| {
if keys_returned.contains(key) {
return true;
} else if context_keys.contains(key) {
return false;
}
true
});
}
_ => target_context.retain(|key, _v| keys_returned.contains(key)),
}

Ok(())
}

fn record_coprocessor_duration(stage: PipelineStep, duration: Duration) {
f64_histogram!(
"apollo.router.operations.coprocessor.duration",
Expand Down Expand Up @@ -1025,16 +1066,7 @@ where
}

if let Some(context) = co_processor_output.context {
for (mut key, value) in context.try_into_iter()? {
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
&response_config.context
{
key = context_key_from_deprecated(key);
}
response
.context
.upsert_json_value(key, move |_current| value);
}
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
}

if let Some(headers) = co_processor_output.headers {
Expand Down Expand Up @@ -1099,14 +1131,11 @@ where
};

if let Some(context) = co_processor_output.context {
for (mut key, value) in context.try_into_iter()? {
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
&context_conf
{
key = context_key_from_deprecated(key);
}
generator_map_context.upsert_json_value(key, move |_current| value);
}
update_context_from_coprocessor(
&generator_map_context,
context,
&context_conf,
)?;
}

// We return the final_bytes into our stream of response chunks
Expand Down Expand Up @@ -1368,16 +1397,7 @@ where
}

if let Some(context) = co_processor_output.context {
for (mut key, value) in context.try_into_iter()? {
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
&response_config.context
{
key = context_key_from_deprecated(key);
}
response
.context
.upsert_json_value(key, move |_current| value);
}
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
}

if let Some(headers) = co_processor_output.headers {
Expand Down
24 changes: 6 additions & 18 deletions apollo-router/src/plugins/coprocessor/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,7 @@ where
}

if let Some(context) = co_processor_output.context {
for (mut key, value) in context.try_into_iter()? {
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
&response_config.context
{
key = context_key_from_deprecated(key);
}
response
.context
.upsert_json_value(key, move |_current| value);
}
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
}

if let Some(headers) = co_processor_output.headers {
Expand Down Expand Up @@ -499,14 +490,11 @@ where
)?;

if let Some(context) = co_processor_output.context {
for (mut key, value) in context.try_into_iter()? {
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
&response_config_context
{
key = context_key_from_deprecated(key);
}
generator_map_context.upsert_json_value(key, move |_current| value);
}
update_context_from_coprocessor(
&generator_map_context,
context,
&response_config_context,
)?;
}

// We return the deferred_response into our stream of response chunks
Expand Down
115 changes: 115 additions & 0 deletions apollo-router/src/plugins/coprocessor/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3999,4 +3999,119 @@ mod tests {
.is_ok()
);
}

// Tests for context key deletion functionality

#[test]
fn test_update_context_from_coprocessor_deletes_missing_keys() {
use crate::Context;
use crate::plugins::coprocessor::update_context_from_coprocessor;

// Create a context with some keys
let target_context = Context::new();
target_context.insert("k1", "v1".to_string()).unwrap();
target_context.insert("k2", "v2".to_string()).unwrap();
target_context.insert("k3", "v3".to_string()).unwrap();

// Coprocessor returns context without k2 (deleted)
let returned_context = Context::new();
returned_context
.insert("k1", "v1_updated".to_string())
.unwrap();
// k2 is missing (deleted)
returned_context.insert("k3", "v3".to_string()).unwrap();

// Update context
update_context_from_coprocessor(
&target_context,
returned_context,
&ContextConf::NewContextConf(NewContextConf::All),
)
.unwrap();

// k1 should be updated
assert_eq!(
target_context.get_json_value("k1"),
Some(serde_json_bytes::json!("v1_updated"))
);
// k2 should be deleted
assert!(!target_context.contains_key("k2"));
// k3 should remain
assert_eq!(
target_context.get_json_value("k3"),
Some(serde_json_bytes::json!("v3"))
);
}

#[test]
fn test_update_context_from_coprocessor_adds_new_keys() {
use crate::Context;
use crate::plugins::coprocessor::update_context_from_coprocessor;

// Create a context with some keys
let target_context = Context::new();
target_context.insert("k1", "v1".to_string()).unwrap();

// Coprocessor returns context with a new key
let returned_context = Context::new();
returned_context
.insert("k1", "v1_updated".to_string())
.unwrap();
returned_context.insert("k2", "v2_new".to_string()).unwrap();

// Update context
update_context_from_coprocessor(
&target_context,
returned_context,
&ContextConf::NewContextConf(NewContextConf::All),
)
.unwrap();

// k1 should be updated
assert_eq!(
target_context.get_json_value("k1"),
Some(serde_json_bytes::json!("v1_updated"))
);
// k2 should be added
assert_eq!(
target_context.get_json_value("k2"),
Some(serde_json_bytes::json!("v2_new"))
);
}

#[test]
fn test_update_context_from_coprocessor_preserves_keys_not_sent() {
use std::collections::HashSet;
use std::sync::Arc;

use crate::Context;
use crate::plugins::coprocessor::update_context_from_coprocessor;

// Create a context with some keys
let target_context = Context::new();
target_context.insert("k1", "v1".to_string()).unwrap();
target_context
.insert("key_not_sent", "preserved_value".to_string())
.unwrap();

// Coprocessor returns context without k1 (deleted)
let returned_context = Context::new();

// Use Selective config to only send "k1", not "key_not_sent"
let selective_keys: HashSet<String> = ["k1".to_string()].into();
let context_config =
ContextConf::NewContextConf(NewContextConf::Selective(Arc::new(selective_keys)));

// Update context
update_context_from_coprocessor(&target_context, returned_context, &context_config)
.unwrap();

// k1 should be deleted (was sent but missing from returned context)
assert!(!target_context.contains_key("k1"));
// key_not_sent should be preserved (wasn't sent to coprocessor)
assert_eq!(
target_context.get_json_value("key_not_sent"),
Some(serde_json_bytes::json!("preserved_value"))
);
}
}
Loading