Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
849a678
[spark tests]
shehabgamin Aug 1, 2025
54fde8e
Merge branch 'main' of github.com:lakehq/sail into feat-parquet-metad…
shehabgamin Aug 2, 2025
c94f908
save
shehabgamin Aug 2, 2025
3fde715
Save
shehabgamin Aug 2, 2025
9b55e9b
save
shehabgamin Aug 2, 2025
0f9a474
[spark tests]
shehabgamin Aug 2, 2025
9556080
Merge branch 'main' of github.com:lakehq/sail into feat-parquet-metad…
shehabgamin Aug 2, 2025
32b503c
save
shehabgamin Aug 2, 2025
40461a2
save
shehabgamin Aug 2, 2025
b85ff5d
save
shehabgamin Aug 2, 2025
6a2c6b3
[spark tests]
shehabgamin Aug 2, 2025
81c5cd9
save
shehabgamin Aug 3, 2025
223984d
save
shehabgamin Aug 3, 2025
ee8613b
save
shehabgamin Aug 3, 2025
3e0863c
save
shehabgamin Aug 3, 2025
cb92bb1
Save
shehabgamin Aug 3, 2025
1dadc79
save
shehabgamin Aug 3, 2025
22a7912
save
shehabgamin Aug 3, 2025
29ebce0
save
shehabgamin Aug 3, 2025
225ecb5
save
shehabgamin Aug 3, 2025
5580d8b
save
shehabgamin Aug 3, 2025
b1df777
fix lint
shehabgamin Aug 3, 2025
24d6947
Merge branch 'main' of github.com:lakehq/sail into feat-parquet-metad…
shehabgamin Aug 4, 2025
085e75b
bump
shehabgamin Aug 4, 2025
fc70acc
chore arrow 56
shehabgamin Aug 5, 2025
cdcbc0d
[spark tests]
shehabgamin Aug 5, 2025
4d5c4ba
[spark tests]
shehabgamin Aug 5, 2025
9b1d203
[spark tests]
shehabgamin Aug 5, 2025
80b7b13
save
shehabgamin Aug 5, 2025
436304b
save
shehabgamin Aug 5, 2025
1fe14d4
save
shehabgamin Aug 5, 2025
d9631a4
save
shehabgamin Aug 5, 2025
4e71494
save
shehabgamin Aug 5, 2025
56bb0a0
Save
shehabgamin Aug 5, 2025
fcf74ea
Save
shehabgamin Aug 5, 2025
faa474e
Save
shehabgamin Aug 5, 2025
e9d5937
subset
shehabgamin Aug 5, 2025
df3c137
Save
shehabgamin Aug 6, 2025
7012884
[spark tests]
shehabgamin Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 285 additions & 115 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ percent-encoding = "2.3.1"
rustls = "0.23.29"
dashmap = "6.1.0"
itertools = "0.14.0"
moka = { version = "0.12.10", features = ["sync"] }

######
# The versions of the following dependencies are managed manually.
Expand Down Expand Up @@ -134,6 +135,7 @@ arrow-buffer = { version = "55.2.0" }
arrow-schema = { version = "55.2.0", features = ["serde"] }
arrow-flight = { version = "55.2.0" }
arrow-pyarrow = { version = "55.2.0" }
parquet = { version = "55.2.0" }
serde_arrow = { version = "0.13.5", features = ["arrow-55"] }
# The `object_store` version must match the one used in DataFusion.
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
Expand All @@ -142,7 +144,6 @@ hdfs-native-object-store = "0.14.2"
# Lakehouse
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "9a2c8ea", default-features = false, features = ["rustls"] }
delta_kernel = { version = "0.13.0", features = ["arrow-55", "internal-api", "default-engine"] }
parquet = { version = "55.2.0" }
bytes = "1.9.0"
indexmap = "2.10.0"

Expand Down
23 changes: 23 additions & 0 deletions crates/sail-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "sail-cache"
version.workspace = true
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true

[lints]
workspace = true

[dependencies]
sail-common = { path = "../sail-common" }

thiserror = { workspace = true }
object_store = { workspace = true }
datafusion = { workspace = true }
chrono = { workspace = true }
log = { workspace = true }
moka = { workspace = true }
44 changes: 44 additions & 0 deletions crates/sail-cache/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use sail_common::error::CommonError;
use thiserror::Error;
pub type CacheResult<T> = Result<T, CacheError>;

#[derive(Debug, Error)]
pub enum CacheError {
#[error("missing argument: {0}")]
MissingArgument(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("not supported: {0}")]
NotSupported(String),
#[error("internal error: {0}")]
InternalError(String),
}

