Skip to content

Commit 48c0b6b

Browse files
ardatankamilkisiela
authored andcommitted
feat(router): Subgraph Timeout Configuration
1 parent ca3d24a commit 48c0b6b

File tree

20 files changed

+535
-251
lines changed

20 files changed

+535
-251
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
default: minor
3+
---
4+
5+
# Breaking
6+
7+
Removed `pool_idle_timeout_seconds` from `traffic_shaping`, instead use `pool_idle_timeout` with duration format.
8+
9+
```diff
10+
traffic_shaping:
11+
- pool_idle_timeout_seconds: 30
12+
+ pool_idle_timeout: 30s
13+
```
14+
15+
#540 by @ardatan

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/router/src/pipeline/progressive_override.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
use std::collections::{BTreeMap, HashMap, HashSet};
22

33
use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig};
4-
use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails;
4+
use hive_router_plan_executor::{
5+
execution::client_request_details::ClientRequestDetails, utils::expression::compile_expression,
6+
};
57
use hive_router_query_planner::{
68
graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR},
79
state::supergraph_state::SupergraphState,
810
};
911
use rand::Rng;
1012
use vrl::{
11-
compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue},
13+
compiler::Program as VrlProgram,
14+
compiler::TargetValue as VrlTargetValue,
1215
core::Value as VrlValue,
1316
prelude::{
1417
state::RuntimeState as VrlState, Context as VrlContext, ExpressionError,
1518
TimeZone as VrlTimeZone,
1619
},
17-
stdlib::all as vrl_build_functions,
1820
value::Secrets as VrlSecrets,
1921
};
2022

