Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
target
data
staging
limitcache
examples
cert.pem
key.pem
Expand Down
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] }
xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }

[build-dependencies]
cargo_toml = "0.15"
Expand Down
18 changes: 17 additions & 1 deletion server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

use crossterm::style::Stylize;
use human_size::SpecificSize;

use crate::about;
use crate::utils::uid::Uid;
Expand Down Expand Up @@ -100,5 +101,20 @@ async fn storage_info(config: &Config) {
config.staging_dir().to_string_lossy(),
storage.get_endpoint(),
latency
)
);

if let Some(path) = &config.parseable.local_cache_path {
let size: SpecificSize<human_size::Gigabyte> =
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
.unwrap()
.into();

eprintln!(
"
Cache: \"{}\"
Cache Size: \"{}\"",
path.display(),
size
);
}
}
189 changes: 189 additions & 0 deletions server/src/localcache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::{io, path::PathBuf, sync::Arc};

use fs_extra::file::CopyOptions;
use futures_util::TryFutureExt;
use hashlru::Cache;
use itertools::{Either, Itertools};
use object_store::ObjectStore;
use once_cell::sync::OnceCell;
use tokio::{fs, sync::Mutex};

use crate::option::CONFIG;

pub const STREAM_CACHE_FILENAME: &str = ".cache.json";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LocalCache {
version: String,
current_size: u64,
capacity: u64,
files: Cache<String, PathBuf>,
}

impl LocalCache {
fn new_with_size(capacity: u64) -> Self {
Self {
version: "v1".to_string(),
current_size: 0,
capacity,
files: Cache::new(100),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 100 only here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backing hashlru is capacity based. The capacity is arbitrary and is increased when needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who increases? user or the backing cache?

}
}

fn can_push(&self, size: u64) -> bool {
self.capacity >= self.current_size + size
}
}

pub struct LocalCacheManager {
object_store: Arc<dyn ObjectStore>,
cache_path: PathBuf,
cache_capacity: u64,
copy_options: CopyOptions,
semaphore: Mutex<()>,
}

impl LocalCacheManager {
pub fn global() -> Option<&'static LocalCacheManager> {
static INSTANCE: OnceCell<LocalCacheManager> = OnceCell::new();

let Some(cache_path) = &CONFIG.parseable.local_cache_path else {
return None;
};

Some(INSTANCE.get_or_init(|| {
let cache_path = cache_path.clone();
std::fs::create_dir_all(&cache_path).unwrap();
let object_store = Arc::new(object_store::local::LocalFileSystem::new());
LocalCacheManager {
object_store,
cache_path,
cache_capacity: CONFIG.parseable.local_cache_size,
copy_options: CopyOptions {
overwrite: true,
skip_exist: false,
..CopyOptions::new()
},
semaphore: Mutex::new(()),
}
}))
}

pub async fn get_cache(&self, stream: &str) -> Result<LocalCache, CacheError> {
let path = cache_file_path(&self.cache_path, stream).unwrap();
let res = self
.object_store
.get(&path)
.and_then(|resp| resp.bytes())
.await;
let cache = match res {
Ok(bytes) => serde_json::from_slice(&bytes)?,
Err(object_store::Error::NotFound { .. }) => {
LocalCache::new_with_size(self.cache_capacity)
}
Err(err) => return Err(err.into()),
};
Ok(cache)
}

pub async fn put_cache(&self, stream: &str, cache: &LocalCache) -> Result<(), CacheError> {
let path = cache_file_path(&self.cache_path, stream).unwrap();
let bytes = serde_json::to_vec(cache)?.into();
Ok(self.object_store.put(&path, bytes).await?)
}

pub async fn move_to_cache(
&self,
stream: &str,
key: String,
staging_path: PathBuf,
) -> Result<(), CacheError> {
let lock = self.semaphore.lock().await;
let mut cache_path = self.cache_path.join(stream);
fs::create_dir_all(&cache_path).await?;
cache_path.push(staging_path.file_name().unwrap());
fs_extra::file::move_file(staging_path, &cache_path, &self.copy_options)?;
let file_size = std::fs::metadata(&cache_path)?.len();
let mut cache = self.get_cache(stream).await?;

while !cache.can_push(file_size) {
if let Some((_, file_for_removal)) = cache.files.pop_lru() {
let lru_file_size = std::fs::metadata(&file_for_removal)?.len();
cache.current_size = cache.current_size.saturating_sub(lru_file_size);
log::info!("removing cache entry");
tokio::spawn(fs::remove_file(file_for_removal));
} else {
log::error!("Cache size too small");
break;
}
}

if cache.files.is_full() {
cache.files.resize(cache.files.capacity() * 2);
}
cache.files.push(key, cache_path);
cache.current_size += file_size;
self.put_cache(stream, &cache).await?;
drop(lock);
Ok(())
}

pub async fn partition_on_cached<T>(
&self,
stream: &str,
collection: Vec<T>,
key: fn(&T) -> &String,
) -> Result<(Vec<(T, PathBuf)>, Vec<T>), CacheError> {
let lock = self.semaphore.lock().await;
let mut cache = self.get_cache(stream).await?;
let (cached, remainder): (Vec<_>, Vec<_>) = collection.into_iter().partition_map(|item| {
let key = key(&item);
match cache.files.get(key).cloned() {
Some(path) => Either::Left((item, path)),
None => Either::Right(item),
}
});
self.put_cache(stream, &cache).await?;
drop(lock);
Ok((cached, remainder))
}
}

fn cache_file_path(
root: impl AsRef<std::path::Path>,
stream: &str,
) -> Result<object_store::path::Path, object_store::path::Error> {
let mut path = root.as_ref().join(stream);
path.set_file_name(STREAM_CACHE_FILENAME);
object_store::path::Path::from_absolute_path(path)
}

#[derive(Debug, thiserror::Error)]
pub enum CacheError {
#[error("{0}")]
Serde(#[from] serde_json::Error),
#[error("{0}")]
IOError(#[from] io::Error),
#[error("{0}")]
MoveError(#[from] fs_extra::error::Error),
#[error("{0}")]
ObjectStoreError(#[from] object_store::Error),
}
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod catalog;
mod event;
mod handlers;
mod livetail;
mod localcache;
mod metadata;
mod metrics;
mod migration;
Expand Down
Loading