Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ async fn main() -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
CONFIG.validate_staging()?;
CONFIG.validate_storage().await?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
metadata.set_global();
Expand Down
33 changes: 32 additions & 1 deletion server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;
use url::Url;

use crate::oidc::{self, OpenidConfig};
use crate::storage::{FSConfig, ObjectStorageProvider, S3Config};
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
use crate::utils::validate_path_is_writeable;

pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB
Expand Down Expand Up @@ -104,6 +104,37 @@ impl Config {
validate_path_is_writeable(staging_path)
}

pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> {
let obj_store = self.storage.get_object_store();
let rel_path = relative_path::RelativePathBuf::from(".parseable.json");

let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok();

let has_dirs = match obj_store.list_dirs_in_storage().await {
Ok(dirs) => dirs.is_empty(),
Err(_) => false,
};

let has_streams = obj_store.list_streams().await.is_ok();

if has_dirs || has_parseable_json && has_streams {
Ok(())
} else if has_parseable_json && !has_streams {
Err(ObjectStorageError::Custom(
"Parseable config found, but the Storage contains some stale data.".to_owned(),
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please correct the error to -
Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server. Join us on Parseable Slack to report this incident : https://launchpass.com/parseable

Also, Parseable.json file gets created in storage, staging directory gets created, parseable.json gets created in staging -- none of these should happen in case of error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error will be triggered if someone added some data to the storage where parseable stores data.
This case is the least likeliest to happen.

))
} else if !has_parseable_json && !has_streams && !has_dirs {
Err(ObjectStorageError::Custom(
"Storage contains some stale data, Please provide an Empty Storage".to_owned(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide the scenario when can this happen -!has_parseable_json && !has_streams && !has_dirs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable has_dirs does not properly convey the intention.

In this case .parseable.json, folders with .stream.json do not exists, but there is some data present in the storage.
This will happen if parseable is given a storage that is not empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am updating the logic of the if condition, so the intention is clearer.

))
} else {
Err(ObjectStorageError::Custom(
"Parseable config is missing, but streams are present in Storage.\nJoin us on Parseable Slack"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide the scenario when can this happen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is again a highly unlikely scenario to happen.
This error will be triggered if .parseable.json was deleted from the storage manually.

.to_owned()
))
}
}

pub fn storage(&self) -> Arc<dyn ObjectStorageProvider + Send + Sync> {
self.storage.clone()
}
Expand Down
17 changes: 17 additions & 0 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,23 @@ impl ObjectStorage for LocalFS {
Ok(logstreams)
}

async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError> {
let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?)
.try_collect::<Vec<DirEntry>>()
.await?
.into_iter()
.map(dir_name);

let dirs = FuturesUnordered::from_iter(dirs)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>();

Ok(dirs)
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
let path = self.root.join(stream_name);
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
Expand Down
1 change: 1 addition & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn check(&self) -> Result<(), ObjectStorageError>;
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;

Expand Down
12 changes: 12 additions & 0 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,18 @@ impl ObjectStorage for S3 {
fn store_url(&self) -> url::Url {
url::Url::parse(&format!("s3://{}", self.bucket)).unwrap()
}

async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError> {
let pre = object_store::path::Path::from("/");
let resp = self.client.list_with_delimiter(Some(&pre)).await?;

Ok(resp
.common_prefixes
.iter()
.flat_map(|path| path.parts())
.map(|name| name.as_ref().to_string())
.collect::<Vec<_>>())
}
}

impl From<object_store::Error> for ObjectStorageError {
Expand Down