From 967aef347878b388def5677dc211d3e3a7cfedd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Wed, 13 Mar 2024 14:43:35 +0000 Subject: [PATCH 1/7] Use consts in the manager_service function --- rust/agama-server/src/manager/web.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/rust/agama-server/src/manager/web.rs b/rust/agama-server/src/manager/web.rs index 86c3739537..fb5b1ebf73 100644 --- a/rust/agama-server/src/manager/web.rs +++ b/rust/agama-server/src/manager/web.rs @@ -127,12 +127,11 @@ pub async fn installation_phase_changed_stream( /// Sets up and returns the axum service for the manager module pub async fn manager_service(dbus: zbus::Connection) -> Result { - let status_route = service_status_router( - &dbus, - "org.opensuse.Agama.Manager1", - "/org/opensuse/Agama/Manager1", - ) - .await?; + const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Manager1"; + const DBUS_PATH: &'static str = "/org/opensuse/Agama/Manager1"; + + let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let manager = ManagerClient::new(dbus).await?; let state = ManagerState { manager }; Ok(Router::new() From 44cf3bf8c7e33fa0da792878a1750fa8264dc465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Wed, 13 Mar 2024 16:11:17 +0000 Subject: [PATCH 2/7] Add support to expose the Progress API --- rust/Cargo.lock | 9 +- rust/agama-lib/src/progress.rs | 13 +++ rust/agama-server/Cargo.toml | 1 + rust/agama-server/src/web/common.rs | 127 +++++++++++++++++++++++++++- rust/agama-server/src/web/event.rs | 27 ++++-- 5 files changed, 166 insertions(+), 11 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 87d2ca5210..1a7deead07 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -107,6 +107,7 @@ dependencies = [ "macaddr", "once_cell", "pam", + "pin-project", "rand", "regex", "serde", @@ -2276,18 +2277,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", diff --git a/rust/agama-lib/src/progress.rs b/rust/agama-lib/src/progress.rs index b79db61195..ab9bd5aa89 100644 --- a/rust/agama-lib/src/progress.rs +++ b/rust/agama-lib/src/progress.rs @@ -78,6 +78,19 @@ impl Progress { finished: finished?, }) } + + pub fn from_cached_proxy(proxy: &crate::proxies::ProgressProxy<'_>) -> Option { + let (current_step, current_title) = proxy.cached_current_step().ok()??; + let max_steps = proxy.cached_total_steps().ok()??; + let finished = proxy.cached_finished().ok()??; + + Some(Progress { + current_step, + current_title, + max_steps, + finished, + }) + } } /// Monitorizes and reports the progress of Agama's current operation. diff --git a/rust/agama-server/Cargo.toml b/rust/agama-server/Cargo.toml index d03ff59cc7..1e6cd6652f 100644 --- a/rust/agama-server/Cargo.toml +++ b/rust/agama-server/Cargo.toml @@ -48,6 +48,7 @@ chrono = { version = "0.4.34", default-features = false, features = [ ] } pam = "0.8.0" serde_with = "3.6.1" +pin-project = "1.1.5" [[bin]] name = "agama-dbus-server" diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs index f11ffea274..c5dfccfbc9 100644 --- a/rust/agama-server/src/web/common.rs +++ b/rust/agama-server/src/web/common.rs @@ -1,9 +1,17 @@ //! This module defines functions to be used accross all services. -use agama_lib::{error::ServiceError, proxies::ServiceStatusProxy}; +use std::task::Poll; + +use agama_lib::{ + error::ServiceError, + progress::Progress, + proxies::{ProgressProxy, ServiceStatusProxy}, +}; use axum::{extract::State, routing::get, Json, Router}; +use pin_project::pin_project; use serde::Serialize; use tokio_stream::{Stream, StreamExt}; +use zbus::PropertyStream; use crate::error::Error; @@ -108,3 +116,120 @@ async fn build_service_status_proxy<'a>( .await?; Ok(proxy) } + +/// Builds a router to the `org.opensuse.Agama1.Progress` +/// interface of the given D-Bus object. +/// +/// ```no_run +/// # use axum::{extract::State, routing::get, Json, Router}; +/// # use agama_lib::connection; +/// # use agama_server::web::common::progress_router; +/// # use tokio_test; +/// +/// # tokio_test::block_on(async { +/// async fn hello(state: State) {}; +/// +/// #[derive(Clone)] +/// struct HelloWorldState {}; +/// +/// let dbus = connection().await.unwrap(); +/// let progress_router = progress_router( +/// &dbus, "org.opensuse.HelloWorld", "/org/opensuse/hello" +/// ).await.unwrap(); +/// let router: Router = Router::new() +/// .route("/hello", get(hello)) +/// .merge(progress) +/// .with_state(HelloWorldState {}); +/// }); +/// ``` +/// +/// * `dbus`: D-Bus connection. +/// * `destination`: D-Bus service name. +/// * `path`: D-Bus object path. +pub async fn progress_router( + dbus: &zbus::Connection, + destination: &str, + path: &str, +) -> Result, ServiceError> { + let proxy = build_progress_proxy(dbus, destination, path).await?; + let state = ProgressState { proxy }; + Ok(Router::new() + .route("/progress", get(progress)) + .with_state(state)) +} + +#[derive(Clone)] +struct ProgressState<'a> { + proxy: ProgressProxy<'a>, +} + +async fn progress(State(state): State>) -> Json { + let proxy = state.proxy; + let progress = Progress::from_proxy(&proxy).await.unwrap(); + Json(progress) +} + +#[pin_project] +pub struct ProgressStream<'a> { + #[pin] + inner: PropertyStream<'a, (u32, String)>, + proxy: ProgressProxy<'a>, +} + +pub async fn progress_stream<'a>( + dbus: zbus::Connection, + destination: &'static str, + path: &'static str, +) -> ProgressStream<'a> { + let proxy = build_progress_proxy(&dbus, destination, path) + .await + .unwrap(); + ProgressStream::new(proxy).await +} + +impl<'a> ProgressStream<'a> { + pub async fn new(proxy: ProgressProxy<'a>) -> Self { + let stream = proxy.receive_current_step_changed().await; + ProgressStream { + inner: stream, + proxy, + } + } +} + +impl<'a> Stream for ProgressStream<'a> { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let pinned = self.project(); + match pinned.inner.poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_change) => match Progress::from_cached_proxy(&pinned.proxy) { + Some(progress) => { + let event = Event::Progress { + progress, + service: pinned.proxy.destination().to_string(), + }; + Poll::Ready(Some(event)) + } + _ => Poll::Pending, + }, + } + } +} + +async fn build_progress_proxy<'a>( + dbus: &zbus::Connection, + destination: &str, + path: &str, +) -> Result, zbus::Error> { + let proxy = ProgressProxy::builder(&dbus) + .destination(destination.to_string())? + .path(path.to_string())? + .build() + .await?; + Ok(proxy) +} diff --git a/rust/agama-server/src/web/event.rs b/rust/agama-server/src/web/event.rs index a67317cdd0..045d4804ba 100644 --- a/rust/agama-server/src/web/event.rs +++ b/rust/agama-server/src/web/event.rs @@ -8,13 +8,28 @@ use tokio::sync::broadcast::{Receiver, Sender}; #[serde(tag = "type")] pub enum Event { L10nConfigChanged(LocaleConfig), - LocaleChanged { locale: String }, - Progress(Progress), - ProductChanged { id: String }, + LocaleChanged { + locale: String, + }, + Progress { + service: String, + #[serde(flatten)] + progress: Progress, + }, + ProductChanged { + id: String, + }, PatternsChanged(HashMap), - InstallationPhaseChanged { phase: InstallationPhase }, - BusyServicesChanged { services: Vec }, - ServiceStatusChanged { service: String, status: u32 }, + InstallationPhaseChanged { + phase: InstallationPhase, + }, + BusyServicesChanged { + services: Vec, + }, + ServiceStatusChanged { + service: String, + status: u32, + }, } pub type EventsSender = Sender; From 6a40c3e158d82c0041f915c798aaa1cbebb91bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Wed, 13 Mar 2024 16:12:00 +0000 Subject: [PATCH 3/7] Expose the Manager Progress over HTTP --- rust/agama-server/src/manager/web.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/rust/agama-server/src/manager/web.rs b/rust/agama-server/src/manager/web.rs index fb5b1ebf73..58dd72b3a0 100644 --- a/rust/agama-server/src/manager/web.rs +++ b/rust/agama-server/src/manager/web.rs @@ -25,7 +25,7 @@ use tokio_stream::{Stream, StreamExt}; use crate::{ error::Error, web::{ - common::{service_status_router, service_status_stream}, + common::{progress_router, progress_stream, service_status_router, service_status_stream}, Event, }, }; @@ -69,15 +69,23 @@ pub struct InstallerStatus { pub async fn manager_stream(dbus: zbus::Connection) -> Result, Error> { Ok(StreamExt::merge( StreamExt::merge( - busy_services_changed_stream(dbus.clone()).await?, - installation_phase_changed_stream(dbus.clone()).await?, + StreamExt::merge( + busy_services_changed_stream(dbus.clone()).await?, + installation_phase_changed_stream(dbus.clone()).await?, + ), + service_status_stream( + dbus.clone(), + "org.opensuse.Agama.Manager1", + "/org/opensuse/Agama/Manager1", + ) + .await?, ), - service_status_stream( + progress_stream( dbus, "org.opensuse.Agama.Manager1", "/org/opensuse/Agama/Manager1", ) - .await?, + .await, )) } @@ -139,7 +147,8 @@ pub async fn manager_service(dbus: zbus::Connection) -> Result Date: Wed, 13 Mar 2024 16:15:34 +0000 Subject: [PATCH 4/7] Drop the EventsProgressPresenter * It is replaced by the ProgressStream. --- rust/agama-server/src/web.rs | 13 ++------- rust/agama-server/src/web/progress.rs | 40 --------------------------- 2 files changed, 2 insertions(+), 51 deletions(-) delete mode 100644 rust/agama-server/src/web/progress.rs diff --git a/rust/agama-server/src/web.rs b/rust/agama-server/src/web.rs index 34fc364d2e..06d4cdd550 100644 --- a/rust/agama-server/src/web.rs +++ b/rust/agama-server/src/web.rs @@ -4,7 +4,6 @@ //! * Emit relevant events via websocket. //! * Serve the code for the web user interface (not implemented yet). -use self::progress::EventsProgressPresenter; use crate::{ error::Error, l10n::web::l10n_service, @@ -19,12 +18,11 @@ mod config; mod docs; mod event; mod http; -mod progress; mod service; mod state; mod ws; -use agama_lib::{connection, error::ServiceError, progress::ProgressMonitor}; +use agama_lib::{connection, error::ServiceError}; pub use auth::generate_token; pub use config::ServiceConfig; pub use docs::ApiDoc; @@ -63,14 +61,7 @@ where /// /// * `events`: channel to send the events to. pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> { - let presenter = EventsProgressPresenter::new(events.clone()); let connection = connection().await?; - let mut monitor = ProgressMonitor::new(connection.clone()).await?; - tokio::spawn(async move { - if let Err(error) = monitor.run(presenter).await { - eprintln!("Could not monitor the D-Bus server: {}", error); - } - }); tokio::spawn(run_events_monitor(connection, events.clone())); Ok(()) @@ -80,7 +71,7 @@ pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> { /// /// * `connection`: D-Bus connection. /// * `events`: channel to send the events to. -pub async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> { +async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> { let stream = StreamExt::merge( manager_stream(dbus.clone()).await?, software_stream(dbus).await?, diff --git a/rust/agama-server/src/web/progress.rs b/rust/agama-server/src/web/progress.rs deleted file mode 100644 index c892edd8ed..0000000000 --- a/rust/agama-server/src/web/progress.rs +++ /dev/null @@ -1,40 +0,0 @@ -//! Implements a mechanism to monitor track service progress. - -use super::event::{Event, EventsSender}; -use agama_lib::progress::{Progress, ProgressPresenter}; -use async_trait::async_trait; - -// let presenter = EventsProgressPresenter::new(socket); -// let mut monitor = ProgressMonitor::new(connection).await.unwrap(); -// _ = monitor.run(presenter).await; - -/// Experimental ProgressPresenter to emit progress events over a Events. -pub struct EventsProgressPresenter(EventsSender); - -impl EventsProgressPresenter { - pub fn new(events: EventsSender) -> Self { - Self(events) - } - - pub async fn report_progress(&mut self, progress: &Progress) { - _ = self.0.send(Event::Progress(progress.clone())) - // _ = self.events.send(Message::Text(payload)).await; - } -} - -#[async_trait] -impl ProgressPresenter for EventsProgressPresenter { - async fn start(&mut self, progress: &Progress) { - self.report_progress(progress).await; - } - - async fn update_main(&mut self, progress: &Progress) { - self.report_progress(progress).await; - } - - async fn update_detail(&mut self, progress: &Progress) { - self.report_progress(progress).await; - } - - async fn finish(&mut self) {} -} From dd88614ddbde57c6a3364da8e9e64342cf60b125 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Wed, 13 Mar 2024 16:18:35 +0000 Subject: [PATCH 5/7] Expose Software ServiceStatus and Progress APIs --- rust/agama-server/src/software/web.rs | 36 ++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/rust/agama-server/src/software/web.rs b/rust/agama-server/src/software/web.rs index dbbfb8bc19..cb12ec2491 100644 --- a/rust/agama-server/src/software/web.rs +++ b/rust/agama-server/src/software/web.rs @@ -5,7 +5,13 @@ //! * `software_service` which returns the Axum service. //! * `software_stream` which offers an stream that emits the software events coming from D-Bus. -use crate::{error::Error, web::Event}; +use crate::{ + error::Error, + web::{ + common::{progress_router, progress_stream, service_status_router, service_status_stream}, + Event, + }, +}; use agama_lib::{ error::ServiceError, product::{Product, ProductClient}, @@ -61,8 +67,24 @@ impl IntoResponse for SoftwareError { /// * `connection`: D-Bus connection to listen for events. pub async fn software_stream(dbus: zbus::Connection) -> Result, Error> { Ok(StreamExt::merge( - product_changed_stream(dbus.clone()).await?, - patterns_changed_stream(dbus.clone()).await?, + StreamExt::merge( + StreamExt::merge( + product_changed_stream(dbus.clone()).await?, + patterns_changed_stream(dbus.clone()).await?, + ), + service_status_stream( + dbus.clone(), + "org.opensuse.Agama.Software1", + "/org/opensuse/Agama/Software1", + ) + .await?, + ), + progress_stream( + dbus, + "org.opensuse.Agama.Software1", + "/org/opensuse/Agama/Software1", + ) + .await, )) } @@ -122,6 +144,12 @@ fn reason_to_selected_by( /// Sets up and returns the axum service for the software module. pub async fn software_service(dbus: zbus::Connection) -> Result { + const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Software1"; + const DBUS_PATH: &'static str = "/org/opensuse/Agama/Software1"; + + let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + let product = ProductClient::new(dbus.clone()).await?; let software = SoftwareClient::new(dbus).await?; let state = SoftwareState { product, software }; @@ -131,6 +159,8 @@ pub async fn software_service(dbus: zbus::Connection) -> Result Date: Thu, 14 Mar 2024 14:35:56 +0000 Subject: [PATCH 6/7] Avoid chaining StreamExt::merge calls --- rust/agama-server/src/manager/web.rs | 55 ++++----------------------- rust/agama-server/src/software/web.rs | 33 +++++----------- rust/agama-server/src/web.rs | 49 +++++++++++++++++++++--- rust/agama-server/src/web/common.rs | 10 ++--- 4 files changed, 66 insertions(+), 81 deletions(-) diff --git a/rust/agama-server/src/manager/web.rs b/rust/agama-server/src/manager/web.rs index 58dd72b3a0..8f6bc6b08c 100644 --- a/rust/agama-server/src/manager/web.rs +++ b/rust/agama-server/src/manager/web.rs @@ -5,6 +5,8 @@ //! * `manager_service` which returns the Axum service. //! * `manager_stream` which offers an stream that emits the manager events coming from D-Bus. +use std::pin::Pin; + use agama_lib::{ error::ServiceError, manager::{InstallationPhase, ManagerClient}, @@ -25,7 +27,7 @@ use tokio_stream::{Stream, StreamExt}; use crate::{ error::Error, web::{ - common::{progress_router, progress_stream, service_status_router, service_status_stream}, + common::{progress_router, service_status_router}, Event, }, }; @@ -63,55 +65,12 @@ pub struct InstallerStatus { /// Returns a stream that emits manager related events coming from D-Bus. /// -/// It emits the Event::BusyServicesChanged and Event::InstallationPhaseChanged events. +/// It emits the Event::InstallationPhaseChanged event. /// /// * `connection`: D-Bus connection to listen for events. -pub async fn manager_stream(dbus: zbus::Connection) -> Result, Error> { - Ok(StreamExt::merge( - StreamExt::merge( - StreamExt::merge( - busy_services_changed_stream(dbus.clone()).await?, - installation_phase_changed_stream(dbus.clone()).await?, - ), - service_status_stream( - dbus.clone(), - "org.opensuse.Agama.Manager1", - "/org/opensuse/Agama/Manager1", - ) - .await?, - ), - progress_stream( - dbus, - "org.opensuse.Agama.Manager1", - "/org/opensuse/Agama/Manager1", - ) - .await, - )) -} - -pub async fn busy_services_changed_stream( - dbus: zbus::Connection, -) -> Result, Error> { - let proxy = Manager1Proxy::new(&dbus).await?; - let stream = proxy - .receive_busy_services_changed() - .await - .then(|change| async move { - if let Ok(busy_services) = change.get().await { - Some(Event::BusyServicesChanged { - services: busy_services, - }) - } else { - None - } - }) - .filter_map(|e| e); - Ok(stream) -} - -pub async fn installation_phase_changed_stream( +pub async fn manager_stream( dbus: zbus::Connection, -) -> Result, Error> { +) -> Result + Send>>, Error> { let proxy = Manager1Proxy::new(&dbus).await?; let stream = proxy .receive_current_installation_phase_changed() @@ -130,7 +89,7 @@ pub async fn installation_phase_changed_stream( } }) .filter_map(|e| e); - Ok(stream) + Ok(Box::pin(stream)) } /// Sets up and returns the axum service for the manager module diff --git a/rust/agama-server/src/software/web.rs b/rust/agama-server/src/software/web.rs index cb12ec2491..de8c599352 100644 --- a/rust/agama-server/src/software/web.rs +++ b/rust/agama-server/src/software/web.rs @@ -8,7 +8,7 @@ use crate::{ error::Error, web::{ - common::{progress_router, progress_stream, service_status_router, service_status_stream}, + common::{progress_router, service_status_router}, Event, }, }; @@ -29,7 +29,7 @@ use axum::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::collections::HashMap; +use std::{collections::HashMap, pin::Pin}; use thiserror::Error; use tokio_stream::{Stream, StreamExt}; @@ -65,27 +65,14 @@ impl IntoResponse for SoftwareError { /// It emits the Event::ProductChanged and Event::PatternsChanged events. /// /// * `connection`: D-Bus connection to listen for events. -pub async fn software_stream(dbus: zbus::Connection) -> Result, Error> { - Ok(StreamExt::merge( - StreamExt::merge( - StreamExt::merge( - product_changed_stream(dbus.clone()).await?, - patterns_changed_stream(dbus.clone()).await?, - ), - service_status_stream( - dbus.clone(), - "org.opensuse.Agama.Software1", - "/org/opensuse/Agama/Software1", - ) - .await?, - ), - progress_stream( - dbus, - "org.opensuse.Agama.Software1", - "/org/opensuse/Agama/Software1", - ) - .await, - )) +pub async fn software_stream( + dbus: zbus::Connection, +) -> Result + Send>>, Error> { + let stream = StreamExt::merge( + product_changed_stream(dbus.clone()).await?, + patterns_changed_stream(dbus.clone()).await?, + ); + Ok(Box::pin(stream)) } async fn product_changed_stream( diff --git a/rust/agama-server/src/web.rs b/rust/agama-server/src/web.rs index 06d4cdd550..580f08b559 100644 --- a/rust/agama-server/src/web.rs +++ b/rust/agama-server/src/web.rs @@ -9,6 +9,7 @@ use crate::{ l10n::web::l10n_service, manager::web::{manager_service, manager_stream}, software::web::{software_service, software_stream}, + web::common::{progress_stream, service_status_stream}, }; use axum::Router; @@ -29,7 +30,7 @@ pub use docs::ApiDoc; pub use event::{Event, EventsReceiver, EventsSender}; pub use service::MainServiceBuilder; use std::path::Path; -use tokio_stream::StreamExt; +use tokio_stream::{StreamExt, StreamMap}; /// Returns a service that implements the web-based Agama API. /// @@ -72,13 +73,51 @@ pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> { /// * `connection`: D-Bus connection. /// * `events`: channel to send the events to. async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> { - let stream = StreamExt::merge( - manager_stream(dbus.clone()).await?, - software_stream(dbus).await?, + let mut stream = StreamMap::new(); + + stream.insert("manager", manager_stream(dbus.clone()).await?); + stream.insert( + "manager-status", + service_status_stream( + dbus.clone(), + "org.opensuse.Agama.Manager1", + "/org/opensuse/Agama/Manager1", + ) + .await?, + ); + stream.insert( + "manager-progress", + progress_stream( + dbus.clone(), + "org.opensuse.Agama.Manager1", + "/org/opensuse/Agama/Manager1", + ) + .await, + ); + + stream.insert("software", software_stream(dbus.clone()).await?); + stream.insert( + "software-status", + service_status_stream( + dbus.clone(), + "org.opensuse.Agama.Software1", + "/org/opensuse/Agama/Software1", + ) + .await?, + ); + stream.insert( + "software-progress", + progress_stream( + dbus.clone(), + "org.opensuse.Agama.Software1", + "/org/opensuse/Agama/Software1", + ) + .await, ); + tokio::pin!(stream); let e = events.clone(); - while let Some(event) = stream.next().await { + while let Some((_, event)) = stream.next().await { _ = e.send(event); } Ok(()) diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs index c5dfccfbc9..44bb518e4d 100644 --- a/rust/agama-server/src/web/common.rs +++ b/rust/agama-server/src/web/common.rs @@ -1,6 +1,6 @@ //! This module defines functions to be used accross all services. -use std::task::Poll; +use std::{pin::Pin, task::Poll}; use agama_lib::{ error::ServiceError, @@ -85,7 +85,7 @@ pub async fn service_status_stream( dbus: zbus::Connection, destination: &'static str, path: &'static str, -) -> Result, Error> { +) -> Result + Send>>, Error> { let proxy = build_service_status_proxy(&dbus, destination, path).await?; let stream = proxy .receive_current_changed() @@ -101,7 +101,7 @@ pub async fn service_status_stream( } }) .filter_map(|e| e); - Ok(stream) + Ok(Box::pin(stream)) } async fn build_service_status_proxy<'a>( @@ -180,11 +180,11 @@ pub async fn progress_stream<'a>( dbus: zbus::Connection, destination: &'static str, path: &'static str, -) -> ProgressStream<'a> { +) -> Pin + Send>> { let proxy = build_progress_proxy(&dbus, destination, path) .await .unwrap(); - ProgressStream::new(proxy).await + Box::pin(ProgressStream::new(proxy).await) } impl<'a> ProgressStream<'a> { From 859617b717e31e51fcfcb960736284ddb99a594a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Thu, 14 Mar 2024 16:50:35 +0000 Subject: [PATCH 7/7] Add error handling to the web::common module --- rust/agama-server/src/web.rs | 4 +-- rust/agama-server/src/web/common.rs | 56 ++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/rust/agama-server/src/web.rs b/rust/agama-server/src/web.rs index 580f08b559..4d7c5c173c 100644 --- a/rust/agama-server/src/web.rs +++ b/rust/agama-server/src/web.rs @@ -92,7 +92,7 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res "org.opensuse.Agama.Manager1", "/org/opensuse/Agama/Manager1", ) - .await, + .await?, ); stream.insert("software", software_stream(dbus.clone()).await?); @@ -112,7 +112,7 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res "org.opensuse.Agama.Software1", "/org/opensuse/Agama/Software1", ) - .await, + .await?, ); tokio::pin!(stream); diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs index 44bb518e4d..5c927bbe06 100644 --- a/rust/agama-server/src/web/common.rs +++ b/rust/agama-server/src/web/common.rs @@ -7,9 +7,17 @@ use agama_lib::{ progress::Progress, proxies::{ProgressProxy, ServiceStatusProxy}, }; -use axum::{extract::State, routing::get, Json, Router}; +use axum::{ + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; use pin_project::pin_project; use serde::Serialize; +use serde_json::json; +use thiserror::Error; use tokio_stream::{Stream, StreamExt}; use zbus::PropertyStream; @@ -75,6 +83,21 @@ struct ServiceStatus { current: u32, } +#[derive(Error, Debug)] +pub enum ServiceStatusError { + #[error("Service status error: {0}")] + Error(#[from] ServiceError), +} + +impl IntoResponse for ServiceStatusError { + fn into_response(self) -> Response { + let body = json!({ + "error": self.to_string() + }); + (StatusCode::BAD_REQUEST, Json(body)).into_response() + } +} + /// Builds a stream of the changes in the the `org.opensuse.Agama1.ServiceStatus` /// interface of the given D-Bus object. /// @@ -163,10 +186,27 @@ struct ProgressState<'a> { proxy: ProgressProxy<'a>, } -async fn progress(State(state): State>) -> Json { +#[derive(Error, Debug)] +pub enum ProgressError { + #[error("Progress error: {0}")] + Error(#[from] ServiceError), + #[error("D-Bus error: {0}")] + DBusError(#[from] zbus::Error), +} + +impl IntoResponse for ProgressError { + fn into_response(self) -> Response { + let body = json!({ + "error": self.to_string() + }); + (StatusCode::BAD_REQUEST, Json(body)).into_response() + } +} + +async fn progress(State(state): State>) -> Result, ProgressError> { let proxy = state.proxy; - let progress = Progress::from_proxy(&proxy).await.unwrap(); - Json(progress) + let progress = Progress::from_proxy(&proxy).await?; + Ok(Json(progress)) } #[pin_project] @@ -180,11 +220,9 @@ pub async fn progress_stream<'a>( dbus: zbus::Connection, destination: &'static str, path: &'static str, -) -> Pin + Send>> { - let proxy = build_progress_proxy(&dbus, destination, path) - .await - .unwrap(); - Box::pin(ProgressStream::new(proxy).await) +) -> Result + Send>>, zbus::Error> { + let proxy = build_progress_proxy(&dbus, destination, path).await?; + Ok(Box::pin(ProgressStream::new(proxy).await)) } impl<'a> ProgressStream<'a> {