diff --git a/rust/agama-lib/src/http.rs b/rust/agama-lib/src/http.rs index eb560a6d2d..b3cdc051dc 100644 --- a/rust/agama-lib/src/http.rs +++ b/rust/agama-lib/src/http.rs @@ -21,8 +21,5 @@ mod base_http_client; pub use base_http_client::{BaseHTTPClient, BaseHTTPClientError}; -pub mod event; -pub use event::{EventPayload, OldEvent}; - mod websocket; pub use websocket::{WebSocketClient, WebSocketError}; diff --git a/rust/agama-lib/src/http/event.rs b/rust/agama-lib/src/http/event.rs deleted file mode 100644 index 378e0ea320..0000000000 --- a/rust/agama-lib/src/http/event.rs +++ /dev/null @@ -1,239 +0,0 @@ -// Copyright (c) [2025] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -use crate::{ - auth::ClientId, - jobs::Job, - manager::InstallationPhase, - network::model::NetworkChange, - progress::Progress, - storage::{ - model::{ - dasd::{DASDDevice, DASDFormatSummary}, - zfcp::{ZFCPController, ZFCPDisk}, - }, - ISCSINode, - }, - users::{FirstUser, RootUser}, -}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use tokio::sync::broadcast; - -pub type OldSender = broadcast::Sender; -pub type OldReceiver = broadcast::Receiver; - -/// Agama event. -/// -/// It represents an event that occurs in Agama. -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct OldEvent { - /// The identifier of the client which caused the event. - #[serde(skip_serializing_if = "Option::is_none")] - pub client_id: Option, - /// Event payload. - #[serde(flatten)] - pub payload: EventPayload, -} - -impl OldEvent { - /// Creates a new event. - /// - /// * `payload`: event payload. - pub fn new(payload: EventPayload) -> Self { - OldEvent { - client_id: None, - payload, - } - } - - /// Creates a new event with a client ID. - /// - /// * `payload`: event payload. - /// * `client_id`: client ID. - pub fn new_with_client_id(payload: EventPayload, client_id: &ClientId) -> Self { - OldEvent { - client_id: Some(client_id.clone()), - payload, - } - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(tag = "type")] -pub enum EventPayload { - ClientConnected, - LocaleChanged { - locale: String, - }, - DevicesDirty { - dirty: bool, - }, - ProgressChanged { - path: String, - #[serde(flatten)] - progress: Progress, - }, - ProductChanged { - id: String, - }, - RegistrationChanged, - FirstUserChanged(FirstUser), - RootUserChanged(RootUser), - NetworkChange { - #[serde(flatten)] - change: NetworkChange, - }, - StorageChanged, - QuestionsChanged, - InstallationPhaseChanged { - phase: InstallationPhase, - }, - ServiceStatusChanged { - service: String, - status: u32, - }, - ValidationChanged { - service: String, - path: String, - errors: Vec, - }, - ISCSINodeAdded { - node: ISCSINode, - }, - ISCSINodeChanged { - node: ISCSINode, - }, - ISCSINodeRemoved { - node: ISCSINode, - }, - ISCSIInitiatorChanged { - name: Option, - ibft: Option, - }, - DASDDeviceAdded { - device: DASDDevice, - }, - DASDDeviceChanged { - device: DASDDevice, - }, - DASDDeviceRemoved { - device: DASDDevice, - }, - JobAdded { - job: Job, - }, - JobChanged { - job: Job, - }, - JobRemoved { - job: Job, - }, - DASDFormatJobChanged { - #[serde(rename = "jobId")] - job_id: String, - summary: HashMap, - }, - ZFCPDiskAdded { - device: ZFCPDisk, - }, - ZFCPDiskChanged { - device: ZFCPDisk, - }, - ZFCPDiskRemoved { - device: ZFCPDisk, - }, - ZFCPControllerAdded { - device: ZFCPController, - }, - ZFCPControllerChanged { - device: ZFCPController, - }, - ZFCPControllerRemoved { - device: ZFCPController, - }, -} - -/// Makes it easier to create an event, reducing the boilerplate. -/// -/// # Event without additional data -/// -/// ``` -/// # use agama_lib::{event, http::EventPayload}; -/// let my_event = event!(ClientConnected); -/// assert!(matches!(my_event.payload, EventPayload::ClientConnected)); -/// assert!(my_event.client_id.is_none()); -/// ``` -/// -/// # Event with some additional data -/// -/// ``` -/// # use agama_lib::{event, http::EventPayload}; -/// let my_event = event!(LocaleChanged { locale: "es_ES".to_string() }); -/// assert!(matches!( -/// my_event.payload, -/// EventPayload::LocaleChanged { locale: _ } -/// )); -/// ``` -/// -/// # Adding the client ID -/// -/// ``` -/// # use agama_lib::{auth::ClientId, event, http::EventPayload}; -/// let client_id = ClientId::new(); -/// let my_event = event!(ClientConnected, &client_id); -/// assert!(matches!(my_event.payload, EventPayload::ClientConnected)); -/// assert!(my_event.client_id.is_some()); -/// ``` -/// -/// # Add the client ID to a complex event -/// -/// ``` -/// # use agama_lib::{auth::ClientId, event, http::EventPayload}; -/// let client_id = ClientId::new(); -/// let my_event = event!(LocaleChanged { locale: "es_ES".to_string() }, &client_id); -/// assert!(matches!( -/// my_event.payload, -/// EventPayload::LocaleChanged { locale: _ } -/// )); -/// assert!(my_event.client_id.is_some()); -/// ``` -#[macro_export] -macro_rules! event { - ($variant:ident) => { - agama_lib::http::OldEvent::new(agama_lib::http::EventPayload::$variant) - }; - ($variant:ident, $client:expr) => { - agama_lib::http::OldEvent::new_with_client_id( - agama_lib::http::EventPayload::$variant, - $client, - ) - }; - ($variant:ident $inner:tt, $client:expr) => { - agama_lib::http::OldEvent::new_with_client_id( - agama_lib::http::EventPayload::$variant $inner, - $client - ) - }; - ($variant:ident $inner:tt) => { - agama_lib::http::OldEvent::new(agama_lib::http::EventPayload::$variant $inner) - }; -} diff --git a/rust/agama-server/src/agama-web-server.rs b/rust/agama-server/src/agama-web-server.rs index f9717062ba..fd149ba0eb 100644 --- a/rust/agama-server/src/agama-web-server.rs +++ b/rust/agama-server/src/agama-web-server.rs @@ -29,7 +29,7 @@ use agama_lib::{auth::AuthToken, connection_to}; use agama_server::{ cert::Certificate, logs::init_logging, - web::{self, run_monitor}, + web::{self}, }; use agama_utils::api::event::Receiver; use anyhow::Context; @@ -320,9 +320,6 @@ async fn serve_command(args: ServeArgs) -> anyhow::Result<()> { _ = l10n_helpers::init_locale(); init_logging().context("Could not initialize the logger")?; - let (tx, _) = channel(16); - run_monitor(tx.clone()).await?; - let (events_tx, events_rx) = channel(16); monitor_events_channel(events_rx); @@ -335,7 +332,7 @@ async fn serve_command(args: ServeArgs) -> anyhow::Result<()> { .web_ui_dir .clone() .unwrap_or_else(|| PathBuf::from(DEFAULT_WEB_UI_DIR)); - let service = web::service(config, events_tx, tx, dbus, web_ui_dir).await?; + let service = web::service(config, events_tx, dbus, web_ui_dir).await?; // TODO: Move elsewhere? Use a singleton? (It would be nice to use the same // generated self-signed certificate on both ports.) let ssl_acceptor = if let Ok(ssl_acceptor) = ssl_acceptor(&args.to_certificate()?) { diff --git a/rust/agama-server/src/error.rs b/rust/agama-server/src/error.rs index 1cf90a6890..27d0df6f53 100644 --- a/rust/agama-server/src/error.rs +++ b/rust/agama-server/src/error.rs @@ -26,7 +26,7 @@ use axum::{ }; use serde_json::json; -use crate::{users::password::PasswordCheckerError, web::common::ProgressServiceError}; +use crate::users::password::PasswordCheckerError; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -36,8 +36,6 @@ pub enum Error { Anyhow(String), #[error("Agama service error: {0}")] Service(#[from] ServiceError), - #[error("Progress service error: {0}")] - Progress(#[from] ProgressServiceError), #[error("Could not check the password")] PasswordCheck(#[from] PasswordCheckerError), } diff --git a/rust/agama-server/src/lib.rs b/rust/agama-server/src/lib.rs index b6a2a2c18a..2b7fb5a8ac 100644 --- a/rust/agama-server/src/lib.rs +++ b/rust/agama-server/src/lib.rs @@ -24,10 +24,8 @@ pub mod dbus; pub mod error; pub mod hostname; pub mod logs; -pub mod manager; pub mod profile; pub mod security; -pub mod storage; pub mod users; pub mod web; pub use web::service; diff --git a/rust/agama-server/src/manager.rs b/rust/agama-server/src/manager.rs deleted file mode 100644 index 7540a15f61..0000000000 --- a/rust/agama-server/src/manager.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -pub mod web; -pub use web::manager_service; diff --git a/rust/agama-server/src/manager/web.rs b/rust/agama-server/src/manager/web.rs deleted file mode 100644 index 1e0485ac58..0000000000 --- a/rust/agama-server/src/manager/web.rs +++ /dev/null @@ -1,319 +0,0 @@ -// Copyright (c) [2024-2025] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! This module implements the web API for the manager service. -//! -//! The module offers two public functions: -//! -//! * `manager_service` which returns the Axum service. -//! * `manager_stream` which offers an stream that emits the manager events coming from D-Bus. - -use agama_lib::{ - auth::ClientId, - error::ServiceError, - event, logs, - manager::{FinishMethod, InstallationPhase, InstallerStatus, ManagerClient}, - proxies::Manager1Proxy, -}; -use anyhow::Context; -use axum::{ - body::Body, - extract::State, - http::{header, status::StatusCode, HeaderMap, HeaderValue}, - response::IntoResponse, - routing::{get, post}, - Extension, Json, Router, -}; -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::Arc; -use tokio_stream::{Stream, StreamExt}; -use tokio_util::io::ReaderStream; - -use crate::{ - error::Error, - web::common::{service_status_router, ProgressClient, ProgressRouterBuilder}, -}; -use agama_lib::http::OldEvent; - -#[derive(Clone)] -pub struct ManagerState<'a> { - dbus: zbus::Connection, - manager: ManagerClient<'a>, -} - -/// Returns a stream that emits manager related events coming from D-Bus. -/// -/// It emits the Event::InstallationPhaseChanged event. -/// -/// * `connection`: D-Bus connection to listen for events. -pub async fn manager_stream( - dbus: zbus::Connection, -) -> Result + Send>>, Error> { - let proxy = Manager1Proxy::new(&dbus).await?; - let stream = proxy - .receive_current_installation_phase_changed() - .await - .then(|change| async move { - if let Ok(phase) = change.get().await { - match InstallationPhase::try_from(phase) { - Ok(phase) => Some(event!(InstallationPhaseChanged { phase })), - Err(error) => { - tracing::warn!("Ignoring the installation phase change. Error: {}", error); - None - } - } - } else { - None - } - }) - .filter_map(|e| e); - Ok(Box::pin(stream)) -} - -/// Sets up and returns the axum service for the manager module -pub async fn manager_service( - dbus: zbus::Connection, - progress: ProgressClient, -) -> Result { - const DBUS_SERVICE: &str = "org.opensuse.Agama.Manager1"; - const DBUS_PATH: &str = "/org/opensuse/Agama/Manager1"; - - let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; - // FIXME: use anyhow temporarily until we adapt all these methods to return - // the crate::error::Error instead of ServiceError. - let progress_router = ProgressRouterBuilder::new(DBUS_SERVICE, DBUS_PATH, progress) - .build() - .context("Could not build the progress router")?; - let manager = ManagerClient::new(dbus.clone()).await?; - let state = ManagerState { manager, dbus }; - Ok(Router::new() - .route("/probe", post(probe_action)) - .route("/probe_sync", post(probe_sync_action)) - .route("/reprobe_sync", post(reprobe_sync_action)) - .route("/install", post(install_action)) - .route("/finish", post(finish_action)) - .route("/installer", get(installer_status)) - .nest("/logs", logs_router()) - .merge(status_router) - .merge(progress_router) - .with_state(state)) -} - -/// Starts the probing process. -// The Probe D-Bus method is blocking and will not return until the probing is finished. To avoid a -// long-lived HTTP connection, this method returns immediately (with a 200) and runs the request on -// a separate task. -#[utoipa::path( - post, - path = "/probe", - context_path = "/api/manager", - responses( - ( - status = 200, - description = "The probing was requested but there is no way to know whether it succeeded." - ) - ) -)] -async fn probe_action( - State(state): State>, - Extension(client_id): Extension>, -) -> Result<(), Error> { - let dbus = state.dbus.clone(); - tokio::spawn(async move { - let result = dbus - .call_method( - Some("org.opensuse.Agama.Manager1"), - "/org/opensuse/Agama/Manager1", - Some("org.opensuse.Agama.Manager1"), - "Probe", - &HashMap::from([("client_id", client_id.to_string())]), - ) - .await; - if let Err(error) = result { - tracing::error!("Could not start probing: {:?}", error); - } - }); - Ok(()) -} - -/// Starts the probing process and waits until it is done. -/// We need this because the CLI (agama_lib::Store) only does sync calls. -#[utoipa::path( - post, - path = "/probe_sync", - context_path = "/api/manager", - responses( - (status = 200, description = "Probing done.") - ) -)] -async fn probe_sync_action( - State(state): State>, - Extension(client_id): Extension>, -) -> Result<(), Error> { - state.manager.probe(client_id.to_string()).await?; - Ok(()) -} - -/// Starts the reprobing process and waits until it is done. -#[utoipa::path( - post, - path = "/reprobe_sync", - context_path = "/api/manager", - responses( - (status = 200, description = "Re-probing done.") - ) -)] -async fn reprobe_sync_action( - State(state): State>, - Extension(client_id): Extension>, -) -> Result<(), Error> { - state.manager.reprobe(client_id.to_string()).await?; - Ok(()) -} - -/// Starts the installation process. -#[utoipa::path( - post, - path = "/install", - context_path = "/api/manager", - responses( - (status = 200, description = "The installation process was started.") - ) -)] -async fn install_action(State(state): State>) -> Result<(), Error> { - state.manager.install().await?; - Ok(()) -} - -/// Executes the post installation tasks (e.g., rebooting the system). -#[utoipa::path( - post, - path = "/finish", - context_path = "/api/manager", - responses( - (status = 200, description = "The installation tasks are executed.", body = Option) - ) -)] -async fn finish_action( - State(state): State>, - method: Option>, -) -> Result, Error> { - let method = match method { - Some(Json(method)) => method, - _ => FinishMethod::default(), - }; - Ok(Json(state.manager.finish(method).await?)) -} - -/// Returns the manager status. -#[utoipa::path( - get, - path = "/installer", - context_path = "/api/manager", - responses( - (status = 200, description = "Installation status.", body = InstallerStatus) - ) -)] -async fn installer_status( - State(state): State>, -) -> Result, Error> { - let phase = state.manager.current_installation_phase().await?; - // CanInstall gets blocked during installation - let can_install = match phase { - InstallationPhase::Install => false, - _ => state.manager.can_install().await?, - }; - let status = InstallerStatus { - phase, - can_install, - is_busy: state.manager.is_busy().await, - use_iguana: state.manager.use_iguana().await?, - }; - Ok(Json(status)) -} - -/// Creates router for handling /logs/* endpoints -fn logs_router() -> Router> { - Router::new() - .route("/store", get(download_logs)) - .route("/list", get(list_logs)) -} - -#[utoipa::path(get, - path = "/logs/store", - context_path = "/api/manager", - responses( - (status = 200, description = "Compressed Agama logs", content_type="application/octet-stream"), - (status = 500, description = "Cannot collect the logs"), - (status = 507, description = "Server is probably out of space"), - ) -)] -async fn download_logs() -> impl IntoResponse { - let mut headers = HeaderMap::new(); - let err_response = (headers.clone(), Body::empty()); - - match logs::store() { - Ok(path) => { - if let Ok(file) = tokio::fs::File::open(path.clone()).await { - let stream = ReaderStream::new(file); - let body = Body::from_stream(stream); - let _ = std::fs::remove_file(path.clone()); - - // See RFC2046, RFC2616 and - // https://www.iana.org/assignments/media-types/media-types.xhtml - // or /etc/mime.types - headers.insert( - header::CONTENT_TYPE, - HeaderValue::from_static("application/x-compressed-tar"), - ); - if let Some(file_name) = path.file_name() { - let disposition = - format!("attachment; filename=\"{}\"", &file_name.to_string_lossy()); - headers.insert( - header::CONTENT_DISPOSITION, - HeaderValue::from_str(&disposition) - .unwrap_or_else(|_| HeaderValue::from_static("attachment")), - ); - } - headers.insert( - header::CONTENT_ENCODING, - HeaderValue::from_static(logs::DEFAULT_COMPRESSION.1), - ); - - (StatusCode::OK, (headers, body)) - } else { - (StatusCode::INSUFFICIENT_STORAGE, err_response) - } - } - Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, err_response), - } -} - -#[utoipa::path(get, - path = "/logs/list", - context_path = "/api/manager", - responses( - (status = 200, description = "Lists of collected logs", body = logs::LogsLists) - ) -)] -pub async fn list_logs() -> Json { - Json(logs::list()) -} diff --git a/rust/agama-server/src/storage.rs b/rust/agama-server/src/storage.rs deleted file mode 100644 index 63141916f2..0000000000 --- a/rust/agama-server/src/storage.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -pub mod web; -pub use web::{storage_service, storage_streams}; diff --git a/rust/agama-server/src/storage/web.rs b/rust/agama-server/src/storage/web.rs deleted file mode 100644 index c9dc75315b..0000000000 --- a/rust/agama-server/src/storage/web.rs +++ /dev/null @@ -1,611 +0,0 @@ -// Copyright (c) [2024-2025] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! This module implements the web API for the storage service. -//! -//! The module offers two public functions: -//! -//! * `storage_service` which returns the Axum service. -//! * `storage_stream` which offers an stream that emits the storage events coming from D-Bus. - -use std::sync::Arc; - -use agama_lib::{ - auth::ClientId, - error::ServiceError, - event, - http::OldEvent, - storage::{ - model::{Action, Device, DeviceSid, ProposalSettings, ProposalSettingsPatch, Volume}, - proxies::Storage1Proxy, - StorageClient, StorageSettings, - }, -}; -use anyhow::Context; -use axum::{ - extract::{Query, State}, - routing::{get, post, put}, - Extension, Json, Router, -}; -use iscsi::storage_iscsi_service; -use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; -use tokio_stream::{Stream, StreamExt}; -use uuid::Uuid; -use zfcp::{zfcp_service, zfcp_stream}; - -pub mod dasd; -pub mod iscsi; -pub mod zfcp; - -use crate::{ - error::Error, - storage::web::{ - dasd::{dasd_service, dasd_stream}, - iscsi::iscsi_stream, - }, - web::common::{ - jobs_service, service_status_router, EventStreams, ProgressClient, ProgressRouterBuilder, - }, -}; - -pub async fn storage_streams(dbus: zbus::Connection) -> Result { - let mut result: EventStreams = vec![ - ( - "devices_dirty", - Box::pin(devices_dirty_stream(dbus.clone()).await?), - ), - ( - "configured", - Box::pin(configured_stream(dbus.clone()).await?), - ), - ]; - let mut iscsi = iscsi_stream(&dbus).await?; - let mut dasd = dasd_stream(&dbus).await?; - let mut zfcp = zfcp_stream(&dbus).await?; - - result.append(&mut iscsi); - result.append(&mut dasd); - result.append(&mut zfcp); - Ok(result) -} - -async fn devices_dirty_stream( - dbus: zbus::Connection, -) -> Result, Error> { - let proxy = Storage1Proxy::new(&dbus).await?; - let stream = proxy - .receive_deprecated_system_changed() - .await - .then(|change| async move { - if let Ok(value) = change.get().await { - return Some(event!(DevicesDirty { dirty: value })); - } - None - }) - .filter_map(|e| e); - Ok(stream) -} - -async fn configured_stream(dbus: zbus::Connection) -> Result, Error> { - let proxy = Storage1Proxy::new(&dbus).await?; - let stream = proxy.receive_configured().await?.filter_map(|signal| { - if let Ok(args) = signal.args() { - if let Ok(uuid) = Uuid::parse_str(args.client_id) { - return Some(event!(StorageChanged, &ClientId::new_from_uuid(uuid))); - } - } - None - }); - Ok(stream) -} - -#[derive(Clone)] -struct StorageState<'a> { - client: StorageClient<'a>, -} - -/// Sets up and returns the axum service for the storage module. -pub async fn storage_service( - dbus: zbus::Connection, - progress: ProgressClient, -) -> Result { - const DBUS_SERVICE: &str = "org.opensuse.Agama.Storage1"; - const DBUS_PATH: &str = "/org/opensuse/Agama/Storage1"; - const DBUS_DESTINATION: &str = "org.opensuse.Agama.Storage1"; - - let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; - // FIXME: use anyhow temporarily until we adapt all these methods to return - // the crate::error::Error instead of ServiceError. - let progress_router = ProgressRouterBuilder::new(DBUS_SERVICE, DBUS_PATH, progress) - .build() - .context("Could not build the progress router")?; - let iscsi_router = storage_iscsi_service(&dbus).await?; - let dasd_router = dasd_service(&dbus).await?; - let zfcp_router = zfcp_service(&dbus).await?; - let jobs_router = jobs_service(&dbus, DBUS_DESTINATION, DBUS_PATH).await?; - - let client = StorageClient::new(dbus.clone()).await?; - let state = StorageState { client }; - let router = Router::new() - .route("/config", put(set_config).get(get_config)) - .route("/config/reset", put(reset_config)) - .route("/config_model", put(set_config_model).get(get_config_model)) - .route("/config_model/solve", get(solve_config_model)) - .route("/probe", post(probe)) - .route("/reprobe", post(reprobe)) - .route("/reactivate", post(reactivate)) - .route("/devices/dirty", get(devices_dirty)) - .route("/devices/system", get(system_devices)) - .route("/devices/result", get(staging_devices)) - .route("/devices/actions", get(actions)) - .route("/devices/available_drives", get(available_drives)) - .route("/devices/candidate_drives", get(candidate_drives)) - .route("/devices/available_md_raids", get(available_md_raids)) - .route("/devices/candidate_md_raids", get(candidate_md_raids)) - .route("/product/volume_for", get(volume_for)) - .route("/product/params", get(product_params)) - .route( - "/proposal/settings", - get(get_proposal_settings).put(set_proposal_settings), - ) - .merge(progress_router) - .merge(status_router) - .merge(jobs_router) - .nest("/iscsi", iscsi_router) - .nest("/dasd", dasd_router) - .nest("/zfcp", zfcp_router) - .with_state(state); - Ok(router) -} - -/// Returns the storage configuration. -/// -/// * `state` : service state. -#[utoipa::path( - get, - path = "/config", - context_path = "/api/storage", - operation_id = "get_storage_config", - responses( - (status = 200, description = "storage configuration", body = StorageSettings), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn get_config( - State(state): State>, -) -> Result>, Error> { - // StorageSettings is just a wrapper over serde_json::value::RawValue - let settings = state.client.get_config().await.map_err(Error::Service)?; - Ok(Json(settings)) -} - -/// Sets the storage configuration. -/// -/// * `state`: service state. -/// * `config`: storage configuration. -#[utoipa::path( - put, - path = "/config", - context_path = "/api/storage", - operation_id = "set_storage_config", - responses( - (status = 200, description = "Set the storage configuration"), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn set_config( - State(state): State>, - Extension(client_id): Extension>, - Json(settings): Json, -) -> Result, Error> { - let _status: u32 = state - .client - .set_config(settings, client_id.to_string()) - .await - .map_err(Error::Service)?; - Ok(Json(())) -} - -/// Returns the storage config model. -/// -/// * `state` : service state. -#[utoipa::path( - get, - path = "/config_model", - context_path = "/api/storage", - operation_id = "get_storage_config_model", - responses( - (status = 200, description = "storage config model", body = String), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn get_config_model( - State(state): State>, - Extension(client_id): Extension>, -) -> Result>, Error> { - tracing::debug!("{client_id:?}"); - let config_model = state - .client - .get_config_model() - .await - .map_err(Error::Service)?; - Ok(Json(config_model)) -} - -/// Resets the storage config to the default value. -/// -/// * `state`: service state. -#[utoipa::path( - put, - path = "/config/reset", - context_path = "/api/storage", - operation_id = "reset_storage_config", - responses( - (status = 200, description = "Reset the storage configuration"), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn reset_config( - State(state): State>, - Extension(client_id): Extension>, -) -> Result, Error> { - let _status: u32 = state - .client - .reset_config(client_id.to_string()) - .await - .map_err(Error::Service)?; - Ok(Json(())) -} - -/// Sets the storage config model. -/// -/// * `state`: service state. -/// * `config_model`: storage config model. -#[utoipa::path( - put, - request_body = String, - path = "/config_model", - context_path = "/api/storage", - operation_id = "set_storage_config_model", - responses( - (status = 200, description = "Set the storage config model"), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn set_config_model( - State(state): State>, - Extension(client_id): Extension>, - Json(model): Json>, -) -> Result, Error> { - let _status: u32 = state - .client - .set_config_model(model, client_id.to_string()) - .await - .map_err(Error::Service)?; - Ok(Json(())) -} - -/// Solves a storage config model. -#[utoipa::path( - get, - path = "/config_model/solve", - context_path = "/api/storage", - params(SolveModelQuery), - operation_id = "solve_storage_config_model", - responses( - (status = 200, description = "Solve the storage config model", body = String), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn solve_config_model( - State(state): State>, - query: Query, -) -> Result>, Error> { - let solved_model = state - .client - .solve_config_model(query.model.as_str()) - .await - .map_err(Error::Service)?; - Ok(Json(solved_model)) -} - -#[derive(Deserialize, utoipa::IntoParams)] -struct SolveModelQuery { - /// Serialized config model. - model: String, -} - -/// Probes the storage devices. -#[utoipa::path( - post, - path = "/probe", - context_path = "/api/storage", - responses( - (status = 200, description = "Devices were probed and an initial proposal was performed"), - (status = 400, description = "The D-Bus service could not perform the action") - ), - operation_id = "storage_probe" -)] -async fn probe( - State(state): State>, - Extension(client_id): Extension>, -) -> Result, Error> { - Ok(Json(state.client.probe(client_id.to_string()).await?)) -} - -/// Reprobes the storage devices. -#[utoipa::path( - post, - path = "/reprobe", - context_path = "/api/storage", - responses( - (status = 200, description = "Devices were probed and the proposal was recalculated"), - (status = 400, description = "The D-Bus service could not perform the action") - ), - operation_id = "storage_reprobe" -)] -async fn reprobe( - State(state): State>, - Extension(client_id): Extension>, -) -> Result, Error> { - Ok(Json(state.client.reprobe(client_id.to_string()).await?)) -} - -/// Reactivate the storage devices. -#[utoipa::path( - post, - path = "/reactivate", - context_path = "/api/reactivate", - responses( - (status = 200, description = "Devices were reactivated and probed, and the proposal was recalculated"), - (status = 400, description = "The D-Bus service could not perform the action") - ), - operation_id = "storage_reactivate" -)] -async fn reactivate( - State(state): State>, - Extension(client_id): Extension>, -) -> Result, Error> { - Ok(Json(state.client.reactivate(client_id.to_string()).await?)) -} - -/// Gets whether the system is in a deprecated status. -/// -/// The system is usually set as deprecated as effect of managing some kind of devices, for example, -/// when iSCSI sessions are created or when a zFCP disk is activated. -/// -/// A deprecated system means that the probed system could not match with the current system. -/// -/// It is expected that clients probe devices again if the system is deprecated. -#[utoipa::path( - get, - path = "/devices/dirty", - context_path = "/api/storage", - responses( - (status = 200, description = "Whether the devices have changed", body = bool), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn devices_dirty(State(state): State>) -> Result, Error> { - Ok(Json(state.client.devices_dirty_bit().await?)) -} - -/// Gets the probed devices. -#[utoipa::path( - get, - path = "/devices/system", - context_path = "/api/storage", - responses( - (status = 200, description = "List of devices", body = Vec), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn system_devices(State(state): State>) -> Result>, Error> { - Ok(Json(state.client.system_devices().await?)) -} - -/// Gets the resulting devices of applying the requested actions. -#[utoipa::path( - get, - path = "/devices/result", - context_path = "/api/storage", - responses( - (status = 200, description = "List of devices", body = Vec), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn staging_devices( - State(state): State>, -) -> Result>, Error> { - Ok(Json(state.client.staging_devices().await?)) -} - -/// Gets the default values for a volume with the given mount path. -#[utoipa::path( - get, - path = "/product/volume_for", - context_path = "/api/storage", - params(VolumeForQuery), - responses( - (status = 200, description = "Volume specification", body = Volume), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn volume_for( - State(state): State>, - query: Query, -) -> Result, Error> { - Ok(Json( - state.client.volume_for(query.mount_path.as_str()).await?, - )) -} - -#[derive(Deserialize, utoipa::IntoParams)] -struct VolumeForQuery { - /// Mount path of the volume (empty for an arbitrary volume). - mount_path: String, -} - -/// Gets information about the selected product. -#[utoipa::path( - get, - path = "/product/params", - context_path = "/api/storage", - responses( - (status = 200, description = "Product information", body = ProductParams), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn product_params( - State(state): State>, -) -> Result, Error> { - let params = ProductParams { - mount_points: state.client.product_mount_points().await?, - encryption_methods: state.client.encryption_methods().await?, - }; - Ok(Json(params)) -} - -#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ProductParams { - /// Mount points defined by the product. - mount_points: Vec, - /// Encryption methods allowed by the product. - encryption_methods: Vec, -} - -/// Gets the actions to perform in the storage devices. -#[utoipa::path( - get, - path = "/devices/actions", - context_path = "/api/storage", - responses( - (status = 200, description = "List of actions", body = Vec), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn actions(State(state): State>) -> Result>, Error> { - Ok(Json(state.client.actions().await?)) -} - -/// Gets the SID (Storage ID) of the available drives for the installation. -#[utoipa::path( - get, - path = "/devices/available_drives", - context_path = "/api/storage", - responses( - (status = 200, description = "Lis of SIDs", body = Vec), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn available_drives( - State(state): State>, -) -> Result>, Error> { - let sids = state.client.available_drives().await?; - Ok(Json(sids)) -} - -/// Gets the SID (Storage ID) of the candidate drives for the installation. -#[utoipa::path( - get, - path = "/devices/candidate_drives", - context_path = "/api/storage", - responses( - (status = 200, description = "Lis of SIDs", body = Vec), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn candidate_drives( - State(state): State>, -) -> Result>, Error> { - let sids = state.client.candidate_drives().await?; - Ok(Json(sids)) -} - -/// Gets the SID (Storage ID) of the available MD RAIDs for the installation. -#[utoipa::path( - get, - path = "/devices/available_md_raids", - context_path = "/api/storage", - responses( - (status = 200, description = "Lis of SIDs", body = Vec), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn available_md_raids( - State(state): State>, -) -> Result>, Error> { - let sids = state.client.available_md_raids().await?; - Ok(Json(sids)) -} - -/// Gets the SID (Storage ID) of the candidate MD RAIDs for the installation. -#[utoipa::path( - get, - path = "/devices/candidate_md_raids", - context_path = "/api/storage", - responses( - (status = 200, description = "Lis of SIDs", body = Vec), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn candidate_md_raids( - State(state): State>, -) -> Result>, Error> { - let sids = state.client.candidate_md_raids().await?; - Ok(Json(sids)) -} - -/// Gets the settings that were used for calculating the current proposal. -#[utoipa::path( - get, - path = "/proposal/settings", - context_path = "/api/storage", - responses( - (status = 200, description = "Settings", body = ProposalSettings), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn get_proposal_settings( - State(state): State>, -) -> Result, Error> { - Ok(Json(state.client.proposal_settings().await?)) -} - -/// Tries to calculates a new proposal with the given settings. -#[utoipa::path( - put, - path = "/proposal/settings", - context_path = "/api/storage", - request_body(content = ProposalSettingsPatch, description = "Proposal settings", content_type = "application/json"), - responses( - (status = 200, description = "Whether the proposal was successfully calculated", body = bool), - (status = 400, description = "The D-Bus service could not perform the action") - ) -)] -async fn set_proposal_settings( - State(state): State>, - Json(config): Json, -) -> Result, Error> { - let result = state.client.calculate(config).await?; - Ok(Json(result == 0)) -} diff --git a/rust/agama-server/src/storage/web/dasd.rs b/rust/agama-server/src/storage/web/dasd.rs deleted file mode 100644 index 443837e1da..0000000000 --- a/rust/agama-server/src/storage/web/dasd.rs +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! This module implements the web API for the handling of DASD storage service. -//! -//! The module offers two public functions: -//! -//! * `dasd_service` which returns the Axum service. -//! * `dasd_stream` which offers an stream that emits the DASD-related events coming from D-Bus. - -use agama_lib::{ - error::ServiceError, - storage::{client::dasd::DASDClient, model::dasd::DASDDevice, settings::dasd::DASDConfig}, -}; -use axum::{ - extract::State, - routing::{get, post, put}, - Json, Router, -}; -use serde::Deserialize; - -use crate::{error::Error, web::common::EventStreams}; - -use self::stream::{DASDDeviceStream, DASDFormatJobStream}; - -mod stream; - -/// Returns the stream of DASD-related events. -/// -/// The stream combines the following events: -/// -/// * Changes on the DASD devices collection. -/// -/// * `dbus`: D-Bus connection to use. -pub async fn dasd_stream(dbus: &zbus::Connection) -> Result { - let stream: EventStreams = vec![ - ("dasd_devices", Box::pin(DASDDeviceStream::new(dbus).await?)), - ( - "format_jobs", - Box::pin(DASDFormatJobStream::new(dbus).await?), - ), - ]; - Ok(stream) -} - -#[derive(Clone)] -struct DASDState<'a> { - client: DASDClient<'a>, -} - -pub async fn dasd_service(dbus: &zbus::Connection) -> Result, ServiceError> { - let client = DASDClient::new(dbus.clone()).await?; - let state = DASDState { client }; - let router = Router::new() - .route("/supported", get(supported)) - .route("/config", get(get_config).put(set_config)) - .route("/devices", get(devices)) - .route("/probe", post(probe)) - .route("/format", post(format)) - .route("/enable", post(enable)) - .route("/disable", post(disable)) - .route("/diag", put(set_diag)) - .with_state(state); - Ok(router) -} - -/// Returns whether DASD technology is supported or not -#[utoipa::path( - get, - path="/supported", - context_path="/api/storage/dasd", - operation_id = "dasd_supported", - responses( - (status = OK, description = "Returns whether DASD technology is supported") - ) -)] -async fn supported(State(state): State>) -> Result, Error> { - Ok(Json(state.client.supported().await?)) -} - -/// Returns DASD config -#[utoipa::path( - get, - path="/config", - context_path="/api/storage/dasd", - operation_id = "dasd_get_config", - responses( - (status = OK, description = "Returns DASD config", body=DASDConfig) - ) -)] -async fn get_config(State(state): State>) -> Result, Error> { - Ok(Json(state.client.get_config().await?)) -} - -/// Returns DASD config -#[utoipa::path( - put, - path="/config", - context_path="/api/storage/dasd", - operation_id = "dasd_set_config", - responses( - (status = OK, description = "Sets DASD config") - ) -)] -async fn set_config( - State(state): State>, - Json(config): Json, -) -> Result<(), Error> { - Ok(state.client.set_config(config).await?) -} - -/// Returns the list of known DASD devices. -#[utoipa::path( - get, - path="/devices", - context_path="/api/storage/dasd", - responses( - (status = OK, description = "List of DASD devices", body = Vec) - ) -)] -async fn devices(State(state): State>) -> Result>, Error> { - let devices = state - .client - .devices() - .await? - .into_iter() - .map(|(_path, device)| device) - .collect(); - Ok(Json(devices)) -} - -/// Find DASD devices in the system. -#[utoipa::path( - post, - path="/probe", - context_path="/api/storage/dasd", - operation_id = "dasd_probe", - responses( - (status = OK, description = "The probing process ran successfully") - ) -)] -async fn probe(State(state): State>) -> Result, Error> { - Ok(Json(state.client.probe().await?)) -} - -/// Formats a set of devices. -#[utoipa::path( - post, - path="/format", - context_path="/api/storage/dasd", - responses( - (status = OK, description = "The formatting process started. The id of format job is in response.") - ) -)] -async fn format( - State(state): State>, - Json(devices): Json, -) -> Result, Error> { - let path = state.client.format(&devices.as_references()).await?; - Ok(Json(path)) -} - -/// Enables a set of devices. -#[utoipa::path( - post, - path="/enable", - context_path="/api/storage/dasd", - responses( - (status = OK, description = "The DASD devices are enabled.") - ) -)] -async fn enable( - State(state): State>, - Json(devices): Json, -) -> Result, Error> { - state.client.enable(&devices.as_references()).await?; - Ok(Json(())) -} - -/// Disables a set of devices. -#[utoipa::path( - post, - path="/disable", - context_path="/api/storage/dasd", - responses( - (status = OK, description = "The DASD devices are disabled.") - ) -)] -async fn disable( - State(state): State>, - Json(devices): Json, -) -> Result, Error> { - state.client.disable(&devices.as_references()).await?; - Ok(Json(())) -} - -/// Sets the diag property for a set of devices. -#[utoipa::path( - put, - path="/diag", - context_path="/api/storage/dasd", - responses( - (status = OK, description = "The DIAG properties are set.") - ) - )] -async fn set_diag( - State(state): State>, - Json(params): Json, -) -> Result, Error> { - state - .client - .set_diag(¶ms.devices.as_references(), params.diag) - .await?; - Ok(Json(())) -} - -#[derive(Deserialize, utoipa::ToSchema)] -struct SetDiagParams { - #[serde(flatten)] - pub devices: DevicesList, - pub diag: bool, -} - -#[derive(Deserialize, utoipa::ToSchema)] -struct DevicesList { - devices: Vec, -} - -impl DevicesList { - pub fn as_references(&self) -> Vec<&str> { - self.devices.iter().map(AsRef::as_ref).collect() - } -} diff --git a/rust/agama-server/src/storage/web/dasd/stream.rs b/rust/agama-server/src/storage/web/dasd/stream.rs deleted file mode 100644 index 7ec9cb7d94..0000000000 --- a/rust/agama-server/src/storage/web/dasd/stream.rs +++ /dev/null @@ -1,282 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -// FIXME: the code is pretty similar to iscsi::stream. Refactor the stream to reduce the repetition. - -use std::{collections::HashMap, task::Poll}; - -use agama_lib::{ - error::ServiceError, - event, - http::OldEvent, - storage::{ - client::dasd::DASDClient, - model::dasd::{DASDDevice, DASDFormatSummary}, - }, -}; -use agama_utils::{dbus::get_optional_property, property_from_dbus}; -use futures_util::{ready, Stream}; -use pin_project::pin_project; -use thiserror::Error; -use tokio::sync::mpsc::unbounded_channel; -use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; -use zbus::{ - fdo::{PropertiesChanged, PropertiesChangedArgs}, - message::Type as MessageType, - zvariant::{self, ObjectPath, OwnedObjectPath, OwnedValue}, - MatchRule, Message, MessageStream, -}; - -use crate::dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}; - -#[derive(Debug, Error)] -enum DASDDeviceStreamError { - #[error("Service error: {0}")] - Service(#[from] ServiceError), - #[error("Unknown DASD device: {0}")] - UnknownDevice(OwnedObjectPath), -} - -/// This stream listens for changes in the collection of DASD devices and emits -/// the updated objects. -/// -/// It relies on the [DBusObjectChangesStream] stream and uses a cache to avoid holding a bunch of -/// proxy objects. -#[pin_project] -pub struct DASDDeviceStream { - dbus: zbus::Connection, - cache: ObjectsCache, - #[pin] - inner: UnboundedReceiverStream, -} - -impl DASDDeviceStream { - /// Creates a new stream - /// - /// * `dbus`: D-Bus connection to listen on. - pub async fn new(dbus: &zbus::Connection) -> Result { - const MANAGER_PATH: &str = "/org/opensuse/Agama/Storage1"; - const NAMESPACE: &str = "/org/opensuse/Agama/Storage1/dasds"; - - let (tx, rx) = unbounded_channel(); - let mut stream = DBusObjectChangesStream::new( - dbus, - &ObjectPath::from_str_unchecked(MANAGER_PATH), - &ObjectPath::from_str_unchecked(NAMESPACE), - "org.opensuse.Agama.Storage1.DASD.Device", - ) - .await?; - - tokio::spawn(async move { - while let Some(change) = stream.next().await { - let _ = tx.send(change); - } - }); - let rx = UnboundedReceiverStream::new(rx); - - let mut cache: ObjectsCache = Default::default(); - let client = DASDClient::new(dbus.clone()).await?; - for (path, device) in client.devices().await? { - cache.add(path, device); - } - - Ok(Self { - dbus: dbus.clone(), - cache, - inner: rx, - }) - } - - fn update_device<'a>( - cache: &'a mut ObjectsCache, - path: &OwnedObjectPath, - values: &HashMap, - ) -> Result<&'a DASDDevice, ServiceError> { - let device = cache.find_or_create(path); - property_from_dbus!(device, id, "Id", values, str); - property_from_dbus!(device, enabled, "Enabled", values, bool); - property_from_dbus!(device, device_name, "DeviceName", values, str); - property_from_dbus!(device, formatted, "Formatted", values, bool); - property_from_dbus!(device, diag, "Diag", values, bool); - property_from_dbus!(device, status, "Status", values, str); - property_from_dbus!(device, device_type, "Type", values, str); - property_from_dbus!(device, access_type, "AccessType", values, str); - property_from_dbus!(device, partition_info, "PartitionInfo", values, str); - Ok(device) - } - - fn remove_device( - cache: &mut ObjectsCache, - path: &OwnedObjectPath, - ) -> Result { - cache - .remove(path) - .ok_or_else(|| DASDDeviceStreamError::UnknownDevice(path.clone())) - } - - fn handle_change( - cache: &mut ObjectsCache, - change: &DBusObjectChange, - ) -> Result { - match change { - DBusObjectChange::Added(path, values) => { - let device = Self::update_device(cache, path, values)?; - Ok(event!(DASDDeviceAdded { - device: device.clone(), - })) - } - DBusObjectChange::Changed(path, updated) => { - let device = Self::update_device(cache, path, updated)?; - Ok(event!(DASDDeviceChanged { - device: device.clone(), - })) - } - DBusObjectChange::Removed(path) => { - let device = Self::remove_device(cache, path)?; - Ok(event!(DASDDeviceRemoved { device })) - } - } - } -} - -impl Stream for DASDDeviceStream { - type Item = OldEvent; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut pinned = self.project(); - - Poll::Ready(loop { - let change = ready!(pinned.inner.as_mut().poll_next(cx)); - let next_value = match change { - Some(change) => { - if let Ok(event) = Self::handle_change(pinned.cache, &change) { - Some(event) - } else { - tracing::warn!("Could not process change {:?}", &change); - None - } - } - None => break None, - }; - if next_value.is_some() { - break next_value; - } - }) - } -} - -/// This stream listens for DASD progress changes and emits an [Event::DASDFormatJobChanged] event. -#[pin_project] -pub struct DASDFormatJobStream { - #[pin] - inner: MessageStream, -} - -impl DASDFormatJobStream { - pub async fn new(connection: &zbus::Connection) -> Result { - let rule = MatchRule::builder() - .msg_type(MessageType::Signal) - .path_namespace("/org/opensuse/Agama/Storage1/jobs")? - .interface("org.freedesktop.DBus.Properties")? - .member("PropertiesChanged")? - .build(); - let inner = MessageStream::for_match_rule(rule, connection, None).await?; - Ok(Self { inner }) - } - - fn handle_change(message: Result) -> Option { - let Ok(message) = message else { - return None; - }; - let properties = PropertiesChanged::from_message(message)?; - let args = properties.args().ok()?; - - if args.interface_name.as_str() != "org.opensuse.Agama.Storage1.DASD.Format" { - return None; - } - - let inner = properties.message(); - let id = inner.header().path()?.to_string(); - let event = Self::to_event(id, &args); - if event.is_none() { - tracing::warn!("Could not decode the DASDFormatJobChanged event"); - } - event - } - - fn to_event(path: String, properties_changed: &PropertiesChangedArgs) -> Option { - let dict = properties_changed - .changed_properties() - .get("Summary")? - .downcast_ref::() - .ok()?; - - // the key is the D-Bus path of the DASD device and the value is the progress - // of the related formatting process - let map = >>::try_from(dict).ok()?; - let mut format_summary = HashMap::new(); - - for (dasd_id, summary) in map { - let summary_values = summary.downcast_ref::().ok()?; - let fields = summary_values.fields(); - let total: &u32 = fields.first()?.downcast_ref().ok()?; - let step: &u32 = fields.get(1)?.downcast_ref().ok()?; - let done: &bool = fields.get(2)?.downcast_ref().ok()?; - format_summary.insert( - dasd_id.to_string(), - DASDFormatSummary { - total: *total, - step: *step, - done: *done, - }, - ); - } - - Some(event!(DASDFormatJobChanged { - job_id: path.to_string(), - summary: format_summary, - })) - } -} - -impl Stream for DASDFormatJobStream { - type Item = OldEvent; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut pinned = self.project(); - - Poll::Ready(loop { - let item = ready!(pinned.inner.as_mut().poll_next(cx)); - let next_value = match item { - Some(change) => Self::handle_change(change), - None => break None, - }; - if next_value.is_some() { - break next_value; - } - }) - } -} diff --git a/rust/agama-server/src/storage/web/iscsi.rs b/rust/agama-server/src/storage/web/iscsi.rs deleted file mode 100644 index 8178253a77..0000000000 --- a/rust/agama-server/src/storage/web/iscsi.rs +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! This module implements the web API for the iSCSI handling of the storage service. -//! -//! The module offers two public functions: -//! -//! * `iscsi_service` which returns the Axum service. -//! * `iscsi_stream` which offers an stream that emits the iSCSI-related events coming from D-Bus. - -use crate::{error::Error, web::common::EventStreams}; -use agama_lib::{ - error::ServiceError, - event, - http::OldEvent, - storage::{ - client::iscsi::{ISCSIAuth, ISCSIInitiator, ISCSINode, LoginResult}, - ISCSIClient, - }, -}; -use agama_utils::dbus::{get_optional_property, to_owned_hash}; -use axum::{ - extract::{Path, State}, - http::StatusCode, - response::IntoResponse, - routing::{delete, get, post}, - Json, Router, -}; -use serde::Deserialize; - -mod stream; -use serde_json::value::RawValue; -use stream::ISCSINodeStream; -use tokio_stream::{Stream, StreamExt}; -use zbus::{ - fdo::{PropertiesChanged, PropertiesProxy}, - names::InterfaceName, -}; - -/// Returns the stream of iSCSI-related events. -/// -/// The stream combines the following events: -/// -/// * Changes on the iSCSI nodes collection. -/// * Changes to the initiator (name or ibft). -/// -/// * `dbus`: D-Bus connection to use. -pub async fn iscsi_stream(dbus: &zbus::Connection) -> Result { - let stream: EventStreams = vec![ - ("iscsi_nodes", Box::pin(ISCSINodeStream::new(dbus).await?)), - ("initiator", Box::pin(initiator_stream(dbus).await?)), - ]; - Ok(stream) -} - -async fn initiator_stream( - dbus: &zbus::Connection, -) -> Result + Send, Error> { - let proxy = PropertiesProxy::builder(dbus) - .destination("org.opensuse.Agama.Storage1")? - .path("/org/opensuse/Agama/Storage1")? - .build() - .await?; - let stream = proxy - .receive_properties_changed() - .await? - .filter_map(|change| match handle_initiator_change(change) { - Ok(event) => event, - Err(error) => { - tracing::warn!("Could not read the initiator change: {}", error); - None - } - }); - Ok(stream) -} - -fn handle_initiator_change(change: PropertiesChanged) -> Result, ServiceError> { - let args = change.args()?; - let iscsi_iface = - InterfaceName::from_str_unchecked("org.opensuse.Agama.Storage1.ISCSI.Initiator"); - if iscsi_iface != args.interface_name { - return Ok(None); - } - let changes = to_owned_hash(args.changed_properties())?; - let name = get_optional_property(&changes, "InitiatorName")?; - let ibft = get_optional_property(&changes, "IBFT")?; - Ok(Some(event!(ISCSIInitiatorChanged { ibft, name }))) -} - -#[derive(Clone)] -struct ISCSIState<'a> { - client: ISCSIClient<'a>, -} - -/// Sets up and returns the Axum service for the iSCSI part of the storage module. -/// -/// It acts as a proxy to Agama D-Bus service. -/// -/// note: storage_iscsi_service is used by the interactive installation (i.e., the web UI). And -/// iscsi_server is used for the new iSCSI API, which allows to load the iscsi section of the -/// configuration (at this moment, used by CLI or unattended installation). The preliminary plan is -/// moving all user interfaces to use only iscsi_service. -/// -/// * `dbus`: D-Bus connection to use. -pub async fn storage_iscsi_service(dbus: &zbus::Connection) -> Result, ServiceError> { - let client = ISCSIClient::new(dbus.clone()).await?; - let state = ISCSIState { client }; - - let router = Router::new() - .route("/initiator", get(initiator).patch(update_initiator)) - .route("/nodes", get(nodes)) - .route("/nodes/:id", delete(delete_node).patch(update_node)) - .route("/nodes/:id/login", post(login_node)) - .route("/nodes/:id/logout", post(logout_node)) - .route("/discover", post(discover)) - .with_state(state); - Ok(router) -} - -/// Sets up and returns the Axum service for the iSCSI module. -/// -/// It acts as a proxy to Agama D-Bus service. -/// -/// * `dbus`: D-Bus connection to use. -pub async fn iscsi_service(dbus: zbus::Connection) -> Result, ServiceError> { - let client = ISCSIClient::new(dbus.clone()).await?; - let state = ISCSIState { client }; - // FIXME: use anyhow temporarily until we adapt all these methods to return - // the crate::error::Error instead of ServiceError. - let router = Router::new() - .route("/config", post(set_config)) - .with_state(state); - Ok(router) -} - -/// Sets iSCSI configuration -/// -/// the json is identical to what iscsi node in profile use. -#[utoipa::path( - post, - path="/config", - context_path="/api/iscsi", - request_body=String, // FIXME: workaround to avoid defining schema here. Identical happens for storage set_config - responses( - (status = OK, description = "Set config succeed."), - (status = BAD_REQUEST, description = "It could not set the config."), - ) -)] -async fn set_config( - State(state): State>, - Json(config): Json>, -) -> Result<(), Error> { - Ok(state.client.set_config(&config).await?) -} - -/// Returns the iSCSI initiator properties. -/// -/// The iSCSI properties include the name and whether iBFT is enabled. -#[utoipa::path( - get, - path="/initiator", - context_path="/api/storage/iscsi", - responses( - (status = OK, description = "iSCSI initiator properties.", body = ISCSIInitiator), - (status = BAD_REQUEST, description = "It could not read the iSCSI initiator properties."), - ) -)] -async fn initiator(State(state): State>) -> Result, Error> { - let initiator = state.client.get_initiator().await?; - Ok(Json(initiator)) -} - -#[derive(Deserialize, utoipa::ToSchema)] -pub struct InitiatorParams { - /// iSCSI initiator name. - name: String, -} - -/// Updates the iSCSI initiator properties. -#[utoipa::path( - patch, - path="/initiator", - context_path="/api/storage/iscsi", - responses( - (status = NO_CONTENT, description = "The iSCSI initiator properties were succesfully updated."), - (status = BAD_REQUEST, description = "It could not update the iSCSI initiator properties."), - ) -)] -async fn update_initiator( - State(state): State>, - Json(params): Json, -) -> Result { - state.client.set_initiator_name(¶ms.name).await?; - Ok(StatusCode::NO_CONTENT) -} - -/// Returns the list of known iSCSI nodes. -#[utoipa::path( - get, - path="/nodes", - context_path="/api/storage/iscsi", - responses( - (status = OK, description = "List of iSCSI nodes.", body = Vec), - (status = BAD_REQUEST, description = "It was not possible to get the list of iSCSI nodes."), - ) -)] -async fn nodes(State(state): State>) -> Result>, Error> { - let nodes = state.client.get_nodes().await?; - Ok(Json(nodes)) -} - -#[derive(Deserialize, utoipa::ToSchema)] -pub struct NodeParams { - /// Startup value. - startup: String, -} - -/// Updates iSCSI node properties. -/// -/// At this point, only the startup option can be changed. -#[utoipa::path( - put, - path="/nodes/{id}", - context_path="/api/storage/iscsi", - params( - ("id" = u32, Path, description = "iSCSI artificial ID.") - ), - responses( - (status = NO_CONTENT, description = "The iSCSI node was updated.", body = NodeParams), - (status = BAD_REQUEST, description = "Could not update the iSCSI node."), - ) -)] -async fn update_node( - State(state): State>, - Path(id): Path, - Json(params): Json, -) -> Result { - state.client.set_startup(id, ¶ms.startup).await?; - Ok(StatusCode::NO_CONTENT) -} - -/// Deletes the iSCSI node. -#[utoipa::path( - delete, - path="/nodes/{id}", - context_path="/api/storage/iscsi", - params( - ("id" = u32, Path, description = "iSCSI artificial ID.") - ), - responses( - (status = NO_CONTENT, description = "The iSCSI node was deleted."), - (status = BAD_REQUEST, description = "Could not delete the iSCSI node."), - ) -)] -async fn delete_node( - State(state): State>, - Path(id): Path, -) -> Result { - state.client.delete_node(id).await?; - Ok(StatusCode::NO_CONTENT) -} - -#[derive(Deserialize, utoipa::ToSchema)] -pub struct LoginParams { - /// Authentication options. - #[serde(flatten)] - auth: ISCSIAuth, - /// Startup value. - startup: String, -} - -#[utoipa::path( - post, - path="/nodes/{id}/login", - context_path="/api/storage/iscsi", - params( - ("id" = u32, Path, description = "iSCSI artificial ID.") - ), - responses( - (status = NO_CONTENT, description = "The login request was successful."), - (status = BAD_REQUEST, description = "Could not reach the iSCSI server."), - (status = UNPROCESSABLE_ENTITY, description = "The login request failed.", - body = LoginResult), - ) -)] -async fn login_node( - State(state): State>, - Path(id): Path, - Json(params): Json, -) -> Result { - let result = state.client.login(id, params.auth, params.startup).await?; - match result { - LoginResult::Success => Ok((StatusCode::NO_CONTENT, ().into_response())), - error => Ok(( - StatusCode::UNPROCESSABLE_ENTITY, - Json(error).into_response(), - )), - } -} - -#[utoipa::path( - post, - path="/nodes/{id}/logout", - context_path="/api/storage/iscsi", - params( - ("id" = u32, Path, description = "iSCSI artificial ID.") - ), - responses( - (status = 204, description = "The logout request was successful."), - (status = 400, description = "Could not reach the iSCSI server."), - (status = 422, description = "The logout request failed."), - ) -)] -async fn logout_node( - State(state): State>, - Path(id): Path, -) -> Result { - if state.client.logout(id).await? { - Ok(StatusCode::NO_CONTENT) - } else { - Ok(StatusCode::UNPROCESSABLE_ENTITY) - } -} - -#[derive(Deserialize, utoipa::ToSchema)] -pub struct DiscoverParams { - /// iSCSI server address. - address: String, - /// iSCSI service port. - port: u32, - /// Authentication options. - #[serde(default)] - options: ISCSIAuth, -} - -/// Performs an iSCSI discovery. -#[utoipa::path( - post, - path="/discover", - context_path="/api/storage/iscsi", - responses( - (status = 204, description = "The iSCSI discovery request was successful."), - (status = 400, description = "The iSCSI discovery request failed."), - ) -)] -async fn discover( - State(state): State>, - Json(params): Json, -) -> Result { - let result = state - .client - .discover(¶ms.address, params.port, params.options) - .await?; - if result { - Ok(StatusCode::NO_CONTENT) - } else { - Ok(StatusCode::BAD_REQUEST) - } -} diff --git a/rust/agama-server/src/storage/web/iscsi/stream.rs b/rust/agama-server/src/storage/web/iscsi/stream.rs deleted file mode 100644 index 9d2228cc91..0000000000 --- a/rust/agama-server/src/storage/web/iscsi/stream.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -use std::{collections::HashMap, task::Poll}; - -use agama_lib::{ - error::ServiceError, - event, - http::OldEvent, - storage::{ISCSIClient, ISCSINode}, -}; -use agama_utils::{ - dbus::{extract_id_from_path, get_optional_property}, - property_from_dbus, -}; -use futures_util::{ready, Stream}; -use pin_project::pin_project; -use thiserror::Error; -use tokio::sync::mpsc::unbounded_channel; -use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; -use zbus::zvariant::{ObjectPath, OwnedObjectPath, OwnedValue}; - -use crate::dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}; - -/// This stream listens for changes in the collection ISCSI nodes and emits -/// the updated objects. -/// -/// It relies on the [DBusObjectChangesStream] stream and uses a cache to avoid holding a bunch of -/// proxy objects. -#[pin_project] -pub struct ISCSINodeStream { - dbus: zbus::Connection, - cache: ObjectsCache, - #[pin] - inner: UnboundedReceiverStream, -} - -/// Internal stream error -#[derive(Debug, Error)] -enum ISCSINodeStreamError { - #[error("Service error: {0}")] - Service(#[from] ServiceError), - #[error("Unknown ISCSI node: {0}")] - UnknownNode(OwnedObjectPath), -} - -impl ISCSINodeStream { - /// Creates a new stream. - /// - /// * `dbus`: D-Bus connection to listen on. - pub async fn new(dbus: &zbus::Connection) -> Result { - const MANAGER_PATH: &str = "/org/opensuse/Agama/Storage1"; - const NAMESPACE: &str = "/org/opensuse/Agama/Storage1/iscsi_nodes"; - - let (tx, rx) = unbounded_channel(); - let mut stream = DBusObjectChangesStream::new( - dbus, - &ObjectPath::from_str_unchecked(MANAGER_PATH), - &ObjectPath::from_str_unchecked(NAMESPACE), - "org.opensuse.Agama.Storage1.ISCSI.Node", - ) - .await?; - - tokio::spawn(async move { - while let Some(change) = stream.next().await { - let _ = tx.send(change); - } - }); - let rx = UnboundedReceiverStream::new(rx); - - // Populate the objects cache - let mut cache: ObjectsCache = Default::default(); - let client = ISCSIClient::new(dbus.clone()).await?; - for node in client.get_nodes().await? { - let path = ObjectPath::from_string_unchecked(format!("{}/{}", NAMESPACE, node.id)); - cache.add(path.into(), node); - } - - Ok(Self { - dbus: dbus.clone(), - cache, - inner: rx, - }) - } - - fn update_node<'a>( - cache: &'a mut ObjectsCache, - path: &OwnedObjectPath, - values: &HashMap, - ) -> Result<&'a ISCSINode, ServiceError> { - let node = cache.find_or_create(path); - node.id = extract_id_from_path(path)?; - property_from_dbus!(node, target, "Target", values, str); - property_from_dbus!(node, address, "Address", values, str); - property_from_dbus!(node, interface, "Interface", values, str); - property_from_dbus!(node, startup, "Startup", values, str); - property_from_dbus!(node, port, "Port", values, u32); - property_from_dbus!(node, connected, "Connected", values, bool); - Ok(node) - } - - fn remove_node( - cache: &mut ObjectsCache, - path: &OwnedObjectPath, - ) -> Result { - cache - .remove(path) - .ok_or_else(|| ISCSINodeStreamError::UnknownNode(path.clone())) - } - - fn handle_change( - cache: &mut ObjectsCache, - change: &DBusObjectChange, - ) -> Result { - match change { - DBusObjectChange::Added(path, values) => { - let node = Self::update_node(cache, path, values)?; - Ok(event!(ISCSINodeAdded { node: node.clone() })) - } - DBusObjectChange::Changed(path, updated) => { - let node = Self::update_node(cache, path, updated)?; - Ok(event!(ISCSINodeChanged { node: node.clone() })) - } - DBusObjectChange::Removed(path) => { - let node = Self::remove_node(cache, path)?; - Ok(event!(ISCSINodeRemoved { node })) - } - } - } -} - -impl Stream for ISCSINodeStream { - type Item = OldEvent; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut pinned = self.project(); - - Poll::Ready(loop { - let change = ready!(pinned.inner.as_mut().poll_next(cx)); - let next_value = match change { - Some(change) => { - if let Ok(event) = Self::handle_change(pinned.cache, &change) { - Some(event) - } else { - tracing::warn!("Could not process change {:?}", &change); - None - } - } - None => break None, - }; - if next_value.is_some() { - break next_value; - } - }) - } -} diff --git a/rust/agama-server/src/storage/web/zfcp.rs b/rust/agama-server/src/storage/web/zfcp.rs deleted file mode 100644 index ba0be3728f..0000000000 --- a/rust/agama-server/src/storage/web/zfcp.rs +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! This module implements the web API for the handling of zFCP storage service. -//! -//! The module offers two public functions: -//! -//! * `zfcp_service` which returns the Axum service. -//! * `zfcp_stream` which offers an stream that emits the zFCP-related events coming from D-Bus. - -use agama_lib::{ - error::ServiceError, - storage::{ - model::zfcp::{ZFCPController, ZFCPDisk}, - settings::zfcp::ZFCPConfig, - ZFCPClient, - }, -}; - -use axum::{ - extract::{Path, State}, - routing::{get, post}, - Json, Router, -}; -use serde::Serialize; -use stream::{ZFCPControllerStream, ZFCPDiskStream}; - -mod stream; - -use crate::{error::Error, web::common::EventStreams}; - -/// Returns the stream of zFCP-related events. -/// -/// The stream combines the following events: -/// -/// * Changes on the zFCP devices collection. -/// -/// * `dbus`: D-Bus connection to use. -pub async fn zfcp_stream(dbus: &zbus::Connection) -> Result { - let stream: EventStreams = vec![ - ("zfcp_disks", Box::pin(ZFCPDiskStream::new(dbus).await?)), - ( - "zfcp_controllers", - Box::pin(ZFCPControllerStream::new(dbus).await?), - ), - ]; - Ok(stream) -} - -#[derive(Clone)] -struct ZFCPState<'a> { - client: ZFCPClient<'a>, -} - -pub async fn zfcp_service(dbus: &zbus::Connection) -> Result, ServiceError> { - let client = ZFCPClient::new(dbus.clone()).await?; - let state = ZFCPState { client }; - let router = Router::new() - .route("/supported", get(supported)) - .route("/controllers", get(controllers)) - .route( - "/controllers/:controller_id/activate", - post(activate_controller), - ) - .route("/controllers/:controller_id/wwpns", get(get_wwpns)) - .route( - "/controllers/:controller_id/wwpns/:wwpn_id/luns", - get(get_luns), - ) - .route( - "/controllers/:controller_id/wwpns/:wwpn_id/luns/:lun_id/activate_disk", - post(activate_disk), - ) - .route( - "/controllers/:controller_id/wwpns/:wwpn_id/luns/:lun_id/deactivate_disk", - post(deactivate_disk), - ) - .route("/disks", get(get_disks)) - .route("/probe", post(probe)) - .route("/global_config", get(get_global_config)) - .route("/config", get(get_config).put(set_config)) - .with_state(state); - Ok(router) -} - -/// Returns whether zFCP technology is supported or not -#[utoipa::path( - get, - path="/supported", - context_path="/api/storage/zfcp", - operation_id = "zfcp_supported", - responses( - (status = OK, description = "Returns whether zFCP technology is supported") - ) -)] -async fn supported(State(state): State>) -> Result, Error> { - Ok(Json(state.client.supported().await?)) -} - -/// Returns zFCP configuration -#[utoipa::path( - get, - path="/config", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "Returns zFCP configuration", body=ZFCPGlobalConfig) - ) -)] -async fn get_config(State(_state): State>) -> Result, Error> { - // TODO: not implemented yet - Ok(Json(ZFCPConfig { devices: vec![] })) -} - -/// Sets zFCP configuration. Mainly for unattended installation. -#[utoipa::path( - put, - path="/config", - context_path="/api/storage/zfcp", - operation_id = "zfcp_set_config", - responses( - (status = OK, description = "Sets zFCP configuration") - ) -)] -async fn set_config( - State(state): State>, - Json(config): Json, -) -> Result<(), Error> { - Ok(state.client.set_config(&config).await?) -} - -/// Represents a zFCP global config (specific to s390x systems). -#[derive(Clone, Debug, Default, Serialize, utoipa::ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ZFCPGlobalConfig { - /// flag whenever allow_lun_scan is active - pub allow_lun_scan: bool, -} - -/// Returns global zFCP configuration -#[utoipa::path( - get, - path="/global_config", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "Returns global zFCP configuration", body=ZFCPGlobalConfig) - ) -)] -async fn get_global_config( - State(state): State>, -) -> Result, Error> { - let lun_scan = state.client.is_lun_scan_allowed().await?; - Ok(Json(ZFCPGlobalConfig { - allow_lun_scan: lun_scan, - })) -} - -/// Returns the list of known zFCP disks. -#[utoipa::path( - get, - path="/disks", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "List of ZFCP disks", body = Vec) - ) -)] -async fn get_disks(State(state): State>) -> Result>, Error> { - let devices = state - .client - .get_disks() - .await? - .into_iter() - .map(|(_path, device)| device) - .collect(); - Ok(Json(devices)) -} - -/// Returns the list of known zFCP controllers. -#[utoipa::path( - get, - path="/controllers", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "List of zFCP controllers", body = Vec) - ) -)] -async fn controllers( - State(state): State>, -) -> Result>, Error> { - let devices = state - .client - .get_controllers() - .await? - .into_iter() - .map(|(_path, device)| device) - .collect(); - Ok(Json(devices)) -} - -/// Activate given zFCP controller. -#[utoipa::path( - post, - path="/controllers/:controller_id/activate", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "controller activated") - ) -)] -async fn activate_controller( - State(state): State>, - Path(controller_id): Path, -) -> Result, Error> { - state - .client - .activate_controller(controller_id.as_str()) - .await?; - Ok(Json(())) -} - -/// List WWPNs for given controller. -#[utoipa::path( - post, - path="/controllers/:controller_id/wwpns", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "List of WWPNs", body=Vec) - ) -)] -async fn get_wwpns( - State(state): State>, - Path(controller_id): Path, -) -> Result>, Error> { - let result = state.client.get_wwpns(controller_id.as_str()).await?; - Ok(Json(result)) -} - -/// List LUNS for given controller and wwpn. -#[utoipa::path( - post, - path="/controllers/:controller_id/wwpns/:wwpn_id/luns", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "list of luns", body=Vec) - ) -)] -async fn get_luns( - State(state): State>, - Path((controller_id, wwpn_id)): Path<(String, String)>, -) -> Result>, Error> { - let result = state.client.get_luns(&controller_id, &wwpn_id).await?; - Ok(Json(result)) -} - -/// Activates a disk on given controller with given WWPN id and LUN id. -#[utoipa::path( - post, - path="/controllers/:controller_id/wwpns/:wwpn_id/luns/:lun_id/activate_disk", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "The activation was succesful.") - ) -)] -async fn activate_disk( - State(state): State>, - Path((controller_id, wwpn_id, lun_id)): Path<(String, String, String)>, -) -> Result, Error> { - state - .client - .activate_disk(&controller_id, &wwpn_id, &lun_id) - .await?; - Ok(Json(())) -} - -/// Deactivates disk on given controller with given WWPN id and LUN id. -#[utoipa::path( - post, - path="/controllers/:controller_id/wwpns/:wwpn_id/luns/:lun_id/deactivate_disk", - context_path="/api/storage/zfcp", - responses( - (status = OK, description = "The activation was succesful.") - ) -)] -async fn deactivate_disk( - State(state): State>, - Path((controller_id, wwpn_id, lun_id)): Path<(String, String, String)>, -) -> Result, Error> { - state - .client - .deactivate_disk(&controller_id, &wwpn_id, &lun_id) - .await?; - Ok(Json(())) -} - -/// Find zFCP devices in the system. -#[utoipa::path( - post, - path="/probe", - context_path="/api/storage/zfcp", - operation_id = "zfcp_probe", - responses( - (status = OK, description = "The probing process ran successfully") - ) -)] -async fn probe(State(state): State>) -> Result, Error> { - Ok(Json(state.client.probe().await?)) -} diff --git a/rust/agama-server/src/storage/web/zfcp/stream.rs b/rust/agama-server/src/storage/web/zfcp/stream.rs deleted file mode 100644 index e35ee94425..0000000000 --- a/rust/agama-server/src/storage/web/zfcp/stream.rs +++ /dev/null @@ -1,313 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -// FIXME: the code is pretty similar to iscsi::stream and dasd::stream. Refactor the stream to reduce the repetition. - -use std::{collections::HashMap, task::Poll}; - -use agama_lib::{ - error::ServiceError, - event, - http::OldEvent, - storage::{ - client::zfcp::ZFCPClient, - model::zfcp::{ZFCPController, ZFCPDisk}, - }, -}; -use agama_utils::{ - dbus::{extract_id_from_path, get_optional_property}, - property_from_dbus, -}; -use futures_util::{ready, Stream}; -use pin_project::pin_project; -use thiserror::Error; -use tokio::sync::mpsc::unbounded_channel; -use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; -use zbus::zvariant::{ObjectPath, OwnedObjectPath, OwnedValue}; - -use crate::dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}; - -#[derive(Debug, Error)] -enum ZFCPDiskStreamError { - #[error("Service error: {0}")] - Service(#[from] ServiceError), - #[error("Unknown ZFCP disk: {0}")] - UnknownDevice(OwnedObjectPath), -} - -/// This stream listens for changes in the collection of zFCP disks and emits -/// the updated objects. -/// -/// It relies on the [DBusObjectChangesStream] stream and uses a cache to avoid holding a bunch of -/// proxy objects. -#[pin_project] -pub struct ZFCPDiskStream { - dbus: zbus::Connection, - cache: ObjectsCache, - #[pin] - inner: UnboundedReceiverStream, -} - -impl ZFCPDiskStream { - /// Creates a new stream - /// - /// * `dbus`: D-Bus connection to listen on. - pub async fn new(dbus: &zbus::Connection) -> Result { - const MANAGER_PATH: &str = "/org/opensuse/Agama/Storage1"; - const NAMESPACE: &str = "/org/opensuse/Agama/Storage1/zfcp_disks"; - - let (tx, rx) = unbounded_channel(); - let mut stream = DBusObjectChangesStream::new( - dbus, - &ObjectPath::from_str_unchecked(MANAGER_PATH), - &ObjectPath::from_str_unchecked(NAMESPACE), - "org.opensuse.Agama.Storage1.ZFCP.Disk", - ) - .await?; - - tokio::spawn(async move { - while let Some(change) = stream.next().await { - let _ = tx.send(change); - } - }); - let rx = UnboundedReceiverStream::new(rx); - - let mut cache: ObjectsCache = Default::default(); - let client = ZFCPClient::new(dbus.clone()).await?; - for (path, device) in client.get_disks().await? { - cache.add(path, device); - } - - Ok(Self { - dbus: dbus.clone(), - cache, - inner: rx, - }) - } - - fn update_device<'a>( - cache: &'a mut ObjectsCache, - path: &OwnedObjectPath, - values: &HashMap, - ) -> Result<&'a ZFCPDisk, ServiceError> { - let device = cache.find_or_create(path); - property_from_dbus!(device, name, "Name", values, str); - property_from_dbus!(device, channel, "Channel", values, str); - property_from_dbus!(device, wwpn, "WWPN", values, str); - property_from_dbus!(device, lun, "LUN", values, str); - Ok(device) - } - - fn remove_device( - cache: &mut ObjectsCache, - path: &OwnedObjectPath, - ) -> Result { - cache - .remove(path) - .ok_or_else(|| ZFCPDiskStreamError::UnknownDevice(path.clone())) - } - - fn handle_change( - cache: &mut ObjectsCache, - change: &DBusObjectChange, - ) -> Result { - match change { - DBusObjectChange::Added(path, values) => { - let device = Self::update_device(cache, path, values)?; - Ok(event!(ZFCPDiskAdded { - device: device.clone(), - })) - } - DBusObjectChange::Changed(path, updated) => { - let device = Self::update_device(cache, path, updated)?; - Ok(event!(ZFCPDiskChanged { - device: device.clone(), - })) - } - DBusObjectChange::Removed(path) => { - let device = Self::remove_device(cache, path)?; - Ok(event!(ZFCPDiskRemoved { device })) - } - } - } -} - -impl Stream for ZFCPDiskStream { - type Item = OldEvent; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut pinned = self.project(); - - Poll::Ready(loop { - let change = ready!(pinned.inner.as_mut().poll_next(cx)); - let next_value = match change { - Some(change) => { - if let Ok(event) = Self::handle_change(pinned.cache, &change) { - Some(event) - } else { - tracing::warn!("Could not process change {:?}", &change); - None - } - } - None => break None, - }; - if next_value.is_some() { - break next_value; - } - }) - } -} - -#[derive(Debug, Error)] -enum ZFCPControllerStreamError { - #[error("Service error: {0}")] - Service(#[from] ServiceError), - #[error("Unknown ZFCP controller: {0}")] - UnknownDevice(OwnedObjectPath), -} - -/// This stream listens for changes in the collection of zFCP controllers and emits -/// the updated objects. -/// -/// It relies on the [DBusObjectChangesStream] stream and uses a cache to avoid holding a bunch of -/// proxy objects. -#[pin_project] -pub struct ZFCPControllerStream { - dbus: zbus::Connection, - cache: ObjectsCache, - #[pin] - inner: UnboundedReceiverStream, -} - -impl ZFCPControllerStream { - /// Creates a new stream - /// - /// * `dbus`: D-Bus connection to listen on. - pub async fn new(dbus: &zbus::Connection) -> Result { - const MANAGER_PATH: &str = "/org/opensuse/Agama/Storage1"; - const NAMESPACE: &str = "/org/opensuse/Agama/Storage1/zfcp_controllers"; - - let (tx, rx) = unbounded_channel(); - let mut stream = DBusObjectChangesStream::new( - dbus, - &ObjectPath::from_str_unchecked(MANAGER_PATH), - &ObjectPath::from_str_unchecked(NAMESPACE), - "org.opensuse.Agama.Storage1.ZFCP.Controller", - ) - .await?; - - tokio::spawn(async move { - while let Some(change) = stream.next().await { - let _ = tx.send(change); - } - }); - let rx = UnboundedReceiverStream::new(rx); - - let mut cache: ObjectsCache = Default::default(); - let client = ZFCPClient::new(dbus.clone()).await?; - for (path, device) in client.get_controllers().await? { - cache.add(path, device); - } - - Ok(Self { - dbus: dbus.clone(), - cache, - inner: rx, - }) - } - - fn update_device<'a>( - cache: &'a mut ObjectsCache, - path: &OwnedObjectPath, - values: &HashMap, - ) -> Result<&'a ZFCPController, ServiceError> { - let device = cache.find_or_create(path); - device.id = extract_id_from_path(path)?.to_string(); - property_from_dbus!(device, channel, "Channel", values, str); - property_from_dbus!(device, lun_scan, "LUNScan", values, bool); - property_from_dbus!(device, active, "Active", values, bool); - Ok(device) - } - - fn remove_device( - cache: &mut ObjectsCache, - path: &OwnedObjectPath, - ) -> Result { - cache - .remove(path) - .ok_or_else(|| ZFCPControllerStreamError::UnknownDevice(path.clone())) - } - - fn handle_change( - cache: &mut ObjectsCache, - change: &DBusObjectChange, - ) -> Result { - match change { - DBusObjectChange::Added(path, values) => { - let device = Self::update_device(cache, path, values)?; - Ok(event!(ZFCPControllerAdded { - device: device.clone(), - })) - } - DBusObjectChange::Changed(path, updated) => { - let device = Self::update_device(cache, path, updated)?; - Ok(event!(ZFCPControllerChanged { - device: device.clone(), - })) - } - DBusObjectChange::Removed(path) => { - let device = Self::remove_device(cache, path)?; - Ok(event!(ZFCPControllerRemoved { device })) - } - } - } -} - -impl Stream for ZFCPControllerStream { - type Item = OldEvent; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut pinned = self.project(); - - Poll::Ready(loop { - let change = ready!(pinned.inner.as_mut().poll_next(cx)); - let next_value = match change { - Some(change) => { - if let Ok(event) = Self::handle_change(pinned.cache, &change) { - Some(event) - } else { - tracing::warn!("Could not process change {:?}", &change); - None - } - } - None => break None, - }; - if next_value.is_some() { - break next_value; - } - }) - } -} diff --git a/rust/agama-server/src/users.rs b/rust/agama-server/src/users.rs index 8f6a5ce1e0..a63e0359f2 100644 --- a/rust/agama-server/src/users.rs +++ b/rust/agama-server/src/users.rs @@ -20,4 +20,4 @@ pub(crate) mod password; pub mod web; -pub use web::{users_service, users_streams}; +pub use web::users_service; diff --git a/rust/agama-server/src/users/web.rs b/rust/agama-server/src/users/web.rs index bf11994162..544485b25e 100644 --- a/rust/agama-server/src/users/web.rs +++ b/rust/agama-server/src/users/web.rs @@ -24,15 +24,9 @@ //! * `users_service` which returns the Axum service. //! * `users_stream` which offers an stream that emits the users events coming from D-Bus. -use crate::{ - error::Error, - users::password::PasswordChecker, - web::common::{service_status_router, EventStreams}, -}; +use crate::{error::Error, users::password::PasswordChecker}; use agama_lib::{ error::ServiceError, - event, - http::OldEvent, users::{model::RootPatchSettings, proxies::Users1Proxy, FirstUser, RootUser, UsersClient}, }; use axum::{ @@ -52,70 +46,6 @@ struct UsersState<'a> { users: UsersClient<'a>, } -/// Returns streams that emits users related events coming from D-Bus. -/// -/// It emits the RootPasswordChange, RootSSHKeyChanged and FirstUserChanged events. -/// -/// * `connection`: D-Bus connection to listen for events. -pub async fn users_streams(dbus: zbus::Connection) -> Result { - const FIRST_USER_ID: &str = "first_user"; - const ROOT_USER_ID: &str = "root_user"; - let result: EventStreams = vec![ - ( - FIRST_USER_ID, - Box::pin(first_user_changed_stream(dbus.clone()).await?), - ), - ( - ROOT_USER_ID, - Box::pin(root_user_changed_stream(dbus.clone()).await?), - ), - ]; - - Ok(result) -} - -async fn first_user_changed_stream( - dbus: zbus::Connection, -) -> Result + Send, Error> { - let proxy = Users1Proxy::new(&dbus).await?; - let stream = proxy - .receive_first_user_changed() - .await - .then(|change| async move { - if let Ok(user) = change.get().await { - let user_struct = FirstUser { - full_name: user.0, - user_name: user.1, - password: user.2, - hashed_password: user.3, - }; - return Some(event!(FirstUserChanged(user_struct))); - } - None - }) - .filter_map(|e| e); - Ok(stream) -} - -async fn root_user_changed_stream( - dbus: zbus::Connection, -) -> Result + Send, Error> { - let proxy = Users1Proxy::new(&dbus).await?; - let stream = proxy - .receive_root_user_changed() - .await - .then(|change| async move { - if let Ok(user) = change.get().await { - if let Ok(root) = RootUser::from_dbus(user) { - return Some(event!(RootUserChanged(root))); - } - } - None - }) - .filter_map(|e| e); - Ok(stream) -} - /// Sets up and returns the axum service for the users module. pub async fn users_service(dbus: zbus::Connection) -> Result { const DBUS_SERVICE: &str = "org.opensuse.Agama.Manager1"; @@ -125,7 +55,6 @@ pub async fn users_service(dbus: zbus::Connection) -> Result Result( config: ServiceConfig, events: event::Sender, - old_events: OldSender, dbus: zbus::Connection, web_ui_dir: P, ) -> Result where P: AsRef, { - let progress = ProgressService::start(dbus.clone(), old_events.clone()).await; - - let router = MainServiceBuilder::new(events.clone(), old_events.clone(), web_ui_dir) - .add_service( - "/manager", - manager_service(dbus.clone(), progress.clone()).await?, - ) + let router = MainServiceBuilder::new(events.clone(), web_ui_dir) .add_service("/v2", server_service(events, dbus.clone()).await?) .add_service("/security", security_service(dbus.clone()).await?) - .add_service("/storage", storage_service(dbus.clone(), progress).await?) - .add_service("/iscsi", iscsi_service(dbus.clone()).await?) .add_service("/bootloader", bootloader_service(dbus.clone()).await?) .add_service("/users", users_service(dbus.clone()).await?) .add_service("/hostname", hostname_service().await?) @@ -92,66 +73,3 @@ where .build(); Ok(router) } - -/// Starts monitoring the D-Bus service progress. -/// -/// The events are sent to the `events` channel. -/// -/// * `events`: channel to send the events to. -pub async fn run_monitor(events: OldSender) -> Result<(), ServiceError> { - let connection = connection().await?; - tokio::spawn(run_events_monitor(connection, events.clone())); - - Ok(()) -} - -/// Emits the events from the system streams through the events channel. -/// -/// * `connection`: D-Bus connection. -/// * `events`: channel to send the events to. -async fn run_events_monitor(dbus: zbus::Connection, events: OldSender) -> Result<(), Error> { - let mut stream = StreamMap::new(); - - stream.insert("manager", manager_stream(dbus.clone()).await?); - stream.insert( - "manager-status", - service_status_stream( - dbus.clone(), - "org.opensuse.Agama.Manager1", - "/org/opensuse/Agama/Manager1", - ) - .await?, - ); - for (id, user_stream) in users_streams(dbus.clone()).await? { - stream.insert(id, user_stream); - } - for (id, storage_stream) in storage_streams(dbus.clone()).await? { - stream.insert(id, storage_stream); - } - stream.insert( - "storage-status", - service_status_stream( - dbus.clone(), - "org.opensuse.Agama.Storage1", - "/org/opensuse/Agama/Storage1", - ) - .await?, - ); - stream.insert( - "storage-jobs", - jobs_stream( - dbus.clone(), - "org.opensuse.Agama.Storage1", - "/org/opensuse/Agama/Storage1", - "/org/opensuse/Agama/Storage1/jobs", - ) - .await?, - ); - - tokio::pin!(stream); - let e = events.clone(); - while let Some((_, event)) = stream.next().await { - _ = e.send(event); - } - Ok(()) -} diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs deleted file mode 100644 index ab7edbd829..0000000000 --- a/rust/agama-server/src/web/common.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright (c) [2024-2025] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! This module defines functions to be used accross all services. - -use std::pin::Pin; - -use agama_lib::{error::ServiceError, event, proxies::ServiceStatusProxy}; -use axum::{extract::State, routing::get, Json, Router}; -use serde::Serialize; -use tokio_stream::{Stream, StreamExt}; - -use crate::error::Error; - -mod jobs; -pub use jobs::{jobs_service, jobs_stream}; -mod progress; -pub use progress::{ProgressClient, ProgressRouterBuilder, ProgressService, ProgressServiceError}; - -use super::OldEvent; - -pub type EventStreams = Vec<(&'static str, Pin + Send>>)>; - -/// Builds a router to the `org.opensuse.Agama1.ServiceStatus` interface of the -/// given D-Bus object. -/// -/// ```no_run -/// # use axum::{extract::State, routing::get, Json, Router}; -/// # use agama_lib::connection; -/// # use agama_server::web::common::service_status_router; -/// # use tokio_test; -/// -/// # tokio_test::block_on(async { -/// async fn hello(state: State) {}; -/// -/// #[derive(Clone)] -/// struct HelloWorldState {}; -/// -/// let dbus = connection().await.unwrap(); -/// let status_router = service_status_router( -/// &dbus, "org.opensuse.HelloWorld", "/org/opensuse/hello" -/// ).await.unwrap(); -/// let router: Router = Router::new() -/// .route("/hello", get(hello)) -/// .merge(status_router) -/// .with_state(HelloWorldState {}); -/// }); -/// ``` -/// -/// * `dbus`: D-Bus connection. -/// * `destination`: D-Bus service name. -/// * `path`: D-Bus object path. -pub async fn service_status_router( - dbus: &zbus::Connection, - destination: &str, - path: &str, -) -> Result, ServiceError> { - let proxy = build_service_status_proxy(dbus, destination, path).await?; - let state = ServiceStatusState { proxy }; - Ok(Router::new() - .route("/status", get(service_status)) - .with_state(state)) -} - -async fn service_status(State(state): State>) -> Json { - Json(ServiceStatus { - current: state.proxy.current().await.unwrap(), - }) -} - -#[derive(Clone)] -struct ServiceStatusState<'a> { - proxy: ServiceStatusProxy<'a>, -} - -#[derive(Clone, Serialize, utoipa::ToSchema)] -pub struct ServiceStatus { - /// Current service status (0 = idle, 1 = busy). - current: u32, -} - -/// Builds a stream of the changes in the the `org.opensuse.Agama1.ServiceStatus` -/// interface of the given D-Bus object. -/// -/// * `dbus`: D-Bus connection. -/// * `destination`: D-Bus service name. -/// * `path`: D-Bus object path. -pub async fn service_status_stream( - dbus: zbus::Connection, - destination: &'static str, - path: &'static str, -) -> Result + Send>>, Error> { - let proxy = build_service_status_proxy(&dbus, destination, path).await?; - let stream = proxy - .receive_current_changed() - .await - .then(move |change| async move { - if let Ok(status) = change.get().await { - Some(event!(ServiceStatusChanged { - service: destination.to_string(), - status, - })) - } else { - None - } - }) - .filter_map(|e| e); - Ok(Box::pin(stream)) -} - -async fn build_service_status_proxy<'a>( - dbus: &zbus::Connection, - destination: &str, - path: &str, -) -> Result, zbus::Error> { - let proxy = ServiceStatusProxy::builder(dbus) - .destination(destination.to_string())? - .path(path.to_string())? - .build() - .await?; - Ok(proxy) -} diff --git a/rust/agama-server/src/web/common/jobs.rs b/rust/agama-server/src/web/common/jobs.rs deleted file mode 100644 index 7a932d8924..0000000000 --- a/rust/agama-server/src/web/common/jobs.rs +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -use std::{collections::HashMap, pin::Pin, task::Poll}; - -use agama_lib::{ - error::ServiceError, - event, - http::OldEvent, - jobs::{client::JobsClient, Job}, -}; -use agama_utils::{dbus::get_optional_property, property_from_dbus}; -use axum::{extract::State, routing::get, Json, Router}; -use futures_util::{ready, Stream}; -use pin_project::pin_project; -use tokio::sync::mpsc::unbounded_channel; -use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; -use zbus::zvariant::{ObjectPath, OwnedObjectPath, OwnedValue}; - -use crate::{ - dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}, - error::Error, -}; - -/// Builds a router for the jobs objects. -pub async fn jobs_service( - dbus: &zbus::Connection, - destination: &'static str, - path: &'static str, -) -> Result, ServiceError> { - let client = JobsClient::new(dbus.clone(), destination, path).await?; - let state = JobsState { client }; - Ok(Router::new().route("/jobs", get(jobs)).with_state(state)) -} - -#[derive(Clone)] -struct JobsState<'a> { - client: JobsClient<'a>, -} - -async fn jobs(State(state): State>) -> Result>, Error> { - let jobs = state - .client - .jobs() - .await? - .into_iter() - .map(|(_path, job)| job) - .collect(); - Ok(Json(jobs)) -} - -/// Returns the stream of jobs-related events. -/// -/// The stream combines the following events: -/// -/// * Changes on the DASD devices collection. -/// -/// * `dbus`: D-Bus connection to use. -pub async fn jobs_stream( - dbus: zbus::Connection, - destination: &'static str, - manager: &'static str, - namespace: &'static str, -) -> Result + Send>>, Error> { - let stream = JobsStream::new(&dbus, destination, manager, namespace).await?; - Ok(Box::pin(stream)) -} - -#[pin_project] -pub struct JobsStream { - dbus: zbus::Connection, - cache: ObjectsCache, - #[pin] - inner: UnboundedReceiverStream, -} - -#[derive(Debug, thiserror::Error)] -enum JobsStreamError { - #[error("Service error: {0}")] - Service(#[from] ServiceError), - #[error("Unknown job: {0}")] - UnknownJob(OwnedObjectPath), -} - -impl JobsStream { - pub async fn new( - dbus: &zbus::Connection, - destination: &'static str, - manager: &'static str, - namespace: &'static str, - ) -> Result { - let (tx, rx) = unbounded_channel(); - let mut stream = DBusObjectChangesStream::new( - dbus, - &ObjectPath::from_static_str(manager)?, - &ObjectPath::from_static_str(namespace)?, - "org.opensuse.Agama.Storage1.Job", - ) - .await?; - - tokio::spawn(async move { - while let Some(change) = stream.next().await { - let _ = tx.send(change); - } - }); - let rx = UnboundedReceiverStream::new(rx); - - let mut cache: ObjectsCache = Default::default(); - let client = JobsClient::new(dbus.clone(), destination, manager).await?; - for (path, job) in client.jobs().await? { - cache.add(path, job); - } - - Ok(Self { - dbus: dbus.clone(), - cache, - inner: rx, - }) - } - - fn update_job<'a>( - cache: &'a mut ObjectsCache, - path: &OwnedObjectPath, - values: &HashMap, - ) -> Result<&'a Job, ServiceError> { - let job = cache.find_or_create(path); - job.id = path.to_string(); - property_from_dbus!(job, running, "Running", values, bool); - property_from_dbus!(job, exit_code, "ExitCode", values, u32); - Ok(job) - } - - fn remove_job( - cache: &mut ObjectsCache, - path: &OwnedObjectPath, - ) -> Result { - cache - .remove(path) - .ok_or_else(|| JobsStreamError::UnknownJob(path.clone())) - } - - fn handle_change( - cache: &mut ObjectsCache, - change: &DBusObjectChange, - ) -> Result { - match change { - DBusObjectChange::Added(path, values) => { - let job = Self::update_job(cache, path, values)?; - Ok(event!(JobAdded { job: job.clone() })) - } - DBusObjectChange::Changed(path, updated) => { - let job = Self::update_job(cache, path, updated)?; - Ok(event!(JobChanged { job: job.clone() })) - } - DBusObjectChange::Removed(path) => { - let job = Self::remove_job(cache, path)?; - Ok(event!(JobRemoved { job })) - } - } - } -} - -impl Stream for JobsStream { - type Item = OldEvent; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut pinned = self.project(); - - Poll::Ready(loop { - let change = ready!(pinned.inner.as_mut().poll_next(cx)); - let next_value = match change { - Some(change) => { - if let Ok(event) = Self::handle_change(pinned.cache, &change) { - Some(event) - } else { - tracing::warn!("Could not process change {:?}", &change); - None - } - } - None => break None, - }; - if next_value.is_some() { - break next_value; - } - }) - } -} diff --git a/rust/agama-server/src/web/common/progress.rs b/rust/agama-server/src/web/common/progress.rs deleted file mode 100644 index b935d7aa24..0000000000 --- a/rust/agama-server/src/web/common/progress.rs +++ /dev/null @@ -1,297 +0,0 @@ -// Copyright (c) [2025] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! Defines a service that keep tracks of the Agama progress. -//! -//! It is responsible for: -//! -//! * Querying the progress via D-Bus and keeping them in a cache. -//! * Listening to D-Bus signals to keep the cache up-to-date. -//! * Emitting `ProgressChanged` events. -//! -//! The following components are included: -//! -//! * [ProgressService] that runs on a separate task to hold the status. -//! * [ProgressClient] that allows querying the [ProgressService] server about the -//! progress. -//! * [ProgressRouterBuilder] which allows building a router. -//! -//! At this point, it only handles the progress that are exposed through D-Bus. - -use agama_lib::{ - event, - http::{self, OldEvent}, - progress::{Progress, ProgressSequence}, - proxies::{ProgressChanged, ProgressProxy}, -}; -use axum::{extract::State, routing::get, Json, Router}; -use std::collections::HashMap; -use tokio::sync::{broadcast, mpsc, oneshot}; -use tokio_stream::StreamExt; -use zbus::{message::Type as MessageType, MatchRule, MessageStream}; - -type ProgressServiceResult = Result; - -#[derive(Debug, thiserror::Error)] -pub enum ProgressServiceError { - #[error("Could not return the progress")] - SendProgress, - #[error("Could not get an answer from the service: {0}")] - RecvProgress(#[from] oneshot::error::RecvError), - #[error("Could not set the command: {0}")] - SendCommand(#[from] mpsc::error::SendError), - #[error("Error parsing progress from D-Bus: {0}")] - InvalidProgress(#[from] zbus::zvariant::Error), - #[error("Error reading the progress: {0}")] - DBus(#[from] zbus::Error), - #[error("Invalid D-Bus name: {0}")] - DBusName(#[from] zbus::names::Error), - #[error("Could not send the event: {0}")] - SendEvent(#[from] broadcast::error::SendError), -} - -#[derive(Debug)] -pub enum ProgressCommand { - Get(String, String, oneshot::Sender), -} - -/// Implements a Tokio task that holds the progress for each service. -pub struct ProgressService { - cache: HashMap, - commands: mpsc::Receiver, - events: http::event::OldSender, - dbus: zbus::Connection, -} - -impl ProgressService { - /// Sets up and starts the service as a Tokio task. - /// - /// Once it is started, the service waits for: - /// - /// * Commands from a client ([ProgressClient]). - /// * Relevant events from D-Bus. - pub async fn start(dbus: zbus::Connection, events: http::event::OldSender) -> ProgressClient { - let (tx, rx) = mpsc::channel(4); - let mut service = ProgressService { - cache: HashMap::new(), - dbus, - events, - commands: rx, - }; - - tokio::spawn(async move { - if let Err(e) = service.run().await { - tracing::error!("Could not start the progress service: {e:?}") - } - }); - ProgressClient(tx) - } - - /// Main loop of the service. - async fn run(&mut self) -> ProgressServiceResult<()> { - let mut messages = build_progress_changed_stream(&self.dbus).await?; - loop { - tokio::select! { - Some(cmd) = self.commands.recv() => { - if let Err(e) = self.handle_command(cmd).await { - tracing::error!("{e}"); - } - } - - Some(Ok(message)) = messages.next() => { - if let Some(changed) = ProgressChanged::from_message(message) { - if let Err(e) = self.handle_progress_changed(changed).await { - tracing::error!("ProgressService: could not handle change: {:?}", e); - } - } - } - } - } - } - - /// Handles commands from the client. - async fn handle_command(&mut self, command: ProgressCommand) -> ProgressServiceResult<()> { - match command { - ProgressCommand::Get(service, path, tx) => { - let progress = self.get(&service, &path).await?; - tx.send(progress) - .map_err(|_| ProgressServiceError::SendProgress)?; - } - } - - Ok(()) - } - - /// Handles ProgressChanged events. - /// - /// It reports an error if something went work. If the message was processed or skipped - /// it returns Ok(()). - async fn handle_progress_changed( - &mut self, - message: ProgressChanged, - ) -> ProgressServiceResult<()> { - let args = message.args()?; - let inner = message.message(); - let header = inner.header(); - - // Given that it is a ProcessChanged, it should not happen. - let Some(path) = header.path() else { - tracing::warn!("Found a ProgressChanged signal without a path"); - return Ok(()); - }; - - let (current_step, current_title) = args.current_step(); - let progress = Progress { - current_title: current_title.to_string(), - current_step: current_step.clone(), - max_steps: args.total_steps, - finished: args.finished, - }; - let sequence = ProgressSequence { - steps: args.steps().iter().map(ToString::to_string).collect(), - progress: progress.clone(), - }; - self.cache.insert(path.to_string(), sequence.clone()); - - let event = event!(ProgressChanged { - path: path.to_string(), - progress, - }); - self.events.send(event)?; - Ok(()) - } - - /// Gets the progress for a given D-Bus service and path. - /// - /// This method uses a cache to store the values. If the value is not in the cache, - /// it asks the D-Bus service about the progress (and cache them). - /// - /// * `service`: D-Bus service to connect to. - /// * `path`: path of the D-Bus object implementing the - /// "org.opensuse.Agama1.Progress" interface. - async fn get(&mut self, service: &str, path: &str) -> ProgressServiceResult { - if let Some(sequence) = self.cache.get(path) { - return Ok(sequence.clone()); - } - - let proxy = ProgressProxy::builder(&self.dbus) - .destination(service)? - .path(path)? - .build() - .await?; - - let progress = Progress::from_proxy(&proxy).await?; - let steps = proxy.steps().await?; - let sequence = ProgressSequence { steps, progress }; - - self.cache.insert(path.to_string(), sequence.clone()); - Ok(sequence) - } -} - -/// It allows querying the [ProgressService]. -/// -/// It is cheap to clone the client and use it from several -/// places. -#[derive(Clone)] -pub struct ProgressClient(mpsc::Sender); - -impl ProgressClient { - /// Get the progress for the given D-Bus service and path. - pub async fn get(&self, service: &str, path: &str) -> ProgressServiceResult { - let (tx, rx) = oneshot::channel(); - self.0 - .send(ProgressCommand::Get( - service.to_string(), - path.to_string(), - tx, - )) - .await?; - Ok(rx.await?) - } -} - -/// It allows building an Axum router for the progress service. -pub struct ProgressRouterBuilder { - service: String, - path: String, - client: ProgressClient, -} - -impl ProgressRouterBuilder { - /// Creates a new builder. - /// - /// * `service`: D-Bus service to connect to. - /// * `path`: path of the D-Bus object implementing the - /// "org.opensuse.Agama1.Progress" interface. - /// * `client`: client to access the progress. - pub fn new(service: &str, path: &str, client: ProgressClient) -> Self { - ProgressRouterBuilder { - service: service.to_string(), - path: path.to_string(), - client, - } - } - - /// Builds the Axum router. - pub fn build(self) -> Result, crate::error::Error> { - let state = ProgressState { - service: self.service, - path: self.path, - client: self.client, - }; - - Ok(Router::new() - .route("/progress", get(Self::progress)) - .with_state(state)) - } - - /// Handler of the GET /progress endpoint. - async fn progress( - State(state): State, - ) -> Result, crate::error::Error> { - let progress = state.client.get(&state.service, &state.path).await?; - Ok(Json(progress)) - } -} - -/// State for the router. -#[derive(Clone)] -struct ProgressState { - service: String, - path: String, - client: ProgressClient, -} - -/// Returns a stream of properties changes. -/// -/// It listens for changes in several objects that are related to a network device. -pub async fn build_progress_changed_stream( - connection: &zbus::Connection, -) -> Result { - let rule = MatchRule::builder() - .msg_type(MessageType::Signal) - .interface("org.opensuse.Agama1.Progress")? - .member("ProgressChanged")? - .build(); - // The third parameter corresponds to the max_queue. We rely on the default (64). - let stream = MessageStream::for_match_rule(rule, connection, None).await?; - Ok(stream) -} diff --git a/rust/agama-server/src/web/docs.rs b/rust/agama-server/src/web/docs.rs index d9e8dd15ce..919b7dfc20 100644 --- a/rust/agama-server/src/web/docs.rs +++ b/rust/agama-server/src/web/docs.rs @@ -24,19 +24,14 @@ mod config; pub use config::ConfigApiDocBuilder; mod hostname; pub use hostname::HostnameApiDocBuilder; -mod storage; -pub use storage::StorageApiDocBuilder; mod bootloader; pub use bootloader::BootloaderApiDocBuilder; mod profile; pub use profile::ProfileApiDocBuilder; -mod manager; -pub use manager::ManagerApiDocBuilder; mod users; pub use users::UsersApiDocBuilder; mod misc; pub use misc::MiscApiDocBuilder; -pub mod common; pub trait ApiDocBuilder { fn title(&self) -> String { diff --git a/rust/agama-server/src/web/docs/common.rs b/rust/agama-server/src/web/docs/common.rs deleted file mode 100644 index 0b0a2d7e4c..0000000000 --- a/rust/agama-server/src/web/docs/common.rs +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -//! This module implements builders for the generation of OpenAPI documentation for the common APIs -//! (e.g., issues, service status or progress). - -use super::ApiDocBuilder; -use crate::web::common::ServiceStatus; -use agama_lib::progress::Progress; -use utoipa::openapi::{ - path::OperationBuilder, schema::RefBuilder, Components, ComponentsBuilder, ContentBuilder, - HttpMethod, PathItem, Paths, PathsBuilder, ResponseBuilder, ResponsesBuilder, -}; - -/// Implements a builder for the service status API documentation. -pub struct ServiceStatusApiDocBuilder { - path: String, -} - -impl ServiceStatusApiDocBuilder { - /// Creates a new builder. - /// - /// * `path`: path of the API (e.g., "/api/storage/status"). - pub fn new(path: &str) -> Self { - Self { - path: path.to_string(), - } - } -} - -impl ApiDocBuilder for ServiceStatusApiDocBuilder { - fn title(&self) -> String { - "Services status HTTP API".to_string() - } - - fn paths(&self) -> Paths { - let path_item = PathItem::new( - HttpMethod::Get, - OperationBuilder::new() - .summary(Some("Service status".to_string())) - .operation_id(Some("status")) - .responses( - ResponsesBuilder::new().response( - "200", - ResponseBuilder::new() - .description("Current service status") - .content( - "application/json", - ContentBuilder::new() - .schema(Some(RefBuilder::new().ref_location( - "#/components/schemas/ServiceStatus".to_string(), - ))) - .build(), - ), - ), - ), - ); - - PathsBuilder::new() - .path(self.path.to_string(), path_item) - .build() - } - - fn components(&self) -> Components { - ComponentsBuilder::new() - .schema_from::() - .build() - } -} - -/// Implements a builder for the progress API documentation. -pub struct ProgressApiDocBuilder { - path: String, -} - -impl ProgressApiDocBuilder { - /// Creates a new builder. - /// - /// * `path`: path of the API (e.g., "/api/storage/progress"). - pub fn new(path: &str) -> Self { - Self { - path: path.to_string(), - } - } -} - -impl ApiDocBuilder for ProgressApiDocBuilder { - fn title(&self) -> String { - "Progress HTTP API".to_string() - } - - fn paths(&self) -> Paths { - let path_item = - PathItem::new( - HttpMethod::Get, - OperationBuilder::new() - .summary(Some("Service progress".to_string())) - .operation_id(Some("progress")) - .responses( - ResponsesBuilder::new().response( - "200", - ResponseBuilder::new() - .description("Current operation progress") - .content( - "application/json", - ContentBuilder::new() - .schema(Some(RefBuilder::new().ref_location( - "#/components/schemas/Progress".to_string(), - ))) - .build(), - ), - ), - ), - ); - - PathsBuilder::new() - .path(self.path.to_string(), path_item) - .build() - } - - fn components(&self) -> Components { - ComponentsBuilder::new().schema_from::().build() - } -} diff --git a/rust/agama-server/src/web/docs/manager.rs b/rust/agama-server/src/web/docs/manager.rs deleted file mode 100644 index f7aa82e07f..0000000000 --- a/rust/agama-server/src/web/docs/manager.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -use utoipa::openapi::{ComponentsBuilder, OpenApi, PathsBuilder}; - -use super::{ - common::{ProgressApiDocBuilder, ServiceStatusApiDocBuilder}, - ApiDocBuilder, -}; - -pub struct ManagerApiDocBuilder; - -impl ApiDocBuilder for ManagerApiDocBuilder { - fn title(&self) -> String { - "Manager HTTP API".to_string() - } - - fn paths(&self) -> utoipa::openapi::Paths { - PathsBuilder::new() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .build() - } - - fn components(&self) -> utoipa::openapi::Components { - ComponentsBuilder::new() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .build() - } - - fn nested(&self) -> Option { - let mut status = ServiceStatusApiDocBuilder::new("/api/storage/status").build(); - let progress = ProgressApiDocBuilder::new("/api/storage/progress").build(); - status.merge(progress); - Some(status) - } -} diff --git a/rust/agama-server/src/web/docs/storage.rs b/rust/agama-server/src/web/docs/storage.rs deleted file mode 100644 index 89601598dd..0000000000 --- a/rust/agama-server/src/web/docs/storage.rs +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright (c) [2024] SUSE LLC -// -// All Rights Reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation; either version 2 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, contact SUSE LLC. -// -// To contact SUSE LLC about this file by physical or electronic mail, you may -// find current contact information at www.suse.com. - -use utoipa::openapi::{Components, ComponentsBuilder, OpenApi, Paths, PathsBuilder}; - -use super::{ - common::{ProgressApiDocBuilder, ServiceStatusApiDocBuilder}, - ApiDocBuilder, -}; - -pub struct StorageApiDocBuilder; - -impl ApiDocBuilder for StorageApiDocBuilder { - fn title(&self) -> String { - "Storage HTTP API".to_string() - } - - fn paths(&self) -> Paths { - PathsBuilder::new() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .path_from::() - .build() - } - - fn components(&self) -> Components { - ComponentsBuilder::new() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .schema_from::() - .build() - } - - fn nested(&self) -> Option { - let mut status = ServiceStatusApiDocBuilder::new("/api/storage/status").build(); - let progress = ProgressApiDocBuilder::new("/api/storage/progress").build(); - status.merge(progress); - Some(status) - } -} diff --git a/rust/agama-server/src/web/docs/users.rs b/rust/agama-server/src/web/docs/users.rs index f61b558d67..57fcd71df9 100644 --- a/rust/agama-server/src/web/docs/users.rs +++ b/rust/agama-server/src/web/docs/users.rs @@ -18,9 +18,9 @@ // To contact SUSE LLC about this file by physical or electronic mail, you may // find current contact information at www.suse.com. -use utoipa::openapi::{ComponentsBuilder, OpenApi, Paths, PathsBuilder}; +use utoipa::openapi::{ComponentsBuilder, Paths, PathsBuilder}; -use super::{common::ServiceStatusApiDocBuilder, ApiDocBuilder}; +use super::ApiDocBuilder; pub struct UsersApiDocBuilder; @@ -55,9 +55,4 @@ impl ApiDocBuilder for UsersApiDocBuilder { ) .build() } - - fn nested(&self) -> Option { - let status = ServiceStatusApiDocBuilder::new("/api/storage/status").build(); - Some(status) - } } diff --git a/rust/agama-server/src/web/service.rs b/rust/agama-server/src/web/service.rs index 6a0ba0d95a..8948b9103d 100644 --- a/rust/agama-server/src/web/service.rs +++ b/rust/agama-server/src/web/service.rs @@ -57,7 +57,6 @@ use tracing::Span; pub struct MainServiceBuilder { config: ServiceConfig, events: event::Sender, - old_events: http::event::OldSender, api_router: Router, public_dir: PathBuf, } @@ -67,7 +66,7 @@ impl MainServiceBuilder { /// /// * `events`: channel to send events through the WebSocket. /// * `public_dir`: path to the public directory. - pub fn new

