diff --git a/.github/workflows/ci-rust.yml b/.github/workflows/ci-rust.yml index d6802c9c13..45a8efe234 100644 --- a/.github/workflows/ci-rust.yml +++ b/.github/workflows/ci-rust.yml @@ -70,6 +70,7 @@ jobs: - name: Install required packages run: zypper --non-interactive install clang-devel + dbus-1-daemon jq libopenssl-3-devel openssl-3 diff --git a/rust/agama-lib/src/jobs.rs b/rust/agama-lib/src/jobs.rs new file mode 100644 index 0000000000..e7ef66ff85 --- /dev/null +++ b/rust/agama-lib/src/jobs.rs @@ -0,0 +1,36 @@ +//! This module implements support for the so-called Jobs. It is a concept hat represents running +//! an external command that may take some time, like formatting a DASD device. It is exposed via +//! D-Bus and, at this time, only the storage service makes use of it. + +use std::collections::HashMap; + +use serde::Serialize; +use zbus::zvariant::OwnedValue; + +use crate::{dbus::get_property, error::ServiceError}; + +pub mod client; + +/// Represents a job. +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Job { + /// Artificial job identifier. + pub id: String, + /// Whether the job is running. + pub running: bool, + /// Job exit code. + pub exit_code: u32, +} + +impl TryFrom<&HashMap> for Job { + type Error = ServiceError; + + fn try_from(value: &HashMap) -> Result { + Ok(Job { + running: get_property(value, "Running")?, + exit_code: get_property(value, "ExitCode")?, + ..Default::default() + }) + } +} diff --git a/rust/agama-lib/src/jobs/client.rs b/rust/agama-lib/src/jobs/client.rs new file mode 100644 index 0000000000..c7f02d5ad9 --- /dev/null +++ b/rust/agama-lib/src/jobs/client.rs @@ -0,0 +1,53 @@ +//! Implements a client to access Agama's Jobs API. + +use zbus::{fdo::ObjectManagerProxy, zvariant::OwnedObjectPath, Connection}; + +use crate::error::ServiceError; + +use super::Job; + +#[derive(Clone)] +pub struct JobsClient<'a> { + object_manager_proxy: ObjectManagerProxy<'a>, +} + +impl<'a> JobsClient<'a> { + pub async fn new( + connection: Connection, + destination: &'static str, + path: &'static str, + ) -> Result { + let object_manager_proxy = ObjectManagerProxy::builder(&connection) + .destination(destination)? + .path(path)? + .build() + .await?; + + Ok(Self { + object_manager_proxy, + }) + } + + pub async fn jobs(&self) -> Result, ServiceError> { + let managed_objects = self.object_manager_proxy.get_managed_objects().await?; + + let mut jobs = vec![]; + for (path, ifaces) in managed_objects { + let Some(properties) = ifaces.get("org.opensuse.Agama.Storage1.Job") else { + continue; + }; + + match Job::try_from(properties) { + Ok(mut job) => { + job.id = path.to_string(); + jobs.push((path, job)); + } + Err(error) => { + log::warn!("Not a valid job: {}", error); + } + } + } + + Ok(jobs) + } +} diff --git a/rust/agama-lib/src/lib.rs b/rust/agama-lib/src/lib.rs index 1bbc4c48c8..2a02f0cc63 100644 --- a/rust/agama-lib/src/lib.rs +++ b/rust/agama-lib/src/lib.rs @@ -27,6 +27,7 @@ pub mod auth; pub mod base_http_client; pub mod error; pub mod install_settings; +pub mod jobs; pub mod localization; pub mod manager; pub mod network; diff --git a/rust/agama-lib/src/proxies.rs b/rust/agama-lib/src/proxies.rs index 240327ab59..19157b2f10 100644 --- a/rust/agama-lib/src/proxies.rs +++ b/rust/agama-lib/src/proxies.rs @@ -192,3 +192,29 @@ trait Locale { /// SetLocale method fn set_locale(&self, locale: &str) -> zbus::Result<()>; } + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.Job", + default_service = "org.opensuse.Agama.Storage1", + default_path = "/org/opensuse/Agama/Storage1/jobs" +)] +trait Job { + #[dbus_proxy(property)] + fn running(&self) -> zbus::Result; + + #[dbus_proxy(property)] + fn exit_code(&self) -> zbus::Result; + + #[dbus_proxy(signal)] + fn finished(&self, exit_code: u32) -> zbus::Result<()>; +} + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.DASD.Format", + default_service = "org.opensuse.Agama.Storage1", + default_path = "/org/opensuse/Agama/Storage1/jobs/1" +)] +trait FormatJob { + #[dbus_proxy(property)] + fn summary(&self) -> zbus::Result>; +} diff --git a/rust/agama-lib/src/storage/client.rs b/rust/agama-lib/src/storage/client.rs index 9998cc61aa..37b53a6936 100644 --- a/rust/agama-lib/src/storage/client.rs +++ b/rust/agama-lib/src/storage/client.rs @@ -14,6 +14,7 @@ use zbus::fdo::ObjectManagerProxy; use zbus::names::{InterfaceName, OwnedInterfaceName}; use zbus::zvariant::{OwnedObjectPath, OwnedValue}; use zbus::Connection; +pub mod dasd; pub mod iscsi; type DBusObject = ( diff --git a/rust/agama-lib/src/storage/client/dasd.rs b/rust/agama-lib/src/storage/client/dasd.rs new file mode 100644 index 0000000000..d1ab9c61f0 --- /dev/null +++ b/rust/agama-lib/src/storage/client/dasd.rs @@ -0,0 +1,110 @@ +//! Implements a client to access Agama's D-Bus API related to DASD management. + +use zbus::{ + fdo::ObjectManagerProxy, + zvariant::{ObjectPath, OwnedObjectPath}, + Connection, +}; + +use crate::{ + error::ServiceError, + storage::{model::dasd::DASDDevice, proxies::DASDManagerProxy}, +}; + +/// Client to connect to Agama's D-Bus API for DASD management. +#[derive(Clone)] +pub struct DASDClient<'a> { + manager_proxy: DASDManagerProxy<'a>, + object_manager_proxy: ObjectManagerProxy<'a>, +} + +impl<'a> DASDClient<'a> { + pub async fn new(connection: Connection) -> Result, ServiceError> { + let manager_proxy = DASDManagerProxy::new(&connection).await?; + let object_manager_proxy = ObjectManagerProxy::builder(&connection) + .destination("org.opensuse.Agama.Storage1")? + .path("/org/opensuse/Agama/Storage1")? + .build() + .await?; + Ok(Self { + manager_proxy, + object_manager_proxy, + }) + } + + pub async fn supported(&self) -> Result { + let introspect = self.manager_proxy.introspect().await?; + // simply check if introspection contain given interface + Ok(introspect.contains("org.opensuse.Agama.Storage1.DASD.Manager")) + } + + pub async fn probe(&self) -> Result<(), ServiceError> { + Ok(self.manager_proxy.probe().await?) + } + + pub async fn devices(&self) -> Result, ServiceError> { + let managed_objects = self.object_manager_proxy.get_managed_objects().await?; + + let mut devices: Vec<(OwnedObjectPath, DASDDevice)> = vec![]; + for (path, ifaces) in managed_objects { + if let Some(properties) = ifaces.get("org.opensuse.Agama.Storage1.DASD.Device") { + match DASDDevice::try_from(properties) { + Ok(device) => { + devices.push((path, device)); + } + Err(error) => { + log::warn!("Not a valid DASD device: {}", error); + } + } + } + } + Ok(devices) + } + + pub async fn format(&self, ids: &[&str]) -> Result { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + let (exit_code, job_path) = self.manager_proxy.format(&references).await?; + if exit_code != 0 { + return Err(ServiceError::UnsuccessfulAction("DASD format".to_string())); + } + + Ok(job_path.to_string()) + } + + pub async fn enable(&self, ids: &[&str]) -> Result<(), ServiceError> { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + self.manager_proxy.enable(&references).await?; + Ok(()) + } + + pub async fn disable(&self, ids: &[&str]) -> Result<(), ServiceError> { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + self.manager_proxy.disable(&references).await?; + Ok(()) + } + + pub async fn set_diag(&self, ids: &[&str], diag: bool) -> Result<(), ServiceError> { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + self.manager_proxy.set_diag(&references, diag).await?; + Ok(()) + } + + async fn find_devices(&self, ids: &[&str]) -> Result>, ServiceError> { + let devices = self.devices().await?; + let selected: Vec = devices + .into_iter() + .filter_map(|(path, device)| { + if ids.contains(&device.id.as_str()) { + Some(path.into_inner()) + } else { + None + } + }) + .collect(); + Ok(selected) + } +} diff --git a/rust/agama-lib/src/storage/model.rs b/rust/agama-lib/src/storage/model.rs index da49d35cc6..81c3b0f65e 100644 --- a/rust/agama-lib/src/storage/model.rs +++ b/rust/agama-lib/src/storage/model.rs @@ -5,6 +5,8 @@ use zbus::zvariant::{OwnedValue, Value}; use crate::dbus::{get_optional_property, get_property}; +pub mod dasd; + #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct DeviceSid(u32); diff --git a/rust/agama-lib/src/storage/model/dasd.rs b/rust/agama-lib/src/storage/model/dasd.rs new file mode 100644 index 0000000000..c40bf09804 --- /dev/null +++ b/rust/agama-lib/src/storage/model/dasd.rs @@ -0,0 +1,46 @@ +//! Implements a data model for DASD devices management. +use std::collections::HashMap; + +use serde::Serialize; +use zbus::zvariant::OwnedValue; + +use crate::{dbus::get_property, error::ServiceError}; + +/// Represents a DASD device (specific to s390x systems). +#[derive(Clone, Debug, Serialize, Default, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DASDDevice { + pub id: String, + pub enabled: bool, + pub device_name: String, + pub formatted: bool, + pub diag: bool, + pub status: String, + pub device_type: String, + pub access_type: String, + pub partition_info: String, +} +#[derive(Clone, Debug, Serialize, Default, utoipa::ToSchema)] +pub struct DASDFormatSummary { + pub total: u32, + pub step: u32, + pub done: bool, +} + +impl TryFrom<&HashMap> for DASDDevice { + type Error = ServiceError; + + fn try_from(value: &HashMap) -> Result { + Ok(DASDDevice { + id: get_property(value, "Id")?, + enabled: get_property(value, "Enabled")?, + device_name: get_property(value, "DeviceName")?, + formatted: get_property(value, "Formatted")?, + diag: get_property(value, "Diag")?, + status: get_property(value, "Status")?, + device_type: get_property(value, "Type")?, + access_type: get_property(value, "AccessType")?, + partition_info: get_property(value, "PartitionInfo")?, + }) + } +} diff --git a/rust/agama-lib/src/storage/proxies.rs b/rust/agama-lib/src/storage/proxies.rs index ebb9396ac7..c5d2de158b 100644 --- a/rust/agama-lib/src/storage/proxies.rs +++ b/rust/agama-lib/src/storage/proxies.rs @@ -156,3 +156,75 @@ trait Node { #[dbus_proxy(property)] fn target(&self) -> zbus::Result; } + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.DASD.Manager", + default_service = "org.opensuse.Agama.Storage1", + default_path = "/org/opensuse/Agama/Storage1" +)] +trait DASDManager { + /// Disable method + fn disable(&self, devices: &[&zbus::zvariant::ObjectPath<'_>]) -> zbus::Result; + + /// Enable method + fn enable(&self, devices: &[&zbus::zvariant::ObjectPath<'_>]) -> zbus::Result; + + /// Format method + fn format( + &self, + devices: &[&zbus::zvariant::ObjectPath<'_>], + ) -> zbus::Result<(u32, zbus::zvariant::OwnedObjectPath)>; + + /// Probe method + fn probe(&self) -> zbus::Result<()>; + + /// SetDiag method + fn set_diag( + &self, + devices: &[&zbus::zvariant::ObjectPath<'_>], + diag: bool, + ) -> zbus::Result; +} + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.DASD.Device", + default_service = "org.opensuse.Agama.Storage1", + assume_defaults = true +)] +trait DASDDevice { + /// AccessType property + #[dbus_proxy(property)] + fn access_type(&self) -> zbus::Result; + + /// DeviceName property + #[dbus_proxy(property)] + fn device_name(&self) -> zbus::Result; + + /// Diag property + #[dbus_proxy(property)] + fn diag(&self) -> zbus::Result; + + /// Enabled property + #[dbus_proxy(property)] + fn enabled(&self) -> zbus::Result; + + /// Formatted property + #[dbus_proxy(property)] + fn formatted(&self) -> zbus::Result; + + /// Id property + #[dbus_proxy(property)] + fn id(&self) -> zbus::Result; + + /// PartitionInfo property + #[dbus_proxy(property)] + fn partition_info(&self) -> zbus::Result; + + /// Status property + #[dbus_proxy(property)] + fn status(&self) -> zbus::Result; + + /// Type property + #[dbus_proxy(property)] + fn type_(&self) -> zbus::Result; +} diff --git a/rust/agama-server/src/storage/web.rs b/rust/agama-server/src/storage/web.rs index 4f0de7fab4..0fc935f346 100644 --- a/rust/agama-server/src/storage/web.rs +++ b/rust/agama-server/src/storage/web.rs @@ -21,13 +21,19 @@ use axum::{ use serde::{Deserialize, Serialize}; use tokio_stream::{Stream, StreamExt}; +pub mod dasd; pub mod iscsi; use crate::{ error::Error, - storage::web::iscsi::{iscsi_service, iscsi_stream}, + storage::web::{ + dasd::{dasd_service, dasd_stream}, + iscsi::{iscsi_service, iscsi_stream}, + }, web::{ - common::{issues_router, progress_router, service_status_router, EventStreams}, + common::{ + issues_router, jobs_service, progress_router, service_status_router, EventStreams, + }, Event, }, }; @@ -38,8 +44,10 @@ pub async fn storage_streams(dbus: zbus::Connection) -> Result { pub async fn storage_service(dbus: zbus::Connection) -> Result { const DBUS_SERVICE: &str = "org.opensuse.Agama.Storage1"; const DBUS_PATH: &str = "/org/opensuse/Agama/Storage1"; + const DBUS_DESTINATION: &str = "org.opensuse.Agama.Storage1"; let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let issues_router = issues_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let iscsi_router = iscsi_service(&dbus).await?; + let dasd_router = dasd_service(&dbus).await?; + let jobs_router = jobs_service(&dbus, DBUS_DESTINATION, DBUS_PATH).await?; let client = StorageClient::new(dbus.clone()).await?; let state = StorageState { client }; @@ -90,8 +101,10 @@ pub async fn storage_service(dbus: zbus::Connection) -> Result Result { + let stream: EventStreams = vec![ + ("dasd_devices", Box::pin(DASDDeviceStream::new(dbus).await?)), + ( + "format_jobs", + Box::pin(DASDFormatJobStream::new(dbus).await?), + ), + ]; + Ok(stream) +} + +#[derive(Clone)] +struct DASDState<'a> { + client: DASDClient<'a>, +} + +pub async fn dasd_service(dbus: &zbus::Connection) -> Result, ServiceError> { + let client = DASDClient::new(dbus.clone()).await?; + let state = DASDState { client }; + let router = Router::new() + .route("/supported", get(supported)) + .route("/devices", get(devices)) + .route("/probe", post(probe)) + .route("/format", post(format)) + .route("/enable", post(enable)) + .route("/disable", post(disable)) + .route("/diag", put(set_diag)) + .with_state(state); + Ok(router) +} + +/// Returns whether DASD technology is supported or not +#[utoipa::path( + get, + path="/supported", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "Returns whether DASD technology is supported") + ) +)] +async fn supported(State(state): State>) -> Result, Error> { + Ok(Json(state.client.supported().await?)) +} + +/// Returns the list of known DASD devices. +#[utoipa::path( + get, + path="/devices", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "List of DASD devices", body = Vec) + ) +)] +async fn devices(State(state): State>) -> Result>, Error> { + let devices = state + .client + .devices() + .await? + .into_iter() + .map(|(_path, device)| device) + .collect(); + Ok(Json(devices)) +} + +/// Find DASD devices in the system. +#[utoipa::path( + post, + path="/probe", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The probing process ran successfully") + ) +)] +async fn probe(State(state): State>) -> Result, Error> { + Ok(Json(state.client.probe().await?)) +} + +/// Formats a set of devices. +#[utoipa::path( + post, + path="/format", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The formatting process started. The id of format job is in response.") + ) +)] +async fn format( + State(state): State>, + Json(devices): Json, +) -> Result, Error> { + let path = state.client.format(&devices.as_references()).await?; + Ok(Json(path)) +} + +/// Enables a set of devices. +#[utoipa::path( + post, + path="/enable", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The DASD devices are enabled.") + ) +)] +async fn enable( + State(state): State>, + Json(devices): Json, +) -> Result, Error> { + state.client.enable(&devices.as_references()).await?; + Ok(Json(())) +} + +/// Disables a set of devices. +#[utoipa::path( + post, + path="/disable", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The DASD devices are disabled.") + ) +)] +async fn disable( + State(state): State>, + Json(devices): Json, +) -> Result, Error> { + state.client.disable(&devices.as_references()).await?; + Ok(Json(())) +} + +/// Sets the diag property for a set of devices. +#[utoipa::path( + put, + path="/diag", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The DIAG properties are set.") + ) + )] +async fn set_diag( + State(state): State>, + Json(params): Json, +) -> Result, Error> { + state + .client + .set_diag(¶ms.devices.as_references(), params.diag) + .await?; + Ok(Json(())) +} + +#[derive(Deserialize)] +struct SetDiagParams { + #[serde(flatten)] + pub devices: DevicesList, + pub diag: bool, +} + +#[derive(Deserialize)] +struct DevicesList { + devices: Vec, +} + +impl DevicesList { + pub fn as_references(&self) -> Vec<&str> { + self.devices.iter().map(AsRef::as_ref).collect() + } +} diff --git a/rust/agama-server/src/storage/web/dasd/stream.rs b/rust/agama-server/src/storage/web/dasd/stream.rs new file mode 100644 index 0000000000..e41f5825ad --- /dev/null +++ b/rust/agama-server/src/storage/web/dasd/stream.rs @@ -0,0 +1,267 @@ +// FIXME: the code is pretty similar to iscsi::stream. Refactor the stream to reduce the repetition. + +use std::{collections::HashMap, sync::Arc, task::Poll}; + +use agama_lib::{ + dbus::get_optional_property, + error::ServiceError, + property_from_dbus, + storage::{ + client::dasd::DASDClient, + model::dasd::{DASDDevice, DASDFormatSummary}, + }, +}; +use futures_util::{ready, Stream}; +use pin_project::pin_project; +use thiserror::Error; +use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; +use zbus::{ + fdo::{PropertiesChanged, PropertiesChangedArgs}, + zvariant::{self, ObjectPath, OwnedObjectPath, OwnedValue}, + MatchRule, Message, MessageStream, MessageType, +}; + +use crate::{ + dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}, + web::Event, +}; + +#[derive(Debug, Error)] +enum DASDDeviceStreamError { + #[error("Service error: {0}")] + Service(#[from] ServiceError), + #[error("Unknown DASD device: {0}")] + UnknownDevice(OwnedObjectPath), +} + +/// This stream listens for changes in the collection of DASD devices and emits +/// the updated objects. +/// +/// It relies on the [DBusObjectChangesStream] stream and uses a cache to avoid holding a bunch of +/// proxy objects. +#[pin_project] +pub struct DASDDeviceStream { + dbus: zbus::Connection, + cache: ObjectsCache, + #[pin] + inner: UnboundedReceiverStream, +} + +impl DASDDeviceStream { + /// Creates a new stream + /// + /// * `dbus`: D-Bus connection to listen on. + pub async fn new(dbus: &zbus::Connection) -> Result { + const MANAGER_PATH: &str = "/org/opensuse/Agama/Storage1"; + const NAMESPACE: &str = "/org/opensuse/Agama/Storage1/dasds"; + + let (tx, rx) = unbounded_channel(); + let mut stream = DBusObjectChangesStream::new( + dbus, + &ObjectPath::from_str_unchecked(MANAGER_PATH), + &ObjectPath::from_str_unchecked(NAMESPACE), + "org.opensuse.Agama.Storage1.DASD.Device", + ) + .await?; + + tokio::spawn(async move { + while let Some(change) = stream.next().await { + let _ = tx.send(change); + } + }); + let rx = UnboundedReceiverStream::new(rx); + + let mut cache: ObjectsCache = Default::default(); + let client = DASDClient::new(dbus.clone()).await?; + for (path, device) in client.devices().await? { + cache.add(path.into(), device); + } + + Ok(Self { + dbus: dbus.clone(), + cache, + inner: rx, + }) + } + + fn update_device<'a>( + cache: &'a mut ObjectsCache, + path: &OwnedObjectPath, + values: &HashMap, + ) -> Result<&'a DASDDevice, ServiceError> { + let device = cache.find_or_create(path); + property_from_dbus!(device, id, "Id", values, str); + property_from_dbus!(device, enabled, "Enabled", values, bool); + property_from_dbus!(device, device_name, "DeviceName", values, str); + property_from_dbus!(device, formatted, "Formatted", values, bool); + property_from_dbus!(device, diag, "Diag", values, bool); + property_from_dbus!(device, status, "Status", values, str); + property_from_dbus!(device, device_type, "Type", values, str); + property_from_dbus!(device, access_type, "AccessType", values, str); + property_from_dbus!(device, partition_info, "PartitionInfo", values, str); + Ok(device) + } + + fn remove_device( + cache: &mut ObjectsCache, + path: &OwnedObjectPath, + ) -> Result { + cache + .remove(path) + .ok_or_else(|| DASDDeviceStreamError::UnknownDevice(path.clone())) + } + + fn handle_change( + cache: &mut ObjectsCache, + change: &DBusObjectChange, + ) -> Result { + match change { + DBusObjectChange::Added(path, values) => { + let device = Self::update_device(cache, path, values)?; + Ok(Event::DASDDeviceAdded { + device: device.clone(), + }) + } + DBusObjectChange::Changed(path, updated) => { + let device = Self::update_device(cache, path, updated)?; + Ok(Event::DASDDeviceChanged { + device: device.clone(), + }) + } + DBusObjectChange::Removed(path) => { + let device = Self::remove_device(cache, path)?; + Ok(Event::DASDDeviceRemoved { device }) + } + } + } +} + +impl Stream for DASDDeviceStream { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut pinned = self.project(); + + Poll::Ready(loop { + let change = ready!(pinned.inner.as_mut().poll_next(cx)); + let next_value = match change { + Some(change) => { + if let Ok(event) = Self::handle_change(pinned.cache, &change) { + Some(event) + } else { + log::warn!("Could not process change {:?}", &change); + None + } + } + None => break None, + }; + if next_value.is_some() { + break next_value; + } + }) + } +} + +/// This stream listens for DASD progress changes and emits an [Event::DASDFormatJobChanged] event. +#[pin_project] +pub struct DASDFormatJobStream { + #[pin] + inner: MessageStream, +} + +impl DASDFormatJobStream { + pub async fn new(connection: &zbus::Connection) -> Result { + let rule = MatchRule::builder() + .msg_type(MessageType::Signal) + .path_namespace("/org/opensuse/Agama/Storage1/jobs")? + .interface("org.freedesktop.DBus.Properties")? + .member("PropertiesChanged")? + .build(); + let inner = MessageStream::for_match_rule(rule, connection, None).await?; + Ok(Self { inner }) + } + + fn handle_change(message: Result, zbus::Error>) -> Option { + let Ok(message) = message else { + return None; + }; + let properties = PropertiesChanged::from_message(message)?; + let args = properties.args().ok()?; + + if args.interface_name.as_str() != "org.opensuse.Agama.Storage1.DASD.Format" { + return None; + } + + let id = properties.path()?.to_string(); + let event = Self::to_event(id, &args); + if event.is_none() { + log::warn!("Could not decode the DASDFormatJobChanged event"); + } + event + } + + fn to_event(path: String, properties_changed: &PropertiesChangedArgs) -> Option { + let dict = properties_changed + .changed_properties() + .get("Summary")? + .downcast_ref::()?; + + // the key is the D-Bus path of the DASD device and the value is the progress + // of the related formatting process + let map = >>::try_from(dict.clone()).ok()?; + let mut format_summary = HashMap::new(); + + for (dasd_id, summary) in map { + let summary_values = summary.downcast_ref::()?; + let fields = summary_values.fields(); + let total: &u32 = fields.get(0)?.downcast_ref()?; + let step: &u32 = fields.get(1)?.downcast_ref()?; + let done: &bool = fields.get(2)?.downcast_ref()?; + format_summary.insert( + dasd_id.to_string(), + DASDFormatSummary { + total: *total, + step: *step, + done: *done, + }, + ); + } + + Some(Event::DASDFormatJobChanged { + job_id: path.to_string(), + summary: format_summary, + }) + } +} + +impl Stream for DASDFormatJobStream { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut pinned = self.project(); + + Poll::Ready(loop { + let item = ready!(pinned.inner.as_mut().poll_next(cx)); + let next_value = match item { + Some(change) => { + if let Some(event) = Self::handle_change(change) { + Some(event) + } else { + None + } + } + None => break None, + }; + if next_value.is_some() { + break next_value; + } + }) + } +} diff --git a/rust/agama-server/src/web.rs b/rust/agama-server/src/web.rs index 3025f17b9b..07926d98e4 100644 --- a/rust/agama-server/src/web.rs +++ b/rust/agama-server/src/web.rs @@ -13,7 +13,7 @@ use crate::{ software::web::{software_service, software_streams}, storage::web::{storage_service, storage_streams}, users::web::{users_service, users_streams}, - web::common::{issues_stream, progress_stream, service_status_stream}, + web::common::{issues_stream, jobs_stream, progress_stream, service_status_stream}, }; use axum::Router; @@ -141,6 +141,16 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res ) .await?, ); + stream.insert( + "storage-jobs", + jobs_stream( + dbus.clone(), + "org.opensuse.Agama.Storage1", + "/org/opensuse/Agama/Storage1", + "/org/opensuse/Agama/Storage1/jobs", + ) + .await?, + ); stream.insert( "software-status", service_status_stream( diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs index 814a811811..430fa50c06 100644 --- a/rust/agama-server/src/web/common.rs +++ b/rust/agama-server/src/web/common.rs @@ -15,6 +15,9 @@ use zbus::PropertyStream; use crate::error::Error; +mod jobs; +pub use jobs::{jobs_service, jobs_stream}; + use super::Event; pub type EventStreams = Vec<(&'static str, Pin + Send>>)>; diff --git a/rust/agama-server/src/web/common/jobs.rs b/rust/agama-server/src/web/common/jobs.rs new file mode 100644 index 0000000000..1e7484ebf2 --- /dev/null +++ b/rust/agama-server/src/web/common/jobs.rs @@ -0,0 +1,186 @@ +use std::{collections::HashMap, pin::Pin, task::Poll}; + +use agama_lib::{ + dbus::get_optional_property, + error::ServiceError, + jobs::{client::JobsClient, Job}, + property_from_dbus, +}; +use axum::{extract::State, routing::get, Json, Router}; +use futures_util::{ready, Stream}; +use pin_project::pin_project; +use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; +use zbus::zvariant::{ObjectPath, OwnedObjectPath, OwnedValue}; + +use crate::{ + dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}, + error::Error, + web::Event, +}; + +/// Builds a router for the jobs objects. +pub async fn jobs_service( + dbus: &zbus::Connection, + destination: &'static str, + path: &'static str, +) -> Result, ServiceError> { + let client = JobsClient::new(dbus.clone(), destination, path).await?; + let state = JobsState { client }; + Ok(Router::new().route("/jobs", get(jobs)).with_state(state)) +} + +#[derive(Clone)] +struct JobsState<'a> { + client: JobsClient<'a>, +} + +async fn jobs(State(state): State>) -> Result>, Error> { + let jobs = state + .client + .jobs() + .await? + .into_iter() + .map(|(_path, job)| job) + .collect(); + Ok(Json(jobs)) +} + +/// Returns the stream of jobs-related events. +/// +/// The stream combines the following events: +/// +/// * Changes on the DASD devices collection. +/// +/// * `dbus`: D-Bus connection to use. +pub async fn jobs_stream( + dbus: zbus::Connection, + destination: &'static str, + manager: &'static str, + namespace: &'static str, +) -> Result + Send>>, Error> { + let stream = JobsStream::new(&dbus, destination, manager, namespace).await?; + Ok(Box::pin(stream)) +} + +#[pin_project] +pub struct JobsStream { + dbus: zbus::Connection, + cache: ObjectsCache, + #[pin] + inner: UnboundedReceiverStream, +} + +#[derive(Debug, thiserror::Error)] +enum JobsStreamError { + #[error("Service error: {0}")] + Service(#[from] ServiceError), + #[error("Unknown job: {0}")] + UnknownJob(OwnedObjectPath), +} + +impl JobsStream { + pub async fn new( + dbus: &zbus::Connection, + destination: &'static str, + manager: &'static str, + namespace: &'static str, + ) -> Result { + let (tx, rx) = unbounded_channel(); + let mut stream = DBusObjectChangesStream::new( + dbus, + &ObjectPath::from_static_str(manager)?, + &ObjectPath::from_static_str(namespace)?, + "org.opensuse.Agama.Storage1.Job", + ) + .await?; + + tokio::spawn(async move { + while let Some(change) = stream.next().await { + let _ = tx.send(change); + } + }); + let rx = UnboundedReceiverStream::new(rx); + + let mut cache: ObjectsCache = Default::default(); + let client = JobsClient::new(dbus.clone(), destination, manager).await?; + for (path, job) in client.jobs().await? { + cache.add(path, job); + } + + Ok(Self { + dbus: dbus.clone(), + cache, + inner: rx, + }) + } + + fn update_job<'a>( + cache: &'a mut ObjectsCache, + path: &OwnedObjectPath, + values: &HashMap, + ) -> Result<&'a Job, ServiceError> { + let job = cache.find_or_create(path); + property_from_dbus!(job, running, "Running", values, bool); + property_from_dbus!(job, exit_code, "ExitCode", values, u32); + Ok(job) + } + + fn remove_job( + cache: &mut ObjectsCache, + path: &OwnedObjectPath, + ) -> Result { + cache + .remove(path) + .ok_or_else(|| JobsStreamError::UnknownJob(path.clone())) + } + + fn handle_change( + cache: &mut ObjectsCache, + change: &DBusObjectChange, + ) -> Result { + match change { + DBusObjectChange::Added(path, values) => { + let job = Self::update_job(cache, path, values)?; + Ok(Event::JobAdded { job: job.clone() }) + } + DBusObjectChange::Changed(path, updated) => { + let job = Self::update_job(cache, path, updated)?; + Ok(Event::JobChanged { job: job.clone() }) + } + DBusObjectChange::Removed(path) => { + let job = Self::remove_job(cache, path)?; + Ok(Event::JobRemoved { job }) + } + } + } +} + +impl Stream for JobsStream { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut pinned = self.project(); + + Poll::Ready(loop { + let change = ready!(pinned.inner.as_mut().poll_next(cx)); + let next_value = match change { + Some(change) => { + if let Ok(event) = Self::handle_change(pinned.cache, &change) { + Some(event) + } else { + log::warn!("Could not process change {:?}", &change); + None + } + } + None => break None, + }; + if next_value.is_some() { + break next_value; + } + }) + } +} diff --git a/rust/agama-server/src/web/event.rs b/rust/agama-server/src/web/event.rs index 20075ce00a..c11a9ccc89 100644 --- a/rust/agama-server/src/web/event.rs +++ b/rust/agama-server/src/web/event.rs @@ -1,7 +1,13 @@ use crate::network::model::NetworkChange; use agama_lib::{ - localization::model::LocaleConfig, manager::InstallationPhase, - product::RegistrationRequirement, progress::Progress, software::SelectedBy, storage::ISCSINode, + jobs::Job, + localization::model::LocaleConfig, + manager::InstallationPhase, + product::RegistrationRequirement, + progress::Progress, + software::SelectedBy, + storage::model::dasd::{DASDDevice, DASDFormatSummary}, + storage::ISCSINode, users::FirstUser, }; use serde::Serialize; @@ -11,7 +17,7 @@ use tokio::sync::broadcast::{Receiver, Sender}; use super::common::Issue; #[derive(Clone, Debug, Serialize)] -#[serde(tag = "type")] +#[serde(rename_all = "camelCase", tag = "type")] pub enum Event { L10nConfigChanged(LocaleConfig), LocaleChanged { @@ -77,6 +83,28 @@ pub enum Event { name: Option, ibft: Option, }, + DASDDeviceAdded { + device: DASDDevice, + }, + DASDDeviceChanged { + device: DASDDevice, + }, + DASDDeviceRemoved { + device: DASDDevice, + }, + JobAdded { + job: Job, + }, + JobChanged { + job: Job, + }, + JobRemoved { + job: Job, + }, + DASDFormatJobChanged { + job_id: String, + summary: HashMap, + }, } pub type EventsSender = Sender;