Skip to content

Commit a0aa887

Browse files
committed
e2e tests, fix config and adjust naming
1 parent 2fe644d commit a0aa887

File tree

7 files changed

+178
-14
lines changed

7 files changed

+178
-14
lines changed

docs/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,7 +1871,7 @@ Optional per-subgraph configurations that will override the default configuratio
18711871

18721872
|Name|Type|Description|Required|
18731873
|----|----|-----------|--------|
1874-
|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`||yes|
1874+
|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`|||
18751875

18761876
<a name="traffic_shapingsubgraphsadditionalproperties"></a>
18771877
#### traffic\_shaping\.subgraphs\.additionalProperties: object
@@ -1880,9 +1880,9 @@ Optional per-subgraph configurations that will override the default configuratio
18801880

18811881
|Name|Type|Description|Required|
18821882
|----|----|-----------|--------|
1883-
|**dedupe\_enabled**|`boolean`, `null`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>|no|
1884-
|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.<br/>|yes|
1885-
|**request\_timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> "10s"<br/> } else {<br/> "15s"<br/> }<br/>```<br/>|no|
1883+
|**dedupe\_enabled**|`boolean`, `null`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>||
1884+
|**pool\_idle\_timeout**|`string`, `null`|Timeout for idle sockets being kept-alive.<br/>||
1885+
|**request\_timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> "10s"<br/> } else {<br/> "15s"<br/> }<br/>```<br/>||
18861886

