diff --git a/server/src/cli.rs b/server/src/cli.rs index cec579e9a..857c165d3 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -101,6 +101,12 @@ pub struct Cli { /// CORS behaviour pub cors: bool, + + /// The local hot_tier path is used for optimising the query performance in the distributed systems + pub hot_tier_storage_path: Option, + + ///maximum disk usage allowed + pub max_disk_usage: f64, } impl Cli { @@ -134,6 +140,8 @@ impl Cli { pub const DEFAULT_PASSWORD: &'static str = "admin"; pub const FLIGHT_PORT: &'static str = "flight-port"; pub const CORS: &'static str = "cors"; + pub const HOT_TIER_PATH: &'static str = "hot-tier-path"; + pub const MAX_DISK_USAGE: &'static str = "max-disk-usage"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -395,7 +403,27 @@ impl Cli { "lz4", "zstd"]) .help("Parquet compression algorithm"), - ).group( + ) + .arg( + Arg::new(Self::HOT_TIER_PATH) + .long(Self::HOT_TIER_PATH) + .env("P_HOT_TIER_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for hot tier data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::MAX_DISK_USAGE) + .long(Self::MAX_DISK_USAGE) + .env("P_MAX_DISK_USAGE_PERCENT") + .value_name("percentage") + .default_value("80.0") + .value_parser(validation::validate_disk_usage) + .help("Maximum allowed disk usage in percentage e.g 90.0 for 90%") + .next_line_help(true), + ) + .group( ArgGroup::new("oidc") .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) @@ -532,6 +560,12 @@ impl FromArgMatches for Cli { _ => unreachable!(), }; + self.hot_tier_storage_path = m.get_one::(Self::HOT_TIER_PATH).cloned(); + self.max_disk_usage = m + .get_one::(Self::MAX_DISK_USAGE) + .cloned() + .expect("default for max disk usage"); + Ok(()) } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index b13cd1679..1a777efdd 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -27,6 +27,7 @@ use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }; +use crate::hottier::{HotTierManager, StreamHotTier}; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::{Mode, CONFIG}; @@ -37,6 +38,7 @@ use crate::{ catalog::{self, remove_manifest_from_snapshot}, event, stats, }; + use crate::{metadata, validator}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; @@ -919,6 +921,122 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, +) -> Result { + if CONFIG.parseable.mode != Mode::Query { + return Err(StreamError::Custom { + msg: "Hot tier can only be enabled in query mode".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + if STREAM_INFO + .get_time_partition(&stream_name) + .unwrap() + .is_some() + { + return Err(StreamError::Custom { + msg: "Hot tier can not be enabled for stream with time partition".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + let body = body.into_inner(); + let mut hottier: StreamHotTier = match serde_json::from_value(body) { + Ok(hottier) => hottier, + Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), + }; + + validator::hot_tier(&hottier.size.to_string())?; + + STREAM_INFO.set_hot_tier(&stream_name, true)?; + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager + .validate_hot_tier_size(&stream_name, &hottier.size) + .await?; + hottier.used_size = Some("0GiB".to_string()); + hottier.available_size = Some(hottier.size.clone()); + hot_tier_manager + .put_hot_tier(&stream_name, &mut hottier) + .await?; + let storage = CONFIG.storage().get_object_store(); + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; + stream_metadata.hot_tier_enabled = Some(true); + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; + } + + Ok(( + format!("hot tier set for stream {stream_name}"), + StatusCode::OK, + )) +} + +pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { + if CONFIG.parseable.mode != Mode::Query { + return Err(StreamError::Custom { + msg: "Hot tier can only be enabled in query mode".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + let hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?; + Ok((web::Json(hot_tier), StatusCode::OK)) + } else { + Err(StreamError::Custom { + msg: format!("hot tier not initialised for stream {}", stream_name), + status: (StatusCode::BAD_REQUEST), + }) + } +} + +pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result { + if CONFIG.parseable.mode != Mode::Query { + return Err(StreamError::Custom { + msg: "Hot tier can only be enabled in query mode".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.delete_hot_tier(&stream_name).await?; + } + Ok(( + format!("hot tier deleted for stream {stream_name}"), + StatusCode::OK, + )) +} #[allow(unused)] fn classify_json_error(kind: serde_json::error::Category) -> StatusCode { match kind { @@ -935,9 +1053,12 @@ pub mod error { use http::StatusCode; use crate::{ + hottier::HotTierError, metadata::error::stream_info::MetadataError, storage::ObjectStorageError, - validator::error::{AlertValidationError, StreamNameValidationError}, + validator::error::{ + AlertValidationError, HotTierValidationError, StreamNameValidationError, + }, }; #[allow(unused)] @@ -997,6 +1118,16 @@ pub mod error { Network(#[from] reqwest::Error), #[error("Could not deserialize into JSON object, {0}")] SerdeError(#[from] serde_json::Error), + #[error( + "Hot tier is not enabled at the server config, cannot enable hot tier for stream {0}" + )] + HotTierNotEnabled(String), + #[error("failed to enable hottier due to err: {0}")] + InvalidHotTierConfig(serde_json::Error), + #[error("Hot tier validation failed due to {0}")] + HotTierValidation(#[from] HotTierValidationError), + #[error("{0}")] + HotTierError(#[from] HotTierError), } impl actix_web::ResponseError for StreamError { @@ -1030,6 +1161,10 @@ pub mod error { StreamError::Network(err) => { err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) } + StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST, + StreamError::InvalidHotTierConfig(_) => StatusCode::BAD_REQUEST, + StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, + StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index c1323f8e2..166524c21 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -20,7 +20,7 @@ use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; - +use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; @@ -188,6 +188,10 @@ impl QueryServer { if matches!(init_cluster_metrics_schedular(), Ok(())) { log::info!("Cluster metrics scheduler started successfully"); } + + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.download_from_s3()?; + }; let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync(); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index ceaf35810..213d279a4 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -343,6 +343,25 @@ impl Server { .to(logstream::get_cache_enabled) .authorize_for_stream(Action::GetCacheEnabled), ), + ) + .service( + web::resource("/hottier") + // PUT "/logstream/{logstream}/hottier" ==> Set hottier for given logstream + .route( + web::put() + .to(logstream::put_stream_hot_tier) + .authorize_for_stream(Action::PutHotTierEnabled), + ) + .route( + web::get() + .to(logstream::get_stream_hot_tier) + .authorize_for_stream(Action::GetHotTierEnabled), + ) + .route( + web::delete() + .to(logstream::delete_stream_hot_tier) + .authorize_for_stream(Action::DeleteHotTierEnabled), + ), ), ) } diff --git a/server/src/hottier.rs b/server/src/hottier.rs new file mode 100644 index 000000000..df130ba5b --- /dev/null +++ b/server/src/hottier.rs @@ -0,0 +1,779 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{ + collections::BTreeMap, + io, + path::{Path, PathBuf}, + sync::Arc, +}; + +use crate::{ + catalog::manifest::{File, Manifest}, + metadata::{error::stream_info::MetadataError, STREAM_INFO}, + option::{ + validation::{bytes_to_human_size, human_size_to_bytes}, + CONFIG, + }, + storage::{ObjectStorage, ObjectStorageError}, + utils::extract_datetime, + validator::error::HotTierValidationError, +}; +use chrono::NaiveDate; +use clokwerk::{AsyncScheduler, Interval, Job}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures_util::TryFutureExt; +use object_store::{local::LocalFileSystem, ObjectStore}; +use once_cell::sync::OnceCell; +use parquet::errors::ParquetError; +use relative_path::RelativePathBuf; +use std::time::Duration; +use sysinfo::{Disks, System}; +use tokio::fs::{self, DirEntry}; +use tokio::io::AsyncWriteExt; +use tokio_stream::wrappers::ReadDirStream; + +pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; +pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB +const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct StreamHotTier { + #[serde(rename = "size")] + pub size: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub used_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub available_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub oldest_date_time_entry: Option, +} + +pub struct HotTierManager { + filesystem: LocalFileSystem, + hot_tier_path: PathBuf, +} + +impl HotTierManager { + pub fn global() -> Option<&'static HotTierManager> { + static INSTANCE: OnceCell = OnceCell::new(); + + let hot_tier_path = CONFIG.parseable.hot_tier_storage_path.as_ref()?; + + Some(INSTANCE.get_or_init(|| { + let hot_tier_path = hot_tier_path.clone(); + std::fs::create_dir_all(&hot_tier_path).unwrap(); + HotTierManager { + filesystem: LocalFileSystem::new(), + hot_tier_path, + } + })) + } + + ///get the total hot tier size for all streams + pub async fn get_hot_tiers_size( + &self, + current_stream: &str, + ) -> Result<(u64, u64), HotTierError> { + let mut total_hot_tier_size = 0; + let mut total_hot_tier_used_size = 0; + for stream in STREAM_INFO.list_streams() { + if self.check_stream_hot_tier_exists(&stream) && stream != current_stream { + let stream_hot_tier = self.get_hot_tier(&stream).await?; + total_hot_tier_size += human_size_to_bytes(&stream_hot_tier.size).unwrap(); + total_hot_tier_used_size += + human_size_to_bytes(&stream_hot_tier.used_size.unwrap()).unwrap(); + } + } + Ok((total_hot_tier_size, total_hot_tier_used_size)) + } + + /// validate if hot tier size can be fit in the disk + /// check disk usage and hot tier size of all other streams + /// check if total hot tier size of all streams is less than max disk usage + /// delete all the files from hot tier once validation is successful and hot tier is ready to be updated + pub async fn validate_hot_tier_size( + &self, + stream: &str, + size: &str, + ) -> Result<(), HotTierError> { + let mut existing_hot_tier_used_size = 0; + if self.check_stream_hot_tier_exists(stream) { + //delete existing hot tier if its size is less than the updated hot tier size else return error + let existing_hot_tier = self.get_hot_tier(stream).await?; + existing_hot_tier_used_size = + human_size_to_bytes(&existing_hot_tier.used_size.unwrap()).unwrap(); + if human_size_to_bytes(size) < human_size_to_bytes(&existing_hot_tier.size) { + return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( + "The hot tier size for the stream is already set to {} which is greater than the updated hot tier size of {}, reducing the hot tier size is not allowed", + existing_hot_tier.size, + size + )))); + } + } + + let (total_disk_space, available_disk_space, used_disk_space) = get_disk_usage(); + + if let (Some(total_disk_space), Some(available_disk_space), Some(used_disk_space)) = + (total_disk_space, available_disk_space, used_disk_space) + { + let stream_hot_tier_size = human_size_to_bytes(size).unwrap(); + let (total_hot_tier_size, total_hot_tier_used_size) = + self.get_hot_tiers_size(stream).await?; + let projected_disk_usage = total_hot_tier_size + stream_hot_tier_size + used_disk_space + - existing_hot_tier_used_size + - total_hot_tier_used_size; + let usage_percentage = + ((projected_disk_usage as f64 / total_disk_space as f64) * 100.0).round(); + if usage_percentage > CONFIG.parseable.max_disk_usage { + return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( + "Including the hot tier size of all the streams, the projected disk usage will be {}% which is above the set threshold of {}%, hence unable to set the hot tier for the stream. Total Disk Size: {}, Available Disk Size: {}, Used Disk Size: {}, Total Hot Tier Size (all other streams): {}", + usage_percentage, + CONFIG.parseable.max_disk_usage, + bytes_to_human_size(total_disk_space), + bytes_to_human_size(available_disk_space), + bytes_to_human_size(used_disk_space), + bytes_to_human_size(total_hot_tier_size) + )))); + } + } + + Ok(()) + } + + ///get the hot tier metadata file for the stream + pub async fn get_hot_tier(&self, stream: &str) -> Result { + if !self.check_stream_hot_tier_exists(stream) { + return Err(HotTierError::HotTierValidationError( + HotTierValidationError::NotFound(stream.to_owned()), + )); + } + let path = hot_tier_file_path(&self.hot_tier_path, stream)?; + let res = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + match res { + Ok(bytes) => { + let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?; + let oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; + stream_hot_tier.oldest_date_time_entry = oldest_date_time_entry; + Ok(stream_hot_tier) + } + Err(err) => Err(err.into()), + } + } + + pub async fn delete_hot_tier(&self, stream: &str) -> Result<(), HotTierError> { + if self.check_stream_hot_tier_exists(stream) { + let path = self.hot_tier_path.join(stream); + fs::remove_dir_all(path).await?; + Ok(()) + } else { + Err(HotTierError::HotTierValidationError( + HotTierValidationError::NotFound(stream.to_owned()), + )) + } + } + + ///put the hot tier metadata file for the stream + /// set the updated_date_range in the hot tier metadata file + pub async fn put_hot_tier( + &self, + stream: &str, + hot_tier: &mut StreamHotTier, + ) -> Result<(), HotTierError> { + let path = hot_tier_file_path(&self.hot_tier_path, stream)?; + let bytes = serde_json::to_vec(&hot_tier)?.into(); + self.filesystem.put(&path, bytes).await?; + Ok(()) + } + + ///schedule the download of the hot tier files from S3 every minute + pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> + where + 'a: 'static, + { + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(HOT_TIER_SYNC_DURATION) + .plus(Interval::Seconds(5)) + .run(move || async { + if let Err(err) = self.sync_hot_tier().await { + log::error!("Error in hot tier scheduler: {:?}", err); + } + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + }); + Ok(()) + } + + ///sync the hot tier files from S3 to the hot tier directory for all streams + async fn sync_hot_tier(&self) -> Result<(), HotTierError> { + let streams = STREAM_INFO.list_streams(); + let sync_hot_tier_tasks = FuturesUnordered::new(); + for stream in streams { + if self.check_stream_hot_tier_exists(&stream) { + sync_hot_tier_tasks.push(async move { self.process_stream(stream).await }); + //self.process_stream(stream).await?; + } + } + + let res: Vec<_> = sync_hot_tier_tasks.collect().await; + for res in res { + if let Err(err) = res { + log::error!("Failed to run hot tier sync task {err:?}"); + return Err(err); + } + } + Ok(()) + } + + ///process the hot tier files for the stream + /// delete the files from the hot tier directory if the available date range is outside the hot tier range + async fn process_stream(&self, stream: String) -> Result<(), HotTierError> { + let stream_hot_tier = self.get_hot_tier(&stream).await?; + let mut parquet_file_size = + human_size_to_bytes(stream_hot_tier.used_size.as_ref().unwrap()).unwrap(); + + let object_store = CONFIG.storage().get_object_store(); + let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?; + self.process_manifest( + &stream, + &mut s3_manifest_file_list, + &mut parquet_file_size, + object_store.clone(), + ) + .await?; + + Ok(()) + } + + ///process the hot tier files for the date for the stream + /// collect all manifests from S3 for the date, sort the parquet file list + /// in order to download the latest files first + /// download the parquet files if not present in hot tier directory + async fn process_manifest( + &self, + stream: &str, + manifest_files_to_download: &mut BTreeMap>, + parquet_file_size: &mut u64, + object_store: Arc, + ) -> Result<(), HotTierError> { + if manifest_files_to_download.is_empty() { + return Ok(()); + } + for (str_date, manifest_files) in manifest_files_to_download.iter().rev() { + let mut storage_combined_manifest = Manifest::default(); + + for manifest_file in manifest_files { + let manifest_path: RelativePathBuf = RelativePathBuf::from(manifest_file.clone()); + let storage_manifest_bytes = object_store.get_object(&manifest_path).await?; + let storage_manifest: Manifest = serde_json::from_slice(&storage_manifest_bytes)?; + storage_combined_manifest + .files + .extend(storage_manifest.files); + } + + storage_combined_manifest + .files + .sort_by_key(|file| file.file_path.clone()); + + while let Some(parquet_file) = storage_combined_manifest.files.pop() { + let parquet_file_path = &parquet_file.file_path; + let parquet_path = self.hot_tier_path.join(parquet_file_path); + + if !parquet_path.exists() { + if let Ok(date) = + NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") + { + if !self + .process_parquet_file( + stream, + &parquet_file, + parquet_file_size, + parquet_path, + date, + ) + .await? + { + break; + } + } else { + log::warn!("Invalid date format: {}", str_date); + } + } + } + } + + Ok(()) + } + + ///process the parquet file for the stream + /// check if the disk is available to download the parquet file + /// if not available, delete the oldest entry from the hot tier directory + /// download the parquet file from S3 to the hot tier directory + /// update the used and available size in the hot tier metadata + /// return true if the parquet file is processed successfully + async fn process_parquet_file( + &self, + stream: &str, + parquet_file: &File, + parquet_file_size: &mut u64, + parquet_path: PathBuf, + date: NaiveDate, + ) -> Result { + let mut file_processed = false; + let mut stream_hot_tier = self.get_hot_tier(stream).await?; + if !self.is_disk_available(parquet_file.file_size).await? + || human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap()).unwrap() + <= parquet_file.file_size + { + if !self + .cleanup_hot_tier_old_data( + stream, + &mut stream_hot_tier, + &parquet_path, + parquet_file.file_size, + ) + .await? + { + return Ok(file_processed); + } + *parquet_file_size = + human_size_to_bytes(&stream_hot_tier.used_size.clone().unwrap()).unwrap(); + } + let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); + fs::create_dir_all(parquet_path.parent().unwrap()).await?; + let mut file = fs::File::create(parquet_path.clone()).await?; + let parquet_data = CONFIG + .storage() + .get_object_store() + .get_object(&parquet_file_path) + .await?; + file.write_all(&parquet_data).await?; + *parquet_file_size += parquet_file.file_size; + stream_hot_tier.used_size = Some(bytes_to_human_size(*parquet_file_size)); + + stream_hot_tier.available_size = Some(bytes_to_human_size( + human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap()).unwrap() + - parquet_file.file_size, + )); + self.put_hot_tier(stream, &mut stream_hot_tier).await?; + file_processed = true; + let mut hot_tier_manifest = self + .get_stream_hot_tier_manifest_for_date(stream, &date) + .await?; + hot_tier_manifest.files.push(parquet_file.clone()); + // write the manifest file to the hot tier directory + let manifest_path = self + .hot_tier_path + .join(stream) + .join(format!("date={}/hottier.manifest.json", date)); + fs::create_dir_all(manifest_path.parent().unwrap()).await?; + fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; + Ok(file_processed) + } + + ///delete the files for the date range given from the hot tier directory for the stream + /// update the used and available size in the hot tier metadata + pub async fn delete_files_from_hot_tier( + &self, + stream: &str, + dates: &[NaiveDate], + ) -> Result<(), HotTierError> { + for date in dates.iter() { + let path = self.hot_tier_path.join(format!("{}/date={}", stream, date)); + if path.exists() { + fs::remove_dir_all(path.clone()).await?; + } + } + + Ok(()) + } + + ///fetch the list of dates available in the hot tier directory for the stream and sort them + pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result, HotTierError> { + let mut date_list = Vec::new(); + let path = self.hot_tier_path.join(stream); + if path.exists() { + let directories = ReadDirStream::new(fs::read_dir(&path).await?); + let dates: Vec = directories.try_collect().await?; + for date in dates { + if !date.path().is_dir() { + continue; + } + let date = date.file_name().into_string().unwrap(); + date_list.push( + NaiveDate::parse_from_str(date.trim_start_matches("date="), "%Y-%m-%d") + .unwrap(), + ); + } + } + date_list.sort(); + Ok(date_list) + } + + ///get hot tier manifest for the stream and date + pub async fn get_stream_hot_tier_manifest_for_date( + &self, + stream: &str, + date: &NaiveDate, + ) -> Result { + let mut hot_tier_manifest = Manifest::default(); + let path = self + .hot_tier_path + .join(stream) + .join(format!("date={}", date)); + if path.exists() { + let date_dirs = ReadDirStream::new(fs::read_dir(&path).await?); + let manifest_files: Vec = date_dirs.try_collect().await?; + for manifest in manifest_files { + if !manifest + .file_name() + .to_string_lossy() + .ends_with(".manifest.json") + { + continue; + } + let file = fs::read(manifest.path()).await?; + let manifest: Manifest = serde_json::from_slice(&file)?; + hot_tier_manifest.files.extend(manifest.files); + } + } + Ok(hot_tier_manifest) + } + + ///get the list of files from all the manifests present in hot tier directory for the stream + pub async fn get_hot_tier_manifest_files( + &self, + stream: &str, + manifest_files: Vec, + ) -> Result<(Vec, Vec), HotTierError> { + let mut hot_tier_files = self.get_hot_tier_parquet_files(stream).await?; + hot_tier_files.retain(|file| { + manifest_files + .iter() + .any(|manifest_file| manifest_file.file_path.eq(&file.file_path)) + }); + let remaining_files: Vec = manifest_files + .into_iter() + .filter(|manifest_file| { + hot_tier_files + .iter() + .all(|file| !file.file_path.eq(&manifest_file.file_path)) + }) + .collect(); + Ok((hot_tier_files, remaining_files)) + } + + ///get the list of parquet files from the hot tier directory for the stream + pub async fn get_hot_tier_parquet_files( + &self, + stream: &str, + ) -> Result, HotTierError> { + let mut hot_tier_parquet_files: Vec = Vec::new(); + let date_list = self.fetch_hot_tier_dates(stream).await?; + for date in date_list { + let manifest = self + .get_stream_hot_tier_manifest_for_date(stream, &date) + .await?; + + for parquet_file in manifest.files { + hot_tier_parquet_files.push(parquet_file.clone()); + } + } + Ok(hot_tier_parquet_files) + } + + ///check if the hot tier metadata file exists for the stream + pub fn check_stream_hot_tier_exists(&self, stream: &str) -> bool { + let path = self + .hot_tier_path + .join(stream) + .join(STREAM_HOT_TIER_FILENAME); + path.exists() + } + + ///delete the parquet file from the hot tier directory for the stream + /// loop through all manifests in the hot tier directory for the stream + /// loop through all parquet files in the manifest + /// check for the oldest entry to delete if the path exists in hot tier + /// update the used and available size in the hot tier metadata + /// loop if available size is still less than the parquet file size + pub async fn cleanup_hot_tier_old_data( + &self, + stream: &str, + stream_hot_tier: &mut StreamHotTier, + download_file_path: &Path, + parquet_file_size: u64, + ) -> Result { + let mut delete_successful = false; + let dates = self.fetch_hot_tier_dates(stream).await?; + 'loop_dates: for date in dates { + let date_str = date.to_string(); + let path = &self + .hot_tier_path + .join(stream) + .join(format!("date={}", date_str)); + if !path.exists() { + continue; + } + + let date_dirs = ReadDirStream::new(fs::read_dir(&path).await?); + let mut manifest_files: Vec = date_dirs.try_collect().await?; + manifest_files.retain(|manifest| { + manifest + .file_name() + .to_string_lossy() + .ends_with(".manifest.json") + }); + for manifest_file in manifest_files { + let file = fs::read(manifest_file.path()).await?; + let mut manifest: Manifest = serde_json::from_slice(&file)?; + + manifest.files.sort_by_key(|file| file.file_path.clone()); + manifest.files.reverse(); + + 'loop_files: while let Some(file_to_delete) = manifest.files.pop() { + let file_size = file_to_delete.file_size; + let path_to_delete = CONFIG + .parseable + .hot_tier_storage_path + .as_ref() + .unwrap() + .join(&file_to_delete.file_path); + + if path_to_delete.exists() { + if let (Some(download_date_time), Some(delete_date_time)) = ( + extract_datetime(download_file_path.to_str().unwrap()), + extract_datetime(path_to_delete.to_str().unwrap()), + ) { + if download_date_time <= delete_date_time { + delete_successful = false; + break 'loop_files; + } + } + + fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + + fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; + delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?; + + stream_hot_tier.used_size = Some(bytes_to_human_size( + human_size_to_bytes(&stream_hot_tier.used_size.clone().unwrap()) + .unwrap() + - file_size, + )); + stream_hot_tier.available_size = Some(bytes_to_human_size( + human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap()) + .unwrap() + + file_size, + )); + self.put_hot_tier(stream, stream_hot_tier).await?; + delete_successful = true; + + if human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap()) + .unwrap() + <= parquet_file_size + { + continue 'loop_files; + } else { + break 'loop_dates; + } + } else { + fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + } + } + } + } + + Ok(delete_successful) + } + + ///check if the disk is available to download the parquet file + /// check if the disk usage is above the threshold + pub async fn is_disk_available(&self, size_to_download: u64) -> Result { + let (total_disk_space, available_disk_space, used_disk_space) = get_disk_usage(); + + if let (Some(total_disk_space), Some(available_disk_space), Some(used_disk_space)) = + (total_disk_space, available_disk_space, used_disk_space) + { + if available_disk_space < size_to_download { + return Ok(false); + } + + if ((used_disk_space + size_to_download) as f64 * 100.0 / total_disk_space as f64) + > CONFIG.parseable.max_disk_usage + { + return Ok(false); + } + } + + Ok(true) + } + + pub async fn get_oldest_date_time_entry( + &self, + stream: &str, + ) -> Result, HotTierError> { + let date_list = self.fetch_hot_tier_dates(stream).await?; + if date_list.is_empty() { + return Ok(None); + } + + for date in date_list { + let path = self + .hot_tier_path + .join(stream) + .join(format!("date={}", date)); + + let hours_dir = ReadDirStream::new(fs::read_dir(&path).await?); + let mut hours: Vec = hours_dir.try_collect().await?; + hours.retain(|entry| { + entry.path().is_dir() && entry.file_name().to_string_lossy().starts_with("hour=") + }); + hours.sort_by_key(|entry| entry.file_name().to_string_lossy().to_string()); + + for hour in hours { + let hour_str = hour + .file_name() + .to_string_lossy() + .trim_start_matches("hour=") + .to_string(); + + let minutes_dir = ReadDirStream::new(fs::read_dir(hour.path()).await?); + let mut minutes: Vec = minutes_dir.try_collect().await?; + minutes.retain(|entry| { + entry.path().is_dir() + && entry.file_name().to_string_lossy().starts_with("minute=") + }); + minutes.sort_by_key(|entry| entry.file_name().to_string_lossy().to_string()); + + if let Some(minute) = minutes.first() { + let minute_str = minute + .file_name() + .to_string_lossy() + .trim_start_matches("minute=") + .to_string(); + let oldest_date_time = format!("{} {}:{}:00", date, hour_str, minute_str); + return Ok(Some(oldest_date_time)); + } + } + } + + Ok(None) + } +} + +/// get the hot tier file path for the stream +pub fn hot_tier_file_path( + root: impl AsRef, + stream: &str, +) -> Result { + let path = root.as_ref().join(stream).join(STREAM_HOT_TIER_FILENAME); + object_store::path::Path::from_absolute_path(path) +} + +///get the disk usage for the hot tier storage path +pub fn get_disk_usage() -> (Option, Option, Option) { + let mut sys = System::new_all(); + sys.refresh_all(); + let path = CONFIG.parseable.hot_tier_storage_path.as_ref().unwrap(); + + let mut disks = Disks::new_with_refreshed_list(); + disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); + disks.reverse(); + + for disk in disks.iter() { + if path.starts_with(disk.mount_point().to_str().unwrap()) { + let total_disk_space = disk.total_space(); + let available_disk_space = disk.available_space(); + let used_disk_space = total_disk_space - available_disk_space; + return ( + Some(total_disk_space), + Some(available_disk_space), + Some(used_disk_space), + ); + } + } + + (None, None, None) +} + +async fn delete_empty_directory_hot_tier(path: &Path) -> io::Result<()> { + async fn delete_helper(path: &Path) -> io::Result<()> { + if path.is_dir() { + let mut read_dir = fs::read_dir(path).await?; + let mut subdirs = vec![]; + + while let Some(entry) = read_dir.next_entry().await? { + let entry_path = entry.path(); + if entry_path.is_dir() { + subdirs.push(entry_path); + } + } + let mut tasks = vec![]; + for subdir in &subdirs { + tasks.push(delete_empty_directory_hot_tier(subdir)); + } + futures::stream::iter(tasks) + .buffer_unordered(10) + .try_collect::>() + .await?; + + // Re-check the directory after deleting its subdirectories + let mut read_dir = fs::read_dir(path).await?; + if read_dir.next_entry().await?.is_none() { + fs::remove_dir(path).await?; + } + } + Ok(()) + } + delete_helper(path).await +} + +#[derive(Debug, thiserror::Error)] +pub enum HotTierError { + #[error("{0}")] + Serde(#[from] serde_json::Error), + #[error("{0}")] + IOError(#[from] io::Error), + #[error("{0}")] + MoveError(#[from] fs_extra::error::Error), + #[error("{0}")] + ObjectStoreError(#[from] object_store::Error), + #[error("{0}")] + ObjectStorePathError(#[from] object_store::path::Error), + #[error("{0}")] + ObjectStorageError(#[from] ObjectStorageError), + #[error("{0}")] + ParquetError(#[from] ParquetError), + #[error("{0}")] + MetadataError(#[from] MetadataError), + #[error("{0}")] + HotTierValidationError(#[from] HotTierValidationError), + #[error("{0}")] + Anyhow(#[from] anyhow::Error), +} diff --git a/server/src/main.rs b/server/src/main.rs index e9d92abfc..fca2ca307 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -24,6 +24,7 @@ mod catalog; mod cli; mod event; mod handlers; +mod hottier; mod livetail; mod localcache; mod metadata; diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 06eb995e7..40b47720a 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -61,6 +61,7 @@ pub struct LogStreamMetadata { pub time_partition_limit: Option, pub custom_partition: Option, pub static_schema_flag: Option, + pub hot_tier_enabled: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -248,6 +249,15 @@ impl StreamInfo { Ok(()) } + pub fn set_hot_tier(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + let stream = map + .get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; + stream.hot_tier_enabled = Some(enable); + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub fn add_stream( &self, @@ -330,6 +340,7 @@ impl StreamInfo { time_partition_limit: meta.time_partition_limit, custom_partition: meta.custom_partition, static_schema_flag: meta.static_schema_flag, + hot_tier_enabled: meta.hot_tier_enabled, }; let mut map = self.write().expect(LOCK_EXPECT); @@ -459,6 +470,7 @@ pub async fn load_stream_metadata_on_server_start( time_partition_limit, custom_partition, static_schema_flag: meta.static_schema_flag.clone(), + hot_tier_enabled: meta.hot_tier_enabled, }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); diff --git a/server/src/option.rs b/server/src/option.rs index 06e693405..fa90b2cd2 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -28,7 +28,8 @@ use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use std::env; use std::path::PathBuf; use std::sync::Arc; -pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB +pub const MIN_CACHE_SIZE_BYTES: u64 = 1073741824; // 1 GiB + pub const JOIN_COMMUNITY: &str = "Join us on Parseable Slack community for questions : https://logg.ing/community"; pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); @@ -328,7 +329,7 @@ pub mod validation { url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string()) } - fn human_size_to_bytes(s: &str) -> Result { + pub fn human_size_to_bytes(s: &str) -> Result { fn parse_and_map( s: &str, ) -> Result { @@ -342,23 +343,51 @@ pub mod validation { .or(parse_and_map::(s)) .or(parse_and_map::(s)) .map_err(|_| "Could not parse given size".to_string())?; + Ok(size) + } - if size < MIN_CACHE_SIZE_BYTES { - return Err( - "Specified value of cache size is smaller than current minimum of 1GiB".to_string(), - ); + pub fn bytes_to_human_size(bytes: u64) -> String { + const KIB: u64 = 1024; + const MIB: u64 = KIB * 1024; + const GIB: u64 = MIB * 1024; + const TIB: u64 = GIB * 1024; + const PIB: u64 = TIB * 1024; + + if bytes < KIB { + format!("{} B", bytes) + } else if bytes < MIB { + format!("{:.2} KB", bytes as f64 / KIB as f64) + } else if bytes < GIB { + format!("{:.2} MiB", bytes as f64 / MIB as f64) + } else if bytes < TIB { + format!("{:.2} GiB", bytes as f64 / GIB as f64) + } else if bytes < PIB { + format!("{:.2} TiB", bytes as f64 / TIB as f64) + } else { + format!("{:.2} PiB", bytes as f64 / PIB as f64) } - - Ok(size) } pub fn cache_size(s: &str) -> Result { let size = human_size_to_bytes(s)?; if size < MIN_CACHE_SIZE_BYTES { - return Err( - "Specified value of cache size is smaller than current minimum of 1GiB".to_string(), - ); + return Err(format!( + "Specified value of cache size is smaller than current minimum of {}", + human_size_to_bytes(&MIN_CACHE_SIZE_BYTES.to_string()).unwrap() + )); } Ok(size) } + + pub fn validate_disk_usage(max_disk_usage: &str) -> Result { + if let Ok(max_disk_usage) = max_disk_usage.parse::() { + if (0.0..=100.0).contains(&max_disk_usage) { + Ok(max_disk_usage) + } else { + Err("Invalid value for max disk usage. It should be between 0 and 100".to_string()) + } + } else { + Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string()) + } + } } diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 6f9eb5608..98d097c10 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -16,6 +16,7 @@ * */ +use crate::hottier::HotTierManager; use crate::Mode; use crate::{ catalog::snapshot::{self, Snapshot}, @@ -293,6 +294,7 @@ impl TableProvider for StandardTableProvider { ) -> Result, DataFusionError> { let mut memory_exec = None; let mut cache_exec = None; + let mut hot_tier_exec = None; let object_store = state .runtime_env() .object_store_registry @@ -416,10 +418,53 @@ impl TableProvider for StandardTableProvider { cache_exec = Some(plan) } + // Hot tier data fetch + if let Some(hot_tier_manager) = HotTierManager::global() { + if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { + let (hot_tier_files, remainder) = hot_tier_manager + .get_hot_tier_manifest_files(&self.stream, manifest_files) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + // Assign remaining entries back to manifest list + // This is to be used for remote query + manifest_files = remainder; + + let hot_tier_files = hot_tier_files + .into_iter() + .map(|mut file| { + let path = CONFIG + .parseable + .hot_tier_storage_path + .as_ref() + .unwrap() + .join(&file.file_path); + file.file_path = path.to_str().unwrap().to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = + partitioned_files(hot_tier_files, &self.schema, 1); + let plan = create_parquet_physical_plan( + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + statistics, + self.schema.clone(), + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; + + hot_tier_exec = Some(plan) + } + } if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); return final_plan( - vec![memory_exec, cache_exec], + vec![memory_exec, cache_exec, hot_tier_exec], projection, self.schema.clone(), ); @@ -440,7 +485,7 @@ impl TableProvider for StandardTableProvider { .await?; Ok(final_plan( - vec![memory_exec, cache_exec, Some(remote_exec)], + vec![memory_exec, cache_exec, hot_tier_exec, Some(remote_exec)], projection, self.schema.clone(), )?) diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index accbd471a..1532c7c21 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -32,6 +32,9 @@ pub enum Action { PutRetention, GetCacheEnabled, PutCacheEnabled, + PutHotTierEnabled, + GetHotTierEnabled, + DeleteHotTierEnabled, PutAlert, GetAlert, PutUser, @@ -117,6 +120,9 @@ impl RoleBuilder { | Action::ListCluster | Action::ListClusterMetrics | Action::Deleteingestor + | Action::PutHotTierEnabled + | Action::GetHotTierEnabled + | Action::DeleteHotTierEnabled | Action::ListDashboard | Action::GetDashboard | Action::CreateDashboard @@ -206,6 +212,9 @@ pub mod model { Action::PutRetention, Action::PutCacheEnabled, Action::GetCacheEnabled, + Action::PutHotTierEnabled, + Action::GetHotTierEnabled, + Action::DeleteHotTierEnabled, Action::PutAlert, Action::GetAlert, Action::GetAbout, @@ -249,6 +258,7 @@ pub mod model { Action::GetAbout, Action::QueryLLM, Action::ListCluster, + Action::GetHotTierEnabled, ], stream: None, tag: None, diff --git a/server/src/storage.rs b/server/src/storage.rs index d28dcb00e..fe081b617 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -100,6 +100,8 @@ pub struct ObjectStoreFormat { pub custom_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub hot_tier_enabled: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -167,6 +169,7 @@ impl Default for ObjectStoreFormat { time_partition_limit: None, custom_partition: None, static_schema_flag: None, + hot_tier_enabled: None, } } } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 78b02e8d5..4aa2d65d8 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -17,6 +17,7 @@ */ use std::{ + collections::BTreeMap, path::{Path, PathBuf}, sync::Arc, time::Instant, @@ -411,6 +412,14 @@ impl ObjectStorage for LocalFS { Ok(dates.into_iter().flatten().collect()) } + async fn list_manifest_files( + &self, + _stream_name: &str, + ) -> Result>, ObjectStorageError> { + //unimplemented + Ok(BTreeMap::new()) + } + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let op = CopyOptions { overwrite: true, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 433327778..67119a164 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -49,6 +49,7 @@ use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; +use std::collections::BTreeMap; use std::{ collections::HashMap, fs, @@ -87,6 +88,10 @@ pub trait ObjectStorage: Sync + 'static { async fn get_all_saved_filters(&self) -> Result, ObjectStorageError>; async fn get_all_dashboards(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; + async fn list_manifest_files( + &self, + stream_name: &str, + ) -> Result>, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>; async fn get_ingestor_meta_file_paths( diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d94b937d3..ff4d26162 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -33,6 +33,7 @@ use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::collections::BTreeMap; use std::iter::Iterator; use std::path::Path as StdPath; use std::sync::Arc; @@ -343,6 +344,36 @@ impl S3 { Ok(dates) } + async fn _list_manifest_files( + &self, + stream: &str, + ) -> Result>, ObjectStorageError> { + let mut result_file_list: BTreeMap> = BTreeMap::new(); + let resp = self + .client + .list_with_delimiter(Some(&(stream.into()))) + .await?; + + let dates = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != stream && name.as_ref() != STREAM_ROOT_DIRECTORY) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for date in dates { + let date_path = object_store::path::Path::from(format!("{}/{}", stream, &date)); + let resp = self.client.list_with_delimiter(Some(&date_path)).await?; + let manifests: Vec = resp + .objects + .iter() + .filter(|name| name.location.filename().unwrap().ends_with("manifest.json")) + .map(|name| name.location.to_string()) + .collect(); + result_file_list.entry(date).or_default().extend(manifests); + } + Ok(result_file_list) + } async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { let instant = Instant::now(); @@ -604,6 +635,15 @@ impl ObjectStorage for S3 { Ok(streams) } + async fn list_manifest_files( + &self, + stream_name: &str, + ) -> Result>, ObjectStorageError> { + let files = self._list_manifest_files(stream_name).await?; + + Ok(files) + } + async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { self._upload_file(key, path).await?; diff --git a/server/src/utils.rs b/server/src/utils.rs index b8435b25e..34630c375 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -23,13 +23,13 @@ pub mod json; pub mod uid; pub mod update; use crate::option::CONFIG; -use chrono::{DateTime, NaiveDate, Timelike, Utc}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc}; use itertools::Itertools; +use regex::Regex; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::env; use url::Url; - #[allow(dead_code)] pub fn hostname() -> Option { hostname::get() @@ -305,6 +305,22 @@ pub fn get_ingestor_id() -> String { result } +pub fn extract_datetime(path: &str) -> Option { + let re = Regex::new(r"date=(\d{4}-\d{2}-\d{2})/hour=(\d{2})/minute=(\d{2})").unwrap(); + if let Some(caps) = re.captures(path) { + let date_str = caps.get(1)?.as_str(); + let hour_str = caps.get(2)?.as_str(); + let minute_str = caps.get(3)?.as_str(); + + let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?; + let time = + NaiveTime::parse_from_str(&format!("{}:{}", hour_str, minute_str), "%H:%M").ok()?; + Some(NaiveDateTime::new(date, time)) + } else { + None + } +} + #[cfg(test)] mod tests { use chrono::DateTime; diff --git a/server/src/validator.rs b/server/src/validator.rs index a1648ff22..cd8b69710 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -16,11 +16,15 @@ * */ +use error::HotTierValidationError; + use self::error::{AlertValidationError, StreamNameValidationError, UsernameValidationError}; use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; use crate::handlers::http::cluster::INTERNAL_STREAM_NAME; +use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; +use crate::option::validation::{bytes_to_human_size, human_size_to_bytes}; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -144,6 +148,21 @@ pub fn user_name(username: &str) -> Result<(), UsernameValidationError> { Ok(()) } +pub fn hot_tier(size: &str) -> Result<(), HotTierValidationError> { + if human_size_to_bytes(size).is_err() { + return Err(HotTierValidationError::Size(bytes_to_human_size( + MIN_STREAM_HOT_TIER_SIZE_BYTES, + ))); + } + + if human_size_to_bytes(size).unwrap() < MIN_STREAM_HOT_TIER_SIZE_BYTES { + return Err(HotTierValidationError::Size(bytes_to_human_size( + MIN_STREAM_HOT_TIER_SIZE_BYTES, + ))); + } + + Ok(()) +} pub mod error { #[derive(Debug, thiserror::Error)] @@ -191,4 +210,19 @@ pub mod error { )] SpecialChar, } + + #[derive(Debug, thiserror::Error)] + pub enum HotTierValidationError { + #[error("Invalid size given for hot tier, please provide size in human readable format, e.g 1GiB, 2GiB, minimum {0}")] + Size(String), + + #[error("While generating times for 'now' failed to parse duration")] + NotValidDuration(#[from] humantime::DurationError), + + #[error("Parsed duration out of range")] + OutOfRange(#[from] chrono::OutOfRangeError), + + #[error("Hot tier not found for stream {0}")] + NotFound(String), + } }