Skip to content

Commit

Permalink
fix: help messages for environment variables / flags (#588)
Browse files Browse the repository at this point in the history
This PR also adds

* fix startup check for cache size and improve the error message
* gRPC port number in startup banner.

Fixes #586
  • Loading branch information
nitisht authored Dec 29, 2023
1 parent 631c8f4 commit a3f9ccb
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 65 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ env-file
parseable
parseable_*
parseable-env-secret
cache
46 changes: 29 additions & 17 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ fn print_ascii_art() {
"#;

eprint!("{ascii_name}");
eprintln!(
"
Welcome to Parseable Server!"
);
}

fn status_info(config: &Config, scheme: &str, id: Uid) {
let url = format!("\"{}://{}\"", scheme, config.parseable.address).underlined();
let address = format!(
"\"{}://{}\" ({}), \":{}\" (gRPC)",
scheme,
config.parseable.address,
scheme.to_ascii_uppercase(),
config.parseable.grpc_port
);

let mut credentials =
String::from("\"As set in P_USERNAME and P_PASSWORD environment variables\"");

Expand All @@ -65,15 +68,21 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
None => "Not Configured".grey(),
};

eprintln!(
"
Welcome to Parseable Server! Deployment UID: \"{}\"",
id.to_string(),
);

eprintln!(
"
{}
URL: {}
Address: {}
Credentials: {}
Deployment UID: \"{}\"
LLM Status: \"{}\"",
"Server:".to_string().bold(),
url,
address,
credentials,
id.to_string(),
llm_status
Expand All @@ -83,8 +92,8 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
/// Prints information about the `ObjectStorage`.
/// - Mode (`Local drive`, `S3 bucket`)
/// - Staging (temporary landing point for incoming events)
/// - Store (path where the data is stored)
/// - Latency
/// - Cache (local cache of data)
/// - Store (path where the data is stored and its latency)
async fn storage_info(config: &Config) {
let storage = config.storage();
let latency = storage.get_object_store().get_latency().await;
Expand All @@ -93,29 +102,32 @@ async fn storage_info(config: &Config) {
"
{}
Mode: \"{}\"
Staging: \"{}\"
Store: \"{}\"
Latency: \"{:?}\"",
Staging: \"{}\"",
"Storage:".to_string().bold(),
config.mode_string(),
config.staging_dir().to_string_lossy(),
storage.get_endpoint(),
latency
);

if let Some(path) = &config.parseable.local_cache_path {
let size: SpecificSize<human_size::Gigabyte> =
let size: SpecificSize<human_size::Gigibyte> =
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
.unwrap()
.into();

eprintln!(
"\
{:8}Cache: \"{}\"
Cache Size: \"{}\"",
{:8}Cache: \"{}\", (size: {})",
"",
path.display(),
size
);
}

eprintln!(
"\
{:8}Store: \"{}\", (latency: {:?})",
"",
storage.get_endpoint(),
latency
);
}
3 changes: 2 additions & 1 deletion server/src/catalog/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum SortOrder {
}

pub type SortInfo = (String, SortOrder);
pub const CURRENT_MANIFEST_VERSION: &str = "v1";

/// An entry in a manifest which points to a single file.
/// Additionally, it is meant to store the statistics for the file it
Expand All @@ -67,7 +68,7 @@ pub struct Manifest {
impl Default for Manifest {
fn default() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_MANIFEST_VERSION.to_string(),
files: Vec::default(),
}
}
Expand Down
3 changes: 2 additions & 1 deletion server/src/catalog/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use chrono::{DateTime, Utc};

use crate::query::PartialTimeFilter;

pub const CURRENT_SNAPSHOT_VERSION: &str = "v1";
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Snapshot {
pub version: String,
Expand All @@ -31,7 +32,7 @@ pub struct Snapshot {
impl Default for Snapshot {
fn default() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_SNAPSHOT_VERSION.to_string(),
manifest_list: Vec::default(),
}
}
Expand Down
29 changes: 21 additions & 8 deletions server/src/localcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{io, path::PathBuf};
use fs_extra::file::CopyOptions;
use futures_util::TryFutureExt;
use hashlru::Cache;
use human_size::{Byte, Gigibyte, SpecificSize};
use itertools::{Either, Itertools};
use object_store::{local::LocalFileSystem, ObjectStore};
use once_cell::sync::OnceCell;
Expand All @@ -30,6 +31,7 @@ use crate::option::CONFIG;

pub const STREAM_CACHE_FILENAME: &str = ".cache.json";
pub const CACHE_META_FILENAME: &str = ".cache_meta.json";
pub const CURRENT_CACHE_VERSION: &str = "v1";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LocalCache {
Expand All @@ -42,7 +44,7 @@ pub struct LocalCache {
impl LocalCache {
fn new() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_CACHE_VERSION.to_string(),
current_size: 0,
files: Cache::new(100),
}
Expand All @@ -58,7 +60,7 @@ pub struct CacheMeta {
impl CacheMeta {
fn new() -> Self {
Self {
version: "v1".to_string(),
version: CURRENT_CACHE_VERSION.to_string(),
size_capacity: 0,
}
}
Expand Down Expand Up @@ -97,7 +99,8 @@ impl LocalCacheManager {

pub async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> {
fs::create_dir_all(&self.cache_path).await?;
let path = cache_meta_path(&self.cache_path).unwrap();
let path = cache_meta_path(&self.cache_path)
.map_err(|err| CacheError::ObjectStoreError(err.into()))?;
let resp = self
.filesystem
.get(&path)
Expand All @@ -107,7 +110,21 @@ impl LocalCacheManager {
let updated_cache = match resp {
Ok(bytes) => {
let mut meta: CacheMeta = serde_json::from_slice(&bytes)?;
if !meta.size_capacity == config_capacity {
if meta.size_capacity != config_capacity {
// log the change in cache size
let configured_size_human: SpecificSize<Gigibyte> =
SpecificSize::new(config_capacity as f64, Byte)
.unwrap()
.into();
let current_size_human: SpecificSize<Gigibyte> =
SpecificSize::new(meta.size_capacity as f64, Byte)
.unwrap()
.into();
log::warn!(
"Cache size is updated from {} to {}",
current_size_human,
configured_size_human
);
meta.size_capacity = config_capacity;
Some(meta)
} else {
Expand All @@ -123,10 +140,6 @@ impl LocalCacheManager {
};

if let Some(updated_cache) = updated_cache {
log::info!(
"Cache is updated to new size of {} Bytes",
&updated_cache.size_capacity
);
self.filesystem
.put(&path, serde_json::to_vec(&updated_cache)?.into())
.await?
Expand Down
1 change: 0 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use crate::localcache::LocalCacheManager;
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
CONFIG.validate();
let storage = CONFIG.storage().get_object_store();
CONFIG.validate_staging()?;
migration::run_metadata_migration(&CONFIG).await?;
Expand Down
Loading

0 comments on commit a3f9ccb

Please sign in to comment.