18871887
**Additional Properties:** not allowed
18881888

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# yaml-language-server: $schema=../../router-config.schema.json
2+
supergraph:
3+
source: file
4+
path: ../supergraph.graphql
5+
traffic_shaping:
6+
all:
7+
request_timeout: 2s
8+
# Disable deduplication to better hunt for deadlocks in tests
9+
dedupe_enabled: false
10+
subgraphs:
11+
accounts:
12+
request_timeout:
13+
expression: |
14+
if (.request.headers["x-timeout"] == "short") {
15+
"10s"
16+
} else {
17+
"15s"
18+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# yaml-language-server: $schema=../../router-config.schema.json
2+
supergraph:
3+
source: file
4+
path: ../supergraph.graphql
5+
traffic_shaping:
6+
all:
7+
request_timeout: 2s
8+
# Disable deduplication to better hunt for deadlocks in tests
9+
dedupe_enabled: false
10+
subgraphs:
11+
accounts:
12+
request_timeout: 5s

e2e/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ mod probes;
1616
mod supergraph;
1717
#[cfg(test)]
1818
mod testkit;
19+
#[cfg(test)]
20+
mod timeout_per_subgraph;

e2e/src/timeout_per_subgraph.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#[cfg(test)]
2+
mod override_subgraph_urls_e2e_tests {
3+
use ntex::web::test;
4+
use sonic_rs::{from_slice, json, Value};
5+
6+
use crate::testkit::{
7+
init_graphql_request, init_router_from_config_file, wait_for_readiness, SubgraphsServer,
8+
};
9+
10+
#[ntex::test]
11+
async fn should_not_deadlock_when_overriding_subgraph_timeout_statically() {
12+
let _subgraphs_server = SubgraphsServer::start().await;
13+
let app = init_router_from_config_file("configs/timeout_per_subgraph_static.router.yaml")
14+
.await
15+
.unwrap();
16+
wait_for_readiness(&app.app).await;
17+
18+
let req1 = init_graphql_request("{ users { id } }", None);
19+
let req2 = init_graphql_request("{ users { id } }", None);
20+
let req3 = init_graphql_request("{ users { id } }", None);
21+
let req4 = init_graphql_request("{ users { id } }", None);
22+
23+
let (resp1, resp2, resp3, resp4) = tokio::join!(
24+
test::call_service(&app.app, req1.to_request()),
25+
test::call_service(&app.app, req2.to_request()),
26+
test::call_service(&app.app, req3.to_request()),
27+
test::call_service(&app.app, req4.to_request())
28+
);
29+
30+
assert!(resp1.status().is_success(), "Expected 200 OK");
31+
assert!(resp2.status().is_success(), "Expected 200 OK");
32+
assert!(resp3.status().is_success(), "Expected 200 OK");
33+
assert!(resp4.status().is_success(), "Expected 200 OK");
34+
35+
let expected_json = json!({
36+
"data": {
37+
"users": [
38+
{
39+
"id": "1"
40+
},
41+
{
42+
"id": "2"
43+
},
44+
{
45+
"id": "3"
46+
},
47+
{
48+
"id": "4"
49+
},
50+
{
51+
"id": "5"
52+
},
53+
{
54+
"id": "6"
55+
}
56+
]
57+
}
58+
});
59+
60+
for resp in [resp1, resp2, resp3, resp4] {
61+
let body = test::read_body(resp).await;
62+
let json_body: Value = from_slice(&body).unwrap();
63+
assert_eq!(json_body, expected_json);
64+
}
65+
}
66+
67+
#[ntex::test]
68+
async fn should_not_deadlock_when_overriding_subgraph_timeout_dynamically() {
69+
let _subgraphs_server = SubgraphsServer::start().await;
70+
let app = init_router_from_config_file("configs/timeout_per_subgraph_dynamic.router.yaml")
71+
.await
72+
.unwrap();
73+
wait_for_readiness(&app.app).await;
74+
75+
// We want to ensure that concurrent requests with different timeout settings
76+
// do not cause deadlocks. We are not testing the actual timeout duration here.
77+
let req1 = init_graphql_request("{ users { id } }", None).header("x-timeout", "short");
78+
let req2 = init_graphql_request("{ users { id } }", None).header("x-timeout", "long");
79+
let req3 = init_graphql_request("{ users { id } }", None).header("x-timeout", "short");
80+
let req4 = init_graphql_request("{ users { id } }", None).header("x-timeout", "long");
81+
82+
let (resp1, resp2, resp3, resp4) = tokio::join!(
83+
test::call_service(&app.app, req1.to_request()),
84+
test::call_service(&app.app, req2.to_request()),
85+
test::call_service(&app.app, req3.to_request()),
86+
test::call_service(&app.app, req4.to_request())
87+
);
88+
89+
assert!(resp1.status().is_success(), "Expected 200 OK");
90+
assert!(resp2.status().is_success(), "Expected 200 OK");
91+
assert!(resp3.status().is_success(), "Expected 200 OK");
92+
assert!(resp4.status().is_success(), "Expected 200 OK");
93+
94+
let expected_json = json!({
95+
"data": {
96+
"users": [
97+
{
98+
"id": "1"
99+
},
100+
{
101+
"id": "2"
102+
},
103+
{
104+
"id": "3"
105+
},
106+
{
107+
"id": "4"
108+
},
109+
{
110+
"id": "5"
111+
},
112+
{
113+
"id": "6"
114+
}
115+
]
116+
}
117+
});
118+
119+
for resp in [resp1, resp2, resp3, resp4] {
120+
let body = test::read_body(resp).await;
121+
let json_body: Value = from_slice(&body).unwrap();
122+
assert_eq!(json_body, expected_json);
123+
}
124+
}
125+
}

lib/executor/src/executors/map.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type SubgraphEndpoint = String;
4141
type ExecutorsBySubgraphMap =
4242
DashMap<SubgraphName, DashMap<SubgraphEndpoint, SubgraphExecutorBoxedArc>>;
4343
type EndpointsBySubgraphMap = DashMap<SubgraphName, SubgraphEndpoint>;
44-
type ExpressionsBySubgraphMap = HashMap<SubgraphName, VrlProgram>;
44+
type EndpointExpressionsBySubgraphMap = HashMap<SubgraphName, VrlProgram>;
4545
type TimeoutsBySubgraph = DashMap<SubgraphName, DurationOrProgram>;
4646

4747
struct ResolvedSubgraphConfig<'a> {
@@ -55,8 +55,8 @@ pub struct SubgraphExecutorMap {
5555
/// Mapping from subgraph name to endpoint for quick lookup
5656
/// based on supergrah sdl and static overrides from router's config.
5757
static_endpoints_by_subgraph: EndpointsBySubgraphMap,
58-
/// Mapping from subgraph name to VRL expression program
59-
expressions_by_subgraph: ExpressionsBySubgraphMap,
58+
/// Mapping from subgraph name to VRL expression program for endpoint resolution.
59+
endpoint_expressions_by_subgraph: EndpointExpressionsBySubgraphMap,
6060
timeouts_by_subgraph: TimeoutsBySubgraph,
6161
config: Arc<HiveRouterConfig>,
6262
client: Arc<HttpClient>,
@@ -79,7 +79,7 @@ impl SubgraphExecutorMap {
7979
SubgraphExecutorMap {
8080
executors_by_subgraph: Default::default(),
8181
static_endpoints_by_subgraph: Default::default(),
82-
expressions_by_subgraph: Default::default(),
82+
endpoint_expressions_by_subgraph: Default::default(),
8383
config,
8484
client: Arc::new(client),
8585
semaphores_by_origin: Default::default(),
@@ -103,7 +103,8 @@ impl SubgraphExecutorMap {
103103
let endpoint_str = match endpoint_str {
104104
Some(UrlOrExpression::Url(url)) => url,
105105
Some(UrlOrExpression::Expression { expression }) => {
106-
subgraph_executor_map.register_expression(&subgraph_name, expression)?;
106+
subgraph_executor_map
107+
.register_endpoint_expression(&subgraph_name, expression)?;
107108
&original_endpoint_str
108109
}
109110
None => &original_endpoint_str,
@@ -202,7 +203,7 @@ impl SubgraphExecutorMap {
202203
subgraph_name: &str,
203204
client_request: &ClientRequestDetails<'_, '_>,
204205
) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
205-
if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) {
206+
if let Some(expression) = self.endpoint_expressions_by_subgraph.get(subgraph_name) {
206207
let original_url_value = VrlValue::Bytes(Bytes::from(
207208
self.static_endpoints_by_subgraph
208209
.get(subgraph_name)
@@ -274,15 +275,15 @@ impl SubgraphExecutorMap {
274275
/// Registers a VRL expression for the given subgraph name.
275276
/// The expression can later be used to resolve the endpoint URL cheaply,
276277
/// without needing to recompile it every time.
277-
fn register_expression(
278+
fn register_endpoint_expression(
278279
&mut self,
279280
subgraph_name: &str,
280281
expression: &str,
281282
) -> Result<(), SubgraphExecutorError> {
282283
let program = compile_expression(expression, None).map_err(|err| {
283284
SubgraphExecutorError::EndpointExpressionBuild(subgraph_name.to_string(), err)
284285
})?;
285-
self.expressions_by_subgraph
286+
self.endpoint_expressions_by_subgraph
286287
.insert(subgraph_name.to_string(), program);
287288

288289
Ok(())

lib/router-config/src/traffic_shaping.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ pub struct TrafficShapingExecutorSubgraphConfig {
4545
/// Timeout for idle sockets being kept-alive.
4646
#[serde(
4747
deserialize_with = "humantime_serde::deserialize",
48-
serialize_with = "humantime_serde::serialize"
48+
serialize_with = "humantime_serde::serialize",
49+
skip_serializing_if = "Option::is_none",
50+
default = "default_subgraph_pool_idle_timeout"
4951
)]
50-
#[schemars(with = "String")]
52+
#[schemars(with = "Option<String>")]
5153
pub pool_idle_timeout: Option<Duration>,
5254

5355
/// Enables/disables request deduplication to subgraphs.
@@ -118,6 +120,10 @@ pub struct TrafficShapingExecutorGlobalConfig {
118120
pub request_timeout: DurationOrExpression,
119121
}
120122

123+
fn default_subgraph_pool_idle_timeout() -> Option<Duration> {
124+
None
125+
}
126+
121127
fn default_request_timeout() -> DurationOrExpression {
122128
DurationOrExpression::Duration(Duration::from_secs(30))
123129
}

0 commit comments

Comments
 (0)