(events: event::Sender, old_events: http::event::OldSender, public_dir: P) -> Self + pub fn new

(events: event::Sender, public_dir: P) -> Self where P: AsRef, { @@ -76,7 +75,6 @@ impl MainServiceBuilder { Self { events, - old_events, api_router, config, public_dir: PathBuf::from(public_dir.as_ref()), @@ -107,7 +105,6 @@ impl MainServiceBuilder { let state = ServiceState { config: self.config, events: self.events, - old_events: self.old_events, public_dir: self.public_dir.clone(), }; diff --git a/rust/agama-server/src/web/state.rs b/rust/agama-server/src/web/state.rs index a761ad224e..120bfa56da 100644 --- a/rust/agama-server/src/web/state.rs +++ b/rust/agama-server/src/web/state.rs @@ -21,7 +21,6 @@ //! Implements the web service state. use super::config::ServiceConfig; -use agama_lib::http; use agama_utils::api::event; use std::path::PathBuf; @@ -32,6 +31,5 @@ use std::path::PathBuf; pub struct ServiceState { pub config: ServiceConfig, pub events: event::Sender, - pub old_events: http::event::OldSender, pub public_dir: PathBuf, } diff --git a/rust/agama-server/src/web/ws.rs b/rust/agama-server/src/web/ws.rs index 2eddcd1bf3..c822a6a6f8 100644 --- a/rust/agama-server/src/web/ws.rs +++ b/rust/agama-server/src/web/ws.rs @@ -55,13 +55,6 @@ pub async fn ws_handler( async fn handle_socket(mut socket: WebSocket, events: event::Sender, client_id: Arc) { let mut events_rx = events.subscribe(); - let conn_event = agama_lib::event!(ClientConnected, client_id.as_ref()); - if let Ok(json) = serde_json::to_string(&conn_event) { - if socket.send(Message::Text(json)).await.is_err() { - return; - } - } - loop { tokio::select! { // Handle messages from the client diff --git a/rust/agama-server/tests/service.rs b/rust/agama-server/tests/service.rs index 5232486246..aa0a8a3e21 100644 --- a/rust/agama-server/tests/service.rs +++ b/rust/agama-server/tests/service.rs @@ -41,8 +41,7 @@ fn public_dir() -> PathBuf { async fn test_ping() -> Result<(), Box> { let config = ServiceConfig::default(); let (events_tx, _) = channel(16); - let (tx, _) = channel(16); - let web_service = MainServiceBuilder::new(events_tx, tx, public_dir()) + let web_service = MainServiceBuilder::new(events_tx, public_dir()) .add_service("/protected", get(protected)) .with_config(config) .build(); @@ -69,8 +68,7 @@ async fn access_protected_route(token: &str, jwt_secret: &str) -> Response { jwt_secret: jwt_secret.to_string(), }; let (events_tx, _) = channel(16); - let (tx, _) = channel(16); - let web_service = MainServiceBuilder::new(events_tx, tx, public_dir()) + let web_service = MainServiceBuilder::new(events_tx, public_dir()) .add_service("/protected", get(protected)) .with_config(config) .build(); diff --git a/rust/xtask/src/main.rs b/rust/xtask/src/main.rs index e65ca611ea..20600ea40f 100644 --- a/rust/xtask/src/main.rs +++ b/rust/xtask/src/main.rs @@ -5,8 +5,8 @@ mod tasks { use agama_cli::Cli; use agama_server::web::docs::{ - ApiDocBuilder, ConfigApiDocBuilder, HostnameApiDocBuilder, ManagerApiDocBuilder, - MiscApiDocBuilder, ProfileApiDocBuilder, StorageApiDocBuilder, UsersApiDocBuilder, + ApiDocBuilder, ConfigApiDocBuilder, HostnameApiDocBuilder, MiscApiDocBuilder, + ProfileApiDocBuilder, UsersApiDocBuilder, }; use clap::CommandFactory; use clap_complete::aot; @@ -65,10 +65,8 @@ mod tasks { write_openapi(ConfigApiDocBuilder {}, out_dir.join("config.json"))?; write_openapi(HostnameApiDocBuilder {}, out_dir.join("hostname.json"))?; - write_openapi(ManagerApiDocBuilder {}, out_dir.join("manager.json"))?; write_openapi(MiscApiDocBuilder {}, out_dir.join("misc.json"))?; write_openapi(ProfileApiDocBuilder {}, out_dir.join("profile.json"))?; - write_openapi(StorageApiDocBuilder {}, out_dir.join("storage.json"))?; write_openapi(UsersApiDocBuilder {}, out_dir.join("users.json"))?; println!( "Generate the OpenAPI specification at {}.",