Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.
1 change: 1 addition & 0 deletions src/agent/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target
.agent-run
5 changes: 4 additions & 1 deletion src/agent/onefuzz-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct Agent {
previous_state: NodeState,
last_poll_command: Result<Option<NodeCommand>, PollCommandError>,
managed: bool,
machine_id: uuid::Uuid,
}

impl Agent {
Expand All @@ -40,6 +41,7 @@ impl Agent {
worker_runner: Box<dyn IWorkerRunner>,
heartbeat: Option<AgentHeartbeatClient>,
managed: bool,
machine_id: uuid::Uuid,
) -> Self {
let scheduler = Some(scheduler);
let previous_state = NodeState::Init;
Expand All @@ -56,6 +58,7 @@ impl Agent {
previous_state,
last_poll_command,
managed,
machine_id,
}
}

Expand Down Expand Up @@ -266,7 +269,7 @@ impl Agent {

async fn done(&mut self, state: State<Done>) -> Result<Scheduler> {
debug!("agent done");
set_done_lock().await?;
set_done_lock(self.machine_id).await?;

let event = match state.cause() {
DoneCause::SetupError {
Expand Down
7 changes: 5 additions & 2 deletions src/agent/onefuzz-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use uuid::Uuid;

use crate::coordinator::double::*;
use crate::reboot::double::*;
use crate::scheduler::*;
use crate::setup::double::*;
use crate::work::double::*;
use crate::work::*;
Expand Down Expand Up @@ -36,6 +35,7 @@ impl Fixture {
worker_runner,
None,
true,
Uuid::new_v4(),
)
}

Expand Down Expand Up @@ -187,6 +187,9 @@ async fn test_emitted_state() {

#[tokio::test]
async fn test_emitted_state_failed_setup() {
// to prevent anyhow from capturing the stack trace when
// SetupRunnerDouble bails
std::env::set_var("RUST_BACKTRACE", "0");
let error_message = "Failed setup";
let mut agent = Agent {
setup_runner: Box::new(SetupRunnerDouble {
Expand Down Expand Up @@ -225,7 +228,7 @@ async fn test_emitted_state_failed_setup() {

// TODO: at some point, the underlying tests should be updated to not write
// this file in the first place.
tokio::fs::remove_file(crate::done::done_path().unwrap())
tokio::fs::remove_file(crate::done::done_path(agent.machine_id).unwrap())
.await
.unwrap();
}
23 changes: 21 additions & 2 deletions src/agent/onefuzz-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ fn default_as_true() -> bool {
true
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
pub struct RawClientCredentials {
client_id: Uuid,
client_secret: String,
tenant: String,
multi_tenant_domain: Option<String>,
}

// Temporary shim type to bridge the current service-provided config.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct RawStaticConfig {
pub client_credentials: Option<ClientCredentials>,
pub client_credentials: Option<RawClientCredentials>,

pub pool_name: String,

Expand All @@ -76,7 +84,14 @@ impl StaticConfig {
let config: RawStaticConfig = serde_json::from_slice(data)?;

let credentials = match config.client_credentials {
Some(client) => client.into(),
Some(client) => ClientCredentials::new(
client.client_id,
client.client_secret,
config.onefuzz_url.to_string(),
client.tenant,
client.multi_tenant_domain,
)
.into(),
None => {
// Remove trailing `/`, which is treated as a distinct resource.
let resource = config
Expand Down Expand Up @@ -193,6 +208,10 @@ pub struct DynamicConfig {
impl DynamicConfig {
pub async fn save(&self) -> Result<()> {
let path = Self::save_path()?;
let dir = path
.parent()
.ok_or(anyhow!("invalid dynamic config path"))?;
fs::create_dir_all(dir).await?;
let data = serde_json::to_vec(&self)?;
fs::write(&path, &data)
.await
Expand Down
4 changes: 4 additions & 0 deletions src/agent/onefuzz-agent/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ impl Coordinator {

Ok(response)
}

pub fn get_machine_id(&self) -> Uuid {
self.registration.machine_id
}
}

#[cfg(test)]
Expand Down
6 changes: 4 additions & 2 deletions src/agent/onefuzz-agent/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ fn debug_run_worker(opt: RunWorkerOpt) -> Result<()> {

async fn run_worker(mut work_set: WorkSet) -> Result<Vec<WorkerEvent>> {
use crate::setup::SetupRunner;

SetupRunner.run(&work_set).await?;
let mut setup_runner = SetupRunner {
machine_id: Uuid::new_v4(),
};
setup_runner.run(&work_set).await?;

let mut events = vec![];
let work_unit = work_set.work_units.pop().unwrap();
Expand Down
23 changes: 17 additions & 6 deletions src/agent/onefuzz-agent/src/done.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,30 @@ use std::path::PathBuf;
use anyhow::{Context, Result};
use onefuzz::fs::onefuzz_root;
use tokio::fs;
use uuid::Uuid;

pub async fn set_done_lock() -> Result<()> {
let path = done_path()?;
pub async fn set_done_lock(machine_id: Uuid) -> Result<()> {
let path = done_path(machine_id)?;
fs::write(&path, "")
.await
.with_context(|| format!("unable to write done lock: {}", path.display()))?;
Ok(())
}

pub fn is_agent_done() -> Result<bool> {
Ok(metadata(done_path()?).is_ok())
pub fn remove_done_lock(machine_id: Uuid) -> Result<()> {
let path = done_path(machine_id)?;
if path.exists() {
std::fs::remove_file(&path)
.with_context(|| format!("unable to remove done lock: {}", path.display()))?;
}

Ok(())
}

pub fn is_agent_done(machine_id: Uuid) -> Result<bool> {
Ok(metadata(done_path(machine_id)?).is_ok())
}

pub fn done_path() -> Result<PathBuf> {
Ok(onefuzz_root()?.join("supervisor-is-done"))
pub fn done_path(machine_id: Uuid) -> Result<PathBuf> {
Ok(onefuzz_root()?.join(format!("supervisor-is-done-{}", machine_id)))
}
66 changes: 48 additions & 18 deletions src/agent/onefuzz-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use std::path::PathBuf;
use std::process::{Command, Stdio};

use anyhow::{Context, Result};
use clap::Parser;
use clap::{ArgAction, Parser};
use onefuzz::machine_id::MachineIdentity;
use onefuzz::process::ExitStatus;
use onefuzz_telemetry::{self as telemetry, EventData, Role};
use std::io::{self, Write};
Expand Down Expand Up @@ -59,6 +60,15 @@ struct RunOpt {
/// the specified directory
#[clap(short, long = "--redirect-output", parse(from_os_str))]
redirect_output: Option<PathBuf>,

#[clap(long = "--machine_id")]
machine_id: Option<Uuid>,

#[clap(long = "--machine_name")]
machine_name: Option<String>,

#[clap(long = "--reset_lock", takes_value = false, action = ArgAction::SetTrue )]
reset_node_lock: bool,
}

fn main() -> Result<()> {
Expand Down Expand Up @@ -160,17 +170,10 @@ fn run(opt: RunOpt) -> Result<()> {
if opt.redirect_output.is_some() {
return redirect(opt);
}

if done::is_agent_done()? {
debug!(
"agent is done, remove lock ({}) to continue",
done::done_path()?.display()
);
return Ok(());
}

// We can't send telemetry if this fails.
let opt_machine_id = opt.machine_id;
let opt_machine_name = opt.machine_name.clone();
let rt = tokio::runtime::Runtime::new()?;
let reset_lock = opt.reset_node_lock;
let config = rt.block_on(load_config(opt));

// We can't send telemetry, because we couldn't get a telemetry key from the config.
Expand All @@ -181,7 +184,26 @@ fn run(opt: RunOpt) -> Result<()> {

let config = config?;

let result = rt.block_on(run_agent(config));
let config = StaticConfig {
machine_identity: MachineIdentity {
machine_id: opt_machine_id.unwrap_or(config.machine_identity.machine_id),
machine_name: opt_machine_name.unwrap_or(config.machine_identity.machine_name),
..config.machine_identity
},
..config
};

if reset_lock {
done::remove_done_lock(config.machine_identity.machine_id)?;
} else if done::is_agent_done(config.machine_identity.machine_id)? {
debug!(
"agent is done, remove lock ({}) to continue",
done::done_path(config.machine_identity.machine_id)?.display()
);
return Ok(());
}

let result = rt.block_on(run_agent(config, reset_lock));

if let Err(err) = &result {
error!("error running supervisor agent: {:?}", err);
Expand Down Expand Up @@ -213,7 +235,7 @@ async fn check_existing_worksets(coordinator: &mut coordinator::Coordinator) ->
// that is the case, mark each of the work units within the workset as
// failed, then exit as a failure.

if let Some(work) = WorkSet::load_from_fs_context().await? {
if let Some(work) = WorkSet::load_from_fs_context(coordinator.get_machine_id()).await? {
warn!("onefuzz-agent unexpectedly identified an existing workset on start");
let failure = match failure::read_failure() {
Ok(value) => format!("onefuzz-agent failed: {}", value),
Expand Down Expand Up @@ -251,17 +273,18 @@ async fn check_existing_worksets(coordinator: &mut coordinator::Coordinator) ->

// force set done semaphore, as to not prevent the supervisor continuing
// to report the workset as failed.
done::set_done_lock().await?;
let machine_id = coordinator.get_machine_id();
done::set_done_lock(machine_id).await?;
anyhow::bail!(
"failed to start due to pre-existing workset config: {}",
WorkSet::context_path()?.display()
WorkSet::context_path(machine_id)?.display()
);
}

Ok(())
}

async fn run_agent(config: StaticConfig) -> Result<()> {
async fn run_agent(config: StaticConfig, reset_node: bool) -> Result<()> {
telemetry::set_property(EventData::InstanceId(config.instance_id));
telemetry::set_property(EventData::MachineId(config.machine_identity.machine_id));
telemetry::set_property(EventData::Version(env!("ONEFUZZ_VERSION").to_string()));
Expand All @@ -288,6 +311,10 @@ async fn run_agent(config: StaticConfig) -> Result<()> {

let mut reboot = reboot::Reboot;
let reboot_context = reboot.load_context().await?;
if reset_node {
WorkSet::remove_context(config.machine_identity.machine_id).await?;
}

if reboot_context.is_none() {
check_existing_worksets(&mut coordinator).await?;
}
Expand All @@ -311,11 +338,14 @@ async fn run_agent(config: StaticConfig) -> Result<()> {
Box::new(coordinator),
Box::new(reboot),
scheduler,
Box::new(setup::SetupRunner),
Box::new(setup::SetupRunner {
machine_id: config.machine_identity.machine_id,
}),
Box::new(work_queue),
Box::new(worker::WorkerRunner::new(config.machine_identity)),
Box::new(worker::WorkerRunner::new(config.machine_identity.clone())),
agent_heartbeat,
config.managed,
config.machine_identity.machine_id,
);

info!("running agent");
Expand Down
10 changes: 9 additions & 1 deletion src/agent/onefuzz-agent/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::setup::ISetupRunner;
use crate::work::*;
use crate::worker::*;

#[derive(Debug)]
pub enum Scheduler {
Free(State<Free>),
SettingUp(State<SettingUp>),
Expand Down Expand Up @@ -97,24 +98,30 @@ impl Default for Scheduler {
}
}

#[derive(Debug)]
pub struct Free;

#[derive(Debug)]
pub struct SettingUp {
work_set: WorkSet,
}

#[derive(Debug)]
pub struct PendingReboot {
work_set: WorkSet,
}

#[derive(Debug)]
pub struct Ready {
work_set: WorkSet,
}

#[derive(Debug)]
pub struct Busy {
workers: Vec<Option<Worker>>,
}

#[derive(Debug)]
pub struct Done {
cause: DoneCause,
}
Expand All @@ -138,6 +145,7 @@ impl Context for Ready {}
impl Context for Busy {}
impl Context for Done {}

#[derive(Debug)]
pub struct State<C: Context> {
ctx: C,
}
Expand Down Expand Up @@ -201,7 +209,7 @@ impl State<SettingUp> {
// No script was executed.
}
Err(err) => {
let error = err.to_string();
let error = format!("{:?}", err);
warn!("{}", error);
let cause = DoneCause::SetupError {
error,
Expand Down
Loading