diff --git a/src/tool/subcommands/api_cmd.rs b/src/tool/subcommands/api_cmd.rs index c837355e91d6..be9d42cd9fb8 100644 --- a/src/tool/subcommands/api_cmd.rs +++ b/src/tool/subcommands/api_cmd.rs @@ -152,6 +152,10 @@ pub enum ApiCommands { /// Report detail level: full (default), failure-only, or summary #[arg(long, value_enum, default_value = "full")] report_mode: ReportMode, + + /// Number of retries for each test + #[arg(long, default_value = "2")] + n_retries: usize, }, /// Generates RPC test snapshots from test dump files and a Forest database. /// @@ -295,6 +299,7 @@ impl ApiCommands { test_criteria_overrides, report_dir, report_mode, + n_retries, } => { let forest = Arc::new(rpc::Client::from_url(forest)); let lotus = Arc::new(rpc::Client::from_url(lotus)); @@ -314,6 +319,7 @@ impl ApiCommands { &test_criteria_overrides, report_dir, report_mode, + n_retries, ) .await?; } diff --git a/src/tool/subcommands/api_cmd/api_compare_tests.rs b/src/tool/subcommands/api_cmd/api_compare_tests.rs index 3c26e0f8ee82..79fca715e3a8 100644 --- a/src/tool/subcommands/api_cmd/api_compare_tests.rs +++ b/src/tool/subcommands/api_cmd/api_compare_tests.rs @@ -45,8 +45,6 @@ use chrono::Utc; use cid::Cid; use fil_actors_shared::fvm_ipld_bitfield::BitField; use fil_actors_shared::v10::runtime::DomainSeparationTag; -use futures::stream::FuturesUnordered; -use futures::stream::StreamExt as _; use fvm_ipld_blockstore::Blockstore; use ipld_core::ipld::Ipld; use itertools::Itertools as _; @@ -68,6 +66,7 @@ use std::{ time::Duration, }; use tokio::sync::Semaphore; +use tokio::task::JoinSet; use tracing::debug; const COLLECTION_SAMPLE_SIZE: usize = 5; @@ -2932,11 +2931,12 @@ pub(super) async fn run_tests( test_criteria_overrides: &[TestCriteriaOverride], report_dir: Option, report_mode: ReportMode, + n_retries: usize, ) -> anyhow::Result<()> { let forest = Into::>::into(forest); let lotus = Into::>::into(lotus); let semaphore = Arc::new(Semaphore::new(max_concurrent_requests)); - let mut futures = FuturesUnordered::new(); + let mut tasks = JoinSet::new(); let filter_list = if let Some(filter_file) = &filter_file { FilterList::new_from_file(filter_file)? @@ -2988,41 +2988,60 @@ pub(super) async fn run_tests( } // Acquire a permit from the semaphore before spawning a test - let permit = semaphore.clone().acquire_owned().await?; + let semaphore = semaphore.clone(); let forest = forest.clone(); let lotus = lotus.clone(); - let future = tokio::spawn(async move { - let test_result = test.run(&forest, &lotus).await; - drop(permit); // Release the permit after test execution - (test, test_result) + let test_criteria_overrides = test_criteria_overrides.to_vec(); + tasks.spawn(async move { + let mut n_retries_left = n_retries; + let mut backoff_secs = 2; + loop { + { + // Ignore the error since 'An acquire operation can only fail if the semaphore has been closed' + let _permit = semaphore.acquire().await; + let test_result = test.run(&forest, &lotus).await; + let success = + evaluate_test_success(&test_result, &test, &test_criteria_overrides); + if success || n_retries_left == 0 { + return (success, test, test_result); + } + // Release the semaphore before sleeping + } + // Sleep before each retry + tokio::time::sleep(Duration::from_secs(backoff_secs)).await; + n_retries_left = n_retries_left.saturating_sub(1); + backoff_secs = backoff_secs.saturating_mul(2); + } }); - - futures.push(future); } // If no tests to run after filtering, return early without saving/printing - if futures.is_empty() { + if tasks.is_empty() { return Ok(()); } - while let Some(Ok((test, test_result))) = futures.next().await { - let method_name = test.request.method_name.clone(); - let success = evaluate_test_success(&test_result, &test, test_criteria_overrides); + while let Some(result) = tasks.join_next().await { + match result { + Ok((success, test, test_result)) => { + let method_name = test.request.method_name.clone(); - report_builder.track_test_result( - method_name.as_ref(), - success, - &test_result, - &test.request.params, - ); + report_builder.track_test_result( + method_name.as_ref(), + success, + &test_result, + &test.request.params, + ); - // Dump test data if configured - if let (Some(dump_dir), Some(test_dump)) = (&dump_dir, &test_result.test_dump) { - dump_test_data(dump_dir, success, test_dump)?; - } + // Dump test data if configured + if let (Some(dump_dir), Some(test_dump)) = (&dump_dir, &test_result.test_dump) { + dump_test_data(dump_dir, success, test_dump)?; + } - if !success && fail_fast { - break; + if !success && fail_fast { + break; + } + } + Err(e) => tracing::warn!("{e}"), } }