diff --git a/src/db/models.rs b/src/db/models.rs index b50ce751..efb36124 100644 --- a/src/db/models.rs +++ b/src/db/models.rs @@ -57,7 +57,9 @@ impl From for String { /// Represents a scan event. /// /// A scan event is created when a file is added by [Triggers](crate::service::triggers). -#[derive(Queryable, Selectable, Serialize, Clone, Debug, AsChangeset, Identifiable)] +#[derive( + Queryable, Selectable, Serialize, Clone, Debug, AsChangeset, Identifiable, Hash, Eq, PartialEq, +)] #[diesel(table_name = crate::db::schema::scan_events)] pub struct ScanEvent { /// The [uuid](crate::utils::generate_uuid::generate_uuid) of the scan event. diff --git a/src/service/targets/fileflows.rs b/src/service/targets/fileflows.rs index 571c7750..83e892f0 100644 --- a/src/service/targets/fileflows.rs +++ b/src/service/targets/fileflows.rs @@ -3,7 +3,8 @@ use std::collections::HashMap; use crate::{db::models::ScanEvent, settings::target::TargetProcess}; use reqwest::header; use serde::{Deserialize, Serialize}; -use tracing::error; +use serde_json::Value; +use tracing::{error, trace}; #[derive(Deserialize, Clone)] pub struct FileFlows { @@ -11,6 +12,13 @@ pub struct FileFlows { pub url: String, } +#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)] +#[doc(hidden)] +#[serde(rename_all = "PascalCase")] +struct FileFlowsFlow { + uid: String, +} + #[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)] #[doc(hidden)] #[serde(rename_all = "PascalCase")] @@ -18,16 +26,63 @@ struct FileFlowsLibrary { uid: String, enabled: bool, path: Option, + flow: Option, +} + +// #[derive(Serialize)] +// #[doc(hidden)] +// #[serde(rename_all = "PascalCase")] +// struct FileFlowsRescanLibraryRequest { +// uids: Vec, +// } + +#[derive(Serialize, Debug)] +#[doc(hidden)] +#[serde(rename_all = "PascalCase")] +struct FileFlowsManuallyAddRequest { + flow_uid: String, + files: Vec, + #[serde(default)] + custom_variables: HashMap, } #[derive(Serialize)] #[doc(hidden)] #[serde(rename_all = "PascalCase")] -struct FileFlowsRescanLibraryRequest { +struct FileFlowsSearchRequest { + path: String, + limit: u32, // set to 1 +} + +#[derive(Serialize, Default, Debug)] +#[doc(hidden)] +#[serde(rename_all = "PascalCase")] +struct FileFlowsReprocessRequest { uids: Vec, + custom_variables: HashMap, + mode: u8, + flow: Option, + node: Option, + bottom_of_queue: bool, } -// TODO: get library files and then reprocess them +#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)] +#[doc(hidden)] +#[serde(rename_all = "PascalCase")] +struct FileFlowsLibraryFile { + uid: String, + flow_uid: String, + name: String, // filename, maybe use output_path later.. +} + +// How to "scan" a file in fileflows +// First, get the libraries +// Group files with their library +// if the library disabled- error +// Next get each file and check their status +// If they are processed, send a reprocess request individually +// For the rest, send a manual-add request, again still in a group with their library + impl FileFlows { fn get_client(&self) -> anyhow::Result { let headers = header::HeaderMap::new(); @@ -56,25 +111,95 @@ impl FileFlows { } } - async fn rescan_library(&self, libraries: &FileFlowsLibrary) -> anyhow::Result<()> { + async fn get_library_file( + &self, + ev: &ScanEvent, + ) -> anyhow::Result> { + let client = self.get_client()?; + + let url = url::Url::parse(&self.url)?.join("/api/library-file/search")?; + + let req = FileFlowsSearchRequest { + path: ev.file_path.clone(), + limit: 1, + }; + + let res = client.post(url.to_string()).json(&req).send().await?; + + if res.status().is_success() { + let files: Vec = res.json().await?; + + Ok(files.first().cloned()) + } else { + let body = res.text().await?; + Err(anyhow::anyhow!("unable to get library file: {}", body)) + } + } + + async fn reprocess_library_filse(&self, evs: Vec<&FileFlowsLibraryFile>) -> anyhow::Result<()> { + let client = self.get_client()?; + + let url = url::Url::parse(&self.url)?.join("/api/library-file/reprocess")?; + + let req = FileFlowsReprocessRequest { + uids: evs.iter().map(|ev| ev.uid.clone()).collect(), + ..Default::default() + }; + + let res = client.post(url.to_string()).json(&req).send().await?; + + if res.status().is_success() { + Ok(()) + } else { + let body = res.text().await?; + Err(anyhow::anyhow!("unable to send reprocess: {}", body)) + } + } + + async fn manually_add_files( + &self, + library: &FileFlowsLibrary, + files: Vec<&ScanEvent>, + ) -> anyhow::Result<()> { let client = self.get_client()?; - let url = url::Url::parse(&self.url)?.join("/api/library/rescan")?; + let url = url::Url::parse(&self.url)?.join("/api/library-file/manually-add")?; - let req = FileFlowsRescanLibraryRequest { - uids: vec![libraries.uid.clone()], + let req = FileFlowsManuallyAddRequest { + flow_uid: library.flow.as_ref().unwrap().uid.clone(), + files: files.iter().map(|ev| ev.file_path.clone()).collect(), + custom_variables: HashMap::new(), }; - let res = client.put(url.to_string()).json(&req).send().await?; + let res = client.post(url.to_string()).json(&req).send().await?; if res.status().is_success() { Ok(()) } else { let body = res.text().await?; - Err(anyhow::anyhow!("unable to send rescan: {}", body)) + Err(anyhow::anyhow!("unable to send manual-add: {}", body)) } } + // async fn rescan_library(&self, libraries: &FileFlowsLibrary) -> anyhow::Result<()> { + // let client = self.get_client()?; + + // let url = url::Url::parse(&self.url)?.join("/api/library/rescan")?; + + // let req = FileFlowsRescanLibraryRequest { + // uids: vec![libraries.uid.clone()], + // }; + + // let res = client.put(url.to_string()).json(&req).send().await?; + + // if res.status().is_success() { + // Ok(()) + // } else { + // let body = res.text().await?; + // Err(anyhow::anyhow!("unable to send rescan: {}", body)) + // } + // } + // No longer in fileflows.. // async fn scan(&self, ev: &ScanEvent, library: &FileFlowsLibrary) -> anyhow::Result<()> { // let client = self.get_client()?; @@ -99,50 +224,91 @@ impl TargetProcess for FileFlows { let mut succeeded = Vec::new(); let libraries = self.get_libraries().await?; - let mut to_scan: HashMap<&FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new(); + let mut to_scan: HashMap> = HashMap::new(); - for ev in evs { - let library = libraries.iter().find(|l| { - l.path - .as_ref() - .map_or(false, |path| ev.file_path.starts_with(path)) - }); + for library in libraries { + let files = evs + .iter() + .filter_map(|ev| { + if ev.file_path.starts_with(library.path.as_deref()?) { + Some(*ev) + } else { + None + } + }) + .collect::>(); - if library.is_none() { - error!("unable to find library for file: {}", ev.file_path); + if files.is_empty() { continue; } - let library = library.unwrap(); - if !library.enabled { - error!("library '{}' is disabled", library.uid); + error!( + "library '{}' is disabled but {} files will fail to scan", + library.uid, + files.len() + ); continue; } - // let res = self.scan(ev, library).await; - - // match res { - // Ok(_) => { - // succeeded.push(ev.file_path.clone()); - // } - // Err(e) => { - // error!("failed to process '{}': {:?}", ev.file_path, e); - // } - // } - - to_scan.entry(library).or_default().push(ev); + to_scan.insert(library, files); } for (library, evs) in to_scan { - if let Err(e) = self.rescan_library(library).await { - error!( - "failed to rescan '{}': {:?}", - library.path.clone().unwrap_or_else(|| library.uid.clone()), - e - ); - } else { - succeeded.extend(evs.iter().map(|ev| ev.id.clone())); + let mut library_files = HashMap::new(); + + for ev in evs { + let file = self.get_library_file(ev).await?; + + if let Some(file) = file { + library_files.insert(ev, Some(file)); + } else { + library_files.insert(ev, None); + } + } + + let (processed, not_processed): (Vec<_>, Vec<_>) = + library_files.iter().partition(|(_, file)| file.is_some()); + + trace!( + "library {} has {} processed and {} not processed files", + library.uid, + processed.len(), + not_processed.len() + ); + + if !processed.is_empty() { + match self + .reprocess_library_filse( + processed + .iter() + .filter_map(|(_, file)| file.as_ref()) + .collect(), + ) + .await + { + Ok(()) => { + trace!("reprocessed {} files", processed.len()); + succeeded.extend(processed.iter().map(|(ev, _)| ev.id.clone())); + } + Err(e) => error!("failed to reprocess files: {}", e), + } + } + + if !not_processed.is_empty() { + match self + .manually_add_files( + &library, + not_processed.iter().map(|(ev, _)| **ev).collect(), + ) + .await + { + Ok(()) => { + trace!("manually added {} files", not_processed.len()); + succeeded.extend(not_processed.iter().map(|(ev, _)| ev.id.clone())); + } + Err(e) => error!("failed to manually add files: {}", e), + } } }