From adc946016616f99fe2642d34245c4ace2e55c821 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sun, 30 Oct 2022 12:10:07 +0100 Subject: [PATCH 1/4] Retry failed PVF prepare jobs - Keep track of the time of failure and the number of failures. - If we get a request to prepare a PVF, retry if: - Enough time has elapsed since a failure, and - The number of failures is not greater than `NUM_PREPARE_RETRIES` Closes #4288 --- node/core/pvf/src/artifacts.rs | 18 ++++- node/core/pvf/src/host.rs | 127 +++++++++++++++++++++++++++------ 2 files changed, 122 insertions(+), 23 deletions(-) diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 49d1be75fed4..138e686c9078 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -103,10 +103,22 @@ pub enum ArtifactState { last_time_needed: SystemTime, }, /// A task to prepare this artifact is scheduled. - Preparing { waiting_for_response: Vec }, + Preparing { + /// List of result senders that are waiting for a response. + waiting_for_response: Vec, + /// The number of times this artifact has failed to prepare. + num_failures: u32, + }, /// The code couldn't be compiled due to an error. Such artifacts /// never reach the executor and stay in the host's memory. - FailedToProcess(PrepareError), + FailedToProcess { + /// Keep track of the last time that processing this artifact failed. + last_time_failed: SystemTime, + /// The number of times this artifact has failed to prepare. + num_failures: u32, + /// The prepare error. + error: PrepareError, + }, } /// A container of all known artifact ids and their states. @@ -150,7 +162,7 @@ impl Artifacts { // See the precondition. always!(self .artifacts - .insert(artifact_id, ArtifactState::Preparing { waiting_for_response }) + .insert(artifact_id, ArtifactState::Preparing { waiting_for_response, num_failures: 0 }) .is_none()); } diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 6670ea48d4ec..f5e6e69e3388 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -48,6 +48,13 @@ pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60); // NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180); +/// The time period after which a failed preparation artifact is considered ready to be retried. Note that we will only +/// retry if another request comes in after this cooldown has passed. +pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60); + +/// The amount of times we will retry failed prepare jobs. +pub const NUM_PREPARE_RETRIES: u32 = 5; + /// An alias to not spell the type for the oneshot sender for the PVF execution result. pub(crate) type ResultSender = oneshot::Sender>; @@ -360,6 +367,8 @@ async fn run( Some(to_host) => to_host, }; + // If the artifact failed before, it could be re-scheduled for preparation here if + // the preparation failure cooldown has elapsed. break_if_fatal!(handle_to_host( &cache_path, &mut artifacts, @@ -376,9 +385,9 @@ async fn run( // Note that preparation always succeeds. // // That's because the error conditions are written into the artifact and will be - // reported at the time of the execution. It potentially, but not necessarily, - // can be scheduled as a result of this function call, in case there are pending - // executions. + // reported at the time of the execution. It potentially, but not necessarily, can + // be scheduled for execution as a result of this function call, in case there are + // pending executions. // // We could be eager in terms of reporting and plumb the result from the preparation // worker but we don't for the sake of simplicity. @@ -432,6 +441,8 @@ async fn handle_to_host( /// Handles PVF prechecking. /// /// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]). +/// +/// If the prepare job failed previously, we may retry it under certain conditions. async fn handle_precheck_pvf( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, @@ -446,11 +457,28 @@ async fn handle_precheck_pvf( *last_time_needed = SystemTime::now(); let _ = result_sender.send(Ok(())); }, - ArtifactState::Preparing { waiting_for_response } => + ArtifactState::Preparing { waiting_for_response, num_failures: _ } => waiting_for_response.push(result_sender), - ArtifactState::FailedToProcess(result) => { - let _ = result_sender.send(PrepareResult::Err(result.clone())); - }, + ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => + if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { + // If we are allowed to retry the failed prepare job, change the state to + // Preparing and re-queue this job. + *state = ArtifactState::Preparing { + waiting_for_response: vec![result_sender], + num_failures: *num_failures, + }; + send_prepare( + prepare_queue, + prepare::ToQueue::Enqueue { + priority: Priority::Normal, + pvf, + compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, + }, + ) + .await?; + } else { + let _ = result_sender.send(PrepareResult::Err(error.clone())); + }, } } else { artifacts.insert_preparing(artifact_id, vec![result_sender]); @@ -469,9 +497,13 @@ async fn handle_precheck_pvf( /// Handles PVF execution. /// -/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there is already a -/// preparation job, we coalesce the two preparation jobs. When preparing for execution, we use a more lenient timeout -/// ([`EXECUTE_COMPILATION_TIMEOUT`]) than when prechecking. +/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a +/// preparation job, we coalesce the two preparation jobs. +/// +/// If the prepare job failed previously, we may retry it under certain conditions. +/// +/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`]) +/// than when prechecking. async fn handle_execute_pvf( cache_path: &Path, artifacts: &mut Artifacts, @@ -491,6 +523,7 @@ async fn handle_execute_pvf( ArtifactState::Prepared { last_time_needed } => { *last_time_needed = SystemTime::now(); + // This artifact has already been prepared, send it to the execute queue. send_execute( execute_queue, execute::ToQueue::Enqueue { @@ -502,11 +535,29 @@ async fn handle_execute_pvf( ) .await?; }, - ArtifactState::Preparing { waiting_for_response: _ } => { + ArtifactState::Preparing { .. } => { awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); }, - ArtifactState::FailedToProcess(error) => { - let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { + if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { + // If we are allowed to retry the failed prepare job, change the state to + // Preparing and re-queue this job. + *state = ArtifactState::Preparing { + waiting_for_response: Vec::new(), + num_failures: *num_failures, + }; + send_prepare( + prepare_queue, + prepare::ToQueue::Enqueue { + priority, + pvf, + compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, + }, + ) + .await?; + } else { + let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + } }, } } else { @@ -523,6 +574,7 @@ async fn handle_execute_pvf( ) .await?; + // Add an execution request that will wait to run after this prepare job has finished. awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); } @@ -543,10 +595,29 @@ async fn handle_heads_up( ArtifactState::Prepared { last_time_needed, .. } => { *last_time_needed = now; }, - ArtifactState::Preparing { waiting_for_response: _ } => { + ArtifactState::Preparing { .. } => { // The artifact is already being prepared, so we don't need to do anything. }, - ArtifactState::FailedToProcess(_) => {}, + ArtifactState::FailedToProcess { last_time_failed, num_failures, error: _ } => { + // TODO: Do we want to retry for heads-up requests? + if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { + // If we are allowed to retry the failed prepare job, change the state to + // Preparing and re-queue this job. + *state = ArtifactState::Preparing { + waiting_for_response: vec![], + num_failures: *num_failures, + }; + send_prepare( + prepare_queue, + prepare::ToQueue::Enqueue { + priority: Priority::Normal, + pvf: active_pvf, + compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, + }, + ) + .await?; + } + }, } } else { // It's not in the artifacts, so we need to enqueue a job to prepare it. @@ -596,20 +667,26 @@ async fn handle_prepare_done( never!("the artifact is already prepared: {:?}", artifact_id); return Ok(()) }, - Some(ArtifactState::FailedToProcess(_)) => { + Some(ArtifactState::FailedToProcess { .. }) => { // The reasoning is similar to the above, the artifact cannot be // processed at this point. never!("the artifact is already processed unsuccessfully: {:?}", artifact_id); return Ok(()) }, - Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state, + Some(state @ ArtifactState::Preparing { .. }) => state, }; - if let ArtifactState::Preparing { waiting_for_response } = state { + let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } = + state + { for result_sender in waiting_for_response.drain(..) { let _ = result_sender.send(result.clone()); } - } + num_failures + } else { + never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed"); + return Ok(()) + }; // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. @@ -641,7 +718,11 @@ async fn handle_prepare_done( *state = match result { Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() }, - Err(error) => ArtifactState::FailedToProcess(error.clone()), + Err(error) => ArtifactState::FailedToProcess { + last_time_failed: SystemTime::now(), + num_failures: *num_failures + 1, + error: error.clone(), + }, }; Ok(()) @@ -704,6 +785,12 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver) { } } +/// Check if the conditions to retry a prepare job have been met. +fn can_retry_prepare_after_failure(last_time_failed: SystemTime, num_failures: u32) -> bool { + SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && + num_failures <= NUM_PREPARE_RETRIES +} + /// A stream that yields a pulse continuously at a given interval. fn pulse_every(interval: std::time::Duration) -> impl futures::Stream { futures::stream::unfold(interval, { From ebd600c7d1625dea102216c70bbb4640bb509fa6 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Wed, 2 Nov 2022 11:23:48 +0100 Subject: [PATCH 2/4] Address review comments --- node/core/pvf/src/artifacts.rs | 2 +- node/core/pvf/src/host.rs | 55 ++++++++++++++++------------------ 2 files changed, 27 insertions(+), 30 deletions(-) diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 138e686c9078..0d764712ea05 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -116,7 +116,7 @@ pub enum ArtifactState { last_time_failed: SystemTime, /// The number of times this artifact has failed to prepare. num_failures: u32, - /// The prepare error. + /// The last error encountered for preparation. error: PrepareError, }, } diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index f5e6e69e3388..341464055188 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -22,6 +22,7 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, + error::PrepareError, execute, metrics::Metrics, prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, @@ -48,8 +49,8 @@ pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60); // NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180); -/// The time period after which a failed preparation artifact is considered ready to be retried. Note that we will only -/// retry if another request comes in after this cooldown has passed. +/// The time period after which a failed preparation artifact is considered ready to be retried. +/// Note that we will only retry if another request comes in after this cooldown has passed. pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60); /// The amount of times we will retry failed prepare jobs. @@ -459,26 +460,11 @@ async fn handle_precheck_pvf( }, ArtifactState::Preparing { waiting_for_response, num_failures: _ } => waiting_for_response.push(result_sender), - ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => - if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { - // If we are allowed to retry the failed prepare job, change the state to - // Preparing and re-queue this job. - *state = ArtifactState::Preparing { - waiting_for_response: vec![result_sender], - num_failures: *num_failures, - }; - send_prepare( - prepare_queue, - prepare::ToQueue::Enqueue { - priority: Priority::Normal, - pvf, - compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, - }, - ) - .await?; - } else { - let _ = result_sender.send(PrepareResult::Err(error.clone())); - }, + ArtifactState::FailedToProcess { error, .. } => { + // Do not retry failed preparation if another pre-check request comes in. We do not retry pre-checking, + // anyway. + let _ = result_sender.send(PrepareResult::Err(error.clone())); + }, } } else { artifacts.insert_preparing(artifact_id, vec![result_sender]); @@ -539,7 +525,7 @@ async fn handle_execute_pvf( awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { - if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { + if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { // If we are allowed to retry the failed prepare job, change the state to // Preparing and re-queue this job. *state = ArtifactState::Preparing { @@ -598,9 +584,8 @@ async fn handle_heads_up( ArtifactState::Preparing { .. } => { // The artifact is already being prepared, so we don't need to do anything. }, - ArtifactState::FailedToProcess { last_time_failed, num_failures, error: _ } => { - // TODO: Do we want to retry for heads-up requests? - if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { + ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { + if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { // If we are allowed to retry the failed prepare job, change the state to // Preparing and re-queue this job. *state = ArtifactState::Preparing { @@ -786,9 +771,21 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver) { } /// Check if the conditions to retry a prepare job have been met. -fn can_retry_prepare_after_failure(last_time_failed: SystemTime, num_failures: u32) -> bool { - SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && - num_failures <= NUM_PREPARE_RETRIES +fn can_retry_prepare_after_failure( + last_time_failed: SystemTime, + num_failures: u32, + error: &PrepareError, +) -> bool { + use PrepareError::*; + match error { + // Gracefully returned an error, so it will probably be reproducible. Don't retry. + Prevalidation(_) | Preparation(_) => false, + // Retry if the retry cooldown has elapsed and if we have already retried less than + // `NUM_PREPARE_RETRIES` times. + Panic(_) | TimedOut | DidNotMakeIt => + SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && + num_failures <= NUM_PREPARE_RETRIES, + } } /// A stream that yields a pulse continuously at a given interval. From 086b2fd60d9402d4ff5c2f238771e4fe25145b40 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 3 Nov 2022 15:45:10 +0100 Subject: [PATCH 3/4] Add tests for PVF preparation retries --- node/core/pvf/src/host.rs | 340 +++++++++++++++++++++++++++++++++----- 1 file changed, 303 insertions(+), 37 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 792464681d61..d41af8926bc3 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -67,6 +67,7 @@ pub(crate) type PrepareResultSender = oneshot::Sender; #[derive(Clone)] pub struct ValidationHost { to_host_tx: mpsc::Sender, + prepare_failure_cooldown: Duration, } impl ValidationHost { @@ -105,7 +106,14 @@ impl ValidationHost { result_tx: ResultSender, ) -> Result<(), String> { self.to_host_tx - .send(ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx }) + .send(ToHost::ExecutePvf(ExecutePvfInputs { + pvf, + execution_timeout, + prepare_failure_cooldown: self.prepare_failure_cooldown, + params, + priority, + result_tx, + })) .await .map_err(|_| "the inner loop hung up".to_string()) } @@ -118,27 +126,28 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> { self.to_host_tx - .send(ToHost::HeadsUp { active_pvfs }) + .send(ToHost::HeadsUp { + active_pvfs, + prepare_failure_cooldown: self.prepare_failure_cooldown, + }) .await .map_err(|_| "the inner loop hung up".to_string()) } } enum ToHost { - PrecheckPvf { - pvf: Pvf, - result_tx: PrepareResultSender, - }, - ExecutePvf { - pvf: Pvf, - execution_timeout: Duration, - params: Vec, - priority: Priority, - result_tx: ResultSender, - }, - HeadsUp { - active_pvfs: Vec, - }, + PrecheckPvf { pvf: Pvf, result_tx: PrepareResultSender }, + ExecutePvf(ExecutePvfInputs), + HeadsUp { active_pvfs: Vec, prepare_failure_cooldown: Duration }, +} + +struct ExecutePvfInputs { + pvf: Pvf, + execution_timeout: Duration, + prepare_failure_cooldown: Duration, + params: Vec, + priority: Priority, + result_tx: ResultSender, } /// Configuration for the validation host. @@ -193,7 +202,8 @@ impl Config { pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future) { let (to_host_tx, to_host_rx) = mpsc::channel(10); - let validation_host = ValidationHost { to_host_tx }; + let validation_host = + ValidationHost { to_host_tx, prepare_failure_cooldown: PREPARE_FAILURE_COOLDOWN }; let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool( metrics.clone(), @@ -417,23 +427,20 @@ async fn handle_to_host( ToHost::PrecheckPvf { pvf, result_tx } => { handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?; }, - ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => { + ToHost::ExecutePvf(inputs) => { handle_execute_pvf( cache_path, artifacts, prepare_queue, execute_queue, awaiting_prepare, - pvf, - execution_timeout, - params, - priority, - result_tx, + inputs, ) .await?; }, - ToHost::HeadsUp { active_pvfs } => { - handle_heads_up(artifacts, prepare_queue, active_pvfs).await?; + ToHost::HeadsUp { active_pvfs, prepare_failure_cooldown } => { + handle_heads_up(artifacts, prepare_queue, active_pvfs, prepare_failure_cooldown) + .await?; }, } @@ -497,12 +504,16 @@ async fn handle_execute_pvf( prepare_queue: &mut mpsc::Sender, execute_queue: &mut mpsc::Sender, awaiting_prepare: &mut AwaitingPrepare, - pvf: Pvf, - execution_timeout: Duration, - params: Vec, - priority: Priority, - result_tx: ResultSender, + inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { + let ExecutePvfInputs { + pvf, + execution_timeout, + prepare_failure_cooldown, + params, + priority, + result_tx, + } = inputs; let artifact_id = pvf.as_artifact_id(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { @@ -526,7 +537,12 @@ async fn handle_execute_pvf( awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { - if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { + if can_retry_prepare_after_failure( + *last_time_failed, + *num_failures, + error, + prepare_failure_cooldown, + ) { // If we are allowed to retry the failed prepare job, change the state to // Preparing and re-queue this job. *state = ArtifactState::Preparing { @@ -538,7 +554,7 @@ async fn handle_execute_pvf( prepare::ToQueue::Enqueue { priority, pvf, - compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }, ) .await?; @@ -572,6 +588,7 @@ async fn handle_heads_up( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, active_pvfs: Vec, + prepare_failure_cooldown: Duration, ) -> Result<(), Fatal> { let now = SystemTime::now(); @@ -586,7 +603,12 @@ async fn handle_heads_up( // The artifact is already being prepared, so we don't need to do anything. }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { - if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { + if can_retry_prepare_after_failure( + *last_time_failed, + *num_failures, + error, + prepare_failure_cooldown, + ) { // If we are allowed to retry the failed prepare job, change the state to // Preparing and re-queue this job. *state = ArtifactState::Preparing { @@ -598,7 +620,7 @@ async fn handle_heads_up( prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf, - compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }, ) .await?; @@ -776,6 +798,7 @@ fn can_retry_prepare_after_failure( last_time_failed: SystemTime, num_failures: u32, error: &PrepareError, + prepare_failure_cooldown: Duration, ) -> bool { use PrepareError::*; match error { @@ -784,7 +807,7 @@ fn can_retry_prepare_after_failure( // Retry if the retry cooldown has elapsed and if we have already retried less than // `NUM_PREPARE_RETRIES` times. Panic(_) | TimedOut | DidNotMakeIt => - SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && + SystemTime::now() >= last_time_failed + prepare_failure_cooldown && num_failures <= NUM_PREPARE_RETRIES, } } @@ -809,6 +832,8 @@ mod tests { const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); + const TEST_PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200); + #[async_std::test] async fn pulse_test() { let pulse = pulse_every(Duration::from_millis(100)); @@ -901,7 +926,7 @@ mod tests { fn host_handle(&mut self) -> ValidationHost { let to_host_tx = self.to_host_tx.take().unwrap(); - ValidationHost { to_host_tx } + ValidationHost { to_host_tx, prepare_failure_cooldown: TEST_PREPARE_FAILURE_COOLDOWN } } async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue { @@ -916,6 +941,25 @@ mod tests { .await } + async fn poll_ensure_to_prepare_queue_is_empty(&mut self) { + use futures_timer::Delay; + + let to_prepare_queue_rx = &mut self.to_prepare_queue_rx; + run_until( + &mut self.run, + async { + futures::select! { + _ = Delay::new(Duration::from_millis(500)).fuse() => (), + _ = to_prepare_queue_rx.next().fuse() => { + panic!("the prepare queue is supposed to be empty") + } + } + } + .boxed(), + ) + .await + } + async fn poll_ensure_to_execute_queue_is_empty(&mut self) { use futures_timer::Delay; @@ -926,7 +970,7 @@ mod tests { futures::select! { _ = Delay::new(Duration::from_millis(500)).fuse() => (), _ = to_execute_queue_rx.next().fuse() => { - panic!("the execute queue supposed to be empty") + panic!("the execute queue is supposed to be empty") } } } @@ -1250,6 +1294,228 @@ mod tests { } } + // Test that multiple prechecking requests do not trigger preparation retries if the first one + // failed. + #[async_std::test] + async fn test_precheck_prepare_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a precheck request that fails. + let (result_tx, _result_rx) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + + // Submit another precheck request. + let (result_tx_2, _result_rx_2) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + + // Submit another precheck request. + let (result_tx_3, _result_rx_3) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap(); + + // Assert the prepare queue is empty - we do not retry for precheck requests. + test.poll_ensure_to_prepare_queue_is_empty().await; + } + + // Test that multiple execution requests trigger preparation retries if the first one failed due + // to a potentially non-reproducible error. + #[async_std::test] + async fn test_execute_prepare_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a execute request that fails. + let (result_tx, _result_rx) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx, + ) + .await + .unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + + // Submit another execute request. + let (result_tx_2, _result_rx_2) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_2, + ) + .await + .unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + + // Submit another execute request. + let (result_tx_3, _result_rx_3) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_3, + ) + .await + .unwrap(); + + // Assert the prepare queue contains the request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + } + + // Test that multiple execution requests don't trigger preparation retries if the first one + // failed due to reproducible error (e.g. Prevalidation). + #[async_std::test] + async fn test_execute_prepare_no_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a execute request that fails. + let (result_tx, _result_rx) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx, + ) + .await + .unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::Prevalidation("reproducible error".into())), + }) + .await + .unwrap(); + + // Submit another execute request. + let (result_tx_2, _result_rx_2) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_2, + ) + .await + .unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + + // Submit another execute request. + let (result_tx_3, _result_rx_3) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf".to_vec(), + Priority::Critical, + result_tx_3, + ) + .await + .unwrap(); + + // Assert the prepare queue is empty - we do not retry for prevalidation errors. + test.poll_ensure_to_prepare_queue_is_empty().await; + } + + // Test that multiple heads-up requests trigger preparation retries if the first one failed. + #[async_std::test] + async fn test_heads_up_prepare_retry() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Submit a heads-up request that fails. + host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send a PrepareError. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + + // Submit another heads-up request. + host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + + // Assert the prepare queue is empty. + test.poll_ensure_to_prepare_queue_is_empty().await; + + // Pause for enough time to reset the cooldown for this failed prepare request. + futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + + // Submit another heads-up request. + host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + + // Assert the prepare queue contains the request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + } + #[async_std::test] async fn cancellation() { let mut test = Builder::default().build(); From 84e57646b12eef469347903e9477540772adb380 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 7 Nov 2022 07:56:11 -0500 Subject: [PATCH 4/4] Remove unnecessary extra `prepare_failure_cooldown` parameter --- node/core/pvf/src/host.rs | 61 +++++++++++---------------------------- 1 file changed, 17 insertions(+), 44 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index d41af8926bc3..5c29072da1c3 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -52,7 +52,10 @@ pub const LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360); /// The time period after which a failed preparation artifact is considered ready to be retried. /// Note that we will only retry if another request comes in after this cooldown has passed. +#[cfg(not(test))] pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60); +#[cfg(test)] +pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200); /// The amount of times we will retry failed prepare jobs. pub const NUM_PREPARE_RETRIES: u32 = 5; @@ -67,7 +70,6 @@ pub(crate) type PrepareResultSender = oneshot::Sender; #[derive(Clone)] pub struct ValidationHost { to_host_tx: mpsc::Sender, - prepare_failure_cooldown: Duration, } impl ValidationHost { @@ -109,7 +111,6 @@ impl ValidationHost { .send(ToHost::ExecutePvf(ExecutePvfInputs { pvf, execution_timeout, - prepare_failure_cooldown: self.prepare_failure_cooldown, params, priority, result_tx, @@ -126,10 +127,7 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> { self.to_host_tx - .send(ToHost::HeadsUp { - active_pvfs, - prepare_failure_cooldown: self.prepare_failure_cooldown, - }) + .send(ToHost::HeadsUp { active_pvfs }) .await .map_err(|_| "the inner loop hung up".to_string()) } @@ -138,13 +136,12 @@ impl ValidationHost { enum ToHost { PrecheckPvf { pvf: Pvf, result_tx: PrepareResultSender }, ExecutePvf(ExecutePvfInputs), - HeadsUp { active_pvfs: Vec, prepare_failure_cooldown: Duration }, + HeadsUp { active_pvfs: Vec }, } struct ExecutePvfInputs { pvf: Pvf, execution_timeout: Duration, - prepare_failure_cooldown: Duration, params: Vec, priority: Priority, result_tx: ResultSender, @@ -202,8 +199,7 @@ impl Config { pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future) { let (to_host_tx, to_host_rx) = mpsc::channel(10); - let validation_host = - ValidationHost { to_host_tx, prepare_failure_cooldown: PREPARE_FAILURE_COOLDOWN }; + let validation_host = ValidationHost { to_host_tx }; let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool( metrics.clone(), @@ -438,10 +434,8 @@ async fn handle_to_host( ) .await?; }, - ToHost::HeadsUp { active_pvfs, prepare_failure_cooldown } => { - handle_heads_up(artifacts, prepare_queue, active_pvfs, prepare_failure_cooldown) - .await?; - }, + ToHost::HeadsUp { active_pvfs } => + handle_heads_up(artifacts, prepare_queue, active_pvfs).await?, } Ok(()) @@ -506,14 +500,7 @@ async fn handle_execute_pvf( awaiting_prepare: &mut AwaitingPrepare, inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { - let ExecutePvfInputs { - pvf, - execution_timeout, - prepare_failure_cooldown, - params, - priority, - result_tx, - } = inputs; + let ExecutePvfInputs { pvf, execution_timeout, params, priority, result_tx } = inputs; let artifact_id = pvf.as_artifact_id(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { @@ -537,12 +524,7 @@ async fn handle_execute_pvf( awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { - if can_retry_prepare_after_failure( - *last_time_failed, - *num_failures, - error, - prepare_failure_cooldown, - ) { + if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { // If we are allowed to retry the failed prepare job, change the state to // Preparing and re-queue this job. *state = ArtifactState::Preparing { @@ -588,7 +570,6 @@ async fn handle_heads_up( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, active_pvfs: Vec, - prepare_failure_cooldown: Duration, ) -> Result<(), Fatal> { let now = SystemTime::now(); @@ -603,12 +584,7 @@ async fn handle_heads_up( // The artifact is already being prepared, so we don't need to do anything. }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { - if can_retry_prepare_after_failure( - *last_time_failed, - *num_failures, - error, - prepare_failure_cooldown, - ) { + if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { // If we are allowed to retry the failed prepare job, change the state to // Preparing and re-queue this job. *state = ArtifactState::Preparing { @@ -798,7 +774,6 @@ fn can_retry_prepare_after_failure( last_time_failed: SystemTime, num_failures: u32, error: &PrepareError, - prepare_failure_cooldown: Duration, ) -> bool { use PrepareError::*; match error { @@ -807,7 +782,7 @@ fn can_retry_prepare_after_failure( // Retry if the retry cooldown has elapsed and if we have already retried less than // `NUM_PREPARE_RETRIES` times. Panic(_) | TimedOut | DidNotMakeIt => - SystemTime::now() >= last_time_failed + prepare_failure_cooldown && + SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && num_failures <= NUM_PREPARE_RETRIES, } } @@ -832,8 +807,6 @@ mod tests { const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); - const TEST_PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200); - #[async_std::test] async fn pulse_test() { let pulse = pulse_every(Duration::from_millis(100)); @@ -926,7 +899,7 @@ mod tests { fn host_handle(&mut self) -> ValidationHost { let to_host_tx = self.to_host_tx.take().unwrap(); - ValidationHost { to_host_tx, prepare_failure_cooldown: TEST_PREPARE_FAILURE_COOLDOWN } + ValidationHost { to_host_tx } } async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue { @@ -1327,7 +1300,7 @@ mod tests { test.poll_ensure_to_prepare_queue_is_empty().await; // Pause for enough time to reset the cooldown for this failed prepare request. - futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another precheck request. let (result_tx_3, _result_rx_3) = oneshot::channel(); @@ -1386,7 +1359,7 @@ mod tests { test.poll_ensure_to_prepare_queue_is_empty().await; // Pause for enough time to reset the cooldown for this failed prepare request. - futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another execute request. let (result_tx_3, _result_rx_3) = oneshot::channel(); @@ -1456,7 +1429,7 @@ mod tests { test.poll_ensure_to_prepare_queue_is_empty().await; // Pause for enough time to reset the cooldown for this failed prepare request. - futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another execute request. let (result_tx_3, _result_rx_3) = oneshot::channel(); @@ -1504,7 +1477,7 @@ mod tests { test.poll_ensure_to_prepare_queue_is_empty().await; // Pause for enough time to reset the cooldown for this failed prepare request. - futures_timer::Delay::new(TEST_PREPARE_FAILURE_COOLDOWN).await; + futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another heads-up request. host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();