Skip to content

Commit 7d062f6

Browse files
committed
Introduce .default value as a fallback
1 parent c776475 commit 7d062f6

File tree

2 files changed

+90
-53
lines changed

2 files changed

+90
-53
lines changed

e2e/configs/timeout_per_subgraph_dynamic.router.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ traffic_shaping:
1414
if (.request.headers."x-timeout" == "short") {
1515
"10s"
1616
} else {
17-
"20s"
17+
.default
1818
}

lib/executor/src/executors/map.rs

Lines changed: 89 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub struct SubgraphExecutorMap {
5858
/// Mapping from subgraph name to VRL expression program for endpoint resolution.
5959
endpoint_expressions_by_subgraph: EndpointExpressionsBySubgraphMap,
6060
timeouts_by_subgraph: TimeoutsBySubgraph,
61+
global_timeout: DurationOrProgram,
6162
config: Arc<HiveRouterConfig>,
6263
client: Arc<HttpClient>,
6364
semaphores_by_origin: DashMap<String, Arc<Semaphore>>,
@@ -66,7 +67,7 @@ pub struct SubgraphExecutorMap {
6667
}
6768

6869
impl SubgraphExecutorMap {
69-
pub fn new(config: Arc<HiveRouterConfig>) -> Self {
70+
pub fn new(config: Arc<HiveRouterConfig>, global_timeout: DurationOrProgram) -> Self {
7071
let https = HttpsConnector::new();
7172
let client: HttpClient = Client::builder(TokioExecutor::new())
7273
.pool_timer(TokioTimer::new())
@@ -86,14 +87,20 @@ impl SubgraphExecutorMap {
8687
max_connections_per_host,
8788
in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())),
8889
timeouts_by_subgraph: Default::default(),
90+
global_timeout,
8991
}
9092
}
9193

9294
pub fn from_http_endpoint_map(
9395
subgraph_endpoint_map: HashMap<SubgraphName, SubgraphEndpoint>,
9496
config: Arc<HiveRouterConfig>,
9597
) -> Result<Self, SubgraphExecutorError> {
96-
let mut subgraph_executor_map = SubgraphExecutorMap::new(config.clone());
98+
let global_timeout =
99+
compile_duration_expression(&config.traffic_shaping.all.request_timeout, None)
100+
.map_err(|err| {
101+
SubgraphExecutorError::RequestTimeoutExpressionBuild("global".to_string(), err)
102+
})?;
103+
let mut subgraph_executor_map = SubgraphExecutorMap::new(config.clone(), global_timeout);
97104

98105
for (subgraph_name, original_endpoint_str) in subgraph_endpoint_map.into_iter() {
99106
let endpoint_str = config
@@ -133,10 +140,14 @@ impl SubgraphExecutorMap {
133140
) -> HttpExecutionResponse {
134141
match self.get_or_create_executor(subgraph_name, client_request) {
135142
Ok(Some(executor)) => {
136-
let timeout = self
137-
.timeouts_by_subgraph
138-
.get(subgraph_name)
139-
.map(|t| resolve_duration_prog(t.value(), subgraph_name, client_request));
143+
let timeout = self.timeouts_by_subgraph.get(subgraph_name).map(|t| {
144+
resolve_duration_prog(
145+
t.value(),
146+
subgraph_name,
147+
client_request,
148+
&self.global_timeout,
149+
)
150+
});
140151
match timeout {
141152
Some(Ok(dur)) => executor.execute(execution_request, Some(dur)).await,
142153
Some(Err(err)) => {
@@ -411,66 +422,92 @@ impl SubgraphExecutorMap {
411422
}
412423
}
413424

414-
fn resolve_duration_prog(
425+
/// Converts a VRL value to a Duration
426+
fn vrl_value_to_duration(value: VrlValue, name: &str) -> Result<Duration, SubgraphExecutorError> {
427+
match value {
428+
VrlValue::Integer(i) => {
429+
if i < 0 {
430+
return Err(SubgraphExecutorError::TimeoutExpressionResolution(
431+
name.to_string(),
432+
"Timeout expression resolved to a negative integer".to_string(),
433+
));
434+
}
435+
Ok(std::time::Duration::from_millis(i as u64))
436+
}
437+
VrlValue::Bytes(b) => {
438+
let s = std::str::from_utf8(&b).map_err(|e| {
439+
SubgraphExecutorError::TimeoutExpressionResolution(
440+
name.to_string(),
441+
format!("Failed to parse duration string from bytes: {}", e),
442+
)
443+
})?;
444+
Ok(humantime::parse_duration(s).map_err(|e| {
445+
SubgraphExecutorError::TimeoutExpressionResolution(
446+
name.to_string(),
447+
format!("Failed to parse duration string '{}': {}", s, e),
448+
)
449+
})?)
450+
}
451+
other => Err(SubgraphExecutorError::TimeoutExpressionResolution(
452+
name.to_string(),
453+
format!(
454+
"Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer (ms) or a human-readable duration string.",
455+
other.kind()
456+
),
457+
)),
458+
}
459+
}
460+
461+
/// Resolves a global timeout DurationOrProgram to a concrete Duration.
462+
/// Global timeout programs only receive request context, no default value.
463+
fn resolve_global_timeout(
415464
duration_or_program: &DurationOrProgram,
416-
subgraph_name: &str,
417465
client_request: &ClientRequestDetails<'_, '_>,
418466
) -> Result<Duration, SubgraphExecutorError> {
419467
match duration_or_program {
420468
DurationOrProgram::Duration(dur) => Ok(*dur),
421469
DurationOrProgram::Program(program) => {
422470
let value =
423471
VrlValue::Object(BTreeMap::from([("request".into(), client_request.into())]));
472+
let result = execute_expression_with_value(program, value).map_err(|err| {
473+
SubgraphExecutorError::TimeoutExpressionResolution(
474+
"global".to_string(),
475+
err.to_string(),
476+
)
477+
})?;
478+
vrl_value_to_duration(result, "global")
479+
}
480+
}
481+
}
482+
483+
/// Resolves a subgraph-specific timeout DurationOrProgram to a concrete Duration.
484+
/// Subgraph timeout programs receive request context and the resolved global timeout as default.
485+
fn resolve_duration_prog(
486+
duration_or_program: &DurationOrProgram,
487+
subgraph_name: &str,
488+
client_request: &ClientRequestDetails<'_, '_>,
489+
global_timeout: &DurationOrProgram,
490+
) -> Result<Duration, SubgraphExecutorError> {
491+
match duration_or_program {
492+
DurationOrProgram::Duration(dur) => Ok(*dur),
493+
DurationOrProgram::Program(program) => {
494+
// First resolve the global timeout to a concrete duration
495+
let global_timeout_duration = resolve_global_timeout(global_timeout, client_request)?;
496+
497+
let value = VrlValue::Object(BTreeMap::from([
498+
("request".into(), client_request.into()),
499+
(
500+
"default".into(),
501+
VrlValue::Integer(global_timeout_duration.as_millis() as i64),
502+
),
503+
]));
424504
let result = execute_expression_with_value(program, value).map_err(|err| {
425505
SubgraphExecutorError::TimeoutExpressionResolution(
426506
subgraph_name.to_string(),
427507
err.to_string(),
428508
)
429509
})?;
430-
match result {
431-
VrlValue::Integer(i) => {
432-
if i < 0 {
433-
return Err(SubgraphExecutorError::TimeoutExpressionResolution(
434-
subgraph_name.to_string(),
435-
"Timeout expression resolved to a negative integer".to_string(),
436-
));
437-
}
438-
Ok(std::time::Duration::from_millis(i as u64))
439-
}
440-
VrlValue::Float(f) => {
441-
let f = f.into_inner();
442-
if f < 0.0 {
443-
return Err(SubgraphExecutorError::TimeoutExpressionResolution(
444-
subgraph_name.to_string(),
445-
"Timeout expression resolved to a negative float".to_string(),
446-
));
447-
}
448-
Ok(std::time::Duration::from_millis(f as u64))
449-
}
450-
VrlValue::Bytes(b) => {
451-
let s = std::str::from_utf8(&b).map_err(|e| {
452-
SubgraphExecutorError::TimeoutExpressionResolution(
453-
subgraph_name.to_string(),
454-
format!("Failed to parse duration string from bytes: {}", e),
455-
)
456-
})?;
457-
Ok(humantime::parse_duration(s).map_err(|e| {
458-
SubgraphExecutorError::TimeoutExpressionResolution(
459-
subgraph_name.to_string(),
460-
format!("Failed to parse duration string '{}': {}", s, e),
461-
)
462-
})?)
463-
}
464-
other => {
465-
Err(SubgraphExecutorError::TimeoutExpressionResolution(
466-
subgraph_name.to_string(),
467-
format!(
468-
"Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string.",
469-
other.kind()
470-
),
471-
))
472-
}
473-
}
510+
vrl_value_to_duration(result, subgraph_name)
474511
}
475512
}
476513
}

0 commit comments

Comments
 (0)