From b9d970264b59201e13053844a277092164c8416a Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Fri, 28 Jun 2024 17:58:24 -0500 Subject: [PATCH] [Refactor] Moves worker logic back to SimpleScheduler Worker logic should not be visible to StateManager just yet. In the future this will likely change, but for this phase of the refactor SimpleScheduler should own all information about workers. towards: #359 --- .../src/operation_state_manager.rs | 6 +- .../src/scheduler_state/state_manager.rs | 182 +--------- nativelink-scheduler/src/simple_scheduler.rs | 343 ++++++++++++++---- .../tests/simple_scheduler_test.rs | 15 +- 4 files changed, 292 insertions(+), 254 deletions(-) diff --git a/nativelink-scheduler/src/operation_state_manager.rs b/nativelink-scheduler/src/operation_state_manager.rs index 7baa5f156..5435aca60 100644 --- a/nativelink-scheduler/src/operation_state_manager.rs +++ b/nativelink-scheduler/src/operation_state_manager.rs @@ -98,7 +98,7 @@ pub struct OrderBy { pub type ActionStateResultStream = Pin> + Send>>; #[async_trait] -pub trait ClientStateManager { +pub trait ClientStateManager: Sync + Send + 'static { /// Add a new action to the queue or joins an existing action. async fn add_action( &self, @@ -113,7 +113,7 @@ pub trait ClientStateManager { } #[async_trait] -pub trait WorkerStateManager { +pub trait WorkerStateManager: Sync + Send + 'static { /// Update that state of an operation. /// The worker must also send periodic updates even if the state /// did not change with a modified timestamp in order to prevent @@ -127,7 +127,7 @@ pub trait WorkerStateManager { } #[async_trait] -pub trait MatchingEngineStateManager { +pub trait MatchingEngineStateManager: Sync + Send + 'static { /// Returns a stream of operations that match the filter. async fn filter_operations( &self, diff --git a/nativelink-scheduler/src/scheduler_state/state_manager.rs b/nativelink-scheduler/src/scheduler_state/state_manager.rs index 087fc5e93..554af61ec 100644 --- a/nativelink-scheduler/src/scheduler_state/state_manager.rs +++ b/nativelink-scheduler/src/scheduler_state/state_manager.rs @@ -21,10 +21,9 @@ use async_lock::Mutex; use async_trait::async_trait; use futures::stream; use hashbrown::{HashMap, HashSet}; -use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; +use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata, - OperationId, WorkerId, + ActionInfo, ActionInfoHashKey, ActionStage, ActionState, OperationId, WorkerId, }; use tokio::sync::watch::error::SendError; use tokio::sync::{watch, Notify}; @@ -39,8 +38,6 @@ use crate::scheduler_state::client_action_state_result::ClientActionStateResult; use crate::scheduler_state::completed_action::CompletedAction; use crate::scheduler_state::matching_engine_action_state_result::MatchingEngineActionStateResult; use crate::scheduler_state::metrics::Metrics; -use crate::scheduler_state::workers::Workers; -use crate::worker::WorkerUpdate; #[repr(transparent)] pub(crate) struct StateManager { @@ -52,22 +49,18 @@ impl StateManager { pub(crate) fn new( queued_actions_set: HashSet>, queued_actions: BTreeMap, AwaitedAction>, - workers: Workers, active_actions: HashMap, AwaitedAction>, recently_completed_actions: HashSet, metrics: Arc, - max_job_retries: usize, tasks_change_notify: Arc, ) -> Self { Self { inner: Mutex::new(StateManagerImpl { queued_actions_set, queued_actions, - workers, active_actions, recently_completed_actions, metrics, - max_job_retries, tasks_change_notify, }), } @@ -102,10 +95,6 @@ pub(crate) struct StateManagerImpl { /// Important: `queued_actions_set` and `queued_actions` must be kept in sync. pub(crate) queued_actions: BTreeMap, AwaitedAction>, - /// A `Workers` pool that contains all workers that are available to execute actions in a priority - /// order based on the allocation strategy. - pub(crate) workers: Workers, - /// A map of all actions that are active. A hashmap is used to find actions that are active in /// O(1) time. The key is the `ActionInfo` struct. The value is the `AwaitedAction` struct. pub(crate) active_actions: HashMap, AwaitedAction>, @@ -118,9 +107,6 @@ pub(crate) struct StateManagerImpl { pub(crate) metrics: Arc, - /// Default times a job can retry before failing. - pub(crate) max_job_retries: usize, - /// Notify task<->worker matching engine that work needs to be done. pub(crate) tasks_change_notify: Arc, } @@ -199,123 +185,6 @@ fn mutate_priority(action_info: &mut Arc, priority: i32) { } impl StateManagerImpl { - fn immediate_evict_worker(&mut self, worker_id: &WorkerId, err: Error) { - if let Some(mut worker) = self.workers.remove_worker(worker_id) { - self.metrics.workers_evicted.inc(); - // We don't care if we fail to send message to worker, this is only a best attempt. - let _ = worker.notify_update(WorkerUpdate::Disconnect); - // We create a temporary Vec to avoid doubt about a possible code - // path touching the worker.running_action_infos elsewhere. - for action_info in worker.running_action_infos.drain() { - self.metrics.workers_evicted_with_running_action.inc(); - self.retry_action(&action_info, worker_id, err.clone()); - } - // Note: Calling this multiple times is very cheap, it'll only trigger `do_try_match` once. - self.tasks_change_notify.notify_one(); - } - } - - fn retry_action(&mut self, action_info: &Arc, worker_id: &WorkerId, err: Error) { - match self.active_actions.remove(action_info) { - Some(running_action) => { - let mut awaited_action = running_action; - let send_result = if awaited_action.attempts >= self.max_job_retries { - self.metrics.retry_action_max_attempts_reached.inc(); - Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Completed(ActionResult { - execution_metadata: ExecutionMetadata { - worker: format!("{worker_id}"), - ..ExecutionMetadata::default() - }, - error: Some(err.merge(make_err!( - Code::Internal, - "Job cancelled because it attempted to execute too many times and failed" - ))), - ..ActionResult::default() - }); - awaited_action - .notify_channel - .send(awaited_action.current_state.clone()) - // Do not put the action back in the queue here, as this action attempted to run too many - // times. - } else { - self.metrics.retry_action.inc(); - Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Queued; - let send_result = awaited_action - .notify_channel - .send(awaited_action.current_state.clone()); - self.queued_actions_set.insert(action_info.clone()); - self.queued_actions - .insert(action_info.clone(), awaited_action); - send_result - }; - - if send_result.is_err() { - self.metrics.retry_action_no_more_listeners.inc(); - // Don't remove this task, instead we keep them around for a bit just in case - // the client disconnected and will reconnect and ask for same job to be executed - // again. - event!( - Level::WARN, - ?action_info, - ?worker_id, - "Action has no more listeners during evict_worker()" - ); - } - } - None => { - self.metrics.retry_action_but_action_missing.inc(); - event!( - Level::ERROR, - ?action_info, - ?worker_id, - "Worker stated it was running an action, but it was not in the active_actions" - ); - } - } - } - - /// Notifies the specified worker to run the given action and handles errors by evicting - /// the worker if the notification fails. - /// - /// # Note - /// - /// Intended utility function for matching engine. - /// - /// # Errors - /// - /// This function will return an error if the notification to the worker fails, and in that case, - /// the worker will be immediately evicted from the system. - /// - async fn worker_notify_run_action( - &mut self, - worker_id: WorkerId, - action_info: Arc, - ) -> Result<(), Error> { - if let Some(worker) = self.workers.workers.get_mut(&worker_id) { - let notify_worker_result = - worker.notify_update(WorkerUpdate::RunAction(action_info.clone())); - - if notify_worker_result.is_err() { - event!( - Level::WARN, - ?worker_id, - ?action_info, - ?notify_worker_result, - "Worker command failed, removing worker", - ); - - let err = make_err!( - Code::Internal, - "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}", - ); - - self.immediate_evict_worker(&worker_id, err.clone()); - return Err(err); - } - } - Ok(()) - } - /// Marks the specified action as active, assigns it to the given worker, and updates the /// action stage. This function removes the action from the queue, updates the action's state /// or error, and inserts it into the set of active actions. @@ -404,7 +273,7 @@ impl StateManagerImpl { ?action_info_hash_key, ?worker_id, "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker", - ); + ); return; }; if running_action_worker_id == *worker_id { @@ -428,20 +297,6 @@ impl StateManagerImpl { self.active_actions .insert(action_info.clone(), running_action); - // Clear this action from the current worker. - if let Some(worker) = self.workers.workers.get_mut(worker_id) { - let was_paused = !worker.can_accept_work(); - // This unpauses, but since we're completing with an error, don't - // unpause unless all actions have completed. - worker.complete_action(&action_info); - // Only pause if there's an action still waiting that will unpause. - if (was_paused || due_to_backpressure) && worker.has_actions() { - worker.is_paused = true; - } - } - - // Re-queue the action or fail on max attempts. - self.retry_action(&action_info, worker_id, err); self.tasks_change_notify.notify_one(); } } @@ -567,12 +422,10 @@ impl WorkerStateManager for StateManager { ?action_stage, "Worker sent error while updating action. Removing worker" ); - let err = make_err!( - Code::Internal, - "Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.", - ); - inner.immediate_evict_worker(&worker_id, err.clone()); - return Err(err); + return Err(make_err!( + Code::Internal, + "Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.", + )); } let (action_info, mut running_action) = inner @@ -585,15 +438,14 @@ impl WorkerStateManager for StateManager { if running_action.worker_id != Some(worker_id) { inner.metrics.update_action_from_wrong_worker.inc(); let err = match running_action.worker_id { - Some(running_action_worker_id) => make_err!( - Code::Internal, - "Got a result from a worker that should not be running the action, Removing worker. Expected worker {running_action_worker_id} got worker {worker_id}", - ), + Code::Internal, + "Got a result from a worker that should not be running the action, Removing worker. Expected worker {running_action_worker_id} got worker {worker_id}", + ), None => make_err!( - Code::Internal, - "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}", - ), + Code::Internal, + "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}", + ), }; event!( Level::ERROR, @@ -605,7 +457,6 @@ impl WorkerStateManager for StateManager { ); // First put it back in our active_actions or we will drop the task. inner.active_actions.insert(action_info, running_action); - inner.immediate_evict_worker(&worker_id, err.clone()); return Err(err); } @@ -635,10 +486,6 @@ impl WorkerStateManager for StateManager { state: running_action.current_state, }); - let worker = inner.workers.workers.get_mut(&worker_id).ok_or_else(|| { - make_input_err!("WorkerId '{}' does not exist in workers map", worker_id) - })?; - worker.complete_action(&action_info); inner.tasks_change_notify.notify_one(); Ok(()) } @@ -689,9 +536,6 @@ impl MatchingEngineStateManager for StateManager { if let Some(action_info) = inner.queued_actions_set.get(&operation_id.unique_qualifier) { if let Some(worker_id) = worker_id { let action_info = action_info.clone(); - inner - .worker_notify_run_action(worker_id, action_info.clone()) - .await?; inner .worker_set_as_active(action_info, worker_id, action_stage) .await?; diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 33f9e0122..dd4f261e1 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -65,13 +65,16 @@ const DEFAULT_MAX_JOB_RETRIES: usize = 3; struct SimpleSchedulerImpl { /// The manager responsible for holding the state of actions and workers. - state_manager: StateManager, + state_manager: Arc, /// The duration that actions are kept in recently_completed_actions for. retain_completed_for: Duration, /// Timeout of how long to evict workers if no response in this given amount of time in seconds. worker_timeout_s: u64, /// Default times a job can retry before failing. max_job_retries: usize, + /// A `Workers` pool that contains all workers that are available to execute actions in a priority + /// order based on the allocation strategy. + workers: Workers, metrics: Arc, } @@ -216,12 +219,13 @@ impl SimpleSchedulerImpl { /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it. fn immediate_evict_worker( inner_state: &mut MutexGuard<'_, StateManagerImpl>, + workers: &mut Workers, max_job_retries: usize, metrics: &Metrics, worker_id: &WorkerId, err: Error, ) { - if let Some(mut worker) = inner_state.workers.remove_worker(worker_id) { + if let Some(mut worker) = workers.remove_worker(worker_id) { metrics.workers_evicted.inc(); // We don't care if we fail to send message to worker, this is only a best attempt. let _ = worker.notify_update(WorkerUpdate::Disconnect); @@ -249,15 +253,75 @@ impl SimpleSchedulerImpl { worker_id: WorkerId, is_draining: bool, ) -> Result<(), Error> { - let mut inner_state = self.state_manager.inner.lock().await; - let worker = inner_state + let worker = self .workers .workers .get_mut(&worker_id) .err_tip(|| format!("Worker {worker_id} doesn't exist in the pool"))?; self.metrics.workers_drained.inc(); worker.is_draining = is_draining; - inner_state.tasks_change_notify.notify_one(); + self.state_manager + .inner + .lock() + .await + .tasks_change_notify + .notify_one(); + Ok(()) + } + + /// Notifies the specified worker to run the given action and handles errors by evicting + /// the worker if the notification fails. + /// + /// # Note + /// + /// Intended utility function for matching engine. + /// + /// # Errors + /// + /// This function will return an error if the notification to the worker fails, and in that case, + /// the worker will be immediately evicted from the system. + /// + async fn worker_notify_run_action( + &mut self, + worker_id: WorkerId, + action_info: Arc, + ) -> Result<(), Error> { + if let Some(worker) = self.workers.workers.get_mut(&worker_id) { + let notify_worker_result = + worker.notify_update(WorkerUpdate::RunAction(action_info.clone())); + + if notify_worker_result.is_err() { + event!( + Level::WARN, + ?worker_id, + ?action_info, + ?notify_worker_result, + "Worker command failed, removing worker", + ); + + let err = make_err!( + Code::Internal, + "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}", + ); + + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + &worker_id, + err.clone(), + ); + return Err(err); + } + } Ok(()) } @@ -320,13 +384,9 @@ impl SimpleSchedulerImpl { continue; }; - let maybe_worker_id: Option = { - let inner_state = self.state_manager.inner.lock().await; - - inner_state - .workers - .find_worker_for_action(&action_info.platform_properties) - }; + let maybe_worker_id = self + .workers + .find_worker_for_action(&action_info.platform_properties); let operation_id = state.id.clone(); let ret = ::update_operation( @@ -338,12 +398,43 @@ impl SimpleSchedulerImpl { .await; if let Err(e) = ret { + if let Some(worker_id) = maybe_worker_id { + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + &worker_id, + e.clone(), + ); + } + event!( Level::ERROR, ?e, "update operation failed for {}", operation_id ); + } else if let Some(worker_id) = maybe_worker_id { + if let Err(err) = self + .worker_notify_run_action(worker_id, action_info.clone()) + .await + { + event!( + Level::ERROR, + ?err, + ?worker_id, + ?action_info, + "failed to run worker_notify_run_action in SimpleSchedulerImpl::do_try_match" + ); + } } } } @@ -359,14 +450,63 @@ impl SimpleSchedulerImpl { action_info_hash_key: ActionInfoHashKey, action_stage: Result, ) -> Result<(), Error> { + let worker = self.workers.workers.get_mut(worker_id).err_tip(|| { + format!("Worker {worker_id} does not exist in SimpleSchedulerImpl::update_action") + })?; + let action_info_res = worker + .running_action_infos + .get(&action_info_hash_key) + .err_tip(|| format!("Action {action_info_hash_key:?} should not be running on worker {worker_id} in SimpleSchedulerImpl::update_action")); + let action_info = match action_info_res { + Ok(action_info) => action_info.clone(), + Err(err) => { + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + worker_id, + err.clone(), + ); + return Err(err); + } + }; + let operation_id = OperationId::new(action_info_hash_key.clone()); + let due_to_backpressure = action_stage + .as_ref() + .map_or_else(|e| e.code == Code::ResourceExhausted, |_| false); let update_operation_result = ::update_operation( &self.state_manager, - OperationId::new(action_info_hash_key.clone()), + operation_id.clone(), *worker_id, - action_stage, + action_stage.clone(), ) - .await; - if let Err(e) = &update_operation_result { + .await + .err_tip(|| "in update_operation on SimpleSchedulerImpl::update_action"); + if let Err(e) = update_operation_result { + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + worker_id, + e.clone(), + ); + event!( Level::ERROR, ?action_info_hash_key, @@ -374,8 +514,43 @@ impl SimpleSchedulerImpl { ?e, "Failed to update_operation on update_action" ); + return Err(e); + } + + match action_stage { + Ok(_) => worker.complete_action(&action_info), + Err(err) => { + // Clear this action from the current worker. + let was_paused = !worker.can_accept_work(); + // This unpauses, but since we're completing with an error, don't + // unpause unless all actions have completed. + // Note: We need to run this before dealing with backpressure logic. + worker.complete_action(&action_info); + // Only pause if there's an action still waiting that will unpause. + if (was_paused || due_to_backpressure) && worker.has_actions() { + worker.is_paused = true; + } + + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + // Re-queue the action or fail on max attempts. + SimpleSchedulerImpl::retry_action( + &mut inner_state, + max_job_retries, + &metrics, + &action_info, + worker_id, + err, + ); + } } - update_operation_result + + Ok(()) } } @@ -436,16 +611,14 @@ impl SimpleScheduler { } let tasks_change_notify = Arc::new(Notify::new()); - let state_manager = StateManager::new( + let state_manager = Arc::new(StateManager::new( HashSet::new(), BTreeMap::new(), - Workers::new(scheduler_cfg.allocation_strategy), HashMap::new(), HashSet::new(), Arc::new(SchedulerMetrics::default()), - max_job_retries, tasks_change_notify.clone(), - ); + )); let metrics = Arc::new(Metrics::default()); let metrics_for_do_try_match = metrics.clone(); let inner = Arc::new(Mutex::new(SimpleSchedulerImpl { @@ -454,6 +627,7 @@ impl SimpleScheduler { worker_timeout_s, max_job_retries, metrics: metrics.clone(), + workers: Workers::new(scheduler_cfg.allocation_strategy), })); let weak_inner = Arc::downgrade(&inner); Self { @@ -493,8 +667,7 @@ impl SimpleScheduler { #[must_use] pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool { let inner_scheduler = self.get_inner_lock().await; - let inner_state = inner_scheduler.state_manager.inner.lock().await; - inner_state.workers.workers.contains(worker_id) + inner_scheduler.workers.workers.contains(worker_id) } /// A unit test function used to send the keep alive message to the worker from the server. @@ -502,9 +675,8 @@ impl SimpleScheduler { &self, worker_id: &WorkerId, ) -> Result<(), Error> { - let inner_scheduler = self.get_inner_lock().await; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; - let worker = inner_state + let mut inner_scheduler = self.get_inner_lock().await; + let worker = inner_scheduler .workers .workers .get_mut(worker_id) @@ -594,17 +766,22 @@ impl WorkerScheduler for SimpleScheduler { async fn add_worker(&self, worker: Worker) -> Result<(), Error> { let worker_id = worker.id; - let inner_scheduler = self.get_inner_lock().await; + let mut inner_scheduler = self.get_inner_lock().await; let max_job_retries = inner_scheduler.max_job_retries; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = inner_scheduler.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; self.metrics.add_worker.wrap(move || { - let res = inner_state + let res = inner_scheduler .workers .add_worker(worker) .err_tip(|| "Error while adding worker, removing from pool"); if let Err(err) = &res { SimpleSchedulerImpl::immediate_evict_worker( &mut inner_state, + &mut inner_scheduler.workers, max_job_retries, &self.metrics, &worker_id, @@ -634,68 +811,82 @@ impl WorkerScheduler for SimpleScheduler { worker_id: &WorkerId, timestamp: WorkerTimestamp, ) -> Result<(), Error> { - let inner_scheduler = self.get_inner_lock().await; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; - inner_state + let mut inner_scheduler = self.get_inner_lock().await; + inner_scheduler .workers .refresh_lifetime(worker_id, timestamp) .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()") } async fn remove_worker(&self, worker_id: WorkerId) { - let inner_scheduler = self.get_inner_lock().await; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; + let mut inner_scheduler = self.get_inner_lock().await; + let max_job_retries = inner_scheduler.max_job_retries; + let metrics = inner_scheduler.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = inner_scheduler.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; SimpleSchedulerImpl::immediate_evict_worker( &mut inner_state, - inner_scheduler.max_job_retries, - &inner_scheduler.metrics, + &mut inner_scheduler.workers, + max_job_retries, + &metrics, &worker_id, make_err!(Code::Internal, "Received request to remove worker"), ); } async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> { - let inner_scheduler = self.get_inner_lock().await; + let mut inner_scheduler = self.get_inner_lock().await; let worker_timeout_s = inner_scheduler.worker_timeout_s; let max_job_retries = inner_scheduler.max_job_retries; let metrics = inner_scheduler.metrics.clone(); - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; - self.metrics.remove_timedout_workers.wrap(move || { - // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire - // map most of the time. - let worker_ids_to_remove: Vec = inner_state - .workers - .workers - .iter() - .rev() - .map_while(|(worker_id, worker)| { - if worker.last_update_timestamp <= now_timestamp - worker_timeout_s { - Some(*worker_id) - } else { - None - } - }) - .collect(); - for worker_id in &worker_ids_to_remove { - event!( - Level::WARN, - ?worker_id, - "Worker timed out, removing from pool" - ); - SimpleSchedulerImpl::immediate_evict_worker( - &mut inner_state, - max_job_retries, - &metrics, - worker_id, - make_err!( - Code::Internal, - "Worker {worker_id} timed out, removing from pool" - ), - ); - } + self.metrics + .remove_timedout_workers + .wrap(async move { + // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire + // map most of the time. + let worker_ids_to_remove: Vec = inner_scheduler + .workers + .workers + .iter() + .rev() + .map_while(|(worker_id, worker)| { + if worker.last_update_timestamp <= now_timestamp - worker_timeout_s { + Some(*worker_id) + } else { + None + } + }) + .collect(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = inner_scheduler.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + for worker_id in &worker_ids_to_remove { + event!( + Level::WARN, + ?worker_id, + "Worker timed out, removing from pool" + ); + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut inner_scheduler.workers, + max_job_retries, + &metrics, + worker_id, + make_err!( + Code::Internal, + "Worker {worker_id} timed out, removing from pool" + ), + ); + } - Ok(()) - }) + Ok(()) + }) + .await } async fn set_drain_worker(&self, worker_id: WorkerId, is_draining: bool) -> Result<(), Error> { @@ -724,7 +915,7 @@ impl MetricsComponent for SimpleScheduler { ); c.publish( "workers_total", - &inner_state.workers.workers.len(), + &inner_scheduler.workers.workers.len(), "The number workers active.", ); c.publish( @@ -753,7 +944,7 @@ impl MetricsComponent for SimpleScheduler { "The amount of times a job is allowed to retry from an internal error before it is dropped.", ); let mut props = HashMap::<&String, u64>::new(); - for (_worker_id, worker) in inner_state.workers.workers.iter() { + for (_worker_id, worker) in inner_scheduler.workers.workers.iter() { c.publish_with_labels( "workers", worker, @@ -806,7 +997,7 @@ struct Metrics { existing_actions_found: CounterWithTime, existing_actions_not_found: CounterWithTime, clean_recently_completed_actions: CounterWithTime, - remove_timedout_workers: FuncCounterWrapper, + remove_timedout_workers: AsyncCounterWrapper, update_action: AsyncCounterWrapper, update_action_with_internal_error: CounterWithTime, update_action_with_internal_error_no_action: CounterWithTime, diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index a7993831b..3e5afe8ab 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -1003,6 +1003,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { // Other tests check full data. We only care if client thinks we are Executing. assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing); } + let _ = setup_new_worker(&scheduler, rogue_worker_id, PlatformProperties::default()).await?; let action_info_hash_key = ActionInfoHashKey { instance_name: INSTANCE_NAME.to_string(), @@ -1043,8 +1044,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { .await; { - const EXPECTED_ERR: &str = - "Got a result from a worker that should not be running the action"; + const EXPECTED_ERR: &str = "should not be running on worker"; // Our request should have sent an error back. assert!( update_action_result.is_err(), @@ -1503,10 +1503,13 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, }, server_logs: HashMap::default(), - error: Some(err.merge(make_err!( - Code::Internal, - "Job cancelled because it attempted to execute too many times and failed" - ))), + error: Some( + err.append("in update_operation on SimpleSchedulerImpl::update_action") + .merge(make_err!( + Code::Internal, + "Job cancelled because it attempted to execute too many times and failed" + )), + ), message: String::new(), }), };