impl CacheError {
pub fn missing(message: impl Into<String>) -> Self {
CacheError::MissingArgument(message.into())
}

pub fn invalid(message: impl Into<String>) -> Self {
CacheError::InvalidArgument(message.into())
}

pub fn unsupported(message: impl Into<String>) -> Self {
CacheError::NotSupported(message.into())
}

pub fn internal(message: impl Into<String>) -> Self {
CacheError::InternalError(message.into())
}
}

impl From<CommonError> for CacheError {
fn from(error: CommonError) -> Self {
match error {
CommonError::MissingArgument(message) => CacheError::MissingArgument(message),
CommonError::InvalidArgument(message) => CacheError::InvalidArgument(message),
CommonError::NotSupported(message) => CacheError::NotSupported(message),
CommonError::InternalError(message) => CacheError::InternalError(message),
}
}
}
42 changes: 42 additions & 0 deletions crates/sail-cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use log::error;

pub mod error;
pub mod list_file_cache;
pub mod table_files_statistics_cache;

#[allow(dead_code)]
pub(crate) fn try_parse_memory_limit(limit: &str) -> Option<usize> {
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = match number.parse() {
Ok(n) => n,
Err(_) => {
error!("Memory limit not set! Failed to parse number from '{limit}'");
return None;
}
};
match unit {
"K" => Some((number * 1024.0) as usize),
"M" => Some((number * 1024.0 * 1024.0) as usize),
"G" => Some((number * 1024.0 * 1024.0 * 1024.0) as usize),
_ => {
error!("Memory limit not set! Unsupported unit '{unit}' in memory limit '{limit}'.");
None
}
}
}

pub(crate) fn try_parse_non_zero_u64(number: &str) -> Option<u64> {
match number.parse::<u64>() {
Ok(n) => {
if n == 0 {
None
} else {
Some(n)
}
}
Err(_) => {
error!("Failed to parse '{number}' as u64");
None
}
}
}
130 changes: 130 additions & 0 deletions crates/sail-cache/src/list_file_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use datafusion::execution::cache::CacheAccessor;
use log::debug;
use moka::sync::Cache;
use object_store::path::Path;
use object_store::ObjectMeta;

use crate::try_parse_non_zero_u64;

pub static GLOBAL_LIST_FILES_CACHE: LazyLock<Arc<MokaListFilesCache>> = LazyLock::new(|| {
let ttl = std::env::var("SAIL_RUNTIME__LIST_FILES_CACHE_TTL").ok();
let max_entries = std::env::var("SAIL_RUNTIME__LIST_FILES_CACHE_MAX_ENTRIES").ok();
Arc::new(MokaListFilesCache::new(ttl, max_entries))
});

pub struct MokaListFilesCache {
statistics: Cache<Path, Arc<Vec<ObjectMeta>>>,
}

impl MokaListFilesCache {
pub fn new(ttl: Option<String>, max_entries: Option<String>) -> Self {
let mut builder = Cache::builder();

if let Some(ttl) = ttl {
if let Some(ttl) = try_parse_non_zero_u64(&ttl) {
debug!("Setting TTL for MokaListFilesCache to {ttl}");
builder = builder.time_to_live(Duration::from_secs(ttl));
} else {
debug!("Disabled or invalid TTL for MokaListFilesCache: {ttl}");
}
} else {
debug!("No TTL set for MokaListFilesCache");
}

if let Some(max_entries) = max_entries {
if let Some(max_entries) = try_parse_non_zero_u64(&max_entries) {
debug!("Setting max entries for MokaListFilesCache to {max_entries}");
builder = builder.max_capacity(max_entries);
} else {
debug!("Disabled or invalid max entries for MokaListFilesCache: {max_entries}");
}
} else {
debug!("No max entries set for MokaListFilesCache");
}

Self {
statistics: builder.build(),
}
}
}

impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for MokaListFilesCache {
type Extra = ObjectMeta;

fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.get(k)
}

fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option<Arc<Vec<ObjectMeta>>> {
self.get(k)
}

fn put(&self, key: &Path, value: Arc<Vec<ObjectMeta>>) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.insert(key.clone(), value);
None
}

fn put_with_extra(
&self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.put(key, value)
}

fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.remove(k)
}

fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}

fn len(&self) -> usize {
self.statistics.entry_count() as usize
}

fn clear(&self) {
self.statistics.invalidate_all()
}

fn name(&self) -> String {
"MokaListFilesCache".to_string()
}
}

#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod tests {
use chrono::DateTime;
use object_store::path::Path;
use object_store::ObjectMeta;

use super::*;

#[test]
fn test_list_file_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};

let cache = MokaListFilesCache::new(None, None);
assert!(cache.get(&meta.location).is_none());

cache.put(&meta.location, vec![meta.clone()].into());
assert_eq!(
cache.get(&meta.location).unwrap().first().unwrap().clone(),
meta.clone()
);
}
}
Loading