Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
nitisht committed Dec 29, 2023
1 parent f48aba0 commit c118cd8
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ env-file
parseable
parseable_*
parseable-env-secret
cache
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] }
xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
human-size = "0.4.3"
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
Expand Down
36 changes: 20 additions & 16 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ fn print_ascii_art() {
"#;

eprint!("{ascii_name}");
eprintln!(
"
Welcome to Parseable Server!"
);
}

fn status_info(config: &Config, scheme: &str, id: Uid) {
Expand All @@ -71,26 +67,30 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
None => "Not Configured".grey(),
};

eprintln!(
"
Welcome to Parseable Server! Deployment UID: \"{}\"",
id.to_string(),
);

eprintln!(
"
{}
Address: {}
Credentials: {}
Deployment UID: \"{}\"
LLM: \"{}\"",
"Server:".to_string().bold(),
address,
credentials,
id.to_string(),
llm_status
);
}

/// Prints information about the `ObjectStorage`.
/// - Mode (`Local drive`, `S3 bucket`)
/// - Staging (temporary landing point for incoming events)
/// - Store (path where the data is stored)
/// - Latency
/// - Cache (local cache of data)
/// - Store (path where the data is stored and its latency)
async fn storage_info(config: &Config) {
let storage = config.storage();
let latency = storage.get_object_store().get_latency().await;
Expand All @@ -99,29 +99,33 @@ async fn storage_info(config: &Config) {
"
{}
Mode: \"{}\"
Staging: \"{}\"
Store: \"{}\"
Latency: \"{:?}\"",
Staging: \"{}\"",
"Storage:".to_string().bold(),
config.mode_string(),
config.staging_dir().to_string_lossy(),
storage.get_endpoint(),
latency
);

if let Some(path) = &config.parseable.local_cache_path {
let size: SpecificSize<human_size::Gigabyte> =
let size: SpecificSize<human_size::Gigibyte> =
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
.unwrap()
.into();

eprintln!(
"\
{:8}Cache: \"{}\"
Cache Size: \"{}\"",
{:8}Cache: \"{}\", (size: {})",
"",
path.display(),
size
);
}

eprintln!(
"\
{:8}Store: \"{}\", (latency: {:?})",
"",
storage.get_endpoint(),
latency
);

}
3 changes: 2 additions & 1 deletion server/src/catalog/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum SortOrder {
}

pub type SortInfo = (String, SortOrder);
pub const CURRENT_MANIFEST_VERSION: &str = "v1";

/// An entry in a manifest which points to a single file.
/// Additionally, it is meant to store the statistics for the file it
Expand All @@ -67,7 +68,7 @@ pub struct Manifest {
impl Default for Manifest {
fn default() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_MANIFEST_VERSION.to_string(),
files: Vec::default(),
}
}
Expand Down
4 changes: 3 additions & 1 deletion server/src/catalog/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use chrono::{DateTime, Utc};

use crate::query::PartialTimeFilter;

