diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index caa4cc4530..f576a0e9de 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -202,6 +202,15 @@ The error response will now contain the status code and status name. Example: `H By [@col](https://github.com/col) in https://github.com/apollographql/router/pull/2118 ## 🛠 Maintenance + + +### Refactor APQ ([PR #2129](https://github.com/apollographql/router/pull/2129)) + +Remove duplicated code. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2129 + + ## 📚 Documentation ### Docs: Update cors match regex example ([Issue #2151](https://github.com/apollographql/router/issues/2151)) diff --git a/apollo-router/src/services/layers/apq.rs b/apollo-router/src/services/layers/apq.rs index 60a2088471..6f8f309876 100644 --- a/apollo-router/src/services/layers/apq.rs +++ b/apollo-router/src/services/layers/apq.rs @@ -50,69 +50,9 @@ impl APQLayer { pub(crate) async fn apq_request( &self, - mut request: SupergraphRequest, + request: SupergraphRequest, ) -> Result { - let maybe_query_hash: Option<(String, Vec)> = request - .supergraph_request - .body() - .extensions - .get("persistedQuery") - .and_then(|value| serde_json_bytes::from_value::(value.clone()).ok()) - .and_then(|persisted_query| { - hex::decode(persisted_query.sha256hash.as_bytes()) - .ok() - .map(|decoded| (persisted_query.sha256hash, decoded)) - }); - - let body_query = request.supergraph_request.body().query.clone(); - - match (maybe_query_hash, body_query) { - (Some((query_hash, query_hash_bytes)), Some(query)) => { - if query_matches_hash(query.as_str(), query_hash_bytes.as_slice()) { - tracing::trace!("apq: cache insert"); - let _ = request.context.insert("persisted_query_hit", false); - self.cache.insert(format!("apq|{query_hash}"), query).await; - } else { - tracing::warn!("apq: graphql request doesn't match provided sha256Hash"); - } - Ok(request) - } - (Some((apq_hash, _)), _) => { - if let Ok(cached_query) = - self.cache.get(&format!("apq|{apq_hash}")).await.get().await - { - let _ = request.context.insert("persisted_query_hit", true); - tracing::trace!("apq: cache hit"); - request.supergraph_request.body_mut().query = Some(cached_query); - Ok(request) - } else { - tracing::trace!("apq: cache miss"); - let errors = vec![crate::error::Error { - message: "PersistedQueryNotFound".to_string(), - locations: Default::default(), - path: Default::default(), - extensions: serde_json_bytes::from_value(json!({ - "code": "PERSISTED_QUERY_NOT_FOUND", - "exception": { - "stacktrace": [ - "PersistedQueryNotFoundError: PersistedQueryNotFound", - ], - }, - })) - .unwrap(), - }]; - let res = SupergraphResponse::builder() - .data(Value::default()) - .errors(errors) - .context(request.context) - .build() - .expect("response is valid"); - - Err(res) - } - } - _ => Ok(request), - } + apq_request(&self.cache, request).await } } @@ -136,73 +76,12 @@ where fn layer(&self, service: S) -> Self::Service { let cache = self.cache.clone(); AsyncCheckpointService::new( - move |mut req| { + move |request| { let cache = cache.clone(); Box::pin(async move { - let maybe_query_hash: Option<(String, Vec)> = req - .supergraph_request - .body() - .extensions - .get("persistedQuery") - .and_then(|value| { - serde_json_bytes::from_value::(value.clone()).ok() - }) - .and_then(|persisted_query| { - hex::decode(persisted_query.sha256hash.as_bytes()) - .ok() - .map(|decoded| (persisted_query.sha256hash, decoded)) - }); - - let body_query = req.supergraph_request.body().query.clone(); - - match (maybe_query_hash, body_query) { - (Some((query_hash, query_hash_bytes)), Some(query)) => { - if query_matches_hash(query.as_str(), query_hash_bytes.as_slice()) { - tracing::trace!("apq: cache insert"); - let _ = req.context.insert("persisted_query_hit", false); - cache.insert(format!("apq|{query_hash}"), query).await; - } else { - tracing::warn!( - "apq: graphql request doesn't match provided sha256Hash" - ); - } - Ok(ControlFlow::Continue(req)) - } - (Some((apq_hash, _)), _) => { - if let Ok(cached_query) = - cache.get(&format!("apq|{apq_hash}")).await.get().await - { - let _ = req.context.insert("persisted_query_hit", true); - tracing::trace!("apq: cache hit"); - req.supergraph_request.body_mut().query = Some(cached_query); - Ok(ControlFlow::Continue(req)) - } else { - tracing::trace!("apq: cache miss"); - let errors = vec![crate::error::Error { - message: "PersistedQueryNotFound".to_string(), - locations: Default::default(), - path: Default::default(), - extensions: serde_json_bytes::from_value(json!({ - "code": "PERSISTED_QUERY_NOT_FOUND", - "exception": { - "stacktrace": [ - "PersistedQueryNotFoundError: PersistedQueryNotFound", - ], - }, - })) - .unwrap(), - }]; - let res = SupergraphResponse::builder() - .data(Value::default()) - .errors(errors) - .context(req.context) - .build() - .expect("response is valid"); - - Ok(ControlFlow::Break(res)) - } - } - _ => Ok(ControlFlow::Continue(req)), + match apq_request(&cache, request).await { + Ok(request) => Ok(ControlFlow::Continue(request)), + Err(response) => Ok(ControlFlow::Break(response)), } }) as BoxFuture< @@ -221,6 +100,71 @@ where } } +pub(crate) async fn apq_request( + cache: &DeduplicatingCache, + mut request: SupergraphRequest, +) -> Result { + let maybe_query_hash: Option<(String, Vec)> = request + .supergraph_request + .body() + .extensions + .get("persistedQuery") + .and_then(|value| serde_json_bytes::from_value::(value.clone()).ok()) + .and_then(|persisted_query| { + hex::decode(persisted_query.sha256hash.as_bytes()) + .ok() + .map(|decoded| (persisted_query.sha256hash, decoded)) + }); + + let body_query = request.supergraph_request.body().query.clone(); + + match (maybe_query_hash, body_query) { + (Some((query_hash, query_hash_bytes)), Some(query)) => { + if query_matches_hash(query.as_str(), query_hash_bytes.as_slice()) { + tracing::trace!("apq: cache insert"); + let _ = request.context.insert("persisted_query_hit", false); + cache.insert(format!("apq|{query_hash}"), query).await; + } else { + tracing::warn!("apq: graphql request doesn't match provided sha256Hash"); + } + Ok(request) + } + (Some((apq_hash, _)), _) => { + if let Ok(cached_query) = cache.get(&format!("apq|{apq_hash}")).await.get().await { + let _ = request.context.insert("persisted_query_hit", true); + tracing::trace!("apq: cache hit"); + request.supergraph_request.body_mut().query = Some(cached_query); + Ok(request) + } else { + tracing::trace!("apq: cache miss"); + let errors = vec![crate::error::Error { + message: "PersistedQueryNotFound".to_string(), + locations: Default::default(), + path: Default::default(), + extensions: serde_json_bytes::from_value(json!({ + "code": "PERSISTED_QUERY_NOT_FOUND", + "exception": { + "stacktrace": [ + "PersistedQueryNotFoundError: PersistedQueryNotFound", + ], + }, + })) + .unwrap(), + }]; + let res = SupergraphResponse::builder() + .data(Value::default()) + .errors(errors) + .context(request.context) + .build() + .expect("response is valid"); + + Err(res) + } + } + _ => Ok(request), + } +} + fn query_matches_hash(query: &str, hash: &[u8]) -> bool { let mut digest = Sha256::new(); digest.update(query.as_bytes());