diff --git a/server/Cargo.toml b/server/Cargo.toml index 148968776..6bfa31428 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -36,7 +36,7 @@ md-5 = "0.10.5" os_info = "3.0.7" hostname = "0.3" rand = "0.8.4" -relative-path = "1.7.2" +relative-path = { version = "1.7.2", features = ["serde"] } rustls = "0.20.6" rustls-pemfile = "1.0.1" rust-flatten-json = "0.2.0" diff --git a/server/src/main.rs b/server/src/main.rs index b80cfe875..d44543dfe 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -54,6 +54,8 @@ mod validator; use option::CONFIG; +use crate::storage::resolve_parseable_metadata; + // Global configurations const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000; const API_BASE_PATH: &str = "/api"; @@ -69,6 +71,7 @@ async fn main() -> anyhow::Result<()> { if let Err(e) = metadata::STREAM_INFO.load(&*storage).await { warn!("could not populate local metadata. {:?}", e); } + resolve_parseable_metadata().await?; // track all parquet files already in the data directory storage::CACHED_FILES.track_parquet(); diff --git a/server/src/option.rs b/server/src/option.rs index 062c2ef9e..4d1987319 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -199,8 +199,7 @@ pub struct Server { long, env = "P_STAGING_DIR", default_value = "./data", - value_name = "path", - value_parser = validation::absolute_path + value_name = "path" )] pub local_staging_path: PathBuf, @@ -245,7 +244,7 @@ impl Server { } } -pub(self) mod validation { +pub mod validation { use std::path::PathBuf; pub fn file_path(s: &str) -> Result { @@ -261,11 +260,4 @@ pub(self) mod validation { Ok(path) } - - pub fn absolute_path(s: &str) -> Result { - std::fs::canonicalize(s).map_err(|_| { - "Could not construct absolute path from given path value for staging directory" - .to_string() - }) - } } diff --git a/server/src/storage.rs b/server/src/storage.rs index 7e7fedf5c..2eb1e830f 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -29,6 +29,7 @@ use datafusion::parquet::errors::ParquetError; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; +use std::fs::create_dir_all; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -36,10 +37,14 @@ mod file_link; mod localfs; mod object_storage; mod s3; +mod store_metadata; pub use localfs::{FSConfig, LocalFS}; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::{S3Config, S3}; +pub use store_metadata::StorageMetadata; + +use self::store_metadata::{put_staging_metadata, startup_check, EnvChange}; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. @@ -132,6 +137,50 @@ impl ObjectStoreFormat { } } +pub async fn resolve_parseable_metadata() -> Result<(), ObjectStorageError> { + let check = startup_check().await?; + const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage"; + let err: Option<&str> = match check { + EnvChange::None => None, + EnvChange::StagingMismatch => Some(MISMATCH), + EnvChange::StorageMismatch => Some(MISMATCH), + EnvChange::NewRemote => { + Some("Could not start the server because metadata not found in storage") + } + EnvChange::NewStaging => { + Some("Could not start the server becuase metadata not found in staging") + } + EnvChange::CreateBoth => { + create_staging_metadata()?; + create_remote_metadata().await?; + None + } + }; + + if let Some(err) = err { + let err = format!( + "{}. {}", + err, + "Join us on Parseable Slack to report this incident : https://launchpass.com/parseable" + ); + let err: Box = err.into(); + Err(ObjectStorageError::UnhandledError(err)) + } else { + Ok(()) + } +} + +async fn create_remote_metadata() -> Result<(), ObjectStorageError> { + let client = CONFIG.storage().get_object_store(); + client.put_metadata(&StorageMetadata::new()).await +} + +fn create_staging_metadata() -> std::io::Result<()> { + create_dir_all(CONFIG.staging_dir())?; + let metadata = StorageMetadata::new(); + put_staging_metadata(&metadata) +} + lazy_static! { pub static ref CACHED_FILES: Mutex> = Mutex::new(FileTable::new()); pub static ref STORAGE_RUNTIME: Arc = CONFIG.storage().get_datafusion_runtime(); @@ -231,7 +280,7 @@ pub enum ObjectStorageError { // Could not connect to object storage #[error("Connection Error: {0}")] - ConnectionError(Box), + ConnectionError(Box), // IO Error when reading a file or listing path #[error("IO Error: {0}")] @@ -242,8 +291,8 @@ pub enum ObjectStorageError { DataFusionError(#[from] datafusion::error::DataFusionError), #[error("Unhandled Error: {0}")] - UnhandledError(Box), + UnhandledError(Box), #[error("Authentication Error: {0}")] - AuthenticationError(Box), + AuthenticationError(Box), } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 056e5d447..758ecd2cf 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -18,7 +18,7 @@ use super::{ file_link::CacheState, AccessObject, LogStream, MoveDataError, ObjectStorageError, - ObjectStoreFormat, StorageDir, CACHED_FILES, + ObjectStoreFormat, StorageDir, StorageMetadata, CACHED_FILES, }; use crate::{alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats}; @@ -44,7 +44,8 @@ use std::{ }; // metadata file names in a Stream prefix -const METADATA_FILE_NAME: &str = ".metadata.json"; +const STREAM_METADATA_FILE_NAME: &str = ".metadata.json"; +pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; const SCHEMA_FILE_NAME: &str = ".schema"; const ALERT_FILE_NAME: &str = ".alert.json"; @@ -89,7 +90,7 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&schema_path(stream_name), "".into()) .await?; - self.put_object(&metadata_json_path(stream_name), format_json) + self.put_object(&stream_json_path(stream_name), format_json) .await?; Ok(()) @@ -105,15 +106,23 @@ pub trait ObjectStorage: Sync + 'static { } async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { - let path = metadata_json_path(stream_name); - let parseable_metadata = self.get_object(&path).await?; + let path = stream_json_path(stream_name); + let stream_metadata = self.get_object(&path).await?; let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); - let mut parseable_metadata: serde_json::Value = - serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + let mut stream_metadata: serde_json::Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); - parseable_metadata["stats"] = stats; + stream_metadata["stats"] = stats; - self.put_object(&path, to_bytes(&parseable_metadata)).await + self.put_object(&path, to_bytes(&stream_metadata)).await + } + + async fn put_metadata( + &self, + parseable_metadata: &StorageMetadata, + ) -> Result<(), ObjectStorageError> { + self.put_object(&parseable_json_path(), to_bytes(parseable_metadata)) + .await } async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError> { @@ -133,17 +142,35 @@ pub trait ObjectStorage: Sync + 'static { } async fn get_stats(&self, stream_name: &str) -> Result { - let parseable_metadata = self.get_object(&metadata_json_path(stream_name)).await?; - let parseable_metadata: Value = - serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; + let stream_metadata: Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); - let stats = &parseable_metadata["stats"]; + let stats = &stream_metadata["stats"]; let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); Ok(stats) } + async fn get_metadata(&self) -> Result, ObjectStorageError> { + let parseable_metadata: Option = + match self.get_object(&parseable_json_path()).await { + Ok(bytes) => { + Some(serde_json::from_slice(&bytes).expect("parseable config is valid json")) + } + Err(err) => { + if matches!(err, ObjectStorageError::NoSuchKey(_)) { + None + } else { + return Err(err); + } + } + }; + + Ok(parseable_metadata) + } + async fn sync(&self) -> Result<(), MoveDataError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); @@ -265,8 +292,13 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { } #[inline(always)] -fn metadata_json_path(stream_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([stream_name, METADATA_FILE_NAME]) +fn stream_json_path(stream_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]) +} + +#[inline(always)] +fn parseable_json_path() -> RelativePathBuf { + RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME) } #[inline(always)] diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs new file mode 100644 index 000000000..47fcf0d98 --- /dev/null +++ b/server/src/storage/store_metadata.rs @@ -0,0 +1,104 @@ +use std::{ + fs::{self, OpenOptions}, + path::PathBuf, +}; + +use serde::{Deserialize, Serialize}; +use std::io; + +use crate::{option::CONFIG, utils::hostname_unchecked}; + +use super::{object_storage::PARSEABLE_METADATA_FILE_NAME, ObjectStorageError}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StorageMetadata { + pub version: String, + pub mode: String, + pub staging: PathBuf, + pub storage: String, + pub deployment_id: String, + pub user: Vec, + pub stream: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct User { + username: String, + password: String, + role: String, +} + +impl StorageMetadata { + pub fn new() -> Self { + Self { + version: "v1".to_string(), + mode: CONFIG.storage_name.to_owned(), + staging: CONFIG.staging_dir().canonicalize().unwrap(), + storage: CONFIG.storage().get_endpoint(), + deployment_id: hostname_unchecked(), + user: Vec::new(), + stream: Vec::new(), + } + } +} + +pub async fn startup_check() -> Result { + let staging_metadata = get_staging_metadata()?; + let storage = CONFIG.storage().get_object_store(); + let remote_metadata = storage.get_metadata().await?; + + Ok(check_metadata_conflict(staging_metadata, remote_metadata)) +} + +fn check_metadata_conflict( + staging_metadata: Option, + remote_metadata: Option, +) -> EnvChange { + match (staging_metadata, remote_metadata) { + (Some(staging), Some(remote)) if staging.mode == remote.mode => { + if staging.storage != remote.storage { + EnvChange::StorageMismatch + } else if staging.staging != remote.staging { + EnvChange::StagingMismatch + } else { + EnvChange::None + } + } + (Some(staging), Some(remote)) if staging.mode != remote.mode => EnvChange::StorageMismatch, + (None, None) => EnvChange::CreateBoth, + (None, Some(_)) => EnvChange::NewStaging, + (Some(_), None) => EnvChange::NewRemote, + _ => unreachable!(), + } +} +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EnvChange { + None, + StagingMismatch, + StorageMismatch, + NewRemote, + NewStaging, + CreateBoth, +} + +fn get_staging_metadata() -> io::Result> { + let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); + let bytes = match fs::read(path) { + Ok(bytes) => bytes, + Err(err) => match err.kind() { + io::ErrorKind::NotFound => return Ok(None), + _ => return Err(err), + }, + }; + + let meta: StorageMetadata = serde_json::from_slice(&bytes).unwrap(); + + Ok(Some(meta)) +} + +pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { + let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); + let mut file = OpenOptions::new().create_new(true).write(true).open(path)?; + serde_json::to_writer(&mut file, meta)?; + Ok(()) +}