diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3247ae0173..0596b3c55a 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -380,6 +380,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-test", + "tracing", "zbus", ] diff --git a/rust/agama-s390/src/zfcp/monitor.rs b/rust/agama-s390/src/zfcp/monitor.rs index 31591dcb16..bccdfa2c5c 100644 --- a/rust/agama-s390/src/zfcp/monitor.rs +++ b/rust/agama-s390/src/zfcp/monitor.rs @@ -34,7 +34,6 @@ use agama_utils::{ issue, progress, }; use serde::Deserialize; -use serde_json; use tokio::sync::broadcast; use tokio_stream::StreamExt; use zbus::{fdo::PropertiesChanged, message, Connection, MatchRule, MessageStream}; @@ -50,6 +49,8 @@ pub enum Error { #[error(transparent)] DBus(#[from] zbus::Error), #[error(transparent)] + DBusConversion(#[from] zbus::zvariant::Error), + #[error(transparent)] Json(#[from] serde_json::Error), #[error(transparent)] Storage(#[from] storage::service::Error), @@ -154,8 +155,11 @@ impl Monitor { .send(Event::SystemChanged { scope: Scope::ZFCP })?; self.storage.cast(storage::message::Probe)?; } - if args.changed_properties().get("Issues").is_some() { - self.update_issues().await?; + if let Some(issues) = args.changed_properties().get("Issues").cloned() { + let issues: String = issues.try_into()?; + let issues = serde_json::from_str(issues.as_str())?; + self.issues + .cast(issue::message::Set::new(Scope::ZFCP, issues))?; } Ok(()) } diff --git a/rust/agama-storage-client/src/proxies.rs b/rust/agama-storage-client/src/proxies.rs index cf34c85137..3c29d427e6 100644 --- a/rust/agama-storage-client/src/proxies.rs +++ b/rust/agama-storage-client/src/proxies.rs @@ -18,7 +18,7 @@ // To contact SUSE LLC about this file by physical or electronic mail, you may // find current contact information at www.suse.com. -mod storage1; +pub mod storage1; pub use storage1::Storage1Proxy; mod bootloader; diff --git a/rust/agama-storage/Cargo.toml b/rust/agama-storage/Cargo.toml index 5ae080158f..c8df105183 100644 --- a/rust/agama-storage/Cargo.toml +++ b/rust/agama-storage/Cargo.toml @@ -14,6 +14,7 @@ tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread", "sync"] } tokio-stream = "0.1.16" serde = { version = "1.0.228" } serde_json = "1.0.140" +tracing = "0.1.41" [dev-dependencies] test-context = "0.4.1" diff --git a/rust/agama-storage/src/monitor.rs b/rust/agama-storage/src/monitor.rs index cd45d0bee4..0b22b3f8d2 100644 --- a/rust/agama-storage/src/monitor.rs +++ b/rust/agama-storage/src/monitor.rs @@ -18,7 +18,10 @@ // To contact SUSE LLC about this file by physical or electronic mail, you may // find current contact information at www.suse.com. -use crate::storage_client; +use crate::storage_client::{ + self, + proxies::{storage1, Storage1Proxy}, +}; use agama_utils::{ actor::Handler, api::{ @@ -28,57 +31,28 @@ use agama_utils::{ issue, progress, }; use serde::Deserialize; -use std::pin::Pin; use tokio::sync::broadcast; -use tokio_stream::{Stream, StreamExt, StreamMap}; -use zbus::{proxy, Connection}; +use tokio_stream::StreamExt; +use zbus::{fdo::PropertiesChanged, message, Connection, MatchRule, MessageStream}; #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("Wrong signal arguments")] - ProgressChangedArgs, - #[error("Wrong signal data")] - ProgressChangedData, - #[error(transparent)] - Issue(#[from] issue::service::Error), #[error(transparent)] Progress(#[from] progress::service::Error), #[error(transparent)] + Event(#[from] broadcast::error::SendError), + #[error(transparent)] + Issue(#[from] issue::service::Error), + #[error(transparent)] DBus(#[from] zbus::Error), #[error(transparent)] - Event(#[from] broadcast::error::SendError), + DBusConversion(#[from] zbus::zvariant::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), #[error(transparent)] StorageClient(#[from] storage_client::Error), } -#[proxy( - default_service = "org.opensuse.Agama.Storage1", - default_path = "/org/opensuse/Agama/Storage1", - interface = "org.opensuse.Agama.Storage1", - assume_defaults = true -)] -pub trait Storage1 { - #[zbus(signal)] - fn system_changed(&self, system: &str) -> zbus::Result<()>; - - #[zbus(signal)] - fn proposal_changed(&self, proposal: &str) -> zbus::Result<()>; - - #[zbus(signal)] - fn progress_changed(&self, progress: &str) -> zbus::Result<()>; - - #[zbus(signal)] - fn progress_finished(&self) -> zbus::Result<()>; -} - -#[derive(Debug)] -enum Signal { - SystemChanged(SystemChanged), - ProposalChanged(ProposalChanged), - ProgressChanged(ProgressChanged), - ProgressFinished(ProgressFinished), -} - #[derive(Debug, Deserialize)] struct ProgressData { pub size: usize, @@ -108,7 +82,7 @@ pub struct Monitor { } impl Monitor { - pub async fn new( + pub fn new( progress: Handler, issues: Handler, events: event::Sender, @@ -119,24 +93,38 @@ impl Monitor { progress, issues, events, - connection: connection.clone(), + connection, storage_client, } } async fn run(&self) -> Result<(), Error> { - let mut streams = StreamMap::new(); - streams.insert("SystemChanged", self.system_changed_stream().await?); - streams.insert("ProposalChanged", self.proposal_changed_stream().await?); - streams.insert("ProgressChanged", self.progress_changed_stream().await?); - streams.insert("ProgressFinished", self.progress_finished_stream().await?); - - tokio::pin!(streams); + let proxy = Storage1Proxy::new(&self.connection).await?; + let rule = MatchRule::builder() + .msg_type(message::Type::Signal) + .sender(proxy.inner().destination())? + .path(proxy.inner().path())? + .build(); + let mut stream = MessageStream::for_match_rule(rule, &self.connection, None).await?; self.update_issues().await?; - while let Some((_, signal)) = streams.next().await { - self.handle_signal(signal).await?; + while let Some(message) = stream.next().await { + let message = message?; + + if let Some(signal) = PropertiesChanged::from_message(message.clone()) { + self.handle_properties_changed(signal).await?; + continue; + } + if let Some(signal) = storage1::ProgressChanged::from_message(message.clone()) { + self.handle_progress_changed(signal).await?; + continue; + } + if let Some(signal) = storage1::ProgressFinished::from_message(message.clone()) { + self.handle_progress_finished(signal).await?; + continue; + } + tracing::warn!("Unmanaged storage signal: {message:?}"); } Ok(()) @@ -152,96 +140,48 @@ impl Monitor { Ok(()) } - async fn handle_signal(&self, signal: Signal) -> Result<(), Error> { - match signal { - Signal::SystemChanged(signal) => self.handle_system_changed(signal)?, - Signal::ProposalChanged(signal) => self.handle_proposal_changed(signal).await?, - Signal::ProgressChanged(signal) => self.handle_progress_changed(signal).await?, - Signal::ProgressFinished(signal) => self.handle_progress_finished(signal).await?, + async fn handle_properties_changed(&self, signal: PropertiesChanged) -> Result<(), Error> { + let args = signal.args()?; + if args.changed_properties().get("System").is_some() { + self.events.send(Event::SystemChanged { + scope: Scope::Storage, + })?; + } + if args.changed_properties().get("Proposal").is_some() { + self.events.send(Event::ProposalChanged { + scope: Scope::Storage, + })?; + } + if let Some(issues) = args.changed_properties().get("Issues").cloned() { + let issues: String = issues.try_into()?; + let issues = serde_json::from_str(issues.as_str())?; + self.issues + .cast(issue::message::Set::new(Scope::Storage, issues))?; } Ok(()) } - // TODO: add system info to the event. - fn handle_system_changed(&self, _signal: SystemChanged) -> Result<(), Error> { - self.events.send(Event::SystemChanged { - scope: Scope::Storage, - })?; - Ok(()) - } - - async fn handle_proposal_changed(&self, _signal: ProposalChanged) -> Result<(), Error> { - self.events.send(Event::ProposalChanged { - scope: Scope::Storage, - })?; - - self.update_issues().await - } - - async fn handle_progress_changed(&self, signal: ProgressChanged) -> Result<(), Error> { - let Ok(args) = signal.args() else { - return Err(Error::ProgressChangedArgs); - }; - let Ok(progress_data) = serde_json::from_str::(args.progress) else { - return Err(Error::ProgressChangedData); - }; + async fn handle_progress_changed( + &self, + signal: storage1::ProgressChanged, + ) -> Result<(), Error> { + let args = signal.args()?; + let progress_data = serde_json::from_str::(args.progress)?; self.progress .call(progress::message::SetProgress::new(progress_data.into())) .await?; - Ok(()) } - async fn handle_progress_finished(&self, _signal: ProgressFinished) -> Result<(), Error> { + async fn handle_progress_finished( + &self, + _signal: storage1::ProgressFinished, + ) -> Result<(), Error> { self.progress .call(progress::message::Finish::new(Scope::Storage)) .await?; Ok(()) } - - async fn system_changed_stream( - &self, - ) -> Result + Send>>, Error> { - let proxy = Storage1Proxy::new(&self.connection).await?; - let stream = proxy - .receive_system_changed() - .await? - .map(Signal::SystemChanged); - Ok(Box::pin(stream)) - } - - async fn proposal_changed_stream( - &self, - ) -> Result + Send>>, Error> { - let proxy = Storage1Proxy::new(&self.connection).await?; - let stream = proxy - .receive_proposal_changed() - .await? - .map(Signal::ProposalChanged); - Ok(Box::pin(stream)) - } - - async fn progress_changed_stream( - &self, - ) -> Result + Send>>, Error> { - let proxy = Storage1Proxy::new(&self.connection).await?; - let stream = proxy - .receive_progress_changed() - .await? - .map(Signal::ProgressChanged); - Ok(Box::pin(stream)) - } - - async fn progress_finished_stream( - &self, - ) -> Result + Send>>, Error> { - let proxy = Storage1Proxy::new(&self.connection).await?; - let stream = proxy - .receive_progress_finished() - .await? - .map(Signal::ProgressFinished); - Ok(Box::pin(stream)) - } } /// Spawns a Tokio task for the monitor. @@ -250,7 +190,7 @@ impl Monitor { pub fn spawn(monitor: Monitor) -> Result<(), Error> { tokio::spawn(async move { if let Err(e) = monitor.run().await { - println!("Error running the storage monitor: {e:?}"); + tracing::error!("Error running the storage monitor: {e:?}"); } }); Ok(()) diff --git a/rust/agama-storage/src/service.rs b/rust/agama-storage/src/service.rs index 864b1991dd..3aff1876ad 100644 --- a/rust/agama-storage/src/service.rs +++ b/rust/agama-storage/src/service.rs @@ -99,8 +99,7 @@ impl Starter { self.events, self.connection, storage_client, - ) - .await; + ); monitor::spawn(monitor)?; } Ok(handler) diff --git a/rust/package/agama.changes b/rust/package/agama.changes index a1285bbee0..a485d864e5 100644 --- a/rust/package/agama.changes +++ b/rust/package/agama.changes @@ -1,3 +1,9 @@ +------------------------------------------------------------------- +Tue Mar 17 07:09:37 UTC 2026 - José Iván López González + +- Ensure the list of storage issues is correctly updated + (gh#agama-project/agama#3291). + ------------------------------------------------------------------- Tue Mar 17 07:00:30 UTC 2026 - Josef Reidinger