Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions rust/agama-s390/src/zfcp/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use agama_utils::{
issue, progress,
};
use serde::Deserialize;
use serde_json;
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use zbus::{fdo::PropertiesChanged, message, Connection, MatchRule, MessageStream};
Expand All @@ -50,6 +49,8 @@ pub enum Error {
#[error(transparent)]
DBus(#[from] zbus::Error),
#[error(transparent)]
DBusConversion(#[from] zbus::zvariant::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
Storage(#[from] storage::service::Error),
Expand Down Expand Up @@ -154,8 +155,11 @@ impl Monitor {
.send(Event::SystemChanged { scope: Scope::ZFCP })?;
self.storage.cast(storage::message::Probe)?;
}
if args.changed_properties().get("Issues").is_some() {
self.update_issues().await?;
if let Some(issues) = args.changed_properties().get("Issues").cloned() {
let issues: String = issues.try_into()?;
let issues = serde_json::from_str(issues.as_str())?;
self.issues
.cast(issue::message::Set::new(Scope::ZFCP, issues))?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion rust/agama-storage-client/src/proxies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// To contact SUSE LLC about this file by physical or electronic mail, you may
// find current contact information at www.suse.com.

mod storage1;
pub mod storage1;
pub use storage1::Storage1Proxy;

mod bootloader;
Expand Down
1 change: 1 addition & 0 deletions rust/agama-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread", "sync"] }
tokio-stream = "0.1.16"
serde = { version = "1.0.228" }
serde_json = "1.0.140"
tracing = "0.1.41"

[dev-dependencies]
test-context = "0.4.1"
Expand Down
192 changes: 66 additions & 126 deletions rust/agama-storage/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
// To contact SUSE LLC about this file by physical or electronic mail, you may
// find current contact information at www.suse.com.

use crate::storage_client;
use crate::storage_client::{
self,
proxies::{storage1, Storage1Proxy},
};
use agama_utils::{
actor::Handler,
api::{
Expand All @@ -28,57 +31,28 @@ use agama_utils::{
issue, progress,
};
use serde::Deserialize;
use std::pin::Pin;
use tokio::sync::broadcast;
use tokio_stream::{Stream, StreamExt, StreamMap};
use zbus::{proxy, Connection};
use tokio_stream::StreamExt;
use zbus::{fdo::PropertiesChanged, message, Connection, MatchRule, MessageStream};

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Wrong signal arguments")]
ProgressChangedArgs,
#[error("Wrong signal data")]
ProgressChangedData,
#[error(transparent)]
Issue(#[from] issue::service::Error),
#[error(transparent)]
Progress(#[from] progress::service::Error),
#[error(transparent)]
Event(#[from] broadcast::error::SendError<Event>),
#[error(transparent)]
Issue(#[from] issue::service::Error),
#[error(transparent)]
DBus(#[from] zbus::Error),
#[error(transparent)]
Event(#[from] broadcast::error::SendError<Event>),
DBusConversion(#[from] zbus::zvariant::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
StorageClient(#[from] storage_client::Error),
}

#[proxy(
default_service = "org.opensuse.Agama.Storage1",
default_path = "/org/opensuse/Agama/Storage1",
interface = "org.opensuse.Agama.Storage1",
assume_defaults = true
)]
pub trait Storage1 {
#[zbus(signal)]
fn system_changed(&self, system: &str) -> zbus::Result<()>;

#[zbus(signal)]
fn proposal_changed(&self, proposal: &str) -> zbus::Result<()>;

#[zbus(signal)]
fn progress_changed(&self, progress: &str) -> zbus::Result<()>;

#[zbus(signal)]
fn progress_finished(&self) -> zbus::Result<()>;
}

#[derive(Debug)]
enum Signal {
SystemChanged(SystemChanged),
ProposalChanged(ProposalChanged),
ProgressChanged(ProgressChanged),
ProgressFinished(ProgressFinished),
}

#[derive(Debug, Deserialize)]
struct ProgressData {
pub size: usize,
Expand Down Expand Up @@ -108,7 +82,7 @@ pub struct Monitor {
}

impl Monitor {
pub async fn new(
pub fn new(
progress: Handler<progress::Service>,
issues: Handler<issue::Service>,
events: event::Sender,
Expand All @@ -119,24 +93,38 @@ impl Monitor {
progress,
issues,
events,
connection: connection.clone(),
connection,
storage_client,
}
}

async fn run(&self) -> Result<(), Error> {
let mut streams = StreamMap::new();
streams.insert("SystemChanged", self.system_changed_stream().await?);
streams.insert("ProposalChanged", self.proposal_changed_stream().await?);
streams.insert("ProgressChanged", self.progress_changed_stream().await?);
streams.insert("ProgressFinished", self.progress_finished_stream().await?);

tokio::pin!(streams);
let proxy = Storage1Proxy::new(&self.connection).await?;
let rule = MatchRule::builder()
.msg_type(message::Type::Signal)
.sender(proxy.inner().destination())?
.path(proxy.inner().path())?
.build();
let mut stream = MessageStream::for_match_rule(rule, &self.connection, None).await?;

self.update_issues().await?;

while let Some((_, signal)) = streams.next().await {
self.handle_signal(signal).await?;
while let Some(message) = stream.next().await {
let message = message?;

if let Some(signal) = PropertiesChanged::from_message(message.clone()) {
self.handle_properties_changed(signal).await?;
continue;
}
if let Some(signal) = storage1::ProgressChanged::from_message(message.clone()) {
self.handle_progress_changed(signal).await?;
continue;
}
if let Some(signal) = storage1::ProgressFinished::from_message(message.clone()) {
self.handle_progress_finished(signal).await?;
continue;
}
tracing::warn!("Unmanaged storage signal: {message:?}");
}

Ok(())
Expand All @@ -152,96 +140,48 @@ impl Monitor {
Ok(())
}

async fn handle_signal(&self, signal: Signal) -> Result<(), Error> {
match signal {
Signal::SystemChanged(signal) => self.handle_system_changed(signal)?,
Signal::ProposalChanged(signal) => self.handle_proposal_changed(signal).await?,
Signal::ProgressChanged(signal) => self.handle_progress_changed(signal).await?,
Signal::ProgressFinished(signal) => self.handle_progress_finished(signal).await?,
async fn handle_properties_changed(&self, signal: PropertiesChanged) -> Result<(), Error> {
let args = signal.args()?;
if args.changed_properties().get("System").is_some() {
self.events.send(Event::SystemChanged {
scope: Scope::Storage,
})?;
}
if args.changed_properties().get("Proposal").is_some() {
self.events.send(Event::ProposalChanged {
scope: Scope::Storage,
})?;
}
if let Some(issues) = args.changed_properties().get("Issues").cloned() {
let issues: String = issues.try_into()?;
let issues = serde_json::from_str(issues.as_str())?;
self.issues
.cast(issue::message::Set::new(Scope::Storage, issues))?;
}
Ok(())
}

// TODO: add system info to the event.
fn handle_system_changed(&self, _signal: SystemChanged) -> Result<(), Error> {
self.events.send(Event::SystemChanged {
scope: Scope::Storage,
})?;
Ok(())
}

async fn handle_proposal_changed(&self, _signal: ProposalChanged) -> Result<(), Error> {
self.events.send(Event::ProposalChanged {
scope: Scope::Storage,
})?;

self.update_issues().await
}

async fn handle_progress_changed(&self, signal: ProgressChanged) -> Result<(), Error> {
let Ok(args) = signal.args() else {
return Err(Error::ProgressChangedArgs);
};
let Ok(progress_data) = serde_json::from_str::<ProgressData>(args.progress) else {
return Err(Error::ProgressChangedData);
};
async fn handle_progress_changed(
&self,
signal: storage1::ProgressChanged,
) -> Result<(), Error> {
let args = signal.args()?;
let progress_data = serde_json::from_str::<ProgressData>(args.progress)?;
self.progress
.call(progress::message::SetProgress::new(progress_data.into()))
.await?;

Ok(())
}

async fn handle_progress_finished(&self, _signal: ProgressFinished) -> Result<(), Error> {
async fn handle_progress_finished(
&self,
_signal: storage1::ProgressFinished,
) -> Result<(), Error> {
self.progress
.call(progress::message::Finish::new(Scope::Storage))
.await?;
Ok(())
}

async fn system_changed_stream(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Signal> + Send>>, Error> {
let proxy = Storage1Proxy::new(&self.connection).await?;
let stream = proxy
.receive_system_changed()
.await?
.map(Signal::SystemChanged);
Ok(Box::pin(stream))
}

async fn proposal_changed_stream(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Signal> + Send>>, Error> {
let proxy = Storage1Proxy::new(&self.connection).await?;
let stream = proxy
.receive_proposal_changed()
.await?
.map(Signal::ProposalChanged);
Ok(Box::pin(stream))
}

async fn progress_changed_stream(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Signal> + Send>>, Error> {
let proxy = Storage1Proxy::new(&self.connection).await?;
let stream = proxy
.receive_progress_changed()
.await?
.map(Signal::ProgressChanged);
Ok(Box::pin(stream))
}

async fn progress_finished_stream(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Signal> + Send>>, Error> {
let proxy = Storage1Proxy::new(&self.connection).await?;
let stream = proxy
.receive_progress_finished()
.await?
.map(Signal::ProgressFinished);
Ok(Box::pin(stream))
}
}

/// Spawns a Tokio task for the monitor.
Expand All @@ -250,7 +190,7 @@ impl Monitor {
pub fn spawn(monitor: Monitor) -> Result<(), Error> {
tokio::spawn(async move {
if let Err(e) = monitor.run().await {
println!("Error running the storage monitor: {e:?}");
tracing::error!("Error running the storage monitor: {e:?}");
}
});
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions rust/agama-storage/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ impl Starter {
self.events,
self.connection,
storage_client,
)
.await;
);
monitor::spawn(monitor)?;
}
Ok(handler)
Expand Down
6 changes: 6 additions & 0 deletions rust/package/agama.changes
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
-------------------------------------------------------------------
Tue Mar 17 07:09:37 UTC 2026 - José Iván López González <jlopez@suse.com>

- Ensure the list of storage issues is correctly updated
(gh#agama-project/agama#3291).

-------------------------------------------------------------------
Tue Mar 17 07:00:30 UTC 2026 - Josef Reidinger <jreidinger@suse.com>

Expand Down
Loading