Skip to content

Commit

Permalink
Fix help messages for environment variables / flags
Browse files Browse the repository at this point in the history
Fixes #586

This PR also adds

* gRPC port number in startup banner.
* Human readable input for size / capacity type environment variable
  • Loading branch information
nitisht committed Dec 25, 2023
1 parent 83f3cc9 commit a24e3ed
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 40 deletions.
14 changes: 10 additions & 4 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ fn print_ascii_art() {
}

fn status_info(config: &Config, scheme: &str, id: Uid) {
let url = format!("\"{}://{}\"", scheme, config.parseable.address).underlined();
let address = format!(
"\"{}://{}\" ({}), \":{}\" (gRPC)",
scheme,
config.parseable.address,
scheme.to_ascii_uppercase(),
config.parseable.grpc_port
);
let mut credentials =
String::from("\"As set in P_USERNAME and P_PASSWORD environment variables\"");

Expand All @@ -67,12 +73,12 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
eprintln!(
"
{}
URL: {}
Address: {}
Credentials: {}
Deployment UID: \"{}\"
LLM Status: \"{}\"",
LLM: \"{}\"",
"Server:".to_string().bold(),
url,
address,
credentials,
id.to_string(),
llm_status
Expand Down
69 changes: 43 additions & 26 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ pub struct Server {
/// Rows in Parquet Rowgroup
pub row_group_size: usize,

/// Query memory limit in bytes
pub query_memory_pool_size: Option<usize>,
/// Query memory limit in human readable format (e.g 1GiB, 2GiB, 100MB)
pub query_memory_pool_size: Option<String>,

/// Parquet compression algorithm
pub parquet_compression: Compression,
Expand Down Expand Up @@ -263,12 +263,9 @@ impl FromArgMatches for Server {
self.livetail_channel_capacity = m
.get_one::<usize>(Self::LIVETAIL_CAPACITY)
.cloned()
.expect("default for livetail port");
.expect("default for livetail capacity");
// converts Gib to bytes before assigning
self.query_memory_pool_size = m
.get_one::<u8>(Self::QUERY_MEM_POOL_SIZE)
.cloned()
.map(|gib| gib as usize * 1024usize.pow(3));
self.query_memory_pool_size = m.get_one::<String>(Self::QUERY_MEM_POOL_SIZE).cloned();
self.row_group_size = m
.get_one::<usize>(Self::ROW_GROUP_SIZE)
.cloned()
Expand Down Expand Up @@ -356,15 +353,15 @@ impl Server {
.env("P_TLS_CERT_PATH")
.value_name("PATH")
.value_parser(validation::file_path)
.help("The location of TLS Cert file"),
.help("Local path on this device where certificate file is located. Required to enable TLS"),
)
.arg(
Arg::new(Self::TLS_KEY)
.long(Self::TLS_KEY)
.env("P_TLS_KEY_PATH")
.value_name("PATH")
.value_parser(validation::file_path)
.help("The location of TLS Private Key file"),
.help("Local path on this device where private key file is located. Required to enable TLS"),
)
.arg(
Arg::new(Self::ADDRESS)
Expand All @@ -373,7 +370,7 @@ impl Server {
.value_name("ADDR:PORT")
.default_value("0.0.0.0:8000")
.value_parser(validation::socket_addr)
.help("The address on which the http server will listen."),
.help("Address and port for Parseable HTTP(s) server"),
)
.arg(
Arg::new(Self::STAGING)
Expand All @@ -382,7 +379,7 @@ impl Server {
.value_name("DIR")
.default_value("./staging")
.value_parser(validation::canonicalize_path)
.help("The local staging path is used as a temporary landing point for incoming events and local cache")
.help("Local path on this device to be used as landing point for incoming events")
.next_line_help(true),
)
.arg(
Expand All @@ -392,7 +389,7 @@ impl Server {
.value_name("SECONDS")
.default_value("60")
.value_parser(value_parser!(u64))
.help("Interval in seconds after which un-committed data would be sent to the storage")
.help("Interval in seconds after which staging data would be sent to the storage")
.next_line_help(true),
)
.arg(
Expand All @@ -401,15 +398,15 @@ impl Server {
.env("P_USERNAME")
.value_name("STRING")
.required(true)
.help("Username for the basic authentication on the server"),
.help("Admin username for this server"),
)
.arg(
Arg::new(Self::PASSWORD)
.long(Self::PASSWORD)
.env("P_PASSWORD")
.value_name("STRING")
.required(true)
.help("Password for the basic authentication on the server"),
.help("Admin password for this server"),
)
.arg(
Arg::new(Self::CHECK_UPDATE)
Expand All @@ -429,31 +426,31 @@ impl Server {
.required(false)
.default_value("true")
.value_parser(value_parser!(bool))
.help("Disable/Enable sending anonymous user data"),
.help("Disable/Enable sending anonymous telemetry"),
)
.arg(
Arg::new(Self::OPEN_AI_KEY)
.long(Self::OPEN_AI_KEY)
.env("P_OPENAI_API_KEY")
.value_name("STRING")
.required(false)
.help("Set OpenAI key to enable llm feature"),
.help("OpenAI key to enable llm feature"),
)
.arg(
Arg::new(Self::OPENID_CLIENT_ID)
.long(Self::OPENID_CLIENT_ID)
.env("P_OIDC_CLIENT_ID")
.value_name("STRING")
.required(false)
.help("Set client id for oidc provider"),
.help("Client id for OIDC provider"),
)
.arg(
Arg::new(Self::OPENID_CLIENT_SECRET)
.long(Self::OPENID_CLIENT_SECRET)
.env("P_OIDC_CLIENT_SECRET")
.value_name("STRING")
.required(false)
.help("Set client secret for oidc provider"),
.help("Client secret for OIDC provider"),
)
.arg(
Arg::new(Self::OPENID_ISSUER)
Expand All @@ -462,7 +459,7 @@ impl Server {
.value_name("URl")
.required(false)
.value_parser(validation::url)
.help("Set OIDC provider's host address."),
.help("OIDC provider's host address"),
)
.arg(
Arg::new(Self::DOMAIN_URI)
Expand All @@ -471,7 +468,7 @@ impl Server {
.value_name("URL")
.required(false)
.value_parser(validation::url)
.help("Set host global domain address"),
.help("Parseable server global domain address"),
)
.arg(
Arg::new(Self::GRPC_PORT)
Expand All @@ -481,7 +478,7 @@ impl Server {
.default_value("8001")
.required(false)
.value_parser(value_parser!(u16))
.help("Set port for livetail arrow flight server"),
.help("Port for gRPC server"),
)
.arg(
Arg::new(Self::LIVETAIL_CAPACITY)
Expand All @@ -491,16 +488,16 @@ impl Server {
.default_value("1000")
.required(false)
.value_parser(value_parser!(usize))
.help("Set port for livetail arrow flight server"),
.help("Number of rows in livetail channel"),
)
.arg(
Arg::new(Self::QUERY_MEM_POOL_SIZE)
.long(Self::QUERY_MEM_POOL_SIZE)
.env("P_QUERY_MEMORY_LIMIT")
.value_name("Gib")
.value_name("STRING")
.required(false)
.value_parser(value_parser!(u8))
.help("Set a fixed memory limit for query"),
.value_parser(validation::size)
.help("Memory allocated to query in human readable format (e.g 1GiB, 2GiB, 100MB)"),
)
.arg(
Arg::new(Self::ROW_GROUP_SIZE)
Expand All @@ -510,7 +507,7 @@ impl Server {
.required(false)
.default_value("16384")
.value_parser(value_parser!(usize))
.help("Number of rows in a row groups"),
.help("Number of rows in a row group"),
)
.arg(
Arg::new(Self::PARQUET_COMPRESSION_ALGO)
Expand Down Expand Up @@ -571,6 +568,8 @@ pub mod validation {
path::PathBuf,
};

use crate::storage::SIZE_WITH_UNIT_REGEX;

pub fn file_path(s: &str) -> Result<PathBuf, String> {
if s.is_empty() {
return Err("empty path".to_owned());
Expand Down Expand Up @@ -606,4 +605,22 @@ pub mod validation {
pub fn url(s: &str) -> Result<url::Url, String> {
url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string())
}

pub fn size(s: &str) -> Result<(), String> {
if let Ok(re) = regex::Regex::new(SIZE_WITH_UNIT_REGEX) {
if re.captures(s).is_some() {
Ok(())
} else {
Err(
"Invalid size provided. Please provide size in the format '1GiB', '3MiB', etc"
.to_string(),
)
}
} else {
Err(
"Invalid size provided. Please provide size in the format '1GiB', '3MiB', etc"
.to_string(),
)
}
}
}
20 changes: 11 additions & 9 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sysinfo::{System, SystemExt};
use crate::event;
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};
use crate::utils;

use self::error::ExecuteError;

Expand All @@ -68,15 +69,16 @@ impl Query {
.get_datafusion_runtime()
.with_disk_manager(DiskManagerConfig::NewOs);

let (pool_size, fraction) = match CONFIG.parseable.query_memory_pool_size {
Some(size) => (size, 1.),
None => {
let mut system = System::new();
system.refresh_memory();
let available_mem = system.available_memory();
(available_mem as usize, 0.85)
}
};
let (pool_size, fraction) =
match utils::parse_storage_size(CONFIG.parseable.query_memory_pool_size.clone()) {
Some(size) => (size as usize, 1.),
None => {
let mut system = System::new();
system.refresh_memory();
let available_mem = system.available_memory();
(available_mem as usize, 0.85)
}
};

let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
Expand Down
2 changes: 2 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const ACCESS_ALL: &str = "all";
pub const CURRENT_OBJECT_STORE_VERSION: &str = "v3";
pub const CURRENT_SCHEMA_VERSION: &str = "v3";

pub const SIZE_WITH_UNIT_REGEX: &str = r"^(\d+)([TGMKP][iI]?[Bb]?)$";

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ObjectStoreFormat {
/// Version of schema registry
Expand Down
Loading

0 comments on commit a24e3ed

Please sign in to comment.