Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ md-5 = "0.10.5"
os_info = "3.0.7"
hostname = "0.3"
rand = "0.8.4"
relative-path = "1.7.2"
relative-path = { version = "1.7.2", features = ["serde"] }
rustls = "0.20.6"
rustls-pemfile = "1.0.1"
rust-flatten-json = "0.2.0"
Expand Down
3 changes: 3 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ mod validator;

use option::CONFIG;

use crate::storage::resolve_parseable_metadata;

// Global configurations
const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000;
const API_BASE_PATH: &str = "/api";
Expand All @@ -69,6 +71,7 @@ async fn main() -> anyhow::Result<()> {
if let Err(e) = metadata::STREAM_INFO.load(&*storage).await {
warn!("could not populate local metadata. {:?}", e);
}
resolve_parseable_metadata().await?;
// track all parquet files already in the data directory
storage::CACHED_FILES.track_parquet();

Expand Down
12 changes: 2 additions & 10 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ pub struct Server {
long,
env = "P_STAGING_DIR",
default_value = "./data",
value_name = "path",
value_parser = validation::absolute_path
value_name = "path"
)]
pub local_staging_path: PathBuf,

Expand Down Expand Up @@ -245,7 +244,7 @@ impl Server {
}
}

pub(self) mod validation {
pub mod validation {
use std::path::PathBuf;

pub fn file_path(s: &str) -> Result<PathBuf, String> {
Expand All @@ -261,11 +260,4 @@ pub(self) mod validation {

Ok(path)
}

pub fn absolute_path(s: &str) -> Result<PathBuf, String> {
std::fs::canonicalize(s).map_err(|_| {
"Could not construct absolute path from given path value for staging directory"
.to_string()
})
}
}
55 changes: 52 additions & 3 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,22 @@ use datafusion::parquet::errors::ParquetError;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};

use std::fs::create_dir_all;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

mod file_link;
mod localfs;
mod object_storage;
mod s3;
mod store_metadata;

pub use localfs::{FSConfig, LocalFS};
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::{S3Config, S3};
pub use store_metadata::StorageMetadata;

use self::store_metadata::{put_staging_metadata, startup_check, EnvChange};

/// local sync interval to move data.records to /tmp dir of that stream.
/// 60 sec is a reasonable value.
Expand Down Expand Up @@ -132,6 +137,50 @@ impl ObjectStoreFormat {
}
}

pub async fn resolve_parseable_metadata() -> Result<(), ObjectStorageError> {
let check = startup_check().await?;
const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage";
let err: Option<&str> = match check {
EnvChange::None => None,
EnvChange::StagingMismatch => Some(MISMATCH),
EnvChange::StorageMismatch => Some(MISMATCH),
EnvChange::NewRemote => {
Some("Could not start the server because metadata not found in storage")
}
EnvChange::NewStaging => {
Some("Could not start the server becuase metadata not found in staging")
}
EnvChange::CreateBoth => {
create_staging_metadata()?;
create_remote_metadata().await?;
None
}
};

if let Some(err) = err {
let err = format!(
"{}. {}",
err,
"Join us on Parseable Slack to report this incident : https://launchpass.com/parseable"
);
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.into();
Err(ObjectStorageError::UnhandledError(err))
} else {
Ok(())
}
}

async fn create_remote_metadata() -> Result<(), ObjectStorageError> {
let client = CONFIG.storage().get_object_store();
client.put_metadata(&StorageMetadata::new()).await
}

fn create_staging_metadata() -> std::io::Result<()> {
create_dir_all(CONFIG.staging_dir())?;
let metadata = StorageMetadata::new();
put_staging_metadata(&metadata)
}

