Skip to content

Commit

Permalink
feat(fileflows): enhance file processing with reprocess/manual-add (#146
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dan-online authored Dec 29, 2024
1 parent 1119854 commit 6ae41dd
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 42 deletions.
4 changes: 3 additions & 1 deletion src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ impl From<ProcessStatus> for String {
/// Represents a scan event.
///
/// A scan event is created when a file is added by [Triggers](crate::service::triggers).
#[derive(Queryable, Selectable, Serialize, Clone, Debug, AsChangeset, Identifiable)]
#[derive(
Queryable, Selectable, Serialize, Clone, Debug, AsChangeset, Identifiable, Hash, Eq, PartialEq,
)]
#[diesel(table_name = crate::db::schema::scan_events)]
pub struct ScanEvent {
/// The [uuid](crate::utils::generate_uuid::generate_uuid) of the scan event.
Expand Down
248 changes: 207 additions & 41 deletions src/service/targets/fileflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,86 @@ use std::collections::HashMap;
use crate::{db::models::ScanEvent, settings::target::TargetProcess};
use reqwest::header;
use serde::{Deserialize, Serialize};
use tracing::error;
use serde_json::Value;
use tracing::{error, trace};

#[derive(Deserialize, Clone)]
pub struct FileFlows {
/// URL to the FileFlows server
pub url: String,
}

#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
#[doc(hidden)]
#[serde(rename_all = "PascalCase")]
struct FileFlowsFlow {
uid: String,
}

#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
#[doc(hidden)]
#[serde(rename_all = "PascalCase")]
struct FileFlowsLibrary {
uid: String,
enabled: bool,
path: Option<String>,
flow: Option<FileFlowsFlow>,
}

// #[derive(Serialize)]
// #[doc(hidden)]
// #[serde(rename_all = "PascalCase")]
// struct FileFlowsRescanLibraryRequest {
// uids: Vec<String>,
// }

#[derive(Serialize, Debug)]
#[doc(hidden)]
#[serde(rename_all = "PascalCase")]
struct FileFlowsManuallyAddRequest {
flow_uid: String,
files: Vec<String>,
#[serde(default)]
custom_variables: HashMap<String, String>,
}

#[derive(Serialize)]
#[doc(hidden)]
#[serde(rename_all = "PascalCase")]
struct FileFlowsRescanLibraryRequest {
struct FileFlowsSearchRequest {
path: String,
limit: u32, // set to 1
}

#[derive(Serialize, Default, Debug)]
#[doc(hidden)]
#[serde(rename_all = "PascalCase")]
struct FileFlowsReprocessRequest {
uids: Vec<String>,
custom_variables: HashMap<String, String>,
mode: u8,
flow: Option<Value>,
node: Option<Value>,
bottom_of_queue: bool,
}

// TODO: get library files and then reprocess them
#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
#[doc(hidden)]
#[serde(rename_all = "PascalCase")]
struct FileFlowsLibraryFile {
uid: String,
flow_uid: String,
name: String, // filename, maybe use output_path later..
}

// How to "scan" a file in fileflows
// First, get the libraries
// Group files with their library
// if the library disabled- error
// Next get each file and check their status
// If they are processed, send a reprocess request individually
// For the rest, send a manual-add request, again still in a group with their library

impl FileFlows {
fn get_client(&self) -> anyhow::Result<reqwest::Client> {
let headers = header::HeaderMap::new();
Expand Down Expand Up @@ -56,25 +111,95 @@ impl FileFlows {
}
}

async fn rescan_library(&self, libraries: &FileFlowsLibrary) -> anyhow::Result<()> {
async fn get_library_file(
&self,
ev: &ScanEvent,
) -> anyhow::Result<Option<FileFlowsLibraryFile>> {
let client = self.get_client()?;

let url = url::Url::parse(&self.url)?.join("/api/library-file/search")?;

let req = FileFlowsSearchRequest {
path: ev.file_path.clone(),
limit: 1,
};

let res = client.post(url.to_string()).json(&req).send().await?;

if res.status().is_success() {
let files: Vec<FileFlowsLibraryFile> = res.json().await?;

Ok(files.first().cloned())
} else {
let body = res.text().await?;
Err(anyhow::anyhow!("unable to get library file: {}", body))
}
}

async fn reprocess_library_filse(&self, evs: Vec<&FileFlowsLibraryFile>) -> anyhow::Result<()> {
let client = self.get_client()?;

let url = url::Url::parse(&self.url)?.join("/api/library-file/reprocess")?;

let req = FileFlowsReprocessRequest {
uids: evs.iter().map(|ev| ev.uid.clone()).collect(),
..Default::default()
};

let res = client.post(url.to_string()).json(&req).send().await?;

if res.status().is_success() {
Ok(())
} else {
let body = res.text().await?;
Err(anyhow::anyhow!("unable to send reprocess: {}", body))
}
}

async fn manually_add_files(
&self,
library: &FileFlowsLibrary,
files: Vec<&ScanEvent>,
) -> anyhow::Result<()> {
let client = self.get_client()?;

let url = url::Url::parse(&self.url)?.join("/api/library/rescan")?;
let url = url::Url::parse(&self.url)?.join("/api/library-file/manually-add")?;

let req = FileFlowsRescanLibraryRequest {
uids: vec![libraries.uid.clone()],
let req = FileFlowsManuallyAddRequest {
flow_uid: library.flow.as_ref().unwrap().uid.clone(),
files: files.iter().map(|ev| ev.file_path.clone()).collect(),
custom_variables: HashMap::new(),
};

let res = client.put(url.to_string()).json(&req).send().await?;
let res = client.post(url.to_string()).json(&req).send().await?;

if res.status().is_success() {
Ok(())
} else {
let body = res.text().await?;
Err(anyhow::anyhow!("unable to send rescan: {}", body))
Err(anyhow::anyhow!("unable to send manual-add: {}", body))
}
}

// async fn rescan_library(&self, libraries: &FileFlowsLibrary) -> anyhow::Result<()> {
// let client = self.get_client()?;

// let url = url::Url::parse(&self.url)?.join("/api/library/rescan")?;

// let req = FileFlowsRescanLibraryRequest {
// uids: vec![libraries.uid.clone()],
// };

// let res = client.put(url.to_string()).json(&req).send().await?;

// if res.status().is_success() {
// Ok(())
// } else {
// let body = res.text().await?;
// Err(anyhow::anyhow!("unable to send rescan: {}", body))
// }
// }

// No longer in fileflows..
// async fn scan(&self, ev: &ScanEvent, library: &FileFlowsLibrary) -> anyhow::Result<()> {
// let client = self.get_client()?;
Expand All @@ -99,50 +224,91 @@ impl TargetProcess for FileFlows {
let mut succeeded = Vec::new();
let libraries = self.get_libraries().await?;

let mut to_scan: HashMap<&FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new();
let mut to_scan: HashMap<FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new();

for ev in evs {
let library = libraries.iter().find(|l| {
l.path
.as_ref()
.map_or(false, |path| ev.file_path.starts_with(path))
});
for library in libraries {
let files = evs
.iter()
.filter_map(|ev| {
if ev.file_path.starts_with(library.path.as_deref()?) {
Some(*ev)
} else {
None
}
})
.collect::<Vec<_>>();

if library.is_none() {
error!("unable to find library for file: {}", ev.file_path);
if files.is_empty() {
continue;
}

let library = library.unwrap();

if !library.enabled {
error!("library '{}' is disabled", library.uid);
error!(
"library '{}' is disabled but {} files will fail to scan",
library.uid,
files.len()
);
continue;
}

// let res = self.scan(ev, library).await;

// match res {
// Ok(_) => {
// succeeded.push(ev.file_path.clone());
// }
// Err(e) => {
// error!("failed to process '{}': {:?}", ev.file_path, e);
// }
// }

to_scan.entry(library).or_default().push(ev);
to_scan.insert(library, files);
}

for (library, evs) in to_scan {
if let Err(e) = self.rescan_library(library).await {
error!(
"failed to rescan '{}': {:?}",
library.path.clone().unwrap_or_else(|| library.uid.clone()),
e
);
} else {
succeeded.extend(evs.iter().map(|ev| ev.id.clone()));
let mut library_files = HashMap::new();

for ev in evs {
let file = self.get_library_file(ev).await?;

if let Some(file) = file {
library_files.insert(ev, Some(file));
} else {
library_files.insert(ev, None);
}
}

let (processed, not_processed): (Vec<_>, Vec<_>) =
library_files.iter().partition(|(_, file)| file.is_some());

trace!(
"library {} has {} processed and {} not processed files",
library.uid,
processed.len(),
not_processed.len()
);

if !processed.is_empty() {
match self
.reprocess_library_filse(
processed
.iter()
.filter_map(|(_, file)| file.as_ref())
.collect(),
)
.await
{
Ok(()) => {
trace!("reprocessed {} files", processed.len());
succeeded.extend(processed.iter().map(|(ev, _)| ev.id.clone()));
}
Err(e) => error!("failed to reprocess files: {}", e),
}
}

if !not_processed.is_empty() {
match self
.manually_add_files(
&library,
not_processed.iter().map(|(ev, _)| **ev).collect(),
)
.await
{
Ok(()) => {
trace!("manually added {} files", not_processed.len());
succeeded.extend(not_processed.iter().map(|(ev, _)| ev.id.clone()));
}
Err(e) => error!("failed to manually add files: {}", e),
}
}
}

Expand Down

0 comments on commit 6ae41dd

Please sign in to comment.