Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRU DashMap to cache objectMeta #10125

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use super::cache_unit::{self};
use crate::cache::CacheAccessor;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

/// The cache of listing files statistics.
/// if set [`CacheManagerConfig::with_files_statistics_cache`]
/// Will avoid infer same file statistics repeatedly during the session lifetime,
Expand Down Expand Up @@ -73,7 +73,7 @@ impl CacheManager {
}
}

#[derive(Clone, Default)]
#[derive(Clone)]
pub struct CacheManagerConfig {
/// Enable cache of files statistics when listing files.
/// Avoid get same file statistics repeatedly in same datafusion session.
Expand All @@ -87,7 +87,14 @@ pub struct CacheManagerConfig {
/// Default is disable.
pub list_files_cache: Option<ListFilesCache>,
}

impl Default for CacheManagerConfig {
fn default() -> Self {
Self {
table_files_statistics_cache: None,
list_files_cache: Some(Arc::new(cache_unit::LruMetaCache::default())),
}
}
}
impl CacheManagerConfig {
pub fn with_files_statistics_cache(
mut self,
Expand Down
177 changes: 171 additions & 6 deletions datafusion/execution/src/cache/cache_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::cache::CacheAccessor;

use datafusion_common::Statistics;

use core::panic;
use dashmap::DashMap;
use datafusion_common::Statistics;
use object_store::path::Path;
use object_store::ObjectMeta;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;

/// Collected statistics for files
/// Cache is invalided when file size or last modification has changed
Expand Down Expand Up @@ -156,10 +156,115 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
"DefaultListFilesCache".to_string()
}
}
pub struct LruMetaCache {
// the actual hashmap
stores: DashMap<Path, Arc<Vec<ObjectMeta>>>,
// number of object meta we should store
capacity: usize,
// order in linked list
order: Mutex<VecDeque<Path>>,
// position of the actual path
key_position: DashMap<Path, usize>,
}
impl LruMetaCache {
fn update_order(&self, k: &Path) {
let mut order = self.order.lock();
if let Some(pos) = self.key_position.get(k).map(|x| *x.value()) {
order.remove(pos);
order.push_back(k.clone());
self.key_position.insert(k.clone(), order.len() - 1);
}
}
}
impl Default for LruMetaCache {
fn default() -> Self {
Self {
capacity: 50,
stores: DashMap::new(),
order: Mutex::new(VecDeque::new()),
key_position: DashMap::new(),
}
}
}
impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for LruMetaCache {
type Extra = ObjectMeta;

fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
let value = self.stores.get(k).map(|x| x.clone());
if value.is_some() {
self.update_order(k);
}
value
}

fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Put cache in LruMetaCache without Extra not supported.")
}

fn put(
&self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
let mut order = self.order.lock();
if order.len() == self.capacity {
let oldest = order.pop_front().unwrap();
self.stores.remove(&oldest);
self.key_position.remove(&oldest);
}
let res = self.stores.insert(key.clone(), value);
order.push_back(key.clone());
self.key_position.insert(key.clone(), order.len() - 1);
res
}

fn put_with_extra(
&self,
_key: &Path,
_value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Put cache in LruMetaCache without Extra not supported.")
}

fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
let removed_value = self.stores.remove(k);
if let Some(value) = removed_value {
let mut order = self.order.lock();
if let Some(pos) = self.key_position.get(k).map(|x| *x.value()) {
order.remove(pos);
self.key_position.remove(k);
}
Some(value.1)
} else {
None
}
}

fn contains_key(&self, k: &Path) -> bool {
self.stores.contains_key(k)
}

fn len(&self) -> usize {
self.stores.len()
}

fn clear(&self) {
self.stores.clear();
let mut order = self.order.lock();
order.clear();
self.key_position.clear();
}

fn name(&self) -> String {
"LruMetaCache".to_string()
}
}
#[cfg(test)]
mod tests {
use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
use crate::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultListFilesCache, LruMetaCache,
};
use crate::cache::CacheAccessor;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
Expand Down Expand Up @@ -232,4 +337,64 @@ mod tests {
meta.clone()
);
}
#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that I think would be really helpful in validating if the cache is being used is showing it in our plans with EXPLAIN (i dont know which setting it would best belong in though - i.e. verbose or analyze). For example I believe postgres exposes information like this through its EXPLAIN BUFFERS. At this stage i dont think we need a new keyword though.

Then we could have a test that just reads from a local file and we can assert on the plans output that the source of the file metadata is the cache. This could also be extended for when we implement file caching.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ref to postgres

Specifically:

Include information on buffer usage. Specifically, include the number of shared blocks hit, read, dirtied, and written, the number of local blocks hit, read, dirtied, and written, the number of temp blocks read and written, and the time spent reading and writing data file blocks and temporary file blocks (in milliseconds) if [track_io_timing](https://www.postgresql.org/docs/current/runtime-config-statistics.html#GUC-TRACK-IO-TIMING) is enabled. A hit means that a read was avoided because the block was found already in cache when needed.```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I would first try adding the filter as you suggested on the issue page. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a little more and what i mentioned above may be tangential to this PR since i think it would be more about the cache trait method usage than this specific cache implementation. So i dont think it would be necessary for this.

fn test_lru_cache_single() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};

let cache = LruMetaCache::default();
assert!(cache.get(&meta.location).is_none());

cache.put(&meta.location, vec![meta.clone()].into());
assert_eq!(
cache.get(&meta.location).unwrap().first().unwrap().clone(),
meta.clone()
);
}
use std::sync::Arc;
use std::thread;
#[test]
fn test_lru_cache_concurreny() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};

let cache = Arc::new(LruMetaCache::default());
assert!(cache.get(&meta.location).is_none());

let cache_clone = cache.clone();
let meta_clone = meta.clone();
let handle_put = thread::spawn(move || {
cache_clone.put(&meta_clone.location, Arc::new(vec![meta_clone.clone()]));
});

handle_put.join().unwrap();

let mut handles = vec![];
for _ in 0..10 {
let cache_clone = cache.clone();
let meta_clone = meta.clone();
handles.push(thread::spawn(move || {
let retrieved_meta = cache_clone.get(&meta_clone.location).unwrap();
assert_eq!(retrieved_meta.first().unwrap().clone(), meta_clone);
}));
}

for handle in handles {
handle.join().unwrap();
}
}
}
Loading