diff --git a/.config/forest.dic b/.config/forest.dic index 2d1dfcb9a930..97ab337d0a7f 100644 --- a/.config/forest.dic +++ b/.config/forest.dic @@ -1,7 +1,8 @@ -75 +78 Algorand/M API/M APIs +aria2c args arities arity @@ -81,6 +82,7 @@ libp2p liveness localhost mainnet +MD5 MDNS mempool Merkle @@ -131,6 +133,7 @@ TOML trie truthy TTY +UI uncompress unrepresentable untrusted diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f1a826cfbd3..fdcc6fd10e01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ ### Added +- [#3715](https://github.com/ChainSafe/forest/issues/3715): Implemented parallel HTTP downloads for snapshots with 5 concurrent connections by default (configurable via `FOREST_DOWNLOAD_CONNECTIONS`), bringing significant performance improvements for snapshot downloads (on par with a manual `aria2c -x5`). + ### Changed ### Removed diff --git a/docs/docs/users/knowledge_base/network_upgrades_state_migrations.md b/docs/docs/users/knowledge_base/network_upgrades_state_migrations.md index 3516f66f9e4d..688acce331a8 100644 --- a/docs/docs/users/knowledge_base/network_upgrades_state_migrations.md +++ b/docs/docs/users/knowledge_base/network_upgrades_state_migrations.md @@ -37,10 +37,10 @@ Sample output: location: /archive/mainnet/latest/forest_snapshot_mainnet_2024-08-06_height_415650.forest.car.zst ``` -You see that the snapshot is past the upgrade epoch by ten epochs. You download the snapshot with `aria2c` because it's significantly faster than a raw `curl`. +You see that the snapshot is past the upgrade epoch by ten epochs. You download the snapshot with the in-built tool which is faster than raw `cURL`. ```bash -aria2c -x5 https://forest-archive.chainsafe.dev/latest/mainnet/ +forest-tool snapshot fetch --chain mainnet ``` You start your node with `--import-snapshot ` and enjoy the new, fancy NV23 features. Hooray! diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index 3c399c85c3f5..3845475b2065 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -57,6 +57,7 @@ process. | `FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK` | 1 or true | false | 1 | Whether or not to backfill full tipsets from the p2p network | | `FOREST_STRICT_JSON` | 1 or true | false | 1 | Enable strict JSON validation to detect duplicate keys in RPC requests | | `FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH` | URL or file path | empty | `/var/tmp/forest_snapshot_calibnet.forest.car.zst` | Override snapshot path for `--auto-download-snapshot` | +| `FOREST_DOWNLOAD_CONNECTIONS` | positive integer | 5 | 10 | Number of parallel HTTP connections for downloading snapshots | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/cli_shared/snapshot.rs b/src/cli_shared/snapshot.rs index 5050e4f502dd..c2c776fea2b5 100644 --- a/src/cli_shared/snapshot.rs +++ b/src/cli_shared/snapshot.rs @@ -67,6 +67,8 @@ pub async fn fetch( .date_and_height_and_forest(); let filename = filename(vendor, chain, date, height, forest_format); + tracing::info!("Downloading snapshot: {filename}"); + download_file_with_retry( &url, directory, diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index a836def9f13c..7ceb59a93788 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -718,14 +718,19 @@ async fn maybe_set_snapshot_path( config.client.snapshot_path = Some(path.into()); } _ => { - let url = crate::cli_shared::snapshot::stable_url(vendor, chain)?; - config.client.snapshot_path = Some(url.to_string().into()); + // Resolve the redirect URL to get the actual snapshot URL + // This ensures all chunks download from the same snapshot even if + // a new snapshot is published during the download + let (resolved_url, _num_bytes, filename) = + crate::cli_shared::snapshot::peek(vendor, chain).await?; + tracing::info!("Downloading snapshot: {filename}"); + config.client.snapshot_path = Some(resolved_url.to_string().into()); } } } (true, false, false) => { // we need a snapshot, don't have one, and don't have permission to download one, so ask the user - let (url, num_bytes, _path) = crate::cli_shared::snapshot::peek(vendor, chain) + let (url, num_bytes, filename) = crate::cli_shared::snapshot::peek(vendor, chain) .await .context("couldn't get snapshot size")?; // dialoguer will double-print long lines, so manually print the first clause ourselves, @@ -751,6 +756,7 @@ async fn maybe_set_snapshot_path( "Forest requires a snapshot to sync with the network, but automatic fetching is disabled." ) } + tracing::info!("Downloading snapshot: {filename}"); config.client.snapshot_path = Some(url.to_string().into()); } }; diff --git a/src/lib.rs b/src/lib.rs index 2cb0ca6e71a0..255554fe2d2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,6 +94,7 @@ pub mod doctest_private { version::NetworkVersion, }, utils::io::progress_log::WithProgress, + utils::net::{DownloadFileOption, download_to}, utils::{encoding::blake2b_256, encoding::keccak_256, io::read_toml}, }; } diff --git a/src/utils/net.rs b/src/utils/net.rs index 31659038060c..240ea11a1c08 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -105,7 +105,10 @@ pub async fn reader( } }; - Ok(tokio::io::BufReader::new( + // Use a larger buffer (512KB) for better throughput on large files + const DOWNLOAD_BUFFER_SIZE: usize = 512 * 1024; + Ok(tokio::io::BufReader::with_capacity( + DOWNLOAD_BUFFER_SIZE, WithProgress::wrap_sync_read_with_callback("Loading", stream, content_length, callback) .bytes(), )) diff --git a/src/utils/net/download_file.rs b/src/utils/net/download_file.rs index 776d6dda43be..99441908e8c3 100644 --- a/src/utils/net/download_file.rs +++ b/src/utils/net/download_file.rs @@ -1,19 +1,91 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +//! File download utilities with parallel connection support. +//! +//! This module provides high-performance file downloads similar to `aria2c -x5`, +//! using multiple parallel HTTP connections to download different parts of a file +//! simultaneously. +//! +//! # Configuration +//! +//! The number of parallel connections can be configured via the +//! `FOREST_DOWNLOAD_CONNECTIONS` environment variable: +//! +//! # Example +//! +//! ```no_run +//! use forest::doctest_private::{download_to, DownloadFileOption}; +//! use url::Url; +//! use std::path::Path; +//! +//! # async fn example() -> anyhow::Result<()> { +//! let url = Url::parse("https://example.com/large-file.zst")?; +//! let destination = Path::new("./large-file.zst"); +//! +//! // Download with parallel connections (automatic for Resumable option) +//! download_to(&url, destination, DownloadFileOption::Resumable, None).await?; +//! # Ok(()) +//! # } +//! ``` + use crate::utils::{RetryArgs, net::global_http_client, retry}; -use anyhow::Context as _; +use anyhow::{Context as _, ensure}; use backon::{ExponentialBuilder, Retryable as _}; use base64::{Engine, prelude::BASE64_STANDARD}; +use futures::stream::{self, StreamExt as _, TryStreamExt as _}; +use human_bytes::human_bytes; +use humantime::format_duration; use md5::{Digest as _, Md5}; use std::sync::Arc; use std::{ ffi::OsStr, path::{Path, PathBuf}, - time::Duration, + time::{Duration, Instant}, }; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use url::Url; +/// Number of parallel connections to use for downloads (like aria2c -x flag) +/// Can be overridden with `FOREST_DOWNLOAD_CONNECTIONS` environment variable +fn get_num_download_connections() -> usize { + std::env::var("FOREST_DOWNLOAD_CONNECTIONS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(5) // Default to 5 like aria2c -x5 +} + +/// Generate a temporary download path with `.frdownload` extension +fn gen_tmp_download_path(dst_path: &Path) -> PathBuf { + const DOWNLOAD_EXTENSION: &str = "frdownload"; + let mut path = dst_path.to_path_buf(); + if let Some(ext) = path.extension() { + path.set_extension(format!( + "{}.{DOWNLOAD_EXTENSION}", + ext.to_str().unwrap_or_default() + )); + } else { + path.set_extension(DOWNLOAD_EXTENSION); + } + path +} + +/// Call user-provided callback with progress percentage +fn call_progress_callback( + callback: &Option>, + downloaded: u64, + total_size: u64, +) { + if let Some(cb) = callback { + let progress_pct = if total_size > 0 { + ((downloaded as f64 / total_size as f64) * 100.0) as u8 + } else { + 0 + }; + cb(format!("{progress_pct}%")); + } +} + #[derive(Debug, Copy, Clone)] pub enum DownloadFileOption { NonResumable, @@ -131,8 +203,238 @@ async fn get_content_md5_hash_from_url(url: Url) -> anyhow::Result>, +) -> anyhow::Result { + ensure!( + num_connections > 0, + "Number of connections must be greater than 0" + ); + if !directory.is_dir() { + std::fs::create_dir_all(directory)?; + } + let dst_path = directory.join(filename); + let tmp_dst_path = gen_tmp_download_path(&dst_path); + + let client = global_http_client(); + + // Check if server supports range requests by attempting a small range request. + // We test with an actual range request (bytes=0-0) instead of checking Accept-Ranges + // header because: + // 1. Some servers (especially CDNs with redirects) don't include Accept-Ranges in HEAD + // 2. This follows redirects automatically and tests the final endpoint + // 3. It's the same approach used by aria2c and other download managers + // 4. Only costs 1 byte of bandwidth to verify + let test_response = client + .get(url.clone()) + .header(http::header::RANGE, "bytes=0-0") + .send() + .await?; + + // Server supports ranges if it returns 206 Partial Content + let supports_ranges = test_response.status() == http::StatusCode::PARTIAL_CONTENT; + + // Get the actual file size from Content-Range or Content-Length + let total_size = if supports_ranges { + // Parse Content-Range header: "bytes 0-0/12345" -> 12345 + test_response + .headers() + .get(http::header::CONTENT_RANGE) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.split('/').nth(1)) + .and_then(|s| s.parse::().ok()) + .context("Failed to parse Content-Range header")? + } else { + // Fallback to Content-Length if range not supported + test_response.content_length().unwrap_or(0) + }; + + if !supports_ranges || total_size == 0 { + tracing::info!( + %url, + status = %test_response.status(), + "Server doesn't support range requests, falling back to single connection" + ); + return download_http_single( + url, + directory, + filename, + DownloadFileOption::Resumable, + callback, + ) + .await; + } + + // Create the file and allocate space + let file = tokio::fs::File::create(&tmp_dst_path) + .await + .context("couldn't create destination file")?; + file.set_len(total_size) + .await + .context("couldn't allocate file space")?; + + // Prevent underflow when file is smaller than connection count + // Use at most as many connections as there are bytes + let effective_connections = (num_connections as u64).min(total_size.max(1)); + let chunk_size = total_size / effective_connections; + + tracing::debug!( + %url, + path = %dst_path.display(), + size = %total_size, + connections = %effective_connections, + "downloading with parallel connections" + ); + + // Progress tracking - log every 5 seconds like the forest::progress system + let bytes_downloaded = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let last_logged_bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let last_logged_time = Arc::new(parking_lot::Mutex::new(Instant::now())); + let start_time = Instant::now(); + const UPDATE_FREQUENCY: Duration = Duration::from_secs(5); + + // Download chunks in parallel + let download_tasks = (0..effective_connections).map(|i| { + let client = client.clone(); + let url = url.clone(); + let tmp_path = tmp_dst_path.clone(); + let bytes_downloaded = Arc::clone(&bytes_downloaded); + let last_logged_bytes = Arc::clone(&last_logged_bytes); + let last_logged_time = Arc::clone(&last_logged_time); + let callback = callback.clone(); + + let start = i * chunk_size; + let end = if i == effective_connections - 1 { + total_size - 1 + } else { + ((i + 1) * chunk_size - 1).min(total_size - 1) + }; + + async move { + let range = format!("bytes={}-{}", start, end); + let expected_size = (end - start + 1) as usize; + + // Retry logic for each chunk + let download_chunk = || async { + let response = client + .get(url.clone()) + .header(http::header::RANGE, &range) + .send() + .await?; + + if !response.status().is_success() + && response.status() != http::StatusCode::PARTIAL_CONTENT + { + anyhow::bail!("Failed to download chunk {}: {}", i, response.status()); + } + + // Open file for writing this chunk + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .open(&tmp_path) + .await?; + file.seek(std::io::SeekFrom::Start(start)).await?; + + // Stream bytes and update progress incrementally + let mut stream = response.bytes_stream(); + let mut chunk_bytes_written = 0usize; + + while let Some(chunk_result) = stream.try_next().await? { + // Write this chunk of data + file.write_all(&chunk_result).await?; + chunk_bytes_written += chunk_result.len(); + + // Update global progress counter + let downloaded = bytes_downloaded.fetch_add( + chunk_result.len() as u64, + std::sync::atomic::Ordering::Relaxed, + ) + chunk_result.len() as u64; + + // Log progress every 5 seconds (forest::progress format) + let now = Instant::now(); + let mut last_logged = last_logged_time.lock(); + if (now - *last_logged) > UPDATE_FREQUENCY { + let last_bytes = + last_logged_bytes.load(std::sync::atomic::Ordering::Relaxed); + let elapsed_secs = (now - start_time).as_secs_f64(); + let seconds_since_last = (now - *last_logged).as_secs_f64().max(0.1); + let speed = (downloaded - last_bytes) as f64 / seconds_since_last; + let percent = if total_size > 0 { + downloaded * 100 / total_size + } else { + 0 + }; + + tracing::info!( + target: "forest::progress", + "Loading {} / {}, {}%, {}/s, elapsed time: {}", + human_bytes(downloaded as f64), + human_bytes(total_size as f64), + percent, + human_bytes(speed), + format_duration(Duration::from_secs(elapsed_secs as u64)) + ); + + *last_logged = now; + last_logged_bytes.store(downloaded, std::sync::atomic::Ordering::Relaxed); + } + + // Also call user callback if provided (for RPC state tracking) + call_progress_callback(&callback, downloaded, total_size); + } + + file.flush().await?; + + // Verify we got the expected amount of data + if chunk_bytes_written != expected_size { + anyhow::bail!( + "Chunk {} size mismatch: expected {} bytes, got {}", + i, + expected_size, + chunk_bytes_written + ); + } + + Ok::<_, anyhow::Error>(()) + }; + + download_chunk + .retry(ExponentialBuilder::default().with_max_times(5)) + .await + .with_context(|| format!("Failed to download chunk {} after retries", i)) + } + }); + + // Execute all downloads in parallel and collect results + let results: Vec<_> = stream::iter(download_tasks) + .buffer_unordered(effective_connections as usize) + .collect() + .await; + + // Check if any chunk failed + for (i, result) in results.into_iter().enumerate() { + result.with_context(|| format!("Chunk {} failed", i))?; + } + + // Rename to final destination + tokio::fs::rename(&tmp_dst_path, &dst_path) + .await + .context("couldn't rename file")?; + + tracing::debug!("successfully downloaded file to {}", dst_path.display()); + Ok(dst_path) +} + +/// Download the file at `url` with a single HTTP connection, returning the path to the downloaded file +async fn download_http_single( url: &Url, directory: &Path, filename: &str, @@ -143,34 +445,74 @@ pub async fn download_http( std::fs::create_dir_all(directory)?; } let dst_path = directory.join(filename); + let tmp_dst_path = gen_tmp_download_path(&dst_path); let destination = dst_path.display(); - tracing::info!(%url, %destination, "downloading snapshot"); + tracing::info!(%url, %destination, "downloading with single connection"); let mut reader = crate::utils::net::reader(url.as_str(), option, callback).await?; - let tmp_dst_path = { - // like `crdownload` for the chrome browser - const DOWNLOAD_EXTENSION: &str = "frdownload"; - let mut path = dst_path.clone(); - if let Some(ext) = path.extension() { - path.set_extension(format!( - "{}.{DOWNLOAD_EXTENSION}", - ext.to_str().unwrap_or_default() - )); - } else { - path.set_extension(DOWNLOAD_EXTENSION); - } - path - }; - let mut tempfile = tokio::fs::File::create(&tmp_dst_path) + const WRITE_BUFFER_SIZE: usize = 1024 * 1024; + let file = tokio::fs::File::create(&tmp_dst_path) .await .context("couldn't create destination file")?; + let mut tempfile = tokio::io::BufWriter::with_capacity(WRITE_BUFFER_SIZE, file); tokio::io::copy(&mut reader, &mut tempfile) .await .context("couldn't download file")?; - std::fs::rename(&tmp_dst_path, &dst_path).context("couldn't rename file")?; - + tempfile.flush().await.context("couldn't flush file")?; + tokio::fs::rename(&tmp_dst_path, &dst_path) + .await + .context("couldn't rename file")?; Ok(dst_path) } +/// Download the file at `url` using the global HTTP client (via [`download_http_parallel`] or +/// [`download_http_single`]), returning the path to the downloaded file. +/// +/// Uses [`global_http_client`] for all HTTP requests. +pub async fn download_http( + url: &Url, + directory: &Path, + filename: &str, + option: DownloadFileOption, + callback: Option>, +) -> anyhow::Result { + // Use parallel downloads for Resumable option, single connection otherwise + match option { + DownloadFileOption::Resumable => { + let num_connections = get_num_download_connections(); + + // Try parallel download, fall back to single connection on error + match download_http_parallel( + url, + directory, + filename, + num_connections, + callback.clone(), + ) + .await + { + Ok(path) => Ok(path), + Err(e) => { + tracing::warn!( + "Parallel download failed ({}), falling back to single connection", + e + ); + download_http_single( + url, + directory, + filename, + DownloadFileOption::Resumable, + callback, + ) + .await + } + } + } + DownloadFileOption::NonResumable => { + download_http_single(url, directory, filename, option, callback).await + } + } +} + pub async fn download_file_with_retry( url: &Url, directory: &Path, @@ -217,40 +559,382 @@ pub async fn download_to( #[cfg(test)] mod test { use super::*; + use axum::{ + Router, + body::Body, + extract::Request, + http::{StatusCode, header}, + response::Response, + routing::get, + }; + use std::net::SocketAddr; + use tokio::net::TcpListener; + + /// Test file data with known MD5 hash + const TEST_FILE_CONTENT: &[u8] = b"ph'nglui mglw'nafh Cthulhu R'lyeh wgah'nagl fhtagn ph'nglui mglw'nafh Cthulhu R'lyeh wgah'nagl fhtagn ph'nglui mglw'nafh Cthulhu R'lyeh wgah'nagl fhtagn"; + + /// MD5 hash of `TEST_FILE_CONTENT` (binary) + fn test_file_md5() -> Vec { + let mut hasher = Md5::new(); + hasher.update(TEST_FILE_CONTENT); + hasher.finalize().to_vec() + } + + /// Test server that supports range requests + struct TestServer { + addr: SocketAddr, + shutdown_tx: Option>, + } + + impl TestServer { + /// Start a new test server that serves `TEST_FILE_CONTENT` with range request support + async fn start() -> Self { + Self::start_with_content(TEST_FILE_CONTENT).await + } + + /// Start a new test server with custom content + async fn start_with_content(content: &'static [u8]) -> Self { + let app = Router::new() + .route( + "/test-file", + get(move |req: Request| async move { handle_file_request(req, content).await }), + ) + .route( + "/test-file-no-ranges", + get(move |_req: Request| async move { + // Server that doesn't support range requests + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .header(header::CONTENT_LENGTH, content.len()) + .body(Body::from(content)) + .unwrap() + }), + ) + .route( + "/test-file-with-md5-etag", + get(move |req: Request| async move { + let mut response = handle_file_request(req, content).await; + // Add MD5 hash as ETag (like filecoin-actors.chainsafe.dev) + let mut hasher = Md5::new(); + hasher.update(content); + let md5_hex = hex::encode(hasher.finalize()); + response + .headers_mut() + .insert(header::ETAG, format!("\"{md5_hex}\"").parse().unwrap()); + response + }), + ) + .route( + "/test-file-with-ms-blob-md5", + get(move |req: Request| async move { + let mut response = handle_file_request(req, content).await; + // Add MD5 hash as x-ms-blob-content-md5 (like GitHub releases) + let mut hasher = Md5::new(); + hasher.update(content); + let md5 = hasher.finalize(); + let md5_base64 = BASE64_STANDARD.encode(md5); + response + .headers_mut() + .insert("x-ms-blob-content-md5", md5_base64.parse().unwrap()); + response + }), + ); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + axum::serve(listener, app) + .with_graceful_shutdown(async { + shutdown_rx.await.ok(); + }) + .await + .unwrap(); + }); + + Self { + addr, + shutdown_tx: Some(shutdown_tx), + } + } + + fn url(&self, path: &str) -> Url { + format!("http://{}{}", self.addr, path).parse().unwrap() + } + } + + impl Drop for TestServer { + fn drop(&mut self) { + // Trigger graceful shutdown (best effort, ignore errors) + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + } + } + + /// Handle file requests with range support + async fn handle_file_request(req: Request, content: &'static [u8]) -> Response { + let headers = req.headers(); + let content_len = content.len() as u64; + + // Check if this is a range request + if let Some(range_header) = headers.get(header::RANGE) + && let Ok(range_str) = range_header.to_str() + { + // Parse range header: "bytes=0-0" or "bytes=100-200" + if let Some(range) = range_str.strip_prefix("bytes=") { + let parts: Vec<&str> = range.split('-').collect(); + if parts.len() == 2 { + let start: u64 = parts + .first() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + let end: u64 = parts + .get(1) + .filter(|s| !s.is_empty()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(content_len.saturating_sub(1)); + + // Handle empty content case + if content_len == 0 { + return Response::builder() + .status(StatusCode::RANGE_NOT_SATISFIABLE) + .header(header::CONTENT_RANGE, format!("bytes */{}", content_len)) + .body(Body::empty()) + .unwrap(); + } + + let start = start.min(content_len - 1); + let end = end.min(content_len - 1); + + if start <= end { + // Use .get() instead of direct indexing to safely handle edge cases + if let Some(range_content) = content.get(start as usize..=end as usize) { + return Response::builder() + .status(StatusCode::PARTIAL_CONTENT) + .header(header::CONTENT_TYPE, "application/octet-stream") + .header(header::CONTENT_LENGTH, range_content.len()) + .header( + header::CONTENT_RANGE, + format!("bytes {}-{}/{}", start, end, content_len), + ) + .header(header::ACCEPT_RANGES, "bytes") + .body(Body::from(range_content)) + .unwrap(); + } else { + // Range is out of bounds + return Response::builder() + .status(StatusCode::RANGE_NOT_SATISFIABLE) + .header(header::CONTENT_RANGE, format!("bytes */{}", content_len)) + .body(Body::empty()) + .unwrap(); + } + } + } + } + } + + // Return full content + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .header(header::CONTENT_LENGTH, content_len) + .header(header::ACCEPT_RANGES, "bytes") + .body(Body::from(content)) + .unwrap() + } #[tokio::test] async fn test_get_content_md5_hash_from_url_1() { - let url = "https://filecoin-actors.chainsafe.dev/v15.0.0/builtin-actors-mainnet.car"; - let md5 = get_content_md5_hash_from_url(url.try_into().unwrap()) - .await - .unwrap() - .map(hex::encode); - assert_eq!(md5, Some("676b41e3dd1dc94430084bde84849701".into())) + let server = TestServer::start().await; + let url = server.url("/test-file-with-md5-etag"); + + // This will fail because 127.0.0.1 is not in HOSTS_WITH_MD5_ETAG + let md5 = get_content_md5_hash_from_url(url).await; + assert!( + md5.is_err(), + "Should fail for localhost (not in HOSTS_WITH_MD5_ETAG)" + ); } #[tokio::test] async fn test_get_content_md5_hash_from_url_2() { - let url = "https://github.com/filecoin-project/builtin-actors/releases/download/v15.0.0/builtin-actors-mainnet.car"; - let md5 = get_content_md5_hash_from_url(url.try_into().unwrap()) - .await - .unwrap() - .map(hex::encode); - assert_eq!(md5, Some("676b41e3dd1dc94430084bde84849701".into())) + let server = TestServer::start().await; + let url = server.url("/test-file-with-ms-blob-md5"); + + let md5 = get_content_md5_hash_from_url(url).await.unwrap(); + + assert_eq!(md5, Some(test_file_md5())); } #[tokio::test] async fn test_download_file_with_cache() { + let server = TestServer::start().await; let temp_dir = tempfile::tempdir().unwrap(); - let url = "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst".try_into().unwrap(); + let url = server.url("/test-file-with-ms-blob-md5"); + let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable) .await .unwrap(); assert!(!result.cache_hit); + let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable) .await .unwrap(); assert!(result.cache_hit); } + + #[tokio::test] + async fn test_parallel_download() { + let server = TestServer::start().await; + let temp_dir = tempfile::tempdir().unwrap(); + let url = server.url("/test-file"); + + let result = download_http_parallel( + &url, + temp_dir.path(), + "test-parallel.dat", + 3, // Use 3 connections for testing + None, + ) + .await + .unwrap(); + + assert!(result.exists()); + + // Verify the file is not corrupted by checking its MD5 + let downloaded_md5 = get_file_md5_hash(&result); + assert_eq!(downloaded_md5, Some(test_file_md5())); + } + + #[tokio::test] + async fn test_download_http_uses_parallel() { + let server = TestServer::start().await; + let temp_dir = tempfile::tempdir().unwrap(); + let url = server.url("/test-file"); + + // Test with Resumable option (should use parallel) + let result = download_http( + &url, + temp_dir.path(), + "test-resumable.dat", + DownloadFileOption::Resumable, + None, + ) + .await + .unwrap(); + + assert!(result.exists()); + + // Verify integrity + let downloaded_md5 = get_file_md5_hash(&result); + assert_eq!(downloaded_md5, Some(test_file_md5())); + } + + #[tokio::test] + async fn test_parallel_download_with_progress() { + let server = TestServer::start().await; + let temp_dir = tempfile::tempdir().unwrap(); + let url = server.url("/test-file"); + + // Track progress updates + let progress_updates = Arc::new(parking_lot::Mutex::new(Vec::new())); + let progress_updates_clone = Arc::clone(&progress_updates); + + let callback = Arc::new(move |msg: String| { + progress_updates_clone.lock().push(msg); + }); + + let result = download_http_parallel( + &url, + temp_dir.path(), + "test-progress.dat", + 3, + Some(callback), + ) + .await + .unwrap(); + + assert!(result.exists()); + + // Verify we got progress updates + let updates = progress_updates.lock(); + assert!(!updates.is_empty(), "Should have received progress updates"); + + // Verify progress increases monotonically + let mut last_progress = 0; + for update in updates.iter() { + if let Some(progress_str) = update.strip_suffix('%') + && let Ok(progress) = progress_str.parse::() + { + assert!( + progress >= last_progress, + "Progress should increase: {} < {}", + progress, + last_progress + ); + last_progress = progress; + } + } + + // Should reach 100% for small test files + assert!( + last_progress >= 90, + "Should reach at least 90% progress, got {}", + last_progress + ); + + println!("Progress updates: {:?}", updates); + } + + #[tokio::test] + async fn test_fallback_to_single_connection() { + let server = TestServer::start().await; + let temp_dir = tempfile::tempdir().unwrap(); + // Use the endpoint that doesn't support range requests + let url = server.url("/test-file-no-ranges"); + + // Try to download with parallel (should fallback to single connection) + let result = download_http( + &url, + temp_dir.path(), + "test-fallback.dat", + DownloadFileOption::Resumable, + None, + ) + .await + .unwrap(); + + assert!(result.exists()); + + // Verify content is correct despite fallback + let content = std::fs::read(&result).unwrap(); + assert_eq!(content, TEST_FILE_CONTENT); + } + + #[tokio::test] + async fn test_small_file_with_many_connections() { + // Test edge case: file smaller than connection count + // This tests the underflow prevention when chunk_size would be 0 + let small_content: &[u8] = b"Hi!"; // 3 bytes + let server = TestServer::start_with_content(small_content).await; + let temp_dir = tempfile::tempdir().unwrap(); + let url = server.url("/test-file"); + + // Try to download with more connections than bytes + let result = download_http_parallel(&url, temp_dir.path(), "tiny.dat", 5, None) + .await + .unwrap(); + + assert!(result.exists()); + + // Verify content is correct + let downloaded = std::fs::read(&result).unwrap(); + assert_eq!(downloaded, small_content); + } }