diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 7f346a19..61843b20 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -1,8 +1,6 @@ mod api; -mod results; -use crate::agent::api::AgentApi; -use crate::agent::results::ResultsUploader; +pub use crate::agent::api::AgentApi; use crate::config::Config; use crate::crates::Crate; use crate::db::{Database, QueryUtils}; @@ -149,7 +147,6 @@ fn run_heartbeat(url: &str, token: &str) { fn run_experiment( agent: &Agent, workspace: &Workspace, - db: &ResultsUploader, threads_count: usize, past_experiment: &mut Option, ) -> Result<(), (Option>, Error)> { @@ -173,9 +170,14 @@ fn run_experiment( } } - crate::runner::run_ex(&ex, workspace, db, threads_count, &agent.config, &|| { - agent.next_crate(&ex.name) - }) + crate::runner::run_ex( + &ex, + workspace, + &agent.api, + threads_count, + &agent.config, + &|| agent.next_crate(&ex.name), + ) .map_err(|err| (Some(Box::new(ex)), err))?; Ok(()) } @@ -188,7 +190,6 @@ pub fn run( workspace: &Workspace, ) -> Fallible<()> { let agent = Agent::new(url, token, caps)?; - let db = results::ResultsUploader::new(&agent.api); run_heartbeat(url, token); health_thread(); @@ -196,7 +197,7 @@ pub fn run( let mut past_experiment = None; loop { if let Err((ex, err)) = - run_experiment(&agent, workspace, &db, threads_count, &mut past_experiment) + run_experiment(&agent, workspace, threads_count, &mut past_experiment) { utils::report_failure(&err); if let Some(ex) = ex { diff --git a/src/agent/results.rs b/src/agent/results.rs deleted file mode 100644 index 16851914..00000000 --- a/src/agent/results.rs +++ /dev/null @@ -1,92 +0,0 @@ -use crate::agent::api::AgentApi; -use crate::crates::Crate; -use crate::experiments::Experiment; -use crate::prelude::*; -use crate::results::{EncodingType, TestResult, WriteResults}; -use crate::toolchain::Toolchain; -use rustwide::logging::{self, LogStorage}; -use std::collections::{hash_map::Entry::Occupied, HashMap}; -use std::sync::{Arc, Mutex}; - -#[derive(Clone)] -pub struct ResultsUploader<'a> { - api: &'a AgentApi, - versions: Arc>>, -} - -impl<'a> ResultsUploader<'a> { - pub fn new(api: &'a AgentApi) -> Self { - ResultsUploader { - api, - versions: Arc::new(Mutex::new(HashMap::new())), - } - } -} - -impl<'a> WriteResults for ResultsUploader<'a> { - fn get_result( - &self, - _ex: &Experiment, - _toolchain: &Toolchain, - _krate: &Crate, - ) -> Fallible> { - // TODO: not yet implemented - Ok(None) - } - - fn update_crate_version(&self, _ex: &Experiment, old: &Crate, new: &Crate) -> Fallible<()> { - self.versions - .lock() - .unwrap() - .insert(old.clone(), (new.clone(), false)); - Ok(()) - } - - fn record_result( - &self, - ex: &Experiment, - toolchain: &Toolchain, - krate: &Crate, - storage: &LogStorage, - _: EncodingType, - f: F, - ) -> Fallible - where - F: FnOnce() -> Fallible, - { - let result = logging::capture(storage, f)?; - let output = storage.to_string(); - - let mut updated = None; - let mut new_version = None; - // This is done to avoid locking versions if record_progress retries in loop - { - let mut versions = self.versions.lock().unwrap(); - if let Occupied(mut entry) = versions.entry(krate.clone()) { - let value = entry.get_mut(); - - if value.1 { - // delete entry if we already processed both toolchains - updated = Some(entry.remove().0); - } else { - updated = Some(value.0.clone()); - new_version = updated.as_ref(); - // mark we already sent the updated version to the server - value.1 = true; - } - }; - } - - info!("sending results to the crater server..."); - self.api.record_progress( - ex, - updated.as_ref().unwrap_or(krate), - toolchain, - output.as_bytes(), - &result, - new_version.map(|new| (krate, new)), - )?; - - Ok(result) - } -} diff --git a/src/results/db.rs b/src/results/db.rs index 2273e434..f32341f1 100644 --- a/src/results/db.rs +++ b/src/results/db.rs @@ -233,6 +233,24 @@ impl<'a> WriteResults for DatabaseDB<'a> { } } +impl crate::runner::RecordProgress for DatabaseDB<'_> { + fn record_progress( + &self, + ex: &Experiment, + krate: &Crate, + toolchain: &Toolchain, + log: &[u8], + result: &TestResult, + version: Option<(&Crate, &Crate)>, + ) -> Fallible<()> { + self.store_result(ex, krate, toolchain, result, log, EncodingType::Plain)?; + if let Some((old, new)) = version { + self.update_crate_version(ex, old, new)?; + } + Ok(()) + } +} + impl<'a> DeleteResults for DatabaseDB<'a> { fn delete_all_results(&self, ex: &Experiment) -> Fallible<()> { self.db diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 3b298e25..0b5196a2 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -7,11 +7,12 @@ use crate::config::Config; use crate::crates::Crate; use crate::experiments::{Experiment, Mode}; use crate::prelude::*; -use crate::results::{TestResult, WriteResults}; +use crate::results::TestResult; use crate::runner::worker::{DiskSpaceWatcher, Worker}; use rustwide::Workspace; use std::thread::scope; use std::time::Duration; +pub use worker::RecordProgress; const DISK_SPACE_WATCHER_INTERVAL: Duration = Duration::from_secs(30); const DISK_SPACE_WATCHER_THRESHOLD: f32 = 0.80; @@ -20,10 +21,10 @@ const DISK_SPACE_WATCHER_THRESHOLD: f32 = 0.80; #[error("overridden task result to {0}")] pub struct OverrideResult(TestResult); -pub fn run_ex( +pub fn run_ex( ex: &Experiment, workspace: &Workspace, - db: &DB, + api: &dyn RecordProgress, threads_count: usize, config: &Config, next_crate: &(dyn Fn() -> Fallible> + Send + Sync), @@ -82,7 +83,16 @@ pub fn run_ex( info!("running tasks in {} threads...", threads_count); let workers = (0..threads_count) - .map(|i| Worker::new(format!("worker-{i}"), workspace, ex, config, db, next_crate)) + .map(|i| { + Worker::new( + format!("worker-{i}"), + workspace, + ex, + config, + api, + next_crate, + ) + }) .collect::>(); let disk_watcher = DiskSpaceWatcher::new( diff --git a/src/runner/tasks.rs b/src/runner/tasks.rs index 97a4d56b..6af70f40 100644 --- a/src/runner/tasks.rs +++ b/src/runner/tasks.rs @@ -1,34 +1,30 @@ use crate::config::Config; -use crate::crates::{Crate, GitHubRepo}; +use crate::crates::Crate; use crate::experiments::Experiment; use crate::prelude::*; -use crate::results::{EncodingType, TestResult, WriteResults}; +use crate::results::TestResult; use crate::runner::test; -use crate::runner::test::detect_broken; use crate::toolchain::Toolchain; -use crate::utils; -use rustwide::{Build, BuildDirectory, Workspace}; +use rustwide::{Build, BuildDirectory}; use std::collections::HashMap; use std::sync::Mutex; -use rustwide::logging::{self, LogStorage}; +use rustwide::logging::LogStorage; use std::fmt; -pub(super) struct TaskCtx<'ctx, DB: WriteResults + 'ctx> { +pub(super) struct TaskCtx<'ctx> { pub(super) build_dir: &'ctx Mutex, pub(super) config: &'ctx Config, - pub(super) db: &'ctx DB, pub(super) experiment: &'ctx Experiment, pub(super) toolchain: &'ctx Toolchain, pub(super) krate: &'ctx Crate, pub(super) quiet: bool, } -impl<'ctx, DB: WriteResults + 'ctx> TaskCtx<'ctx, DB> { +impl<'ctx> TaskCtx<'ctx> { fn new( build_dir: &'ctx Mutex, config: &'ctx Config, - db: &'ctx DB, experiment: &'ctx Experiment, toolchain: &'ctx Toolchain, krate: &'ctx Crate, @@ -37,7 +33,6 @@ impl<'ctx, DB: WriteResults + 'ctx> TaskCtx<'ctx, DB> { TaskCtx { build_dir, config, - db, experiment, toolchain, krate, @@ -47,7 +42,6 @@ impl<'ctx, DB: WriteResults + 'ctx> TaskCtx<'ctx, DB> { } pub(super) enum TaskStep { - Prepare, BuildAndTest { tc: Toolchain, quiet: bool }, BuildOnly { tc: Toolchain, quiet: bool }, CheckOnly { tc: Toolchain, quiet: bool }, @@ -59,7 +53,6 @@ pub(super) enum TaskStep { impl fmt::Debug for TaskStep { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let (name, quiet, tc) = match *self { - TaskStep::Prepare => ("prepare", false, None), TaskStep::BuildAndTest { ref tc, quiet } => ("build and test", quiet, Some(tc)), TaskStep::BuildOnly { ref tc, quiet } => ("build", quiet, Some(tc)), TaskStep::CheckOnly { ref tc, quiet } => ("check", quiet, Some(tc)), @@ -91,46 +84,17 @@ impl fmt::Debug for Task { } impl Task { - pub(super) fn mark_as_failed( - &self, - ex: &Experiment, - db: &DB, - err: &failure::Error, - result: &TestResult, - storage: &LogStorage, - ) -> Fallible<()> { - match self.step { - TaskStep::Prepare => {} - TaskStep::BuildAndTest { ref tc, .. } - | TaskStep::BuildOnly { ref tc, .. } - | TaskStep::CheckOnly { ref tc, .. } - | TaskStep::Clippy { ref tc, .. } - | TaskStep::Rustdoc { ref tc, .. } - | TaskStep::UnstableFeatures { ref tc } => { - db.record_result(ex, tc, &self.krate, storage, EncodingType::Plain, || { - error!("this task or one of its parent failed!"); - utils::report_failure(err); - Ok(result.clone()) - })?; - } - } - - Ok(()) - } - - pub(super) fn run<'ctx, 's: 'ctx, DB: WriteResults>( + pub(super) fn run<'ctx, 's: 'ctx>( &'s self, config: &'ctx Config, - workspace: &Workspace, build_dir: &'ctx HashMap<&'ctx crate::toolchain::Toolchain, Mutex>, ex: &'ctx Experiment, - db: &'ctx DB, logs: &LogStorage, - ) -> Fallible<()> { + ) -> Fallible { let (build_dir, action, test, toolchain, quiet): ( _, _, - fn(&TaskCtx<_>, &Build, &_) -> _, + fn(&TaskCtx, &Build, &_) -> _, _, _, ) = match self.step { @@ -160,71 +124,9 @@ impl Task { tc, false, ), - TaskStep::Prepare => { - logging::capture(logs, || { - let rustwide_crate = self.krate.to_rustwide(); - for attempt in 1..=15 { - match detect_broken(rustwide_crate.fetch(workspace)) { - Ok(()) => break, - Err(e) => { - if logs.to_string().contains("No space left on device") { - if attempt == 15 { - // If we've failed 15 times, then - // just give up. It's been at least - // 45 seconds, which is enough that - // our disk space check should - // have run at least once in this - // time. If that's not helped, then - // maybe this git repository *is* - // actually too big. - // - // Ideally we'd have some kind of - // per-worker counter and if we hit - // this too often we'd replace the - // machine, but it's not very clear - // what "too often" means here. - return Err(e); - } else { - log::warn!( - "Retrying crate fetch in 3 seconds (attempt {})", - attempt - ); - std::thread::sleep(std::time::Duration::from_secs(3)); - } - } else { - return Err(e); - } - } - } - } - - if let Crate::GitHub(repo) = &self.krate { - if let Some(sha) = rustwide_crate.git_commit(workspace) { - let updated = GitHubRepo { - sha: Some(sha), - ..repo.clone() - }; - db.update_crate_version( - ex, - &Crate::GitHub(repo.clone()), - &Crate::GitHub(updated), - ) - .with_context(|_| { - format!("failed to record the sha of GitHub repo {}", repo.slug()) - })?; - } else { - bail!("unable to capture sha for {}", repo.slug()); - } - } - Ok(()) - })?; - return Ok(()); - } }; - let ctx = TaskCtx::new(build_dir, config, db, ex, toolchain, &self.krate, quiet); - test::run_test(action, &ctx, test, logs)?; - - Ok(()) + let ctx = TaskCtx::new(build_dir, config, ex, toolchain, &self.krate, quiet); + test::run_test(action, &ctx, test, logs) } } diff --git a/src/runner/test.rs b/src/runner/test.rs index 3edaa13a..d5b4f6f2 100644 --- a/src/runner/test.rs +++ b/src/runner/test.rs @@ -1,7 +1,7 @@ use crate::crates::Crate; use crate::prelude::*; use crate::results::DiagnosticCode; -use crate::results::{BrokenReason, EncodingType, FailureReason, TestResult, WriteResults}; +use crate::results::{BrokenReason, FailureReason, TestResult}; use crate::runner::tasks::TaskCtx; use crate::runner::OverrideResult; use cargo_metadata::diagnostic::DiagnosticLevel; @@ -109,8 +109,8 @@ fn get_local_packages(build_env: &Build) -> Fallible> { .collect()) } -fn run_cargo( - ctx: &TaskCtx, +fn run_cargo( + ctx: &TaskCtx, build_env: &Build, args: &[&str], check_errors: bool, @@ -248,59 +248,40 @@ fn run_cargo( } } -pub(super) fn run_test( +pub(super) fn run_test( action: &str, - ctx: &TaskCtx, - test_fn: fn(&TaskCtx, &Build, &[Package]) -> Fallible, + ctx: &TaskCtx, + test_fn: fn(&TaskCtx, &Build, &[Package]) -> Fallible, logs: &LogStorage, -) -> Fallible<()> { - if let Some(res) = ctx - .db - .get_result(ctx.experiment, ctx.toolchain, ctx.krate)? - { - info!("skipping crate {}. existing result: {}", ctx.krate, res); - } else { - ctx.db.record_result( - ctx.experiment, - ctx.toolchain, +) -> Fallible { + rustwide::logging::capture(logs, || { + info!( + "{} {} against {} for {}", + action, ctx.krate, - logs, - EncodingType::Plain, - || { - info!( - "{} {} against {} for {}", - action, - ctx.krate, - ctx.toolchain.to_string(), - ctx.experiment.name - ); - let sandbox = SandboxBuilder::new() - .memory_limit(Some(ctx.config.sandbox.memory_limit.to_bytes())) - .enable_networking(false); - - let krate = &ctx.krate.to_rustwide(); - let mut build_dir = ctx.build_dir.lock().unwrap(); - let mut build = build_dir.build(ctx.toolchain, krate, sandbox); - - for patch in ctx.toolchain.patches.iter() { - build = build.patch_with_git(&patch.name, &patch.repo, &patch.branch); - } + ctx.toolchain.to_string(), + ctx.experiment.name + ); + let sandbox = SandboxBuilder::new() + .memory_limit(Some(ctx.config.sandbox.memory_limit.to_bytes())) + .enable_networking(false); - detect_broken(build.run(|build| { - let local_packages = get_local_packages(build)?; - test_fn(ctx, build, &local_packages) - })) - }, - )?; - } - Ok(()) + let krate = &ctx.krate.to_rustwide(); + let mut build_dir = ctx.build_dir.lock().unwrap(); + let mut build = build_dir.build(ctx.toolchain, krate, sandbox); + + for patch in ctx.toolchain.patches.iter() { + build = build.patch_with_git(&patch.name, &patch.repo, &patch.branch); + } + + detect_broken(build.run(|build| { + let local_packages = get_local_packages(build)?; + test_fn(ctx, build, &local_packages) + })) + }) } -fn build( - ctx: &TaskCtx, - build_env: &Build, - local_packages: &[Package], -) -> Fallible<()> { +fn build(ctx: &TaskCtx, build_env: &Build, local_packages: &[Package]) -> Fallible<()> { run_cargo( ctx, build_env, @@ -320,7 +301,7 @@ fn build( Ok(()) } -fn test(ctx: &TaskCtx, build_env: &Build) -> Fallible<()> { +fn test(ctx: &TaskCtx, build_env: &Build) -> Fallible<()> { run_cargo( ctx, build_env, @@ -331,8 +312,8 @@ fn test(ctx: &TaskCtx, build_env: &Build) -> Fallible<()> ) } -pub(super) fn test_build_and_test( - ctx: &TaskCtx, +pub(super) fn test_build_and_test( + ctx: &TaskCtx, build_env: &Build, local_packages_id: &[Package], ) -> Fallible { @@ -351,8 +332,8 @@ pub(super) fn test_build_and_test( }) } -pub(super) fn test_build_only( - ctx: &TaskCtx, +pub(super) fn test_build_only( + ctx: &TaskCtx, build_env: &Build, local_packages_id: &[Package], ) -> Fallible { @@ -363,8 +344,8 @@ pub(super) fn test_build_only( } } -pub(super) fn test_check_only( - ctx: &TaskCtx, +pub(super) fn test_check_only( + ctx: &TaskCtx, build_env: &Build, local_packages_id: &[Package], ) -> Fallible { @@ -388,8 +369,8 @@ pub(super) fn test_check_only( } } -pub(super) fn test_clippy_only( - ctx: &TaskCtx, +pub(super) fn test_clippy_only( + ctx: &TaskCtx, build_env: &Build, local_packages: &[Package], ) -> Fallible { @@ -413,8 +394,8 @@ pub(super) fn test_clippy_only( } } -pub(super) fn test_rustdoc( - ctx: &TaskCtx, +pub(super) fn test_rustdoc( + ctx: &TaskCtx, build_env: &Build, local_packages: &[Package], ) -> Fallible { diff --git a/src/runner/unstable_features.rs b/src/runner/unstable_features.rs index 273beac7..33e5743e 100644 --- a/src/runner/unstable_features.rs +++ b/src/runner/unstable_features.rs @@ -1,6 +1,5 @@ use crate::prelude::*; use crate::results::TestResult; -use crate::results::WriteResults; use crate::runner::tasks::TaskCtx; use cargo_metadata::Package; use rustwide::Build; @@ -8,8 +7,8 @@ use std::collections::HashSet; use std::path::Path; use walkdir::{DirEntry, WalkDir}; -pub(super) fn find_unstable_features( - _ctx: &TaskCtx, +pub(super) fn find_unstable_features( + _ctx: &TaskCtx, build: &Build, _local_packages_id: &[Package], ) -> Fallible { diff --git a/src/runner/worker.rs b/src/runner/worker.rs index 5e3e4152..8980cfd8 100644 --- a/src/runner/worker.rs +++ b/src/runner/worker.rs @@ -1,11 +1,14 @@ +use crate::agent::AgentApi; use crate::crates::Crate; use crate::experiments::{Experiment, Mode}; use crate::prelude::*; -use crate::results::{BrokenReason, TestResult, WriteResults}; +use crate::results::{BrokenReason, TestResult}; use crate::runner::tasks::{Task, TaskStep}; +use crate::runner::test::detect_broken; use crate::runner::OverrideResult; +use crate::toolchain::Toolchain; use crate::utils; -use rustwide::logging::LogStorage; +use rustwide::logging::{self, LogStorage}; use rustwide::{BuildDirectory, Workspace}; use std::collections::HashMap; use std::sync::Condvar; @@ -15,24 +18,50 @@ use std::sync::{ }; use std::time::Duration; -pub(super) struct Worker<'a, DB: WriteResults + Sync> { +pub trait RecordProgress: Send + Sync { + fn record_progress( + &self, + ex: &Experiment, + krate: &Crate, + toolchain: &Toolchain, + log: &[u8], + result: &TestResult, + version: Option<(&Crate, &Crate)>, + ) -> Fallible<()>; +} + +impl RecordProgress for AgentApi { + fn record_progress( + &self, + ex: &Experiment, + krate: &Crate, + toolchain: &Toolchain, + log: &[u8], + result: &TestResult, + version: Option<(&Crate, &Crate)>, + ) -> Fallible<()> { + self.record_progress(ex, krate, toolchain, log, result, version) + } +} + +pub(super) struct Worker<'a> { name: String, workspace: &'a Workspace, build_dir: HashMap<&'a crate::toolchain::Toolchain, Mutex>, ex: &'a Experiment, config: &'a crate::config::Config, - db: &'a DB, + api: &'a dyn RecordProgress, target_dir_cleanup: AtomicBool, next_crate: &'a (dyn Fn() -> Fallible> + Send + Sync), } -impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { +impl<'a> Worker<'a> { pub(super) fn new( name: String, workspace: &'a Workspace, ex: &'a Experiment, config: &'a crate::config::Config, - db: &'a DB, + api: &'a dyn RecordProgress, next_crate: &'a (dyn Fn() -> Fallible> + Send + Sync), ) -> Self { let mut build_dir = HashMap::new(); @@ -51,7 +80,7 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { ex, config, next_crate, - db, + api, target_dir_cleanup: AtomicBool::new(false), } } @@ -64,23 +93,21 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { &self, task: &Task, storage: &LogStorage, - ) -> Result<(), (failure::Error, TestResult)> { + ) -> Result { info!("running task: {:?}", task); - let mut res = Ok(()); + let mut res = None; let max_attempts = 5; for run in 1..=max_attempts { // If we're running a task, we call ourselves healthy. crate::agent::set_healthy(); - res = task.run( - self.config, - self.workspace, - &self.build_dir, - self.ex, - self.db, - storage, - ); + match task.run(self.config, &self.build_dir, self.ex, storage) { + Ok(res) => return Ok(res), + Err(e) => { + res = Some(e); + } + } // We retry task failing on the second toolchain (i.e., regressions). In // the future we might expand this list further but for now this helps @@ -89,9 +116,8 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { // For now we make no distinction between build failures and test failures // here, but that may change if this proves too slow. let mut should_retry = false; - if res.is_err() && self.ex.toolchains.len() == 2 { + if self.ex.toolchains.len() == 2 { let toolchain = match &task.step { - TaskStep::Prepare => None, TaskStep::BuildAndTest { tc, .. } | TaskStep::BuildOnly { tc, .. } | TaskStep::CheckOnly { tc, .. } @@ -105,33 +131,32 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { } } } + if !should_retry { break; } log::info!("Retrying task {:?} [{run}/{max_attempts}]", task); } - if let Err(e) = res { - error!("task {:?} failed", task); - utils::report_failure(&e); + // Unreachable unless we failed to succeed above. + let e = res.unwrap(); + error!("task {:?} failed", task); + utils::report_failure(&e); - let mut result = if self.config.is_broken(&task.krate) { - TestResult::BrokenCrate(BrokenReason::Unknown) - } else { - TestResult::Error - }; + let mut result = if self.config.is_broken(&task.krate) { + TestResult::BrokenCrate(BrokenReason::Unknown) + } else { + TestResult::Error + }; - for err in e.iter_chain() { - if let Some(OverrideResult(res)) = err.downcast_ctx() { - result = res.clone(); - break; - } + for err in e.iter_chain() { + if let Some(OverrideResult(res)) = err.downcast_ctx() { + result = res.clone(); + break; } - - return Err((e, result)); } - Ok(()) + Err((e, result)) } pub(super) fn run(&self) -> Fallible<()> { @@ -152,16 +177,13 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { // If a skipped crate is somehow sent to the agent (for example, when a crate was // added to the experiment and *then* blacklisted) report the crate as skipped // instead of silently ignoring it. - if let Err(e) = self.db.record_result( + if let Err(e) = self.api.record_progress( self.ex, - tc, &krate, - &LogStorage::from(self.config), - crate::results::EncodingType::Plain, - || { - warn!("crate skipped"); - Ok(TestResult::Skipped) - }, + tc, + "crate skipped".as_bytes(), + &TestResult::Skipped, + None, ) { crate::utils::report_failure(&e); } @@ -169,29 +191,83 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { continue; } + let mut updated_version = None; let logs = LogStorage::from(self.config); - let prepare_task = Task { - krate: krate.clone(), - step: TaskStep::Prepare, - }; - if let Err((err, test_result)) = &self.run_task(&prepare_task, &logs) { - if let Err(e) = - prepare_task.mark_as_failed(self.ex, self.db, err, test_result, &logs) - { - crate::utils::report_failure(&e); + let prepare = logging::capture(&logs, || { + let rustwide_crate = krate.to_rustwide(); + for attempt in 1..=15 { + match detect_broken(rustwide_crate.fetch(self.workspace)) { + Ok(()) => break, + Err(e) => { + if logs.to_string().contains("No space left on device") { + if attempt == 15 { + // If we've failed 15 times, then + // just give up. It's been at least + // 45 seconds, which is enough that + // our disk space check should + // have run at least once in this + // time. If that's not helped, then + // maybe this git repository *is* + // actually too big. + // + // Ideally we'd have some kind of + // per-worker counter and if we hit + // this too often we'd replace the + // machine, but it's not very clear + // what "too often" means here. + return Err(e); + } else { + log::warn!( + "Retrying crate fetch in 3 seconds (attempt {})", + attempt + ); + std::thread::sleep(std::time::Duration::from_secs(3)); + } + } else { + return Err(e); + } + } + } } + + if let Crate::GitHub(repo) = &krate { + if let Some(sha) = rustwide_crate.git_commit(self.workspace) { + let updated = crate::crates::GitHubRepo { + sha: Some(sha), + ..repo.clone() + }; + updated_version = Some(Crate::GitHub(updated)); + } else { + bail!("unable to capture sha for {}", repo.slug()); + } + } + Ok(()) + }); + if let Err(err) = prepare { + let mut result = if self.config.is_broken(&krate) { + TestResult::BrokenCrate(BrokenReason::Unknown) + } else { + TestResult::Error + }; + for err in err.iter_chain() { + if let Some(OverrideResult(res)) = err.downcast_ctx() { + result = res.clone(); + break; + } + } + for tc in &self.ex.toolchains { - if let Err(e) = self.db.record_result( + if let Err(e) = self.api.record_progress( self.ex, - tc, &krate, - &LogStorage::from(self.config), - crate::results::EncodingType::Plain, - || { - error!("this task or one of its parent failed!"); - utils::report_failure(err); - Ok(test_result.clone()) - }, + tc, + format!( + "{}\n\nthis task or one of its parent failed: {:?}", + logs, err + ) + .as_bytes(), + &result, + updated_version.as_ref().map(|new| (&krate, new)), ) { crate::utils::report_failure(&e); } @@ -240,12 +316,15 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { // Fork logs off to distinct branch, so that each toolchain has its own log file, // while keeping the shared prepare step in common. let storage = logs.duplicate(); - if let Err((err, test_result)) = &self.run_task(&task, &storage) { - if let Err(e) = - task.mark_as_failed(self.ex, self.db, err, test_result, &storage) - { - crate::utils::report_failure(&e); - } + if let Err((err, test_result)) = self.run_task(&task, &storage) { + self.api.record_progress( + self.ex, + &task.krate, + tc, + format!("{}\n\n{:?}", storage, err).as_bytes(), + &test_result, + updated_version.as_ref().map(|new| (&krate, new)), + )?; } } } @@ -267,16 +346,16 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> { } } -pub(super) struct DiskSpaceWatcher<'a, DB: WriteResults + Sync> { +pub(super) struct DiskSpaceWatcher<'a> { interval: Duration, threshold: f32, - workers: &'a [Worker<'a, DB>], + workers: &'a [Worker<'a>], should_stop: Mutex, waiter: Condvar, } -impl<'a, DB: WriteResults + Sync> DiskSpaceWatcher<'a, DB> { - pub(super) fn new(interval: Duration, threshold: f32, workers: &'a [Worker<'a, DB>]) -> Self { +impl<'a> DiskSpaceWatcher<'a> { + pub(super) fn new(interval: Duration, threshold: f32, workers: &'a [Worker<'a>]) -> Self { DiskSpaceWatcher { interval, threshold,