lazy_static! {
pub static ref CACHED_FILES: Mutex<FileTable<FileLink>> = Mutex::new(FileTable::new());
pub static ref STORAGE_RUNTIME: Arc<RuntimeEnv> = CONFIG.storage().get_datafusion_runtime();
Expand Down Expand Up @@ -231,7 +280,7 @@ pub enum ObjectStorageError {

// Could not connect to object storage
#[error("Connection Error: {0}")]
ConnectionError(Box<dyn std::error::Error + Send + 'static>),
ConnectionError(Box<dyn std::error::Error + Send + Sync + 'static>),

// IO Error when reading a file or listing path
#[error("IO Error: {0}")]
Expand All @@ -242,8 +291,8 @@ pub enum ObjectStorageError {
DataFusionError(#[from] datafusion::error::DataFusionError),

#[error("Unhandled Error: {0}")]
UnhandledError(Box<dyn std::error::Error + Send + 'static>),
UnhandledError(Box<dyn std::error::Error + Send + Sync + 'static>),

#[error("Authentication Error: {0}")]
AuthenticationError(Box<dyn std::error::Error + Send + 'static>),
AuthenticationError(Box<dyn std::error::Error + Send + Sync + 'static>),
}
62 changes: 47 additions & 15 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use super::{
file_link::CacheState, AccessObject, LogStream, MoveDataError, ObjectStorageError,
ObjectStoreFormat, StorageDir, CACHED_FILES,
ObjectStoreFormat, StorageDir, StorageMetadata, CACHED_FILES,
};
use crate::{alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats};

Expand All @@ -44,7 +44,8 @@ use std::{
};

// metadata file names in a Stream prefix
const METADATA_FILE_NAME: &str = ".metadata.json";
const STREAM_METADATA_FILE_NAME: &str = ".metadata.json";
pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
const SCHEMA_FILE_NAME: &str = ".schema";
const ALERT_FILE_NAME: &str = ".alert.json";

Expand Down Expand Up @@ -89,7 +90,7 @@ pub trait ObjectStorage: Sync + 'static {

self.put_object(&schema_path(stream_name), "".into())
.await?;
self.put_object(&metadata_json_path(stream_name), format_json)
self.put_object(&stream_json_path(stream_name), format_json)
.await?;

Ok(())
Expand All @@ -105,15 +106,23 @@ pub trait ObjectStorage: Sync + 'static {
}

async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> {
let path = metadata_json_path(stream_name);
let parseable_metadata = self.get_object(&path).await?;
let path = stream_json_path(stream_name);
let stream_metadata = self.get_object(&path).await?;
let stats = serde_json::to_value(stats).expect("stats are perfectly serializable");
let mut parseable_metadata: serde_json::Value =
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
let mut stream_metadata: serde_json::Value =
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");

parseable_metadata["stats"] = stats;
stream_metadata["stats"] = stats;

self.put_object(&path, to_bytes(&parseable_metadata)).await
self.put_object(&path, to_bytes(&stream_metadata)).await
}

async fn put_metadata(
&self,
parseable_metadata: &StorageMetadata,
) -> Result<(), ObjectStorageError> {
self.put_object(&parseable_json_path(), to_bytes(parseable_metadata))
.await
}

async fn get_schema(&self, stream_name: &str) -> Result<Option<Schema>, ObjectStorageError> {
Expand All @@ -133,17 +142,35 @@ pub trait ObjectStorage: Sync + 'static {
}

async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
let parseable_metadata = self.get_object(&metadata_json_path(stream_name)).await?;
let parseable_metadata: Value =
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
let stream_metadata: Value =
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");

let stats = &parseable_metadata["stats"];
let stats = &stream_metadata["stats"];

let stats = serde_json::from_value(stats.clone()).unwrap_or_default();

Ok(stats)
}

async fn get_metadata(&self) -> Result<Option<StorageMetadata>, ObjectStorageError> {
let parseable_metadata: Option<StorageMetadata> =
match self.get_object(&parseable_json_path()).await {
Ok(bytes) => {
Some(serde_json::from_slice(&bytes).expect("parseable config is valid json"))
}
Err(err) => {
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
None
} else {
return Err(err);
}
}
};

Ok(parseable_metadata)
}

async fn sync(&self) -> Result<(), MoveDataError> {
if !Path::new(&CONFIG.staging_dir()).exists() {
return Ok(());
Expand Down Expand Up @@ -265,8 +292,13 @@ fn schema_path(stream_name: &str) -> RelativePathBuf {
}

#[inline(always)]
fn metadata_json_path(stream_name: &str) -> RelativePathBuf {
RelativePathBuf::from_iter([stream_name, METADATA_FILE_NAME])
fn stream_json_path(stream_name: &str) -> RelativePathBuf {
RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME])
}

#[inline(always)]
fn parseable_json_path() -> RelativePathBuf {
RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME)
}

#[inline(always)]
Expand Down
104 changes: 104 additions & 0 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::{
fs::{self, OpenOptions},
path::PathBuf,
};

use serde::{Deserialize, Serialize};
use std::io;

use crate::{option::CONFIG, utils::hostname_unchecked};

use super::{object_storage::PARSEABLE_METADATA_FILE_NAME, ObjectStorageError};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StorageMetadata {
pub version: String,
pub mode: String,
pub staging: PathBuf,
pub storage: String,
pub deployment_id: String,
pub user: Vec<User>,
pub stream: Vec<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct User {
username: String,
password: String,
role: String,
}

impl StorageMetadata {
pub fn new() -> Self {
Self {
version: "v1".to_string(),
mode: CONFIG.storage_name.to_owned(),
staging: CONFIG.staging_dir().canonicalize().unwrap(),
storage: CONFIG.storage().get_endpoint(),
deployment_id: hostname_unchecked(),
user: Vec::new(),
stream: Vec::new(),
}
}
}

pub async fn startup_check() -> Result<EnvChange, ObjectStorageError> {
let staging_metadata = get_staging_metadata()?;
let storage = CONFIG.storage().get_object_store();
let remote_metadata = storage.get_metadata().await?;

Ok(check_metadata_conflict(staging_metadata, remote_metadata))
}

fn check_metadata_conflict(
staging_metadata: Option<StorageMetadata>,
remote_metadata: Option<StorageMetadata>,
) -> EnvChange {
match (staging_metadata, remote_metadata) {
(Some(staging), Some(remote)) if staging.mode == remote.mode => {
if staging.storage != remote.storage {
EnvChange::StorageMismatch
} else if staging.staging != remote.staging {
EnvChange::StagingMismatch
} else {
EnvChange::None
}
}
(Some(staging), Some(remote)) if staging.mode != remote.mode => EnvChange::StorageMismatch,
(None, None) => EnvChange::CreateBoth,
(None, Some(_)) => EnvChange::NewStaging,
(Some(_), None) => EnvChange::NewRemote,
_ => unreachable!(),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnvChange {
None,
StagingMismatch,
StorageMismatch,
NewRemote,
NewStaging,
CreateBoth,
}

fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let bytes = match fs::read(path) {
Ok(bytes) => bytes,
Err(err) => match err.kind() {
io::ErrorKind::NotFound => return Ok(None),
_ => return Err(err),
},
};

let meta: StorageMetadata = serde_json::from_slice(&bytes).unwrap();

Ok(Some(meta))
}

pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> {
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let mut file = OpenOptions::new().create_new(true).write(true).open(path)?;
serde_json::to_writer(&mut file, meta)?;
Ok(())
}