diff --git a/.gitignore b/.gitignore index 27ff2bd97..c759f2831 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ env-file parseable parseable_* parseable-env-secret +cache diff --git a/server/Cargo.toml b/server/Cargo.toml index a948a39d3..8d03fcb4c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -96,7 +96,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] } xz2 = { version = "*", features = ["static"] } nom = "7.1.3" humantime = "2.1.0" -human-size = "0.4" +human-size = "0.4.3" openid = { version = "0.12.0", default-features = false, features = ["rustls"] } url = "2.4.0" http-auth-basic = "0.3.3" diff --git a/server/src/banner.rs b/server/src/banner.rs index 05c8fbc01..f48ae92ba 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -45,10 +45,6 @@ fn print_ascii_art() { "#; eprint!("{ascii_name}"); - eprintln!( - " - Welcome to Parseable Server!" - ); } fn status_info(config: &Config, scheme: &str, id: Uid) { @@ -71,17 +67,21 @@ fn status_info(config: &Config, scheme: &str, id: Uid) { None => "Not Configured".grey(), }; + eprintln!( + " + Welcome to Parseable Server! Deployment UID: \"{}\"", + id.to_string(), + ); + eprintln!( " {} Address: {} Credentials: {} - Deployment UID: \"{}\" LLM: \"{}\"", "Server:".to_string().bold(), address, credentials, - id.to_string(), llm_status ); } @@ -89,8 +89,8 @@ fn status_info(config: &Config, scheme: &str, id: Uid) { /// Prints information about the `ObjectStorage`. /// - Mode (`Local drive`, `S3 bucket`) /// - Staging (temporary landing point for incoming events) -/// - Store (path where the data is stored) -/// - Latency +/// - Cache (local cache of data) +/// - Store (path where the data is stored and its latency) async fn storage_info(config: &Config) { let storage = config.storage(); let latency = storage.get_object_store().get_latency().await; @@ -99,29 +99,33 @@ async fn storage_info(config: &Config) { " {} Mode: \"{}\" - Staging: \"{}\" - Store: \"{}\" - Latency: \"{:?}\"", + Staging: \"{}\"", "Storage:".to_string().bold(), config.mode_string(), config.staging_dir().to_string_lossy(), - storage.get_endpoint(), - latency ); if let Some(path) = &config.parseable.local_cache_path { - let size: SpecificSize<human_size::Gigabyte> = + let size: SpecificSize<human_size::Gigibyte> = SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte) .unwrap() .into(); eprintln!( "\ - {:8}Cache: \"{}\" - Cache Size: \"{}\"", + {:8}Cache: \"{}\", (size: {})", "", path.display(), size ); } + + eprintln!( + "\ + {:8}Store: \"{}\", (latency: {:?})", + "", + storage.get_endpoint(), + latency + ); + } diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index d29adea4e..2f9f2e57f 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -43,6 +43,7 @@ pub enum SortOrder { } pub type SortInfo = (String, SortOrder); +pub const CURRENT_MANIFEST_VERSION: &str = "v1"; /// An entry in a manifest which points to a single file. /// Additionally, it is meant to store the statistics for the file it @@ -67,7 +68,7 @@ pub struct Manifest { impl Default for Manifest { fn default() -> Self { Self { - version: "v1".to_string(), + version: CURRENT_MANIFEST_VERSION.to_string(), files: Vec::default(), } } diff --git a/server/src/catalog/snapshot.rs b/server/src/catalog/snapshot.rs index 3c953eb93..b55838310 100644 --- a/server/src/catalog/snapshot.rs +++ b/server/src/catalog/snapshot.rs @@ -22,6 +22,8 @@ use chrono::{DateTime, Utc}; use crate::query::PartialTimeFilter; +pub const CURRENT_SNAPSHOT_VERSION: &str = "v1"; + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Snapshot { pub version: String, @@ -31,7 +33,7 @@ pub struct Snapshot { impl Default for Snapshot { fn default() -> Self { Self { - version: "v1".to_string(), + version: CURRENT_SNAPSHOT_VERSION.to_string(), manifest_list: Vec::default(), } } diff --git a/server/src/localcache.rs b/server/src/localcache.rs index d15963f21..2f50bf9eb 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -25,11 +25,13 @@ use itertools::{Either, Itertools}; use object_store::{local::LocalFileSystem, ObjectStore}; use once_cell::sync::OnceCell; use tokio::{fs, sync::Mutex}; +use human_size::{SpecificSize, Gigibyte, Byte}; use crate::option::CONFIG; pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; pub const CACHE_META_FILENAME: &str = ".cache_meta.json"; +pub const CURRENT_CACHE_VERSION: &str = "v1"; #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct LocalCache { @@ -42,7 +44,7 @@ pub struct LocalCache { impl LocalCache { fn new() -> Self { Self { - version: "v1".to_string(), + version: CURRENT_CACHE_VERSION.to_string(), current_size: 0, files: Cache::new(100), } @@ -58,7 +60,7 @@ pub struct CacheMeta { impl CacheMeta { fn new() -> Self { Self { - version: "v1".to_string(), + version: CURRENT_CACHE_VERSION.to_string(), size_capacity: 0, } } @@ -97,7 +99,9 @@ impl LocalCacheManager { pub async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { fs::create_dir_all(&self.cache_path).await?; - let path = cache_meta_path(&self.cache_path).unwrap(); + let path = cache_meta_path(&self.cache_path) + .map_err(|err| CacheError::ObjectStoreError(err.into()))?; + let resp = self .filesystem .get(&path) @@ -107,7 +111,15 @@ impl LocalCacheManager { let updated_cache = match resp { Ok(bytes) => { let mut meta: CacheMeta = serde_json::from_slice(&bytes)?; - if !meta.size_capacity == config_capacity { + if meta.size_capacity != config_capacity { + // log the change in cache size + let configured_size_human: SpecificSize<Gigibyte> = SpecificSize::new(config_capacity as f64, Byte).unwrap().into(); + let current_size_human: SpecificSize<Gigibyte> = SpecificSize::new(meta.size_capacity as f64, Byte).unwrap().into(); + log::warn!( + "Cache size is updated from {} to {}", + current_size_human, + configured_size_human + ); meta.size_capacity = config_capacity; Some(meta) } else { @@ -123,10 +135,6 @@ impl LocalCacheManager { }; if let Some(updated_cache) = updated_cache { - log::info!( - "Cache is updated to new size of {} Bytes", - &updated_cache.size_capacity - ); self.filesystem .put(&path, serde_json::to_vec(&updated_cache)?.into()) .await? diff --git a/server/src/main.rs b/server/src/main.rs index 6856e587e..d61568c1f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -54,7 +54,6 @@ use crate::localcache::LocalCacheManager; #[actix_web::main] async fn main() -> anyhow::Result<()> { env_logger::init(); - CONFIG.validate(); let storage = CONFIG.storage().get_object_store(); CONFIG.validate_staging()?; migration::run_metadata_migration(&CONFIG).await?; diff --git a/server/src/option.rs b/server/src/option.rs index 9a390436e..3f84474df 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -26,10 +26,11 @@ use std::sync::Arc; use url::Url; use crate::oidc::{self, OpenidConfig}; -use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; +use crate::storage::{FSConfig, ObjectStorageProvider, S3Config}; use crate::utils::validate_path_is_writeable; pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB +pub const MIN_QUERY_MEM_POOL_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB pub static CONFIG: Lazy<Arc<Config>> = Lazy::new(|| Arc::new(Config::new())); @@ -98,13 +99,6 @@ impl Config { _ => unreachable!(), } } - - pub fn validate(&self) { - if CONFIG.parseable.upload_interval < LOCAL_SYNC_INTERVAL { - panic!("object storage upload_interval (P_STORAGE_UPLOAD_INTERVAL) must be 60 seconds or more"); - } - } - pub fn validate_staging(&self) -> anyhow::Result<()> { let staging_path = self.staging_dir(); validate_path_is_writeable(staging_path) @@ -412,7 +406,7 @@ impl Server { .env("P_CACHE_DIR") .value_name("DIR") .value_parser(validation::canonicalize_path) - .help("Local path to be used for caching latest files") + .help("Local path on this device to be used for caching data") .next_line_help(true), ) .arg( @@ -420,9 +414,9 @@ impl Server { .long(Self::CACHE_SIZE) .env("P_CACHE_SIZE") .value_name("size") - .default_value("1Gib") - .value_parser(validation::human_size_to_bytes) - .help("Size for cache in human readable format (e.g 1GiB, 2GiB, 100MB)") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") .next_line_help(true), ) .arg( @@ -431,7 +425,7 @@ impl Server { .env("P_STORAGE_UPLOAD_INTERVAL") .value_name("SECONDS") .default_value("60") - .value_parser(value_parser!(u64)) + .value_parser(validation::upload_interval) .help("Interval in seconds after which staging data would be sent to the storage") .next_line_help(true), ) @@ -441,7 +435,7 @@ impl Server { .env("P_USERNAME") .value_name("STRING") .required(true) - .help("Admin username for this server"), + .help("Admin username to be set for this Parseable server"), ) .arg( Arg::new(Self::PASSWORD) @@ -449,7 +443,7 @@ impl Server { .env("P_PASSWORD") .value_name("STRING") .required(true) - .help("Admin password for this server"), + .help("Admin password to be set for this Parseable server"), ) .arg( Arg::new(Self::CHECK_UPDATE) @@ -459,7 +453,7 @@ impl Server { .required(false) .default_value("true") .value_parser(value_parser!(bool)) - .help("Disable/Enable checking for updates"), + .help("Enable/Disable checking for new Parseable release"), ) .arg( Arg::new(Self::SEND_ANALYTICS) @@ -469,7 +463,7 @@ impl Server { .required(false) .default_value("true") .value_parser(value_parser!(bool)) - .help("Disable/Enable sending anonymous telemetry"), + .help("Enable/Disable anonymous telemetry data collection"), ) .arg( Arg::new(Self::OPEN_AI_KEY) @@ -477,7 +471,7 @@ impl Server { .env("P_OPENAI_API_KEY") .value_name("STRING") .required(false) - .help("OpenAI key to enable llm feature"), + .help("OpenAI key to enable llm features"), ) .arg( Arg::new(Self::OPENID_CLIENT_ID) @@ -539,8 +533,8 @@ impl Server { .env("P_QUERY_MEMORY_LIMIT") .value_name("STRING") .required(false) - .value_parser(validation::size) - .help("Memory allocated to query in human readable format (e.g 1GiB, 2GiB, 100MB)"), + .value_parser(validation::query_memory_pool_size) + .help("Memory allocated to query sub system (In human readable format, e.g 1GiB, 2GiB, 100MB)"), ) .arg( Arg::new(Self::ROW_GROUP_SIZE) @@ -612,10 +606,11 @@ pub mod validation { str::FromStr, }; - use human_size::SpecificSize; - - use crate::option::MIN_CACHE_SIZE_BYTES; + use human_size::{SpecificSize,multiples}; + use crate::option::{MIN_CACHE_SIZE_BYTES, MIN_QUERY_MEM_POOL_SIZE_BYTES}; + use crate::storage::LOCAL_SYNC_INTERVAL; + pub fn file_path(s: &str) -> Result<PathBuf, String> { if s.is_empty() { return Err("empty path".to_owned()); @@ -652,8 +647,7 @@ pub mod validation { url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string()) } - pub fn human_size_to_bytes(s: &str) -> Result<u64, String> { - use human_size::multiples; + fn human_size_to_bytes(s: &str) -> Result<u64, String> { fn parse_and_map<T: human_size::Multiple>( s: &str, ) -> Result<u64, human_size::ParsingError> { @@ -668,12 +662,35 @@ pub mod validation { .or(parse_and_map::<multiples::Terabyte>(s)) .map_err(|_| "Could not parse given size".to_string())?; + Ok(size) + } + + pub fn cache_size(s: &str) -> Result<u64, String> { + let size = human_size_to_bytes(s)?; if size < MIN_CACHE_SIZE_BYTES { return Err( "Specified value of cache size is smaller than current minimum of 1GiB".to_string(), ); } + Ok(size) + } + pub fn query_memory_pool_size(s: &str) -> Result<u64, String> { + let size = human_size_to_bytes(s)?; + if size < MIN_QUERY_MEM_POOL_SIZE_BYTES { + return Err( + "Specified value of query memory pool size is smaller than current minimum of 1GiB".to_string(), + ); + } Ok(size) } + + pub fn upload_interval(s: &str) -> Result<u64, String> { + let u = s.parse::<u64>().map_err(|_| "invalid upload interval".to_string())?; + if u < LOCAL_SYNC_INTERVAL { + return Err("object storage upload interval must be 60 seconds or more".to_string()); + } + Ok(u) + } + }