Skip to content

Commit

Permalink
Reduce overhead of chunk timeouts in wasix package downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
theduke committed Aug 29, 2024
1 parent b8e61d7 commit f187f77
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/wasix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ ahash = "0.8.11"
hyper-util = { version = "0.1.5", features = ["server", "server-graceful", "tokio", "service", "client"], optional = true }
http-body-util = { version="0.1.1", optional = true }
toml = "0.8"
pin-utils = "0.1.0"

[target.'cfg(not(target_arch = "riscv64"))'.dependencies.reqwest]
workspace = true
Expand Down
51 changes: 49 additions & 2 deletions lib/wasix/src/http/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,59 @@ impl ReqwestHttpClient {
let headers = std::mem::take(response.headers_mut());

let status = response.status();

// Download the body.
let data = if let Some(timeout) = self.response_body_chunk_timeout {
// Download the body with a chunk timeout.
// The timeout prevents long stalls.

let mut stream = response.bytes_stream();
let mut buf = Vec::new();
while let Some(chunk) = tokio::time::timeout(timeout, stream.try_next()).await?? {
buf.extend_from_slice(&chunk);

// Creating tokio timeouts has overhead, so instead of a fresh
// timeout per chunk a shared timeout is used, and a chunk counter
// is kept. Only if no chunk was downloaded within the timeout a
// timeout error is raised.
'OUTER: loop {
let timeout = tokio::time::sleep(timeout);
pin_utils::pin_mut!(timeout);

let mut chunk_count = 0;

loop {
tokio::select! {
// Biased because the timeout is secondary,
// and chunks should always have priority.
biased;

res = stream.try_next() => {
match res {
Ok(Some(chunk)) => {
buf.extend_from_slice(&chunk);
chunk_count += 1;
}
Ok(None) => {
break 'OUTER;
}
Err(e) => {
return Err(e.into());
}
}
}

_ = &mut timeout => {
if chunk_count == 0 {
return Err(anyhow::anyhow!("Timeout while downloading response body"));
} else {
// Timeout, but chunks were still downloaded, so
// just continue with a fresh timeout.
continue 'OUTER;
}
}
}
}
}

buf
} else {
response.bytes().await?.to_vec()
Expand Down

0 comments on commit f187f77

Please sign in to comment.