Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/tool/subcommands/api_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ pub enum ApiCommands {
/// Report detail level: full (default), failure-only, or summary
#[arg(long, value_enum, default_value = "full")]
report_mode: ReportMode,

/// Number of retries for each test
#[arg(long, default_value = "2")]
n_retries: usize,
},
/// Generates RPC test snapshots from test dump files and a Forest database.
///
Expand Down Expand Up @@ -295,6 +299,7 @@ impl ApiCommands {
test_criteria_overrides,
report_dir,
report_mode,
n_retries,
} => {
let forest = Arc::new(rpc::Client::from_url(forest));
let lotus = Arc::new(rpc::Client::from_url(lotus));
Expand All @@ -314,6 +319,7 @@ impl ApiCommands {
&test_criteria_overrides,
report_dir,
report_mode,
n_retries,
)
.await?;
}
Expand Down
71 changes: 45 additions & 26 deletions src/tool/subcommands/api_cmd/api_compare_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ use chrono::Utc;
use cid::Cid;
use fil_actors_shared::fvm_ipld_bitfield::BitField;
use fil_actors_shared::v10::runtime::DomainSeparationTag;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt as _;
use fvm_ipld_blockstore::Blockstore;
use ipld_core::ipld::Ipld;
use itertools::Itertools as _;
Expand All @@ -68,6 +66,7 @@ use std::{
time::Duration,
};
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::debug;

const COLLECTION_SAMPLE_SIZE: usize = 5;
Expand Down Expand Up @@ -2932,11 +2931,12 @@ pub(super) async fn run_tests(
test_criteria_overrides: &[TestCriteriaOverride],
report_dir: Option<PathBuf>,
report_mode: ReportMode,
n_retries: usize,
) -> anyhow::Result<()> {
let forest = Into::<Arc<rpc::Client>>::into(forest);
let lotus = Into::<Arc<rpc::Client>>::into(lotus);
let semaphore = Arc::new(Semaphore::new(max_concurrent_requests));
let mut futures = FuturesUnordered::new();
let mut tasks = JoinSet::new();

let filter_list = if let Some(filter_file) = &filter_file {
FilterList::new_from_file(filter_file)?
Expand Down Expand Up @@ -2988,41 +2988,60 @@ pub(super) async fn run_tests(
}

// Acquire a permit from the semaphore before spawning a test
let permit = semaphore.clone().acquire_owned().await?;
let semaphore = semaphore.clone();
let forest = forest.clone();
let lotus = lotus.clone();
let future = tokio::spawn(async move {
let test_result = test.run(&forest, &lotus).await;
drop(permit); // Release the permit after test execution
(test, test_result)
let test_criteria_overrides = test_criteria_overrides.to_vec();
tasks.spawn(async move {
let mut n_retries_left = n_retries;
let mut backoff_secs = 2;
loop {
{
// Ignore the error since 'An acquire operation can only fail if the semaphore has been closed'
let _permit = semaphore.acquire().await;
let test_result = test.run(&forest, &lotus).await;
let success =
evaluate_test_success(&test_result, &test, &test_criteria_overrides);
if success || n_retries_left == 0 {
return (success, test, test_result);
}
// Release the semaphore before sleeping
}
// Sleep before each retry
tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
n_retries_left = n_retries_left.saturating_sub(1);
backoff_secs = backoff_secs.saturating_mul(2);
}
});

futures.push(future);
}

// If no tests to run after filtering, return early without saving/printing
if futures.is_empty() {
if tasks.is_empty() {
return Ok(());
}

while let Some(Ok((test, test_result))) = futures.next().await {
let method_name = test.request.method_name.clone();
let success = evaluate_test_success(&test_result, &test, test_criteria_overrides);
while let Some(result) = tasks.join_next().await {
match result {
Ok((success, test, test_result)) => {
let method_name = test.request.method_name.clone();

report_builder.track_test_result(
method_name.as_ref(),
success,
&test_result,
&test.request.params,
);
report_builder.track_test_result(
method_name.as_ref(),
success,
&test_result,
&test.request.params,
);

// Dump test data if configured
if let (Some(dump_dir), Some(test_dump)) = (&dump_dir, &test_result.test_dump) {
dump_test_data(dump_dir, success, test_dump)?;
}
// Dump test data if configured
if let (Some(dump_dir), Some(test_dump)) = (&dump_dir, &test_result.test_dump) {
dump_test_data(dump_dir, success, test_dump)?;
}

if !success && fail_fast {
break;
if !success && fail_fast {
break;
}
}
Err(e) => tracing::warn!("{e}"),
}
}

Expand Down
Loading