From 1130c0b7cfed72782f33e15834454577c5afc99f Mon Sep 17 00:00:00 2001 From: Christoph Herzog Date: Thu, 29 Aug 2024 18:12:28 +0200 Subject: [PATCH] Reduce overhead of chunk timeouts in wasix package downloads --- Cargo.lock | 1 + lib/wasix/Cargo.toml | 1 + lib/wasix/src/http/reqwest.rs | 51 +++++++++++++++++++++++++++++++++-- 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e809c86d8b..b2bb7020c8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7147,6 +7147,7 @@ dependencies = [ "once_cell", "petgraph", "pin-project", + "pin-utils", "pretty_assertions", "rand", "rayon", diff --git a/lib/wasix/Cargo.toml b/lib/wasix/Cargo.toml index 3893a4f041e..6341663dc8c 100644 --- a/lib/wasix/Cargo.toml +++ b/lib/wasix/Cargo.toml @@ -103,6 +103,7 @@ ahash = "0.8.11" hyper-util = { version = "0.1.5", features = ["server", "server-graceful", "tokio", "service", "client"], optional = true } http-body-util = { version="0.1.1", optional = true } toml = "0.8" +pin-utils = "0.1.0" [target.'cfg(not(target_arch = "riscv64"))'.dependencies.reqwest] workspace = true diff --git a/lib/wasix/src/http/reqwest.rs b/lib/wasix/src/http/reqwest.rs index c3827bdafca..bece975b195 100644 --- a/lib/wasix/src/http/reqwest.rs +++ b/lib/wasix/src/http/reqwest.rs @@ -69,12 +69,59 @@ impl ReqwestHttpClient { let headers = std::mem::take(response.headers_mut()); let status = response.status(); + + // Download the body. let data = if let Some(timeout) = self.response_body_chunk_timeout { + // Download the body with a chunk timeout. + // The timeout prevents long stalls. + let mut stream = response.bytes_stream(); let mut buf = Vec::new(); - while let Some(chunk) = tokio::time::timeout(timeout, stream.try_next()).await?? { - buf.extend_from_slice(&chunk); + + // Creating tokio timeouts has overhead, so instead of a fresh + // timeout per chunk a shared timeout is used, and a chunk counter + // is kept. Only if no chunk was downloaded within the timeout a + // timeout error is raised. + 'OUTER: loop { + let timeout = tokio::time::sleep(timeout); + pin_utils::pin_mut!(timeout); + + let mut chunk_count = 0; + + loop { + tokio::select! { + // Biased because the timeout is secondary, + // and chunks should always have priority. + biased; + + res = stream.try_next() => { + match res { + Ok(Some(chunk)) => { + buf.extend_from_slice(&chunk); + chunk_count += 1; + } + Ok(None) => { + break 'OUTER; + } + Err(e) => { + return Err(e.into()); + } + } + } + + _ = &mut timeout => { + if chunk_count == 0 { + return Err(anyhow::anyhow!("Timeout while downloading response body")); + } else { + // Timeout, but chunks were still downloaded, so + // just continue with a fresh timeout. + continue 'OUTER; + } + } + } + } } + buf } else { response.bytes().await?.to_vec()