pub const CURRENT_SNAPSHOT_VERSION: &str = "v1";

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Snapshot {
pub version: String,
Expand All @@ -31,7 +33,7 @@ pub struct Snapshot {
impl Default for Snapshot {
fn default() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_SNAPSHOT_VERSION.to_string(),
manifest_list: Vec::default(),
}
}
Expand Down
24 changes: 16 additions & 8 deletions server/src/localcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ use itertools::{Either, Itertools};
use object_store::{local::LocalFileSystem, ObjectStore};
use once_cell::sync::OnceCell;
use tokio::{fs, sync::Mutex};
use human_size::{SpecificSize, Gigibyte, Byte};

use crate::option::CONFIG;

pub const STREAM_CACHE_FILENAME: &str = ".cache.json";
pub const CACHE_META_FILENAME: &str = ".cache_meta.json";
pub const CURRENT_CACHE_VERSION: &str = "v1";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LocalCache {
Expand All @@ -42,7 +44,7 @@ pub struct LocalCache {
impl LocalCache {
fn new() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_CACHE_VERSION.to_string(),
current_size: 0,
files: Cache::new(100),
}
Expand All @@ -58,7 +60,7 @@ pub struct CacheMeta {
impl CacheMeta {
fn new() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_CACHE_VERSION.to_string(),
size_capacity: 0,
}
}
Expand Down Expand Up @@ -97,7 +99,9 @@ impl LocalCacheManager {

pub async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> {
fs::create_dir_all(&self.cache_path).await?;
let path = cache_meta_path(&self.cache_path).unwrap();
let path = cache_meta_path(&self.cache_path)
.map_err(|err| CacheError::ObjectStoreError(err.into()))?;

let resp = self
.filesystem
.get(&path)
Expand All @@ -107,7 +111,15 @@ impl LocalCacheManager {
let updated_cache = match resp {
Ok(bytes) => {
let mut meta: CacheMeta = serde_json::from_slice(&bytes)?;
if !meta.size_capacity == config_capacity {
if meta.size_capacity != config_capacity {
// log the change in cache size
let configured_size_human: SpecificSize<Gigibyte> = SpecificSize::new(config_capacity as f64, Byte).unwrap().into();
let current_size_human: SpecificSize<Gigibyte> = SpecificSize::new(meta.size_capacity as f64, Byte).unwrap().into();
log::warn!(
"Cache size is updated from {} to {}",
current_size_human,
configured_size_human
);
meta.size_capacity = config_capacity;
Some(meta)
} else {
Expand All @@ -123,10 +135,6 @@ impl LocalCacheManager {
};

if let Some(updated_cache) = updated_cache {
log::info!(
"Cache is updated to new size of {} Bytes",
&updated_cache.size_capacity
);
self.filesystem
.put(&path, serde_json::to_vec(&updated_cache)?.into())
.await?
Expand Down
1 change: 0 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use crate::localcache::LocalCacheManager;
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
CONFIG.validate();
let storage = CONFIG.storage().get_object_store();
CONFIG.validate_staging()?;
migration::run_metadata_migration(&CONFIG).await?;
Expand Down
67 changes: 42 additions & 25 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use std::sync::Arc;
use url::Url;

use crate::oidc::{self, OpenidConfig};
use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL};
use crate::storage::{FSConfig, ObjectStorageProvider, S3Config};
use crate::utils::validate_path_is_writeable;

pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB
pub const MIN_QUERY_MEM_POOL_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB

pub static CONFIG: Lazy<Arc<Config>> = Lazy::new(|| Arc::new(Config::new()));

Expand Down Expand Up @@ -98,13 +99,6 @@ impl Config {
_ => unreachable!(),
}
}

pub fn validate(&self) {
if CONFIG.parseable.upload_interval < LOCAL_SYNC_INTERVAL {
panic!("object storage upload_interval (P_STORAGE_UPLOAD_INTERVAL) must be 60 seconds or more");
}
}

pub fn validate_staging(&self) -> anyhow::Result<()> {
let staging_path = self.staging_dir();
validate_path_is_writeable(staging_path)
Expand Down Expand Up @@ -412,17 +406,17 @@ impl Server {
.env("P_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path to be used for caching latest files")
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE_SIZE)
.long(Self::CACHE_SIZE)
.env("P_CACHE_SIZE")
.value_name("size")
.default_value("1Gib")
.value_parser(validation::human_size_to_bytes)
.help("Size for cache in human readable format (e.g 1GiB, 2GiB, 100MB)")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
Expand All @@ -431,7 +425,7 @@ impl Server {
.env("P_STORAGE_UPLOAD_INTERVAL")
.value_name("SECONDS")
.default_value("60")
.value_parser(value_parser!(u64))
.value_parser(validation::upload_interval)
.help("Interval in seconds after which staging data would be sent to the storage")
.next_line_help(true),
)
Expand All @@ -441,15 +435,15 @@ impl Server {
.env("P_USERNAME")
.value_name("STRING")
.required(true)
.help("Admin username for this server"),
.help("Admin username to be set for this Parseable server"),
)
.arg(
Arg::new(Self::PASSWORD)
.long(Self::PASSWORD)
.env("P_PASSWORD")
.value_name("STRING")
.required(true)
.help("Admin password for this server"),
.help("Admin password to be set for this Parseable server"),
)
.arg(
Arg::new(Self::CHECK_UPDATE)
Expand All @@ -459,7 +453,7 @@ impl Server {
.required(false)
.default_value("true")
.value_parser(value_parser!(bool))
.help("Disable/Enable checking for updates"),
.help("Enable/Disable checking for new Parseable release"),
)
.arg(
Arg::new(Self::SEND_ANALYTICS)
Expand All @@ -469,15 +463,15 @@ impl Server {
.required(false)
.default_value("true")
.value_parser(value_parser!(bool))
.help("Disable/Enable sending anonymous telemetry"),
.help("Enable/Disable anonymous telemetry data collection"),
)
.arg(
Arg::new(Self::OPEN_AI_KEY)
.long(Self::OPEN_AI_KEY)
.env("P_OPENAI_API_KEY")
.value_name("STRING")
.required(false)
.help("OpenAI key to enable llm feature"),
.help("OpenAI key to enable llm features"),
)
.arg(
Arg::new(Self::OPENID_CLIENT_ID)
Expand Down Expand Up @@ -539,8 +533,8 @@ impl Server {
.env("P_QUERY_MEMORY_LIMIT")
.value_name("STRING")
.required(false)
.value_parser(validation::size)
.help("Memory allocated to query in human readable format (e.g 1GiB, 2GiB, 100MB)"),
.value_parser(validation::query_memory_pool_size)
.help("Memory allocated to query sub system (In human readable format, e.g 1GiB, 2GiB, 100MB)"),
)
.arg(
Arg::new(Self::ROW_GROUP_SIZE)
Expand Down Expand Up @@ -612,10 +606,11 @@ pub mod validation {
str::FromStr,
};

use human_size::SpecificSize;

use crate::option::MIN_CACHE_SIZE_BYTES;
use human_size::{SpecificSize,multiples};

use crate::option::{MIN_CACHE_SIZE_BYTES, MIN_QUERY_MEM_POOL_SIZE_BYTES};
use crate::storage::LOCAL_SYNC_INTERVAL;

pub fn file_path(s: &str) -> Result<PathBuf, String> {
if s.is_empty() {
return Err("empty path".to_owned());
Expand Down Expand Up @@ -652,8 +647,7 @@ pub mod validation {
url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string())
}

pub fn human_size_to_bytes(s: &str) -> Result<u64, String> {
use human_size::multiples;
fn human_size_to_bytes(s: &str) -> Result<u64, String> {
fn parse_and_map<T: human_size::Multiple>(
s: &str,
) -> Result<u64, human_size::ParsingError> {
Expand All @@ -668,12 +662,35 @@ pub mod validation {
.or(parse_and_map::<multiples::Terabyte>(s))
.map_err(|_| "Could not parse given size".to_string())?;

Ok(size)
}

pub fn cache_size(s: &str) -> Result<u64, String> {
let size = human_size_to_bytes(s)?;
if size < MIN_CACHE_SIZE_BYTES {
return Err(
"Specified value of cache size is smaller than current minimum of 1GiB".to_string(),
);
}
Ok(size)
}

pub fn query_memory_pool_size(s: &str) -> Result<u64, String> {
let size = human_size_to_bytes(s)?;
if size < MIN_QUERY_MEM_POOL_SIZE_BYTES {
return Err(
"Specified value of query memory pool size is smaller than current minimum of 1GiB".to_string(),
);
}
Ok(size)
}

pub fn upload_interval(s: &str) -> Result<u64, String> {
let u = s.parse::<u64>().map_err(|_| "invalid upload interval".to_string())?;
if u < LOCAL_SYNC_INTERVAL {
return Err("object storage upload interval must be 60 seconds or more".to_string());
}
Ok(u)
}

}

0 comments on commit c118cd8

Please sign in to comment.