Skip to content

Commit d3e97f3

Browse files
committed
Better impl
1 parent 5e8cef1 commit d3e97f3

File tree

4 files changed

+37
-33
lines changed

4 files changed

+37
-33
lines changed

lib/executor/src/execution/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
rewrites::FetchRewriteExt,
2020
},
2121
executors::{
22-
common::{HttpExecutionRequest, HttpExecutionResponse},
22+
common::{HttpExecutionResponse, SubgraphExecutionRequest},
2323
map::SubgraphExecutorMap,
2424
},
2525
headers::{
@@ -700,7 +700,7 @@ impl<'exec, 'req> Executor<'exec, 'req> {
700700
let variable_refs =
701701
select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
702702

703-
let mut subgraph_request = HttpExecutionRequest {
703+
let mut subgraph_request = SubgraphExecutionRequest {
704704
query: node.operation.document_str.as_str(),
705705
dedupe: self.dedupe_subgraph_requests,
706706
operation_name: node.operation_name.as_deref(),

lib/executor/src/executors/common.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use async_trait::async_trait;
44
use bytes::Bytes;
@@ -9,7 +9,8 @@ use sonic_rs::Value;
99
pub trait SubgraphExecutor {
1010
async fn execute<'a>(
1111
&self,
12-
execution_request: HttpExecutionRequest<'a>,
12+
execution_request: SubgraphExecutionRequest<'a>,
13+
timeout: Option<Duration>,
1314
) -> HttpExecutionResponse;
1415

1516
fn to_boxed_arc<'a>(self) -> Arc<Box<dyn SubgraphExecutor + Send + Sync + 'a>>
@@ -26,7 +27,7 @@ pub type SubgraphExecutorBoxedArc = Arc<Box<SubgraphExecutorType>>;
2627

2728
pub type SubgraphRequestExtensions = HashMap<String, Value>;
2829

29-
pub struct HttpExecutionRequest<'a> {
30+
pub struct SubgraphExecutionRequest<'a> {
3031
pub query: &'a str,
3132
pub dedupe: bool,
3233
pub operation_name: Option<&'a str>,
@@ -37,7 +38,7 @@ pub struct HttpExecutionRequest<'a> {
3738
pub extensions: Option<SubgraphRequestExtensions>,
3839
}
3940

40-
impl HttpExecutionRequest<'_> {
41+
impl SubgraphExecutionRequest<'_> {
4142
pub fn add_request_extensions_field(&mut self, key: String, value: Value) {
4243
self.extensions
4344
.get_or_insert_with(HashMap::new)

lib/executor/src/executors/http.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use crate::executors::common::HttpExecutionResponse;
45
use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse};
56
use dashmap::DashMap;
7+
use futures::TryFutureExt;
68
use tokio::sync::OnceCell;
79

810
use async_trait::async_trait;
@@ -18,7 +20,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client};
1820
use tokio::sync::Semaphore;
1921
use tracing::debug;
2022

21-
use crate::executors::common::HttpExecutionRequest;
23+
use crate::executors::common::SubgraphExecutionRequest;
2224
use crate::executors::error::SubgraphExecutorError;
2325
use crate::response::graphql_error::GraphQLError;
2426
use crate::utils::consts::CLOSE_BRACE;
@@ -75,7 +77,7 @@ impl HTTPSubgraphExecutor {
7577

7678
fn build_request_body(
7779
&self,
78-
execution_request: &HttpExecutionRequest<'_>,
80+
execution_request: &SubgraphExecutionRequest<'_>,
7981
) -> Result<Vec<u8>, SubgraphExecutorError> {
8082
let mut body = Vec::with_capacity(4096);
8183
body.put(FIRST_QUOTE_STR);
@@ -136,6 +138,7 @@ impl HTTPSubgraphExecutor {
136138
&self,
137139
body: Vec<u8>,
138140
headers: HeaderMap,
141+
timeout: Option<Duration>,
139142
) -> Result<SharedResponse, SubgraphExecutorError> {
140143
let mut req = hyper::Request::builder()
141144
.method(http::Method::POST)
@@ -150,9 +153,22 @@ impl HTTPSubgraphExecutor {
150153

151154
debug!("making http request to {}", self.endpoint.to_string());
152155

153-
let res = self.http_client.request(req).await.map_err(|e| {
156+
let res_fut = self.http_client.request(req).map_err(|e| {
154157
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
155-
})?;
158+
});
159+
160+
let res = if let Some(timeout_duration) = timeout {
161+
tokio::time::timeout(timeout_duration, res_fut)
162+
.await
163+
.map_err(|_| {
164+
SubgraphExecutorError::RequestTimeout(
165+
self.endpoint.to_string(),
166+
timeout_duration.as_secs(),
167+
)
168+
})?
169+
} else {
170+
res_fut.await
171+
}?;
156172

157173
debug!(
158174
"http request to {} completed, status: {}",
@@ -211,7 +227,8 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
211227
#[tracing::instrument(skip_all, fields(subgraph_name = %self.subgraph_name))]
212228
async fn execute<'a>(
213229
&self,
214-
execution_request: HttpExecutionRequest<'a>,
230+
execution_request: SubgraphExecutionRequest<'a>,
231+
timeout: Option<Duration>,
215232
) -> HttpExecutionResponse {
216233
let body = match self.build_request_body(&execution_request) {
217234
Ok(body) => body,
@@ -233,7 +250,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
233250
// This unwrap is safe because the semaphore is never closed during the application's lifecycle.
234251
// `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
235252
let _permit = self.semaphore.acquire().await.unwrap();
236-
return match self._send_request(body, headers).await {
253+
return match self._send_request(body, headers, timeout).await {
237254
Ok(shared_response) => HttpExecutionResponse {
238255
body: shared_response.body,
239256
headers: shared_response.headers,
@@ -265,7 +282,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
265282
// This unwrap is safe because the semaphore is never closed during the application's lifecycle.
266283
// `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
267284
let _permit = self.semaphore.acquire().await.unwrap();
268-
self._send_request(body, headers).await
285+
self._send_request(body, headers, timeout).await
269286
};
270287
// It's important to remove the entry from the map before returning the result.
271288
// This ensures that once the OnceCell is set, no future requests can join it.

lib/executor/src/executors/map.rs

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use crate::{
2121
execution::client_request_details::ClientRequestDetails,
2222
executors::{
2323
common::{
24-
HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
24+
HttpExecutionResponse, SubgraphExecutionRequest, SubgraphExecutor,
25+
SubgraphExecutorBoxedArc,
2526
},
2627
dedupe::{ABuildHasher, SharedResponse},
2728
duration_or_prog::{compile_duration_expression, DurationOrProgram},
@@ -109,7 +110,7 @@ impl SubgraphExecutorMap {
109110
pub async fn execute<'a, 'req>(
110111
&self,
111112
subgraph_name: &str,
112-
execution_request: HttpExecutionRequest<'a>,
113+
execution_request: SubgraphExecutionRequest<'a>,
113114
client_request: &ClientRequestDetails<'a, 'req>,
114115
) -> HttpExecutionResponse {
115116
match self.get_or_create_executor(subgraph_name, client_request) {
@@ -119,30 +120,15 @@ impl SubgraphExecutorMap {
119120
.get(subgraph_name)
120121
.map(|t| resolve_duration_prog(t.value(), subgraph_name, client_request));
121122
match timeout {
122-
Some(Ok(dur)) => tokio::time::timeout(dur, executor.execute(execution_request))
123-
.await
124-
.unwrap_or_else(|_| {
125-
error!(
126-
"Request to subgraph '{}' timed out after {:?}",
127-
subgraph_name, dur,
128-
);
129-
self.internal_server_error_response(
130-
SubgraphExecutorError::RequestTimeout(
131-
subgraph_name.to_string(),
132-
dur.as_millis() as u64,
133-
)
134-
.into(),
135-
subgraph_name,
136-
)
137-
}),
123+
Some(Ok(dur)) => executor.execute(execution_request, Some(dur)).await,
138124
Some(Err(err)) => {
139125
error!(
140-
"Timeout expression resolution error for subgraph '{}': {}",
126+
"Failed to resolve timeout for subgraph '{}': {}",
141127
subgraph_name, err,
142128
);
143129
self.internal_server_error_response(err.into(), subgraph_name)
144130
}
145-
None => executor.execute(execution_request).await,
131+
None => executor.execute(execution_request, None).await,
146132
}
147133
}
148134
Err(err) => {

0 commit comments

Comments
 (0)