diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 98bcb7c3e9..c18f758905 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -60,6 +60,7 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tokio-test", + "tracing", ] [[package]] diff --git a/rust/agama-autoinstall/src/scripts.rs b/rust/agama-autoinstall/src/scripts.rs index 9c6e1bddd9..bdda0cdde3 100644 --- a/rust/agama-autoinstall/src/scripts.rs +++ b/rust/agama-autoinstall/src/scripts.rs @@ -19,16 +19,15 @@ // find current contact information at www.suse.com. use std::{ - fs::{self, create_dir_all, File}, + fs::{self, create_dir_all}, io::Write, os::unix::fs::OpenOptionsExt, path::{Path, PathBuf}, - process::Output, }; use agama_lib::http::BaseHTTPClient; use agama_transfer::Transfer; -use agama_utils::command::run_with_retry; +use agama_utils::command::{create_log_file, run_with_retry}; use anyhow::anyhow; use url::Url; @@ -73,9 +72,18 @@ impl ScriptsRunner { let path = self.path.join(&file_name); self.save_script(url, &path).await?; - let command = tokio::process::Command::new(&path); + let stdout_file = create_log_file(&path.with_extension("stdout"))?; + let stderr_file = create_log_file(&path.with_extension("stderr"))?; + + let mut command = tokio::process::Command::new(&path); + command.stdout(stdout_file).stderr(stderr_file); let output = run_with_retry(command).await?; - self.save_logs(&path, output)?; + + if let Some(code) = output.status.code() { + let mut file = create_log_file(&path.with_extension("exit"))?; + write!(&mut file, "{}", code)?; + } + Ok(()) } @@ -100,7 +108,13 @@ impl ScriptsRunner { } async fn save_script(&self, url: &str, path: &PathBuf) -> anyhow::Result<()> { - let mut file = Self::create_file(&path, 0o700)?; + let mut file = fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .mode(0o700) + .open(&path)?; + while let Err(error) = Transfer::get(url, &mut file, self.insecure) { eprintln!("Could not load the script from {url}: {error}"); if !self.should_retry(&url, &error.to_string()).await? { @@ -111,34 +125,6 @@ impl ScriptsRunner { Ok(()) } - fn save_logs(&self, path: &Path, output: Output) -> anyhow::Result<()> { - if !output.stdout.is_empty() { - let mut file = Self::create_file(&path.with_extension("stdout"), 0o600)?; - file.write_all(&output.stdout)?; - } - - if !output.stderr.is_empty() { - let mut file = Self::create_file(&path.with_extension("stderr"), 0o600)?; - file.write_all(&output.stderr)?; - } - - if let Some(code) = output.status.code() { - let mut file = Self::create_file(&path.with_extension("exit"), 0o600)?; - write!(&mut file, "{}", code)?; - } - - Ok(()) - } - - fn create_file(path: &Path, perms: u32) -> std::io::Result { - fs::OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .mode(perms) - .open(path) - } - async fn should_retry(&self, url: &str, error: &str) -> anyhow::Result { let msg = format!( r#" diff --git a/rust/agama-files/Cargo.toml b/rust/agama-files/Cargo.toml index 34dd7f83d3..ce812bd900 100644 --- a/rust/agama-files/Cargo.toml +++ b/rust/agama-files/Cargo.toml @@ -11,6 +11,7 @@ async-trait = "0.1.89" tempfile = "3.23.0" thiserror = "2.0.17" tokio = { version = "1.48.0", features = ["sync"] } +tracing = "0.1.41" [dev-dependencies] tokio-test = "0.4.4" diff --git a/rust/agama-files/src/lib.rs b/rust/agama-files/src/lib.rs index 5ed26f8c6d..941fce548f 100644 --- a/rust/agama-files/src/lib.rs +++ b/rust/agama-files/src/lib.rs @@ -24,6 +24,8 @@ pub mod service; pub use service::{Service, Starter}; pub mod message; +mod runner; +pub use runner::ScriptsRunner; #[cfg(test)] mod tests { @@ -33,6 +35,7 @@ mod tests { use agama_utils::{ actor::Handler, api::{ + event, files::{scripts::ScriptsGroup, Config}, Event, }, @@ -47,6 +50,7 @@ mod tests { struct Context { handler: Handler, tmp_dir: TempDir, + events_rx: event::Receiver, } impl AsyncTestContext for Context { @@ -62,20 +66,28 @@ mod tests { std::fs::copy("/usr/bin/install", tmp_dir.path().join("usr/bin/install")).unwrap(); // Set up the service - let (events_tx, _events_rx) = broadcast::channel::(16); + let (events_tx, events_rx) = broadcast::channel::(16); let issues = issue::Service::starter(events_tx.clone()).start(); let progress = progress::Service::starter(events_tx.clone()).start(); let questions = question::start(events_tx.clone()).await.unwrap(); - let software = - start_software_service(events_tx.clone(), issues, progress.clone(), questions) - .await; - let handler = Service::starter(events_tx.clone(), progress, software) + let software = start_software_service( + events_tx.clone(), + issues, + progress.clone(), + questions.clone(), + ) + .await; + let handler = Service::starter(progress, questions, software) .with_scripts_workdir(tmp_dir.path()) .with_install_dir(tmp_dir.path()) .start() .await .unwrap(); - Context { handler, tmp_dir } + Context { + handler, + tmp_dir, + events_rx, + } } } @@ -111,6 +123,12 @@ mod tests { .await .unwrap(); + // Wait until the scripts are executed. + while let Ok(event) = ctx.events_rx.recv().await { + if matches!(event, Event::ProgressFinished { scope: _ }) { + break; + } + } // Check that only the pre-script ran assert!(std::fs::exists(&test_file_1).unwrap()); assert!(!std::fs::exists(&test_file_2).unwrap()); diff --git a/rust/agama-files/src/runner.rs b/rust/agama-files/src/runner.rs new file mode 100644 index 0000000000..1657af4a25 --- /dev/null +++ b/rust/agama-files/src/runner.rs @@ -0,0 +1,384 @@ +// Copyright (c) [2024-2025] SUSE LLC +// +// All Rights Reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation; either version 2 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, contact SUSE LLC. +// +// To contact SUSE LLC about this file by physical or electronic mail, you may +// find current contact information at www.suse.com. + +use std::{ + fs::File, + io::{self, BufReader, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + process::ExitStatus, +}; + +use agama_utils::{ + actor::Handler, + api::{files::Script, question::QuestionSpec, Scope}, + command::{create_log_file, run_with_retry}, + progress, + question::{self, ask_question}, +}; +use tokio::process; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error(transparent)] + Io(#[from] std::io::Error), + #[error("The script failed")] + Script { status: ExitStatus, stderr: String }, + #[error(transparent)] + Question(#[from] question::AskError), +} + +/// Implements the logic to run a script. +/// +/// It takes care of running the script, reporting errors (and asking whether to retry) and write +/// the logs. +pub struct ScriptsRunner { + progress: Handler, + questions: Handler, + install_dir: PathBuf, + workdir: PathBuf, +} + +impl ScriptsRunner { + /// Creates a new runner. + /// + /// * `install_dir`: directory where the system is being installed. It is relevant for + /// chrooted scripts. + /// * `workdir`: scripts work directory. + /// * `progress`: handler to report the progress. + /// * `questions`: handler to interact with the user. + pub fn new>( + install_dir: P, + workdir: P, + progress: Handler, + questions: Handler, + ) -> Self { + Self { + progress, + questions, + install_dir: install_dir.as_ref().to_path_buf(), + workdir: workdir.as_ref().to_path_buf(), + } + } + + /// Runs the given scripts. + /// + /// It runs each script. If something goes wrong, it reports the problem to the user through + /// the questions mechanism. + /// + /// * `scripts`: scripts to run. + pub async fn run(&self, scripts: &[&Script]) -> Result<(), Error> { + self.start_progress(scripts); + + for script in scripts { + _ = self + .progress + .cast(progress::message::Next::new(Scope::Files)); + self.run_script(script).await?; + } + + _ = self + .progress + .cast(progress::message::Finish::new(Scope::Files)); + Ok(()) + } + + /// Runs the script. + /// + /// If the script fails, it asks the user whether it should try again. + async fn run_script(&self, script: &Script) -> Result<(), Error> { + loop { + let path = self + .workdir + .join(script.group().to_string()) + .join(script.name()); + + let Err(error) = self.run_command(&path, script.chroot()).await else { + return Ok(()); + }; + + if !self.should_retry(&script, error).await? { + return Ok(()); + } + } + } + + /// Asks the user whether it should try to run the script again. + async fn should_retry(&self, script: &Script, error: Error) -> Result { + let text = format!( + "Running the script '{}' failed. Do you want to try again?", + script.name() + ); + let mut question = QuestionSpec::new(&text, "scripts.retry").with_yes_no_actions(); + + if let Error::Script { status, stderr } = error { + let exit_status = status + .code() + .map(|c| c.to_string()) + .unwrap_or("unknown".to_string()); + question = question.with_data(&[ + ("name", script.name()), + ("stderr", &stderr), + ("exit_status", &exit_status), + ]); + } + + let answer = ask_question(&self.questions, question).await?; + return Ok(answer.action == "Yes"); + } + + /// Runs the script at the given path. + /// + /// * `path`: script's path. + /// * `chroot`: whether to run the script in a chroot. + async fn run_command>(&self, path: P, chroot: bool) -> Result<(), Error> { + const STDERR_SIZE: u64 = 512; + + let path = path.as_ref(); + let stdout_file = path.with_extension("stdout"); + let stderr_file = path.with_extension("stderr"); + + let mut command = if chroot { + let mut command = process::Command::new("chroot"); + command.args([&self.install_dir, path]); + command + } else { + process::Command::new(path) + }; + + command + .stdout(create_log_file(&stdout_file)?) + .stderr(create_log_file(&stderr_file)?); + + let output = run_with_retry(command) + .await + .inspect_err(|e| println!("Error executing the script: {e}"))?; + + if let Some(code) = output.status.code() { + let mut file = create_log_file(&path.with_extension("exit"))?; + write!(&mut file, "{}", code)?; + } + + if !output.status.success() { + let stderr = Self::read_n_last_bytes(&stderr_file, STDERR_SIZE)?; + return Err(Error::Script { + status: output.status, + stderr, + }); + } + + Ok(()) + } + + /// Ancillary function to start the progress. + fn start_progress(&self, scripts: &[&Script]) { + let messages: Vec<_> = scripts + .iter() + .map(|s| format!("Running user script '{}'", s.name())) + .collect(); + let steps: Vec<_> = messages.iter().map(|s| s.as_ref()).collect(); + let progress_action = progress::message::StartWithSteps::new(Scope::Files, &steps); + _ = self.progress.cast(progress_action); + } + + /// Reads the last n bytes of the file and returns them as a string. + fn read_n_last_bytes(path: &Path, n_bytes: u64) -> io::Result { + let mut file = File::open(path)?; + let file_size = file.metadata()?.len(); + let offset = file_size.saturating_sub(n_bytes); + file.seek(SeekFrom::Start(offset))?; + let bytes_to_read = (file_size - offset) as usize; + let mut buffer = Vec::with_capacity(bytes_to_read); + _ = file.read_to_end(&mut buffer)?; + let string = String::from_utf8_lossy(&buffer); + Ok(string.into_owned()) + } +} + +#[cfg(test)] +mod tests { + use agama_utils::{ + api::{ + event, + files::{BaseScript, FileSource, PostScript}, + question::Answer, + Event, + }, + question::test_utils::wait_for_question, + }; + use tempfile::TempDir; + use test_context::{test_context, AsyncTestContext}; + use tokio::sync::broadcast; + + use super::*; + + struct Context { + // runner: ScriptsRunner, + install_dir: PathBuf, + workdir: PathBuf, + progress: Handler, + questions: Handler, + events_rx: event::Receiver, + tmp_dir: TempDir, + } + + impl AsyncTestContext for Context { + async fn setup() -> Context { + let tmp_dir = TempDir::with_prefix("scripts-").expect("a temporary directory"); + + let (events_tx, events_rx) = broadcast::channel::(16); + let install_dir = tmp_dir.path().join("mnt"); + let workdir = tmp_dir.path().join("scripts"); + let questions = question::start(events_tx.clone()).await.unwrap(); + let progress = progress::Service::starter(events_tx.clone()).start(); + + Context { + events_rx, + install_dir, + workdir, + progress, + questions, + // runner, + tmp_dir, + } + } + } + + impl Context { + pub fn runner(&self) -> ScriptsRunner { + ScriptsRunner::new( + self.install_dir.clone(), + self.workdir.clone(), + self.progress.clone(), + self.questions.clone(), + ) + } + pub fn setup_script(&self, content: &str, chroot: bool) -> Script { + let base = BaseScript { + name: "test.sh".to_string(), + source: FileSource::Text { + content: content.to_string(), + }, + }; + let script = Script::Post(PostScript { + base, + chroot: Some(chroot), + }); + script + .write(&self.workdir) + .expect("Could not write the script"); + script + } + } + + #[test_context(Context)] + #[tokio::test] + async fn test_run_scripts_success(ctx: &mut Context) -> Result<(), Error> { + let file = ctx.tmp_dir.path().join("file-1.txt"); + let content = format!( + "#!/usr/bin/bash\necho hello\necho error >&2\ntouch {}", + file.display() + ); + let script = ctx.setup_script(&content, false); + let scripts = vec![&script]; + + let runner = ctx.runner(); + runner.run(&scripts).await.unwrap(); + + let path = &ctx.workdir.join("post").join("test.stdout"); + let body: Vec = std::fs::read(path).unwrap(); + let body = String::from_utf8(body).unwrap(); + assert_eq!("hello\n", body); + + let path = &ctx.workdir.join("post").join("test.stderr"); + let body: Vec = std::fs::read(path).unwrap(); + let body = String::from_utf8(body).unwrap(); + assert_eq!("error\n", body); + + let path = &ctx.workdir.join("post").join("test.exit"); + let body: Vec = std::fs::read(path).unwrap(); + let body = String::from_utf8(body).unwrap(); + assert_eq!("0", body); + + assert!(std::fs::exists(file).unwrap()); + Ok(()) + } + + #[test_context(Context)] + #[tokio::test] + async fn test_run_scripts_retry(ctx: &mut Context) -> Result<(), Error> { + let file = ctx.tmp_dir.path().join("file-1.txt"); + let content = format!( + "#!/usr/bin/bash\necho \"hello\"\necho \"line\" >>{}\nagama-unknown\n", + file.display() + ); + let script = ctx.setup_script(&content, false); + + let runner = ctx.runner(); + tokio::task::spawn(async move { + let scripts = vec![&script]; + _ = runner.run(&scripts).await; + }); + + // Retry + let id = wait_for_question(&mut ctx.events_rx) + .await + .expect("Did not receive a question"); + _ = ctx.questions.cast(question::message::Answer { + id, + answer: Answer::new("Yes"), + }); + + // Check the question content + let questions = ctx + .questions + .call(question::message::Get) + .await + .expect("Could not get the questions"); + let question = questions.first().unwrap(); + assert_eq!(question.spec.data.get("name"), Some(&"test.sh".to_string())); + assert_eq!( + question.spec.data.get("exit_status"), + Some(&"127".to_string()) + ); + let stderr = question.spec.data.get("stderr").unwrap(); + assert!(stderr.contains("agama-unknown")); + + // Do not retry + let id = wait_for_question(&mut ctx.events_rx) + .await + .expect("Did not receive a question"); + _ = ctx.questions.cast(question::message::Answer { + id, + answer: Answer::new("No"), + }); + + // Check the generated files + let path = &ctx.workdir.join("post").join("test.stderr"); + let body: Vec = std::fs::read(path).unwrap(); + let body = String::from_utf8(body).unwrap(); + assert!(body.contains("agama-unknown")); + + let body: Vec = std::fs::read(&file).unwrap(); + let body = String::from_utf8(body).unwrap(); + assert_eq!("line\nline\n", body); + + Ok(()) + } +} diff --git a/rust/agama-files/src/service.rs b/rust/agama-files/src/service.rs index 3be665e2b1..f12323eadf 100644 --- a/rust/agama-files/src/service.rs +++ b/rust/agama-files/src/service.rs @@ -18,23 +18,24 @@ // To contact SUSE LLC about this file by physical or electronic mail, you may // find current contact information at www.suse.com. -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; use agama_software::{self as software, Resolvable, ResolvableType}; use agama_utils::{ actor::{self, Actor, Handler, MessageHandler}, - api::{ - event, - files::{ - scripts::{self, ScriptsRepository}, - user_file, ScriptsConfig, UserFile, - }, + api::files::{ + scripts::{self, ScriptsRepository}, + user_file, ScriptsConfig, UserFile, }, - progress, + progress, question, }; use async_trait::async_trait; +use tokio::sync::Mutex; -use crate::message; +use crate::{message, ScriptsRunner}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -55,11 +56,11 @@ const DEFAULT_INSTALL_DIR: &str = "/mnt"; /// /// This structs allows to build a files service. pub struct Starter { - progress: Handler, - events: event::Sender, - software: Handler, scripts_workdir: PathBuf, install_dir: PathBuf, + software: Handler, + progress: Handler, + questions: Handler, } impl Starter { @@ -67,14 +68,14 @@ impl Starter { /// /// * `events`: channel to emit the [localization-specific events](crate::Event). pub fn new( - events: event::Sender, progress: Handler, + questions: Handler, software: Handler, ) -> Self { Self { - events, - progress, software, + progress, + questions, scripts_workdir: PathBuf::from(DEFAULT_SCRIPTS_DIR), install_dir: PathBuf::from(DEFAULT_INSTALL_DIR), } @@ -85,9 +86,9 @@ impl Starter { let scripts = ScriptsRepository::new(self.scripts_workdir); let service = Service { progress: self.progress, - events: self.events, + questions: self.questions, software: self.software, - scripts, + scripts: Arc::new(Mutex::new(scripts)), files: vec![], install_dir: self.install_dir, }; @@ -107,46 +108,53 @@ impl Starter { } pub struct Service { - progress: Handler, - events: event::Sender, software: Handler, - scripts: ScriptsRepository, + progress: Handler, + questions: Handler, + scripts: Arc>, files: Vec, install_dir: PathBuf, } impl Service { pub fn starter( - events: event::Sender, progress: Handler, + questions: Handler, software: Handler, ) -> Starter { - Starter::new(events, progress, software) + Starter::new(progress, questions, software) + } + + pub async fn clear_scripts(&mut self) -> Result<(), Error> { + let mut repo = self.scripts.lock().await; + repo.clear()?; + Ok(()) } pub async fn add_scripts(&mut self, config: ScriptsConfig) -> Result<(), Error> { + let mut repo = self.scripts.lock().await; if let Some(scripts) = config.pre { for pre in scripts { - self.scripts.add(pre.into())?; + repo.add(pre.into())?; } } if let Some(scripts) = config.post_partitioning { for post in scripts { - self.scripts.add(post.into())?; + repo.add(post.into())?; } } if let Some(scripts) = config.post { for post in scripts { - self.scripts.add(post.into())?; + repo.add(post.into())?; } } let mut packages = vec![]; if let Some(scripts) = config.init { for init in scripts { - self.scripts.add(init.into())?; + repo.add(init.into())?; } packages.push(Resolvable::new("agama-scripts", ResolvableType::Package)); } @@ -168,10 +176,9 @@ impl Actor for Service { #[async_trait] impl MessageHandler for Service { async fn handle(&mut self, message: message::SetConfig) -> Result<(), Error> { - self.scripts.clear()?; - let config = message.config.unwrap_or_default(); + self.clear_scripts().await?; if let Some(scripts) = config.scripts { self.add_scripts(scripts.clone()).await?; } @@ -185,7 +192,18 @@ impl MessageHandler for Service { #[async_trait] impl MessageHandler for Service { async fn handle(&mut self, message: message::RunScripts) -> Result<(), Error> { - self.scripts.run(message.group).await; + let scripts = self.scripts.clone(); + let install_dir = self.install_dir.clone(); + let progress = self.progress.clone(); + let questions = self.questions.clone(); + + tokio::task::spawn(async move { + let scripts = scripts.lock().await; + let workdir = scripts.workdir.clone(); + let to_run = scripts.by_group(message.group).clone(); + let runner = ScriptsRunner::new(install_dir, workdir, progress, questions); + runner.run(&to_run).await.unwrap(); + }); Ok(()) } } diff --git a/rust/agama-manager/src/service.rs b/rust/agama-manager/src/service.rs index bd272137ec..9efc67df58 100644 --- a/rust/agama-manager/src/service.rs +++ b/rust/agama-manager/src/service.rs @@ -195,7 +195,7 @@ impl Starter { let files = match self.files { Some(files) => files, None => { - files::Service::starter(self.events.clone(), progress.clone(), software.clone()) + files::Service::starter(progress.clone(), self.questions.clone(), software.clone()) .start() .await? } diff --git a/rust/agama-utils/Cargo.toml b/rust/agama-utils/Cargo.toml index 5f5e9e5558..8f39274b95 100644 --- a/rust/agama-utils/Cargo.toml +++ b/rust/agama-utils/Cargo.toml @@ -13,7 +13,7 @@ serde_json = "1.0.140" serde_with = "3.14.0" strum = { version = "0.27.2", features = ["derive"] } thiserror = "2.0.16" -tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread", "sync"] } +tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread", "process", "sync"] } tokio-stream = "0.1.17" utoipa = "5.3.1" zbus = "5.7.1" diff --git a/rust/agama-utils/src/api/files/scripts.rs b/rust/agama-utils/src/api/files/scripts.rs index b38129f17d..b3f038b19a 100644 --- a/rust/agama-utils/src/api/files/scripts.rs +++ b/rust/agama-utils/src/api/files/scripts.rs @@ -19,18 +19,14 @@ // find current contact information at www.suse.com. use std::{ - fs, io, + io, path::{Path, PathBuf}, }; -use crate::{ - api::files::{FileSource, FileSourceError, WithFileSource}, - command::run_with_retry, -}; +use crate::api::files::{FileSource, FileSourceError, WithFileSource}; use agama_transfer::Error as TransferError; use serde::{Deserialize, Serialize}; use strum::{EnumIter, IntoEnumIterator}; -use tokio::process; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -148,37 +144,11 @@ impl Script { } } - /// Runs the script in the given work directory. - /// - /// It saves the logs and the exit status of the execution. - /// - /// * `workdir`: where to run the script. - pub async fn run>(&self, workdir: P) -> Result<(), Error> { - let path = workdir - .as_ref() - .join(self.group().to_string()) - .join(self.name()); - let runner = match self { - Script::Pre(inner) => &inner.runner(), - Script::PostPartitioning(inner) => &inner.runner(), - Script::Post(inner) => &inner.runner(), - Script::Init(inner) => &inner.runner(), - }; - - let Some(runner) = runner else { - tracing::info!("No runner defined for script {:?}", &self); - return Ok(()); - }; - - runner.run(&path).await - } -} - -/// Trait to allow getting the runner for a script. -trait WithRunner { - /// Returns the runner for the script if any. - fn runner(&self) -> Option { - Some(ScriptRunner::default()) + pub fn chroot(&self) -> bool { + match self { + Script::Post(script) => script.chroot.unwrap_or(true), + _ => false, + } } } @@ -206,8 +176,6 @@ impl TryFrom