@@ -126,27 +128,20 @@ impl OverrideLabelsEvaluator {
126128
) -> Result<Self, OverrideLabelsCompileError> {
127129
let mut static_enabled_labels = HashSet::new();
128130
let mut expressions = HashMap::new();
129-
let vrl_functions = vrl_build_functions();
130131

131132
for (label, value) in override_labels_config.iter() {
132133
match value {
133134
LabelOverrideValue::Boolean(true) => {
134135
static_enabled_labels.insert(label.clone());
135136
}
136137
LabelOverrideValue::Expression { expression } => {
137-
let compilation_result =
138-
vrl_compile(expression, &vrl_functions).map_err(|diagnostics| {
139-
OverrideLabelsCompileError {
140-
label: label.clone(),
141-
error: diagnostics
142-
.errors()
143-
.into_iter()
144-
.map(|d| d.code.to_string() + ": " + &d.message)
145-
.collect::<Vec<_>>()
146-
.join(", "),
147-
}
148-
})?;
149-
expressions.insert(label.clone(), compilation_result.program);
138+
let program = compile_expression(expression, None).map_err(|err| {
139+
OverrideLabelsCompileError {
140+
label: label.clone(),
141+
error: err.to_string(),
142+
}
143+
})?;
144+
expressions.insert(label.clone(), program);
150145
}
151146
_ => {} // Skip false booleans
152147
}

docs/README.md

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
|[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.<br/>Default: `{}`<br/>||
1616
|[**query\_planner**](#query_planner)|`object`|Query planning configuration.<br/>Default: `{"allow_expose":false,"timeout":"10s"}`<br/>||
1717
|[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).<br/>||
18-
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`<br/>||
18+
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"},"max_connections_per_host":100}`<br/>||
1919

2020
**Additional Properties:** not allowed
2121
**Example**
@@ -107,9 +107,11 @@ query_planner:
107107
timeout: 10s
108108
supergraph: {}
109109
traffic_shaping:
110-
dedupe_enabled: true
110+
all:
111+
dedupe_enabled: true
112+
pool_idle_timeout: 50s
113+
request_timeout: 30s
111114
max_connections_per_host: 100
112-
pool_idle_timeout_seconds: 50
113115

114116
```
115117

@@ -1808,25 +1810,75 @@ Request timeout for the Hive Console CDN requests.
18081810
<a name="traffic_shaping"></a>
18091811
## traffic\_shaping: object
18101812

1811-
Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
1813+
Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
18121814

18131815

18141816
**Properties**
18151817

18161818
|Name|Type|Description|Required|
18171819
|----|----|-----------|--------|
1818-
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
1820+
|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.<br/>Default: `{"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"}`<br/>||
18191821
|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.<br/>Default: `100`<br/>Format: `"uint"`<br/>Minimum: `0`<br/>||
1820-
|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.<br/>Default: `50`<br/>Format: `"uint64"`<br/>Minimum: `0`<br/>||
1822+
|[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.<br/>||
18211823

18221824
**Additional Properties:** not allowed
18231825
**Example**
18241826

18251827
```yaml
1826-
dedupe_enabled: true
1828+
all:
1829+
dedupe_enabled: true
1830+
pool_idle_timeout: 50s
1831+
request_timeout: 30s
18271832
max_connections_per_host: 100
1828-
pool_idle_timeout_seconds: 50
18291833
18301834
```
18311835

1836+
<a name="traffic_shapingall"></a>
1837+
### traffic\_shaping\.all: object
1838+
1839+
The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
1840+
1841+
1842+
**Properties**
1843+
1844+
|Name|Type|Description|Required|
1845+
|----|----|-----------|--------|
1846+
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
1847+
|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.<br/>Default: `"50s"`<br/>||
1848+
|**request\_timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> "10s"<br/> } else {<br/> "15s"<br/> }<br/>```<br/>Default: `"30s"`<br/>||
1849+
1850+
**Additional Properties:** not allowed
1851+
**Example**
1852+
1853+
```yaml
1854+
dedupe_enabled: true
1855+
pool_idle_timeout: 50s
1856+
request_timeout: 30s
1857+
1858+
```
1859+
1860+
<a name="traffic_shapingsubgraphs"></a>
1861+
### traffic\_shaping\.subgraphs: object
1862+
1863+
Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
1864+
1865+
1866+
**Additional Properties**
1867+
1868+
|Name|Type|Description|Required|
1869+
|----|----|-----------|--------|
1870+
|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`||yes|
1871+
1872+
<a name="traffic_shapingsubgraphsadditionalproperties"></a>
1873+
#### traffic\_shaping\.subgraphs\.additionalProperties: object
1874+
1875+
**Properties**
1876+
1877+
|Name|Type|Description|Required|
1878+
|----|----|-----------|--------|
1879+
|**dedupe\_enabled**|`boolean`, `null`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>|no|
1880+
|**pool\_idle\_timeout\_seconds**|`string`|Timeout for idle sockets being kept-alive.<br/>|yes|
1881+
|**request\_timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> "10s"<br/> } else {<br/> "15s"<br/> }<br/>```<br/>|no|
1882+
1883+
**Additional Properties:** not allowed
18321884

lib/executor/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@ hyper-util = { version = "0.1.16", features = [
4444
"http2",
4545
"tokio",
4646
] }
47+
humantime = "2.3.0"
4748
bytes = "1.10.1"
4849
itoa = "1.0.15"
4950
ryu = "1.0.20"
5051
indexmap = "2.10.0"
5152
bumpalo = "3.19.0"
53+
once_cell = "1.21.3"
5254

5355
[dev-dependencies]
5456
subgraphs = { path = "../../bench/subgraphs" }

lib/executor/src/execution/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
rewrites::FetchRewriteExt,
2020
},
2121
executors::{
22-
common::{HttpExecutionRequest, HttpExecutionResponse},
22+
common::{HttpExecutionResponse, SubgraphExecutionRequest},
2323
map::SubgraphExecutorMap,
2424
},
2525
headers::{
@@ -700,7 +700,7 @@ impl<'exec, 'req> Executor<'exec, 'req> {
700700
let variable_refs =
701701
select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
702702

703-
let mut subgraph_request = HttpExecutionRequest {
703+
let mut subgraph_request = SubgraphExecutionRequest {
704704
query: node.operation.document_str.as_str(),
705705
dedupe: self.dedupe_subgraph_requests,
706706
operation_name: node.operation_name.as_deref(),

lib/executor/src/executors/common.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use async_trait::async_trait;
44
use bytes::Bytes;
@@ -9,7 +9,8 @@ use sonic_rs::Value;
99
pub trait SubgraphExecutor {
1010
async fn execute<'a>(
1111
&self,
12-
execution_request: HttpExecutionRequest<'a>,
12+
execution_request: SubgraphExecutionRequest<'a>,
13+
timeout: Option<Duration>,
1314
) -> HttpExecutionResponse;
1415

1516
fn to_boxed_arc<'a>(self) -> Arc<Box<dyn SubgraphExecutor + Send + Sync + 'a>>
@@ -26,7 +27,7 @@ pub type SubgraphExecutorBoxedArc = Arc<Box<SubgraphExecutorType>>;
2627

2728
pub type SubgraphRequestExtensions = HashMap<String, Value>;
2829

29-
pub struct HttpExecutionRequest<'a> {
30+
pub struct SubgraphExecutionRequest<'a> {
3031
pub query: &'a str,
3132
pub dedupe: bool,
3233
pub operation_name: Option<&'a str>,
@@ -37,7 +38,7 @@ pub struct HttpExecutionRequest<'a> {
3738
pub extensions: Option<SubgraphRequestExtensions>,
3839
}
3940

40-
impl HttpExecutionRequest<'_> {
41+
impl SubgraphExecutionRequest<'_> {
4142
pub fn add_request_extensions_field(&mut self, key: String, value: Value) {
4243
self.extensions
4344
.get_or_insert_with(HashMap::new)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use std::time::Duration;
2+
3+
use hive_router_config::traffic_shaping::DurationOrExpression;
4+
use vrl::{compiler::Program as VrlProgram, prelude::Function};
5+
6+
use crate::utils::expression::compile_expression;
7+
8+
pub enum DurationOrProgram {
9+
Duration(Duration),
10+
Program(Box<VrlProgram>),
11+
}
12+
13+
pub fn compile_duration_expression(
14+
duration_or_expr: &DurationOrExpression,
15+
fns: Option<&[Box<dyn Function>]>,
16+
) -> Result<DurationOrProgram, String> {
17+
match duration_or_expr {
18+
DurationOrExpression::Duration(dur) => Ok(DurationOrProgram::Duration(*dur)),
19+
DurationOrExpression::Expression { expression } => {
20+
let program = compile_expression(expression, fns)?;
21+
Ok(DurationOrProgram::Program(Box::new(program)))
22+
}
23+
}
24+
}

lib/executor/src/executors/error.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError};
1+
use vrl::prelude::ExpressionError;
22

33
use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions};
44

@@ -20,6 +20,12 @@ pub enum SubgraphExecutorError {
2020
RequestFailure(String, String),
2121
#[error("Failed to serialize variable \"{0}\": {1}")]
2222
VariablesSerializationFailure(String, String),
23+
#[error("Failed to compile VRL expression for timeout for subgraph '{0}'. Please check your VRL expression for syntax errors. Diagnostic: {1}")]
24+
RequestTimeoutExpressionBuild(String, String),
25+
#[error("Failed to resolve VRL expression for timeout for subgraph '{0}'. Runtime error: {1}")]
26+
TimeoutExpressionResolution(String, String),
27+
#[error("Request to subgraph \"{0}\" timed out after {1} milliseconds")]
28+
RequestTimeout(String, u64),
2329
}
2430

2531
impl From<SubgraphExecutorError> for GraphQLError {
@@ -34,21 +40,6 @@ impl From<SubgraphExecutorError> for GraphQLError {
3440
}
3541

3642
impl SubgraphExecutorError {
37-
pub fn new_endpoint_expression_build(
38-
subgraph_name: String,
39-
diagnostics: DiagnosticList,
40-
) -> Self {
41-
SubgraphExecutorError::EndpointExpressionBuild(
42-
subgraph_name,
43-
diagnostics
44-
.errors()
45-
.into_iter()
46-
.map(|d| d.code.to_string() + ": " + &d.message)
47-
.collect::<Vec<_>>()
48-
.join(", "),
49-
)
50-
}
51-
5243
pub fn new_endpoint_expression_resolution_failure(
5344
subgraph_name: String,
5445
error: ExpressionError,
@@ -76,6 +67,13 @@ impl SubgraphExecutorError {
7667
SubgraphExecutorError::VariablesSerializationFailure(_, _) => {
7768
"SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE"
7869
}
70+
SubgraphExecutorError::TimeoutExpressionResolution(_, _) => {
71+
"SUBGRAPH_TIMEOUT_EXPRESSION_RESOLUTION_FAILURE"
72+
}
73+
SubgraphExecutorError::RequestTimeout(_, _) => "SUBGRAPH_REQUEST_TIMEOUT",
74+
SubgraphExecutorError::RequestTimeoutExpressionBuild(_, _) => {
75+
"SUBGRAPH_TIMEOUT_EXPRESSION_BUILD_FAILURE"
76+
}
7977
}
8078
}
8179
}

0 commit comments

Comments
 (0)