diff --git a/src/agent/.gitignore b/src/agent/.gitignore index eb5a316cbd..41c40876dc 100644 --- a/src/agent/.gitignore +++ b/src/agent/.gitignore @@ -1 +1,2 @@ target +.agent-run diff --git a/src/agent/onefuzz-agent/src/agent.rs b/src/agent/onefuzz-agent/src/agent.rs index 8f4b556b64..d26ecb732a 100644 --- a/src/agent/onefuzz-agent/src/agent.rs +++ b/src/agent/onefuzz-agent/src/agent.rs @@ -28,6 +28,7 @@ pub struct Agent { previous_state: NodeState, last_poll_command: Result, PollCommandError>, managed: bool, + machine_id: uuid::Uuid, } impl Agent { @@ -40,6 +41,7 @@ impl Agent { worker_runner: Box, heartbeat: Option, managed: bool, + machine_id: uuid::Uuid, ) -> Self { let scheduler = Some(scheduler); let previous_state = NodeState::Init; @@ -56,6 +58,7 @@ impl Agent { previous_state, last_poll_command, managed, + machine_id, } } @@ -266,7 +269,7 @@ impl Agent { async fn done(&mut self, state: State) -> Result { debug!("agent done"); - set_done_lock().await?; + set_done_lock(self.machine_id).await?; let event = match state.cause() { DoneCause::SetupError { diff --git a/src/agent/onefuzz-agent/src/agent/tests.rs b/src/agent/onefuzz-agent/src/agent/tests.rs index a27c7365fc..d77d3e2e94 100644 --- a/src/agent/onefuzz-agent/src/agent/tests.rs +++ b/src/agent/onefuzz-agent/src/agent/tests.rs @@ -6,7 +6,6 @@ use uuid::Uuid; use crate::coordinator::double::*; use crate::reboot::double::*; -use crate::scheduler::*; use crate::setup::double::*; use crate::work::double::*; use crate::work::*; @@ -36,6 +35,7 @@ impl Fixture { worker_runner, None, true, + Uuid::new_v4(), ) } @@ -187,6 +187,9 @@ async fn test_emitted_state() { #[tokio::test] async fn test_emitted_state_failed_setup() { + // to prevent anyhow from capturing the stack trace when + // SetupRunnerDouble bails + std::env::set_var("RUST_BACKTRACE", "0"); let error_message = "Failed setup"; let mut agent = Agent { setup_runner: Box::new(SetupRunnerDouble { @@ -225,7 +228,7 @@ async fn test_emitted_state_failed_setup() { // TODO: at some point, the underlying tests should be updated to not write // this file in the first place. - tokio::fs::remove_file(crate::done::done_path().unwrap()) + tokio::fs::remove_file(crate::done::done_path(agent.machine_id).unwrap()) .await .unwrap(); } diff --git a/src/agent/onefuzz-agent/src/config.rs b/src/agent/onefuzz-agent/src/config.rs index 5d5f65a9ea..3e50ab7ae6 100644 --- a/src/agent/onefuzz-agent/src/config.rs +++ b/src/agent/onefuzz-agent/src/config.rs @@ -46,10 +46,18 @@ fn default_as_true() -> bool { true } +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +pub struct RawClientCredentials { + client_id: Uuid, + client_secret: String, + tenant: String, + multi_tenant_domain: Option, +} + // Temporary shim type to bridge the current service-provided config. #[derive(Clone, Debug, Deserialize, Eq, PartialEq)] struct RawStaticConfig { - pub client_credentials: Option, + pub client_credentials: Option, pub pool_name: String, @@ -76,7 +84,14 @@ impl StaticConfig { let config: RawStaticConfig = serde_json::from_slice(data)?; let credentials = match config.client_credentials { - Some(client) => client.into(), + Some(client) => ClientCredentials::new( + client.client_id, + client.client_secret, + config.onefuzz_url.to_string(), + client.tenant, + client.multi_tenant_domain, + ) + .into(), None => { // Remove trailing `/`, which is treated as a distinct resource. let resource = config @@ -193,6 +208,10 @@ pub struct DynamicConfig { impl DynamicConfig { pub async fn save(&self) -> Result<()> { let path = Self::save_path()?; + let dir = path + .parent() + .ok_or(anyhow!("invalid dynamic config path"))?; + fs::create_dir_all(dir).await?; let data = serde_json::to_vec(&self)?; fs::write(&path, &data) .await diff --git a/src/agent/onefuzz-agent/src/coordinator.rs b/src/agent/onefuzz-agent/src/coordinator.rs index fb466a1808..d940d46d8a 100644 --- a/src/agent/onefuzz-agent/src/coordinator.rs +++ b/src/agent/onefuzz-agent/src/coordinator.rs @@ -326,6 +326,10 @@ impl Coordinator { Ok(response) } + + pub fn get_machine_id(&self) -> Uuid { + self.registration.machine_id + } } #[cfg(test)] diff --git a/src/agent/onefuzz-agent/src/debug.rs b/src/agent/onefuzz-agent/src/debug.rs index a686dc03c8..82072e7825 100644 --- a/src/agent/onefuzz-agent/src/debug.rs +++ b/src/agent/onefuzz-agent/src/debug.rs @@ -180,8 +180,10 @@ fn debug_run_worker(opt: RunWorkerOpt) -> Result<()> { async fn run_worker(mut work_set: WorkSet) -> Result> { use crate::setup::SetupRunner; - - SetupRunner.run(&work_set).await?; + let mut setup_runner = SetupRunner { + machine_id: Uuid::new_v4(), + }; + setup_runner.run(&work_set).await?; let mut events = vec![]; let work_unit = work_set.work_units.pop().unwrap(); diff --git a/src/agent/onefuzz-agent/src/done.rs b/src/agent/onefuzz-agent/src/done.rs index 8c10fc9882..f13933fa5c 100644 --- a/src/agent/onefuzz-agent/src/done.rs +++ b/src/agent/onefuzz-agent/src/done.rs @@ -7,19 +7,30 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use onefuzz::fs::onefuzz_root; use tokio::fs; +use uuid::Uuid; -pub async fn set_done_lock() -> Result<()> { - let path = done_path()?; +pub async fn set_done_lock(machine_id: Uuid) -> Result<()> { + let path = done_path(machine_id)?; fs::write(&path, "") .await .with_context(|| format!("unable to write done lock: {}", path.display()))?; Ok(()) } -pub fn is_agent_done() -> Result { - Ok(metadata(done_path()?).is_ok()) +pub fn remove_done_lock(machine_id: Uuid) -> Result<()> { + let path = done_path(machine_id)?; + if path.exists() { + std::fs::remove_file(&path) + .with_context(|| format!("unable to remove done lock: {}", path.display()))?; + } + + Ok(()) +} + +pub fn is_agent_done(machine_id: Uuid) -> Result { + Ok(metadata(done_path(machine_id)?).is_ok()) } -pub fn done_path() -> Result { - Ok(onefuzz_root()?.join("supervisor-is-done")) +pub fn done_path(machine_id: Uuid) -> Result { + Ok(onefuzz_root()?.join(format!("supervisor-is-done-{}", machine_id))) } diff --git a/src/agent/onefuzz-agent/src/main.rs b/src/agent/onefuzz-agent/src/main.rs index 7123053b4e..12428db2ba 100644 --- a/src/agent/onefuzz-agent/src/main.rs +++ b/src/agent/onefuzz-agent/src/main.rs @@ -20,7 +20,8 @@ use std::path::PathBuf; use std::process::{Command, Stdio}; use anyhow::{Context, Result}; -use clap::Parser; +use clap::{ArgAction, Parser}; +use onefuzz::machine_id::MachineIdentity; use onefuzz::process::ExitStatus; use onefuzz_telemetry::{self as telemetry, EventData, Role}; use std::io::{self, Write}; @@ -59,6 +60,15 @@ struct RunOpt { /// the specified directory #[clap(short, long = "--redirect-output", parse(from_os_str))] redirect_output: Option, + + #[clap(long = "--machine_id")] + machine_id: Option, + + #[clap(long = "--machine_name")] + machine_name: Option, + + #[clap(long = "--reset_lock", takes_value = false, action = ArgAction::SetTrue )] + reset_node_lock: bool, } fn main() -> Result<()> { @@ -160,17 +170,10 @@ fn run(opt: RunOpt) -> Result<()> { if opt.redirect_output.is_some() { return redirect(opt); } - - if done::is_agent_done()? { - debug!( - "agent is done, remove lock ({}) to continue", - done::done_path()?.display() - ); - return Ok(()); - } - - // We can't send telemetry if this fails. + let opt_machine_id = opt.machine_id; + let opt_machine_name = opt.machine_name.clone(); let rt = tokio::runtime::Runtime::new()?; + let reset_lock = opt.reset_node_lock; let config = rt.block_on(load_config(opt)); // We can't send telemetry, because we couldn't get a telemetry key from the config. @@ -181,7 +184,26 @@ fn run(opt: RunOpt) -> Result<()> { let config = config?; - let result = rt.block_on(run_agent(config)); + let config = StaticConfig { + machine_identity: MachineIdentity { + machine_id: opt_machine_id.unwrap_or(config.machine_identity.machine_id), + machine_name: opt_machine_name.unwrap_or(config.machine_identity.machine_name), + ..config.machine_identity + }, + ..config + }; + + if reset_lock { + done::remove_done_lock(config.machine_identity.machine_id)?; + } else if done::is_agent_done(config.machine_identity.machine_id)? { + debug!( + "agent is done, remove lock ({}) to continue", + done::done_path(config.machine_identity.machine_id)?.display() + ); + return Ok(()); + } + + let result = rt.block_on(run_agent(config, reset_lock)); if let Err(err) = &result { error!("error running supervisor agent: {:?}", err); @@ -213,7 +235,7 @@ async fn check_existing_worksets(coordinator: &mut coordinator::Coordinator) -> // that is the case, mark each of the work units within the workset as // failed, then exit as a failure. - if let Some(work) = WorkSet::load_from_fs_context().await? { + if let Some(work) = WorkSet::load_from_fs_context(coordinator.get_machine_id()).await? { warn!("onefuzz-agent unexpectedly identified an existing workset on start"); let failure = match failure::read_failure() { Ok(value) => format!("onefuzz-agent failed: {}", value), @@ -251,17 +273,18 @@ async fn check_existing_worksets(coordinator: &mut coordinator::Coordinator) -> // force set done semaphore, as to not prevent the supervisor continuing // to report the workset as failed. - done::set_done_lock().await?; + let machine_id = coordinator.get_machine_id(); + done::set_done_lock(machine_id).await?; anyhow::bail!( "failed to start due to pre-existing workset config: {}", - WorkSet::context_path()?.display() + WorkSet::context_path(machine_id)?.display() ); } Ok(()) } -async fn run_agent(config: StaticConfig) -> Result<()> { +async fn run_agent(config: StaticConfig, reset_node: bool) -> Result<()> { telemetry::set_property(EventData::InstanceId(config.instance_id)); telemetry::set_property(EventData::MachineId(config.machine_identity.machine_id)); telemetry::set_property(EventData::Version(env!("ONEFUZZ_VERSION").to_string())); @@ -288,6 +311,10 @@ async fn run_agent(config: StaticConfig) -> Result<()> { let mut reboot = reboot::Reboot; let reboot_context = reboot.load_context().await?; + if reset_node { + WorkSet::remove_context(config.machine_identity.machine_id).await?; + } + if reboot_context.is_none() { check_existing_worksets(&mut coordinator).await?; } @@ -311,11 +338,14 @@ async fn run_agent(config: StaticConfig) -> Result<()> { Box::new(coordinator), Box::new(reboot), scheduler, - Box::new(setup::SetupRunner), + Box::new(setup::SetupRunner { + machine_id: config.machine_identity.machine_id, + }), Box::new(work_queue), - Box::new(worker::WorkerRunner::new(config.machine_identity)), + Box::new(worker::WorkerRunner::new(config.machine_identity.clone())), agent_heartbeat, config.managed, + config.machine_identity.machine_id, ); info!("running agent"); diff --git a/src/agent/onefuzz-agent/src/scheduler.rs b/src/agent/onefuzz-agent/src/scheduler.rs index e9b5fcc8ae..3fc8403f4f 100644 --- a/src/agent/onefuzz-agent/src/scheduler.rs +++ b/src/agent/onefuzz-agent/src/scheduler.rs @@ -13,6 +13,7 @@ use crate::setup::ISetupRunner; use crate::work::*; use crate::worker::*; +#[derive(Debug)] pub enum Scheduler { Free(State), SettingUp(State), @@ -97,24 +98,30 @@ impl Default for Scheduler { } } +#[derive(Debug)] pub struct Free; +#[derive(Debug)] pub struct SettingUp { work_set: WorkSet, } +#[derive(Debug)] pub struct PendingReboot { work_set: WorkSet, } +#[derive(Debug)] pub struct Ready { work_set: WorkSet, } +#[derive(Debug)] pub struct Busy { workers: Vec>, } +#[derive(Debug)] pub struct Done { cause: DoneCause, } @@ -138,6 +145,7 @@ impl Context for Ready {} impl Context for Busy {} impl Context for Done {} +#[derive(Debug)] pub struct State { ctx: C, } @@ -201,7 +209,7 @@ impl State { // No script was executed. } Err(err) => { - let error = err.to_string(); + let error = format!("{:?}", err); warn!("{}", error); let cause = DoneCause::SetupError { error, diff --git a/src/agent/onefuzz-agent/src/setup.rs b/src/agent/onefuzz-agent/src/setup.rs index 95af22f10e..fe9dc7bab8 100644 --- a/src/agent/onefuzz-agent/src/setup.rs +++ b/src/agent/onefuzz-agent/src/setup.rs @@ -11,6 +11,7 @@ use onefuzz::az_copy; use onefuzz::process::Output; use tokio::fs; use tokio::process::Command; +use uuid::Uuid; use crate::work::*; @@ -36,24 +37,22 @@ impl ISetupRunner for SetupRunner { } #[derive(Clone, Copy, Debug)] -pub struct SetupRunner; +pub struct SetupRunner { + pub machine_id: Uuid, +} impl SetupRunner { pub async fn run(&mut self, work_set: &WorkSet) -> Result { info!("running setup for work set"); - - work_set.save_context().await?; - + work_set.save_context(self.machine_id).await?; // Download the setup container. let setup_url = work_set.setup_url.url()?; let setup_dir = work_set.setup_dir()?; - // `azcopy sync` requires the local dir to exist. fs::create_dir_all(&setup_dir).await.with_context(|| { format!("unable to create setup container: {}", setup_dir.display()) })?; az_copy::sync(setup_url.to_string(), &setup_dir, false).await?; - debug!( "synced setup container from {} to {}", setup_url, @@ -65,7 +64,7 @@ impl SetupRunner { // Create setup container directory symlinks for tasks. for work_unit in &work_set.work_units { - create_setup_symlink(&setup_dir, work_unit).await?; + create_setup_symlink(&setup_dir, work_unit, self.machine_id).await?; } // Run setup script, if any. @@ -104,18 +103,29 @@ impl SetupRunner { } #[cfg(target_family = "windows")] -async fn create_setup_symlink(setup_dir: &Path, work_unit: &WorkUnit) -> Result<()> { +async fn create_setup_symlink( + setup_dir: &Path, + work_unit: &WorkUnit, + machine_id: Uuid, +) -> Result<()> { use std::os::windows::fs::symlink_dir; use tokio::task::spawn_blocking; - let working_dir = work_unit.working_dir()?; + let working_dir = work_unit.working_dir(machine_id)?; - fs::create_dir(&working_dir).await.with_context(|| { + let create_work_dir = fs::create_dir_all(&working_dir).await.with_context(|| { format!( "unable to create working directory: {}", working_dir.display() ) - })?; + }); + + if let Err(err) = create_work_dir { + if !working_dir.exists() { + return Err(err); + } + } + let task_setup_dir = working_dir.join("setup"); // Tokio does not ship async versions of the `std::fs::os` symlink @@ -135,10 +145,14 @@ async fn create_setup_symlink(setup_dir: &Path, work_unit: &WorkUnit) -> Result< } #[cfg(target_family = "unix")] -async fn create_setup_symlink(setup_dir: &Path, work_unit: &WorkUnit) -> Result<()> { +async fn create_setup_symlink( + setup_dir: &Path, + work_unit: &WorkUnit, + machine_id: Uuid, +) -> Result<()> { use tokio::fs::symlink; - let working_dir = work_unit.working_dir()?; + let working_dir = work_unit.working_dir(machine_id)?; tokio::fs::create_dir_all(&working_dir) .await diff --git a/src/agent/onefuzz-agent/src/work.rs b/src/agent/onefuzz-agent/src/work.rs index 2599a6f924..34b93af494 100644 --- a/src/agent/onefuzz-agent/src/work.rs +++ b/src/agent/onefuzz-agent/src/work.rs @@ -30,12 +30,12 @@ impl WorkSet { self.work_units.iter().map(|w| w.task_id).collect() } - pub fn context_path() -> Result { - Ok(onefuzz::fs::onefuzz_root()?.join("workset_context.json")) + pub fn context_path(machine_id: Uuid) -> Result { + Ok(onefuzz::fs::onefuzz_root()?.join(format!("workset_context-{}.json", machine_id))) } - pub async fn load_from_fs_context() -> Result> { - let path = Self::context_path()?; + pub async fn load_from_fs_context(machine_id: Uuid) -> Result> { + let path = Self::context_path(machine_id)?; info!("checking for workset context: {}", path.display()); @@ -57,8 +57,8 @@ impl WorkSet { Ok(Some(ctx)) } - pub async fn save_context(&self) -> Result<()> { - let path = Self::context_path()?; + pub async fn save_context(&self, machine_id: Uuid) -> Result<()> { + let path = Self::context_path(machine_id)?; info!("saving workset context: {}", path.display()); let data = serde_json::to_vec(&self)?; @@ -69,6 +69,19 @@ impl WorkSet { Ok(()) } + pub async fn remove_context(machine_id: Uuid) -> Result<()> { + let path = Self::context_path(machine_id)?; + info!("removing workset context: {}", path.display()); + + if path.exists() { + fs::remove_file(&path) + .await + .with_context(|| format!("unable to delete WorkSet context: {}", path.display()))?; + } + + Ok(()) + } + pub fn setup_dir(&self) -> Result { let setup_dir = self .setup_url @@ -93,12 +106,14 @@ pub struct WorkUnit { } impl WorkUnit { - pub fn working_dir(&self) -> Result { - Ok(onefuzz::fs::onefuzz_root()?.join(self.task_id.to_string())) + pub fn working_dir(&self, machine_id: Uuid) -> Result { + Ok(onefuzz::fs::onefuzz_root()? + .join(format!("{}", machine_id)) + .join(self.task_id.to_string())) } - pub fn config_path(&self) -> Result { - Ok(self.working_dir()?.join("config.json")) + pub fn config_path(&self, machine_id: Uuid) -> Result { + Ok(self.working_dir(machine_id)?.join("config.json")) } } diff --git a/src/agent/onefuzz-agent/src/worker.rs b/src/agent/onefuzz-agent/src/worker.rs index 0c3bce9653..65492874d3 100644 --- a/src/agent/onefuzz-agent/src/worker.rs +++ b/src/agent/onefuzz-agent/src/worker.rs @@ -37,6 +37,7 @@ pub enum WorkerEvent { }, } +#[derive(Debug)] pub enum Worker { Ready(State), Running(State), @@ -94,14 +95,17 @@ impl Worker { } } +#[derive(Debug)] pub struct Ready { setup_dir: PathBuf, } +#[derive(Debug)] pub struct Running { child: Box, } +#[derive(Debug)] pub struct Done { output: Output, } @@ -112,6 +116,7 @@ impl Context for Ready {} impl Context for Running {} impl Context for Done {} +#[derive(Debug)] pub struct State { ctx: C, work: WorkUnit, @@ -189,7 +194,7 @@ pub trait IWorkerRunner: Downcast { impl_downcast!(IWorkerRunner); -pub trait IWorkerChild: Downcast { +pub trait IWorkerChild: Downcast + std::fmt::Debug { fn try_wait(&mut self) -> Result>; fn kill(&mut self) -> Result<()>; @@ -210,7 +215,7 @@ impl WorkerRunner { #[async_trait] impl IWorkerRunner for WorkerRunner { async fn run(&mut self, setup_dir: &Path, work: &WorkUnit) -> Result> { - let working_dir = work.working_dir()?; + let working_dir = work.working_dir(self.machine_identity.machine_id)?; debug!("worker working dir = {}", working_dir.display()); @@ -232,7 +237,7 @@ impl IWorkerRunner for WorkerRunner { serde_json::to_value(&self.machine_identity)?, ); - let config_path = work.config_path()?; + let config_path = work.config_path(self.machine_identity.machine_id)?; fs::write(&config_path, serde_json::to_string(&config)?.as_bytes()) .await @@ -292,6 +297,7 @@ impl SuspendableChild for Child { } /// Child process with redirected output streams, tailed by two worker threads. +#[derive(Debug)] struct RedirectedChild { /// The child process. child: Child, @@ -318,6 +324,7 @@ impl RedirectedChild { } /// Worker threads that tail the redirected output streams of a running child process. +#[derive(Debug)] struct StreamReaderThreads { stderr: JoinHandle, stdout: JoinHandle,