diff --git a/rust/agama-cli/src/progress.rs b/rust/agama-cli/src/progress.rs index 085d10dd8d..7752d35824 100644 --- a/rust/agama-cli/src/progress.rs +++ b/rust/agama-cli/src/progress.rs @@ -26,8 +26,8 @@ use console::style; use indicatif::{ProgressBar, ProgressStyle}; use std::time::Duration; -const MANAGER_SERVICE: &str = "org.opensuse.Agama.Manager1"; -const SOFTWARE_SERVICE: &str = "org.opensuse.Agama.Software1"; +const MANAGER_PROGRESS_OBJECT_PATH: &str = "/org/opensuse/Agama/Manager1"; +const SOFTWARE_PROGRESS_OBJECT_PATH: &str = "/org/opensuse/Agama/Software1"; /// Displays the progress on the terminal. pub struct ProgressMonitor { @@ -77,14 +77,14 @@ impl ProgressMonitor { /// /// It returns true if the monitor should continue. async fn update(&mut self, status: MonitorStatus) -> bool { - if status.progress.get(MANAGER_SERVICE).is_none() && self.running { + if status.progress.get(MANAGER_PROGRESS_OBJECT_PATH).is_none() && self.running { self.finish(); if self.stop_on_idle { return false; } } - if let Some(progress) = status.progress.get(MANAGER_SERVICE) { + if let Some(progress) = status.progress.get(MANAGER_PROGRESS_OBJECT_PATH) { self.running = true; if self.current_step != progress.current_step { self.update_main(&progress).await; @@ -92,7 +92,7 @@ impl ProgressMonitor { } } - match status.progress.get(SOFTWARE_SERVICE) { + match status.progress.get(SOFTWARE_PROGRESS_OBJECT_PATH) { Some(progress) => self.update_bar(progress), None => self.remove_bar(), } diff --git a/rust/agama-lib/src/http/event.rs b/rust/agama-lib/src/http/event.rs index 2bc8bee295..9fdabb8b1c 100644 --- a/rust/agama-lib/src/http/event.rs +++ b/rust/agama-lib/src/http/event.rs @@ -49,8 +49,8 @@ pub enum Event { DevicesDirty { dirty: bool, }, - Progress { - service: String, + ProgressChanged { + path: String, #[serde(flatten)] progress: Progress, }, diff --git a/rust/agama-lib/src/monitor.rs b/rust/agama-lib/src/monitor.rs index df4e99e94f..c56ecd7075 100644 --- a/rust/agama-lib/src/monitor.rs +++ b/rust/agama-lib/src/monitor.rs @@ -60,8 +60,8 @@ use crate::{ progress::Progress, }; -const MANAGER_SERVICE: &str = "org.opensuse.Agama.Manager1"; -// const SOFTWARE_SERVICE: &str = "org.opensuse.Agama.Software1"; +const MANAGER_PROGRESS_OBJECT_PATH: &str = "/org/opensuse/Agama/Manager1"; +const SOFTWARE_PROGRESS_OBJECT_PATH: &str = "/org/opensuse/Agama/Software1"; #[derive(thiserror::Error, Debug)] pub enum MonitorError { @@ -82,7 +82,7 @@ pub struct MonitorStatus { /// /// FIXME: do not hold the full status (some elements are not updated) pub installer_status: InstallerStatus, - /// Progress for each service using the name as the key. If the progress is + /// Progress for each service using the D-Bus object path as the key. If the progress is /// finished, the entry is removed from the map. pub progress: HashMap, } @@ -92,13 +92,13 @@ impl MonitorStatus { /// /// The entry is removed if the progress is finished. /// - /// * `service`: service name. + /// * `service`: D-Bus object path. /// * `progress`: updated progress. - fn update_progress(&mut self, service: String, progress: Progress) { + fn update_progress(&mut self, path: String, progress: Progress) { if progress.finished { - _ = self.progress.remove_entry(&service); + _ = self.progress.remove_entry(&path); } else { - _ = self.progress.insert(service, progress); + _ = self.progress.insert(path, progress); } } @@ -224,11 +224,11 @@ impl Monitor { /// * `event`: Agama event. fn handle_event(&mut self, event: Event) { match event { - Event::Progress { service, progress } => { - self.status.update_progress(service, progress); + Event::ProgressChanged { path, progress } => { + self.status.update_progress(path, progress); } Event::ServiceStatusChanged { service, status } => { - if service.as_str() == MANAGER_SERVICE { + if service.as_str() == MANAGER_PROGRESS_OBJECT_PATH { self.status.set_is_busy(status == 1); } } @@ -258,26 +258,32 @@ impl MonitorStatusReader { ..Default::default() }; - self.add_service_progress(&mut status, MANAGER_SERVICE, "/manager/progress") - .await?; - // FIXME: do not read the software status yet because it might block - // the progress. Enable this line when the software service does not block - // self.add_service_progress(&mut status, SOFTWARE_SERVICE, "/software/progress") - // .await?; + self.add_service_progress( + &mut status, + MANAGER_PROGRESS_OBJECT_PATH, + "/manager/progress", + ) + .await?; + self.add_service_progress( + &mut status, + SOFTWARE_PROGRESS_OBJECT_PATH, + "/software/progress", + ) + .await?; Ok(status) } async fn add_service_progress( &self, status: &mut MonitorStatus, - service: &str, + dbus_path: &str, path: &str, ) -> Result<(), MonitorError> { let progress: Progress = self.http.get(path).await?; if progress.finished { return Ok(()); } - status.progress.insert(service.to_string(), progress); + status.progress.insert(dbus_path.to_string(), progress); Ok(()) } } diff --git a/rust/agama-lib/src/progress.rs b/rust/agama-lib/src/progress.rs index e3c0eaadf1..b4a8167996 100644 --- a/rust/agama-lib/src/progress.rs +++ b/rust/agama-lib/src/progress.rs @@ -63,3 +63,13 @@ impl Progress { }) } } + +/// Information about the current progress sequence. +#[derive(Clone, Debug, Default, Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProgressSequence { + /// Sequence steps if known in advance + pub steps: Vec, + #[serde(flatten)] + pub progress: Progress, +} diff --git a/rust/agama-lib/src/proxies.rs b/rust/agama-lib/src/proxies.rs index 8b6d049408..7374bb057e 100644 --- a/rust/agama-lib/src/proxies.rs +++ b/rust/agama-lib/src/proxies.rs @@ -19,7 +19,7 @@ // find current contact information at www.suse.com. mod progress; -pub use progress::ProgressProxy; +pub use progress::{ProgressChanged, ProgressChangedArgs, ProgressChangedStream, ProgressProxy}; mod service_status; pub use service_status::ServiceStatusProxy; diff --git a/rust/agama-lib/src/proxies/progress.rs b/rust/agama-lib/src/proxies/progress.rs index 11d7236368..c39d6c697f 100644 --- a/rust/agama-lib/src/proxies/progress.rs +++ b/rust/agama-lib/src/proxies/progress.rs @@ -22,6 +22,16 @@ use zbus::proxy; #[proxy(interface = "org.opensuse.Agama1.Progress", assume_defaults = true)] pub trait Progress { + /// ProgressChanged signal + #[zbus(signal)] + fn progress_changed( + &self, + total_steps: u32, + current_step: (u32, &str), + finished: bool, + steps: Vec<&str>, + ) -> zbus::Result<()>; + /// CurrentStep property #[zbus(property)] fn current_step(&self) -> zbus::Result<(u32, String)>; diff --git a/rust/agama-server/src/error.rs b/rust/agama-server/src/error.rs index 5cba83437e..f9328d8f0d 100644 --- a/rust/agama-server/src/error.rs +++ b/rust/agama-server/src/error.rs @@ -26,7 +26,11 @@ use axum::{ }; use serde_json::json; -use crate::{l10n::LocaleError, questions::QuestionsError, web::common::IssuesServiceError}; +use crate::{ + l10n::LocaleError, + questions::QuestionsError, + web::common::{IssuesServiceError, ProgressServiceError}, +}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -42,6 +46,8 @@ pub enum Error { Locale(#[from] LocaleError), #[error("Issues service error: {0}")] Issues(#[from] IssuesServiceError), + #[error("Progress service error: {0}")] + Progress(#[from] ProgressServiceError), } // This would be nice, but using it for a return type diff --git a/rust/agama-server/src/manager/web.rs b/rust/agama-server/src/manager/web.rs index 5a093dabe3..e5bbf4e3ca 100644 --- a/rust/agama-server/src/manager/web.rs +++ b/rust/agama-server/src/manager/web.rs @@ -31,6 +31,7 @@ use agama_lib::{ manager::{FinishMethod, InstallationPhase, InstallerStatus, ManagerClient}, proxies::Manager1Proxy, }; +use anyhow::Context; use axum::{ body::Body, extract::State, @@ -45,7 +46,7 @@ use tokio_util::io::ReaderStream; use crate::{ error::Error, - web::common::{progress_router, service_status_router}, + web::common::{service_status_router, ProgressClient, ProgressRouterBuilder}, }; use agama_lib::http::Event; @@ -85,12 +86,19 @@ pub async fn manager_stream( } /// Sets up and returns the axum service for the manager module -pub async fn manager_service(dbus: zbus::Connection) -> Result { +pub async fn manager_service( + dbus: zbus::Connection, + progress: ProgressClient, +) -> Result { const DBUS_SERVICE: &str = "org.opensuse.Agama.Manager1"; const DBUS_PATH: &str = "/org/opensuse/Agama/Manager1"; let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; - let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + // FIXME: use anyhow temporarily until we adapt all these methods to return + // the crate::error::Error instead of ServiceError. + let progress_router = ProgressRouterBuilder::new(DBUS_SERVICE, DBUS_PATH, progress) + .build() + .context("Could not build the progress router")?; let manager = ManagerClient::new(dbus.clone()).await?; let state = ManagerState { manager, dbus }; Ok(Router::new() diff --git a/rust/agama-server/src/software/web.rs b/rust/agama-server/src/software/web.rs index 80549b6db8..709b49ee01 100644 --- a/rust/agama-server/src/software/web.rs +++ b/rust/agama-server/src/software/web.rs @@ -29,7 +29,8 @@ use crate::{ error::Error, web::{ common::{ - progress_router, service_status_router, EventStreams, IssuesClient, IssuesRouterBuilder, + service_status_router, EventStreams, IssuesClient, IssuesRouterBuilder, ProgressClient, + ProgressRouterBuilder, }, EventsReceiver, }, @@ -238,13 +239,13 @@ pub async fn software_service( dbus: zbus::Connection, events: EventsReceiver, issues: IssuesClient, + progress: ProgressClient, ) -> Result { const DBUS_SERVICE: &str = "org.opensuse.Agama.Software1"; const DBUS_PATH: &str = "/org/opensuse/Agama/Software1"; const DBUS_PRODUCT_PATH: &str = "/org/opensuse/Agama/Software1/Product"; let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; - let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; // FIXME: use anyhow temporarily until we adapt all these methods to return // the crate::error::Error instead of ServiceError. @@ -254,6 +255,9 @@ pub async fn software_service( let product_issues = IssuesRouterBuilder::new(DBUS_SERVICE, DBUS_PRODUCT_PATH, issues) .build() .context("Could not build an issues router")?; + let progress_router = ProgressRouterBuilder::new(DBUS_SERVICE, DBUS_PATH, progress) + .build() + .context("Could not build the progress router")?; let mut licenses_repo = LicensesRepo::default(); if let Err(error) = licenses_repo.read() { diff --git a/rust/agama-server/src/storage/web.rs b/rust/agama-server/src/storage/web.rs index 6af7d0c9d0..e593e7ef9e 100644 --- a/rust/agama-server/src/storage/web.rs +++ b/rust/agama-server/src/storage/web.rs @@ -56,8 +56,8 @@ use crate::{ iscsi::iscsi_stream, }, web::common::{ - jobs_service, progress_router, service_status_router, EventStreams, IssuesClient, - IssuesRouterBuilder, + jobs_service, service_status_router, EventStreams, IssuesClient, IssuesRouterBuilder, + ProgressClient, ProgressRouterBuilder, }, }; use agama_lib::http::Event; @@ -101,16 +101,21 @@ struct StorageState<'a> { pub async fn storage_service( dbus: zbus::Connection, issues: IssuesClient, + progress: ProgressClient, ) -> Result { const DBUS_SERVICE: &str = "org.opensuse.Agama.Storage1"; const DBUS_PATH: &str = "/org/opensuse/Agama/Storage1"; const DBUS_DESTINATION: &str = "org.opensuse.Agama.Storage1"; let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; - let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + // FIXME: use anyhow temporarily until we adapt all these methods to return + // the crate::error::Error instead of ServiceError. let issues_router = IssuesRouterBuilder::new(DBUS_SERVICE, DBUS_PATH, issues.clone()) .build() .context("Could not build an issues router")?; + let progress_router = ProgressRouterBuilder::new(DBUS_SERVICE, DBUS_PATH, progress) + .build() + .context("Could not build the progress router")?; let iscsi_router = storage_iscsi_service(&dbus).await?; let dasd_router = dasd_service(&dbus).await?; let zfcp_router = zfcp_service(&dbus).await?; diff --git a/rust/agama-server/src/web.rs b/rust/agama-server/src/web.rs index eb3e97a212..da4e1f301b 100644 --- a/rust/agama-server/src/web.rs +++ b/rust/agama-server/src/web.rs @@ -39,7 +39,7 @@ use crate::{ software::web::{software_service, software_streams}, storage::web::{iscsi::iscsi_service, storage_service, storage_streams}, users::web::{users_service, users_streams}, - web::common::{jobs_stream, progress_stream, service_status_stream}, + web::common::{jobs_stream, service_status_stream}, }; use axum::Router; @@ -54,7 +54,7 @@ mod state; mod ws; use agama_lib::{connection, error::ServiceError, http::Event}; -use common::IssuesService; +use common::{IssuesService, ProgressService}; pub use config::ServiceConfig; pub use event::{EventsReceiver, EventsSender}; pub use service::MainServiceBuilder; @@ -81,18 +81,28 @@ where .expect("Could not connect to NetworkManager to read the configuration"); let issues = IssuesService::start(dbus.clone(), events.clone()).await; + let progress = ProgressService::start(dbus.clone(), events.clone()).await; let router = MainServiceBuilder::new(events.clone(), web_ui_dir) .add_service("/l10n", l10n_service(dbus.clone(), events.clone()).await?) - .add_service("/manager", manager_service(dbus.clone()).await?) + .add_service( + "/manager", + manager_service(dbus.clone(), progress.clone()).await?, + ) .add_service("/security", security_service(dbus.clone()).await?) .add_service( "/software", - software_service(dbus.clone(), events.subscribe(), issues.clone()).await?, + software_service( + dbus.clone(), + events.subscribe(), + issues.clone(), + progress.clone(), + ) + .await?, ) .add_service( "/storage", - storage_service(dbus.clone(), issues.clone()).await?, + storage_service(dbus.clone(), issues.clone(), progress).await?, ) .add_service("/iscsi", iscsi_service(dbus.clone(), issues.clone()).await?) .add_service("/bootloader", bootloader_service(dbus.clone()).await?) @@ -137,15 +147,6 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res ) .await?, ); - stream.insert( - "manager-progress", - progress_stream( - dbus.clone(), - "org.opensuse.Agama.Manager1", - "/org/opensuse/Agama/Manager1", - ) - .await?, - ); for (id, user_stream) in users_streams(dbus.clone()).await? { stream.insert(id, user_stream); } @@ -164,15 +165,6 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res ) .await?, ); - stream.insert( - "storage-progress", - progress_stream( - dbus.clone(), - "org.opensuse.Agama.Storage1", - "/org/opensuse/Agama/Storage1", - ) - .await?, - ); stream.insert( "storage-jobs", jobs_stream( @@ -192,15 +184,6 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res ) .await?, ); - stream.insert( - "software-progress", - progress_stream( - dbus.clone(), - "org.opensuse.Agama.Software1", - "/org/opensuse/Agama/Software1", - ) - .await?, - ); stream.insert("questions", questions_stream(dbus.clone()).await?); tokio::pin!(stream); diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs index 395f8eca1c..2a28120dde 100644 --- a/rust/agama-server/src/web/common.rs +++ b/rust/agama-server/src/web/common.rs @@ -20,18 +20,12 @@ //! This module defines functions to be used accross all services. -use std::{pin::Pin, task::Poll}; +use std::pin::Pin; -use agama_lib::{ - error::ServiceError, - progress::Progress, - proxies::{ProgressProxy, ServiceStatusProxy}, -}; +use agama_lib::{error::ServiceError, proxies::ServiceStatusProxy}; use axum::{extract::State, routing::get, Json, Router}; -use pin_project::pin_project; use serde::Serialize; use tokio_stream::{Stream, StreamExt}; -use zbus::proxy::PropertyStream; use crate::error::Error; @@ -39,6 +33,8 @@ mod jobs; pub use jobs::{jobs_service, jobs_stream}; mod issues; pub use issues::{IssuesClient, IssuesRouterBuilder, IssuesService, IssuesServiceError}; +mod progress; +pub use progress::{ProgressClient, ProgressRouterBuilder, ProgressService, ProgressServiceError}; use super::Event; @@ -143,130 +139,3 @@ async fn build_service_status_proxy<'a>( .await?; Ok(proxy) } - -/// Builds a router to the `org.opensuse.Agama1.Progress` -/// interface of the given D-Bus object. -/// -/// ```no_run -/// # use axum::{extract::State, routing::get, Json, Router}; -/// # use agama_lib::connection; -/// # use agama_server::web::common::progress_router; -/// # use tokio_test; -/// -/// # tokio_test::block_on(async { -/// async fn hello(state: State) {}; -/// -/// #[derive(Clone)] -/// struct HelloWorldState {}; -/// -/// let dbus = connection().await.unwrap(); -/// let progress_router = progress_router( -/// &dbus, "org.opensuse.HelloWorld", "/org/opensuse/hello" -/// ).await.unwrap(); -/// let router: Router = Router::new() -/// .route("/hello", get(hello)) -/// .merge(progress_router) -/// .with_state(HelloWorldState {}); -/// }); -/// ``` -/// -/// * `dbus`: D-Bus connection. -/// * `destination`: D-Bus service name. -/// * `path`: D-Bus object path. -pub async fn progress_router( - dbus: &zbus::Connection, - destination: &str, - path: &str, -) -> Result, ServiceError> { - let proxy = build_progress_proxy(dbus, destination, path).await?; - let state = ProgressState { proxy }; - Ok(Router::new() - .route("/progress", get(progress)) - .with_state(state)) -} - -#[derive(Clone)] -struct ProgressState<'a> { - proxy: ProgressProxy<'a>, -} - -/// Information about the current progress sequence. -#[derive(Clone, Default, Serialize, utoipa::ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ProgressSequence { - /// Sequence steps if known in advance - steps: Vec, - #[serde(flatten)] - progress: Progress, -} - -async fn progress(State(state): State>) -> Result, Error> { - let proxy = state.proxy; - let progress = Progress::from_proxy(&proxy).await?; - let steps = proxy.steps().await?; - let sequence = ProgressSequence { steps, progress }; - Ok(Json(sequence)) -} - -#[pin_project] -pub struct ProgressStream<'a> { - #[pin] - inner: PropertyStream<'a, (u32, String)>, - proxy: ProgressProxy<'a>, -} - -pub async fn progress_stream<'a>( - dbus: zbus::Connection, - destination: &'static str, - path: &'static str, -) -> Result + Send>>, zbus::Error> { - let proxy = build_progress_proxy(&dbus, destination, path).await?; - Ok(Box::pin(ProgressStream::new(proxy).await)) -} - -impl<'a> ProgressStream<'a> { - pub async fn new(proxy: ProgressProxy<'a>) -> Self { - let stream = proxy.receive_current_step_changed().await; - ProgressStream { - inner: stream, - proxy, - } - } -} - -impl Stream for ProgressStream<'_> { - type Item = Event; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let pinned = self.project(); - match pinned.inner.poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_change) => match Progress::from_cached_proxy(pinned.proxy) { - Some(progress) => { - let event = Event::Progress { - progress, - service: pinned.proxy.inner().destination().to_string(), - }; - Poll::Ready(Some(event)) - } - _ => Poll::Pending, - }, - } - } -} - -async fn build_progress_proxy<'a>( - dbus: &zbus::Connection, - destination: &str, - path: &str, -) -> Result, zbus::Error> { - let proxy = ProgressProxy::builder(dbus) - .destination(destination.to_string())? - .path(path.to_string())? - .build() - .await?; - Ok(proxy) -} diff --git a/rust/agama-server/src/web/common/issues.rs b/rust/agama-server/src/web/common/issues.rs index d4ea71f0a6..5e9887805f 100644 --- a/rust/agama-server/src/web/common/issues.rs +++ b/rust/agama-server/src/web/common/issues.rs @@ -24,16 +24,14 @@ //! //! * Querying the issues via D-Bus and keeping them in a cache. //! * Listening to D-Bus signals to keep the cache up-to-date. -//! * Emitting `IssuesChanged` events, replacing -//! [issues_stream](crate::web::common::issues_stream). +//! * Emitting `IssuesChanged` events. //! //! The following components are included: //! //! * [IssuesService] that runs on a separate task to hold the status. //! * [IssuesClient] that allows querying the [IssuesService] server about the //! issues. -//! * [IssuesRouter] which allows building a router, replacing -//! [issues_router](crate::web::common::issues_router). +//! * [IssuesRouterBuilder] which allows building a router. //! //! At this point, it only handles the issues that are exposed through D-Bus. @@ -58,8 +56,8 @@ pub enum IssuesServiceError { SendIssues, #[error("Could not get an answer from the service: {0}")] RecvIssues(#[from] oneshot::error::RecvError), - #[error("Could not set the command")] - SendCommand, + #[error("Could not set the command: {0}")] + SendCommand(#[from] mpsc::error::SendError), #[error("Error parsing issues from D-Bus: {0}")] InvalidIssue(#[from] zbus::zvariant::Error), #[error("Error reading the issues: {0}")] @@ -71,13 +69,13 @@ pub enum IssuesServiceError { } #[derive(Debug)] -enum IssuesCommand { +pub enum IssuesCommand { Get(String, String, oneshot::Sender>), } /// Implements a Tokio task that holds the issues for each service. pub struct IssuesService { - issues: HashMap>, + cache: HashMap>, commands: mpsc::Receiver, events: EventsSender, dbus: zbus::Connection, @@ -93,19 +91,23 @@ impl IssuesService { pub async fn start(dbus: zbus::Connection, events: EventsSender) -> IssuesClient { let (tx, rx) = mpsc::channel(4); let mut service = IssuesService { - issues: HashMap::new(), + cache: HashMap::new(), dbus, events, commands: rx, }; - tokio::spawn(async move { service.run().await }); + tokio::spawn(async move { + if let Err(e) = service.run().await { + tracing::error!("Could not start the issues service: {e:?}") + } + }); IssuesClient(tx) } /// Main loop of the service. - async fn run(&mut self) { - let mut messages = build_properties_changed_stream(&self.dbus).await.unwrap(); + async fn run(&mut self) -> IssuesServiceResult<()> { + let mut messages = build_properties_changed_stream(&self.dbus).await?; loop { tokio::select! { Some(cmd) = self.commands.recv() => { @@ -168,7 +170,7 @@ impl IssuesService { .map(Issue::try_from) .collect::, _>>()?; - self.issues.insert(path.to_string(), issues.clone()); + self.cache.insert(path.to_string(), issues.clone()); let event = Event::IssuesChanged { path: path.to_string(), @@ -187,7 +189,7 @@ impl IssuesService { /// * `path`: path of the D-Bus object implementing the /// "org.opensuse.Agama1.Issues" interface. async fn get(&mut self, service: &str, path: &str) -> IssuesServiceResult> { - if let Some(issues) = self.issues.get(path) { + if let Some(issues) = self.cache.get(path) { return Ok(issues.clone()); } @@ -212,7 +214,7 @@ impl IssuesService { .map(Issue::try_from) .collect::, _>>()?; - self.issues.insert(path.to_string(), issues.clone()); + self.cache.insert(path.to_string(), issues.clone()); Ok(issues) } } @@ -234,8 +236,7 @@ impl IssuesClient { path.to_string(), tx, )) - .await - .map_err(|_| IssuesServiceError::SendCommand)?; + .await?; Ok(rx.await?) } } @@ -279,7 +280,7 @@ impl IssuesRouterBuilder { async fn issues( State(state): State, ) -> Result>, crate::error::Error> { - let issues = state.client.get(&state.service, &state.path).await.unwrap(); + let issues = state.client.get(&state.service, &state.path).await?; Ok(Json(issues)) } } diff --git a/rust/agama-server/src/web/common/progress.rs b/rust/agama-server/src/web/common/progress.rs new file mode 100644 index 0000000000..8be6d11a6d --- /dev/null +++ b/rust/agama-server/src/web/common/progress.rs @@ -0,0 +1,297 @@ +// Copyright (c) [2025] SUSE LLC +// +// All Rights Reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation; either version 2 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, contact SUSE LLC. +// +// To contact SUSE LLC about this file by physical or electronic mail, you may +// find current contact information at www.suse.com. + +//! Defines a service that keep tracks of the Agama progress. +//! +//! It is responsible for: +//! +//! * Querying the progress via D-Bus and keeping them in a cache. +//! * Listening to D-Bus signals to keep the cache up-to-date. +//! * Emitting `ProgressChanged` events. +//! +//! The following components are included: +//! +//! * [ProgressService] that runs on a separate task to hold the status. +//! * [ProgressClient] that allows querying the [ProgressService] server about the +//! progress. +//! * [ProgressRouterBuilder] which allows building a router. +//! +//! At this point, it only handles the progress that are exposed through D-Bus. + +use crate::web::EventsSender; +use agama_lib::{ + http::Event, + progress::{Progress, ProgressSequence}, + proxies::{ProgressChanged, ProgressProxy}, +}; +use axum::{extract::State, routing::get, Json, Router}; +use std::collections::HashMap; +use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio_stream::StreamExt; +use zbus::{message::Type as MessageType, MatchRule, MessageStream}; + +type ProgressServiceResult = Result; + +#[derive(Debug, thiserror::Error)] +pub enum ProgressServiceError { + #[error("Could not return the progress")] + SendProgress, + #[error("Could not get an answer from the service: {0}")] + RecvProgress(#[from] oneshot::error::RecvError), + #[error("Could not set the command: {0}")] + SendCommand(#[from] mpsc::error::SendError), + #[error("Error parsing progress from D-Bus: {0}")] + InvalidProgress(#[from] zbus::zvariant::Error), + #[error("Error reading the progress: {0}")] + DBus(#[from] zbus::Error), + #[error("Invalid D-Bus name: {0}")] + DBusName(#[from] zbus::names::Error), + #[error("Could not send the event: {0}")] + SendEvent(#[from] broadcast::error::SendError), +} + +#[derive(Debug)] +pub enum ProgressCommand { + Get(String, String, oneshot::Sender), +} + +/// Implements a Tokio task that holds the progress for each service. +pub struct ProgressService { + cache: HashMap, + commands: mpsc::Receiver, + events: EventsSender, + dbus: zbus::Connection, +} + +impl ProgressService { + /// Sets up and starts the service as a Tokio task. + /// + /// Once it is started, the service waits for: + /// + /// * Commands from a client ([ProgressClient]). + /// * Relevant events from D-Bus. + pub async fn start(dbus: zbus::Connection, events: EventsSender) -> ProgressClient { + let (tx, rx) = mpsc::channel(4); + let mut service = ProgressService { + cache: HashMap::new(), + dbus, + events, + commands: rx, + }; + + tokio::spawn(async move { + if let Err(e) = service.run().await { + tracing::error!("Could not start the progress service: {e:?}") + } + }); + ProgressClient(tx) + } + + /// Main loop of the service. + async fn run(&mut self) -> ProgressServiceResult<()> { + let mut messages = build_progress_changed_stream(&self.dbus).await?; + loop { + tokio::select! { + Some(cmd) = self.commands.recv() => { + if let Err(e) = self.handle_command(cmd).await { + tracing::error!("{e}"); + } + } + + Some(Ok(message)) = messages.next() => { + if let Some(changed) = ProgressChanged::from_message(message) { + if let Err(e) = self.handle_progress_changed(changed).await { + tracing::error!("ProgressService: could not handle change: {:?}", e); + } + } + } + } + } + } + + /// Handles commands from the client. + async fn handle_command(&mut self, command: ProgressCommand) -> ProgressServiceResult<()> { + match command { + ProgressCommand::Get(service, path, tx) => { + let progress = self.get(&service, &path).await?; + tx.send(progress) + .map_err(|_| ProgressServiceError::SendProgress)?; + } + } + + Ok(()) + } + + /// Handles ProgressChanged events. + /// + /// It reports an error if something went work. If the message was processed or skipped + /// it returns Ok(()). + async fn handle_progress_changed( + &mut self, + message: ProgressChanged, + ) -> ProgressServiceResult<()> { + let args = message.args()?; + let inner = message.message(); + let header = inner.header(); + + // Given that it is a ProcessChanged, it should not happen. + let Some(path) = header.path() else { + tracing::warn!("Found a ProgressChanged signal without a path"); + return Ok(()); + }; + + let (current_step, current_title) = args.current_step(); + let progress = Progress { + current_title: current_title.to_string(), + current_step: current_step.clone(), + max_steps: args.total_steps, + finished: args.finished, + }; + let sequence = ProgressSequence { + steps: args.steps().iter().map(ToString::to_string).collect(), + progress: progress.clone(), + }; + self.cache.insert(path.to_string(), sequence.clone()); + + let event = Event::ProgressChanged { + path: path.to_string(), + progress, + }; + self.events.send(event)?; + Ok(()) + } + + /// Gets the progress for a given D-Bus service and path. + /// + /// This method uses a cache to store the values. If the value is not in the cache, + /// it asks the D-Bus service about the progress (and cache them). + /// + /// * `service`: D-Bus service to connect to. + /// * `path`: path of the D-Bus object implementing the + /// "org.opensuse.Agama1.Progress" interface. + async fn get(&mut self, service: &str, path: &str) -> ProgressServiceResult { + if let Some(sequence) = self.cache.get(path) { + return Ok(sequence.clone()); + } + + let proxy = ProgressProxy::builder(&self.dbus) + .destination(service)? + .path(path)? + .build() + .await?; + + let progress = Progress::from_proxy(&proxy).await?; + let steps = proxy.steps().await?; + let sequence = ProgressSequence { steps, progress }; + + self.cache.insert(path.to_string(), sequence.clone()); + Ok(sequence) + } +} + +/// It allows querying the [ProgressService]. +/// +/// It is cheap to clone the client and use it from several +/// places. +#[derive(Clone)] +pub struct ProgressClient(mpsc::Sender); + +impl ProgressClient { + /// Get the progress for the given D-Bus service and path. + pub async fn get(&self, service: &str, path: &str) -> ProgressServiceResult { + let (tx, rx) = oneshot::channel(); + self.0 + .send(ProgressCommand::Get( + service.to_string(), + path.to_string(), + tx, + )) + .await?; + Ok(rx.await?) + } +} + +/// It allows building an Axum router for the progress service. +pub struct ProgressRouterBuilder { + service: String, + path: String, + client: ProgressClient, +} + +impl ProgressRouterBuilder { + /// Creates a new builder. + /// + /// * `service`: D-Bus service to connect to. + /// * `path`: path of the D-Bus object implementing the + /// "org.opensuse.Agama1.Progress" interface. + /// * `client`: client to access the progress. + pub fn new(service: &str, path: &str, client: ProgressClient) -> Self { + ProgressRouterBuilder { + service: service.to_string(), + path: path.to_string(), + client, + } + } + + /// Builds the Axum router. + pub fn build(self) -> Result, crate::error::Error> { + let state = ProgressState { + service: self.service, + path: self.path, + client: self.client, + }; + + Ok(Router::new() + .route("/progress", get(Self::progress)) + .with_state(state)) + } + + /// Handler of the GET /progress endpoint. + async fn progress( + State(state): State, + ) -> Result, crate::error::Error> { + let progress = state.client.get(&state.service, &state.path).await?; + Ok(Json(progress)) + } +} + +/// State for the router. +#[derive(Clone)] +struct ProgressState { + service: String, + path: String, + client: ProgressClient, +} + +/// Returns a stream of properties changes. +/// +/// It listens for changes in several objects that are related to a network device. +pub async fn build_progress_changed_stream( + connection: &zbus::Connection, +) -> Result { + let rule = MatchRule::builder() + .msg_type(MessageType::Signal) + .interface("org.opensuse.Agama1.Progress")? + .member("ProgressChanged")? + .build(); + // The third parameter corresponds to the max_queue. We rely on the default (64). + let stream = MessageStream::for_match_rule(rule, connection, None).await?; + Ok(stream) +} diff --git a/rust/package/agama.changes b/rust/package/agama.changes index ec25de0ca0..71c20ad871 100644 --- a/rust/package/agama.changes +++ b/rust/package/agama.changes @@ -1,3 +1,9 @@ +------------------------------------------------------------------- +Fri May 23 10:17:32 UTC 2025 - Imobach Gonzalez Sosa + +- Cache progress reporting to avoid blocking the clients + (gh#agama-project/agama#2389). + ------------------------------------------------------------------- Thu May 22 17:19:10 UTC 2025 - Ancor Gonzalez Sosa diff --git a/service/lib/agama/dbus/interfaces/progress.rb b/service/lib/agama/dbus/interfaces/progress.rb index 67881a8194..e5e69d719a 100644 --- a/service/lib/agama/dbus/interfaces/progress.rb +++ b/service/lib/agama/dbus/interfaces/progress.rb @@ -92,10 +92,16 @@ def progress_properties def register_progress_callbacks progress_manager.on_change do dbus_properties_changed(PROGRESS_INTERFACE, progress_properties, []) + ProgressChanged( + progress_total_steps, progress_current_step, progress_finished, progress_steps + ) end progress_manager.on_finish do dbus_properties_changed(PROGRESS_INTERFACE, progress_properties, []) + ProgressChanged( + progress_total_steps, progress_current_step, progress_finished, progress_steps + ) end end @@ -106,6 +112,8 @@ def self.included(base) dbus_reader :progress_current_step, "(us)", dbus_name: "CurrentStep" dbus_reader :progress_finished, "b", dbus_name: "Finished" dbus_reader :progress_steps, "as", dbus_name: "Steps" + dbus_signal :ProgressChanged, + "total_steps:u, current_step:(us), finished:b, steps:as" end end end diff --git a/service/lib/agama/software/manager.rb b/service/lib/agama/software/manager.rb index a258d01d80..0512974710 100644 --- a/service/lib/agama/software/manager.rb +++ b/service/lib/agama/software/manager.rb @@ -145,16 +145,22 @@ def probe logger.info "Probing software" + common_steps = [ + _("Refreshing repositories metadata"), + _("Calculating the software proposal") + ] if repositories.empty? - start_progress_with_size(3) + start_progress_with_descriptions( + _("Initializing sources"), *common_steps + ) Yast::PackageCallbacks.InitPackageCallbacks(logger) - progress.step(_("Initializing sources")) { add_base_repos } + progress.step { add_base_repos } else - start_progress_with_size(2) + start_progress_with_size(*common_steps) end - progress.step(_("Refreshing repositories metadata")) { repositories.load } - progress.step(_("Calculating the software proposal")) { propose } + progress.step { repositories.load } + progress.step { propose } update_issues end diff --git a/service/package/rubygem-agama-yast.changes b/service/package/rubygem-agama-yast.changes index 7ab0930fe4..25a823b66c 100644 --- a/service/package/rubygem-agama-yast.changes +++ b/service/package/rubygem-agama-yast.changes @@ -1,3 +1,9 @@ +------------------------------------------------------------------- +Fri May 23 11:59:20 UTC 2025 - Imobach Gonzalez Sosa + +- Introduce a new ProgressChanged signal which should be used + instead of PropertiesChanged (gh#agama-project/agama#2389). + ------------------------------------------------------------------- Fri May 23 10:39:21 UTC 2025 - Josef Reidinger diff --git a/service/test/agama/dbus/interfaces/progress_test.rb b/service/test/agama/dbus/interfaces/progress_test.rb index aeda5776aa..e6dd6b1bd1 100644 --- a/service/test/agama/dbus/interfaces/progress_test.rb +++ b/service/test/agama/dbus/interfaces/progress_test.rb @@ -176,6 +176,8 @@ class Backend expect(subject).to receive(:dbus_properties_changed) .with(progress_interface, anything, anything) + expect(subject).to receive(:ProgressChanged) + .with(2, [1, "step 1"], false, []) progress.step("step 1") end @@ -186,6 +188,8 @@ class Backend expect(subject).to receive(:dbus_properties_changed) .with(progress_interface, anything, anything) + expect(subject).to receive(:ProgressChanged) + .with(2, [0, ""], true, []) progress.finish end diff --git a/web/src/queries/progress.ts b/web/src/queries/progress.ts index 4ea2701ef9..00a582e3a5 100644 --- a/web/src/queries/progress.ts +++ b/web/src/queries/progress.ts @@ -28,9 +28,9 @@ import { QueryHookOptions } from "~/types/queries"; import { fetchProgress } from "~/api/progress"; const servicesMap = { - "org.opensuse.Agama.Manager1": "manager", - "org.opensuse.Agama.Software1": "software", - "org.opensuse.Agama.Storage1": "storage", + "/org/opensuse/Agama/Manager1": "manager", + "/org/opensuse/Agama/Software1": "software", + "/org/opensuse/Agama/Storage1": "storage", }; /** @@ -76,10 +76,10 @@ const useProgressChanges = () => { if (!client) return; return client.onEvent((event) => { - if (event.type === "Progress") { - const service = servicesMap[event.service]; + if (event.type === "ProgressChanged") { + const service = servicesMap[event.path]; if (!service) { - console.warn("Unknown service", event.service); + console.warn("Unknown progress path", event.path); return; } diff --git a/web/src/types/progress.ts b/web/src/types/progress.ts index 69b808268d..d1ef53f6ca 100644 --- a/web/src/types/progress.ts +++ b/web/src/types/progress.ts @@ -26,7 +26,7 @@ type APIProgress = { currentTitle: string; finished: boolean; steps?: string[]; - service: string; + path: string; }; class Progress {