From 6a89aa15267f40289a6723d14c40c0a8ba38faaa Mon Sep 17 00:00:00 2001 From: dantengsky Date: Sun, 19 May 2024 09:36:47 +0800 Subject: [PATCH] feat: fuzzy table data disk cache key reload (#15566) * feat: fuzzy table data disk cache key reload During query node restart, if the config item `data_cache_key_reload_policy` is set to "fuzzy", disk cache keys will be reloaded from the cache directory instead of directly removing previous cache data. This means that the cache data existing before the restart will not be deleted. Note that during the reloading of cache keys, cache capacity will NOT be checked. Therefore, if `cache.disk.max_bytes` is decreased between restarts, no cached items on disk will be removed immediately. Instead, items will be removed when the first new item is put into the cache. New config item introduced: ~~~ [cache] Policy of data cache key reloading: - Available options: [reset|fuzzy] - "reset": remove previous data cache during restart - "fuzzy": reload cache keys from cache dir, retaining the cache data that existed before the restart data_cache_key_reload_policy = "reset" ~~~ * Update src/query/storages/common/cache/src/providers/disk_cache.rs Co-authored-by: Bohu * cargo fmt * parallel deletion * cleanup --------- Co-authored-by: Bohu --- Cargo.lock | 1 + src/query/config/src/config.rs | 48 ++++- src/query/config/src/inner.rs | 28 ++- src/query/config/src/lib.rs | 1 + .../storages/testdata/configs_table_basic.txt | 1 + src/query/storages/common/cache/Cargo.toml | 1 + .../common/cache/src/providers/disk_cache.rs | 199 ++++++++++++++++-- .../cache/src/providers/table_data_cache.rs | 7 +- .../cache/tests/it/providers/disk_cache.rs | 21 +- .../common/cache_manager/src/cache_manager.rs | 9 + 10 files changed, 290 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc3e480a187fa..75e17f7c4a0c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4881,6 +4881,7 @@ dependencies = [ "hex", "log", "parking_lot 0.12.1", + "rayon", "siphasher", "tempfile", ] diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 64f5fda49d3a0..426c4818b6072 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -71,8 +71,6 @@ use super::inner::QueryConfig as InnerQueryConfig; use crate::background_config::BackgroundConfig; use crate::DATABEND_COMMIT_VERSION; -// FIXME: too much boilerplate here - const CATALOG_HIVE: &str = "hive"; /// Config for `query`. @@ -2806,6 +2804,15 @@ pub struct CacheConfig { )] pub data_cache_storage: CacheStorageTypeConfig, + /// Policy of disk cache restart + #[clap( + long = "cache-data-cache-key-reload-policy", + value_name = "VALUE", + value_enum, + default_value_t + )] + pub data_cache_key_reload_policy: DiskCacheKeyReloadPolicy, + /// Max size of external cache population queue length /// /// the items being queued reference table column raw data, which are @@ -2890,6 +2897,22 @@ impl Default for CacheStorageTypeConfig { } } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ValueEnum)] +#[serde(rename_all = "lowercase")] +pub enum DiskCacheKeyReloadPolicy { + // remove all the disk cache during restart + Reset, + // recovery the cache keys during restart, + // but cache capacity will not be checked + Fuzzy, +} + +impl Default for DiskCacheKeyReloadPolicy { + fn default() -> Self { + Self::Reset + } +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args, Default)] #[serde(default, deny_unknown_fields)] pub struct DiskCacheConfig { @@ -2991,6 +3014,7 @@ mod cache_config_converters { table_data_cache_population_queue_size: value .table_data_cache_population_queue_size, disk_cache_config: value.disk_cache_config.try_into()?, + data_cache_key_reload_policy: value.data_cache_key_reload_policy.try_into()?, table_data_deserialized_data_bytes: value.table_data_deserialized_data_bytes, table_data_deserialized_memory_ratio: value.table_data_deserialized_memory_ratio, }) @@ -3013,6 +3037,7 @@ mod cache_config_converters { inverted_index_filter_memory_ratio: value.inverted_index_filter_memory_ratio, table_prune_partitions_count: value.table_prune_partitions_count, data_cache_storage: value.data_cache_storage.into(), + data_cache_key_reload_policy: value.data_cache_key_reload_policy.into(), table_data_cache_population_queue_size: value .table_data_cache_population_queue_size, disk_cache_config: value.disk_cache_config.into(), @@ -3060,4 +3085,23 @@ mod cache_config_converters { } } } + + impl TryFrom for inner::DiskCacheKeyReloadPolicy { + type Error = ErrorCode; + fn try_from(value: DiskCacheKeyReloadPolicy) -> std::result::Result { + Ok(match value { + DiskCacheKeyReloadPolicy::Reset => inner::DiskCacheKeyReloadPolicy::Reset, + DiskCacheKeyReloadPolicy::Fuzzy => inner::DiskCacheKeyReloadPolicy::Fuzzy, + }) + } + } + + impl From for DiskCacheKeyReloadPolicy { + fn from(value: inner::DiskCacheKeyReloadPolicy) -> Self { + match value { + inner::DiskCacheKeyReloadPolicy::Reset => DiskCacheKeyReloadPolicy::Reset, + inner::DiskCacheKeyReloadPolicy::Fuzzy => DiskCacheKeyReloadPolicy::Fuzzy, + } + } + } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index 9e85c61beb47d..ed428eaba3c17 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -568,6 +568,9 @@ pub struct CacheConfig { /// Storage that hold the raw data caches pub disk_cache_config: DiskCacheConfig, + /// Policy of reloading disk cache keys + pub data_cache_key_reload_policy: DiskCacheKeyReloadPolicy, + /// Max size of in memory table column object cache. By default it is 0 (disabled) /// /// CAUTION: The cache items are deserialized table column objects, may take a lot of memory. @@ -589,7 +592,6 @@ pub struct CacheConfig { pub enum CacheStorageTypeConfig { None, Disk, - // Redis, } impl Default for CacheStorageTypeConfig { @@ -598,6 +600,20 @@ impl Default for CacheStorageTypeConfig { } } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum DiskCacheKeyReloadPolicy { + // remove all the disk cache during restart + Reset, + // recovery the cache keys during restart, + // but cache capacity will not be checked + Fuzzy, +} +impl Default for DiskCacheKeyReloadPolicy { + fn default() -> Self { + Self::Reset + } +} + impl ToString for CacheStorageTypeConfig { fn to_string(&self) -> String { match self { @@ -607,6 +623,15 @@ impl ToString for CacheStorageTypeConfig { } } +impl ToString for DiskCacheKeyReloadPolicy { + fn to_string(&self) -> String { + match self { + DiskCacheKeyReloadPolicy::Reset => "reset".to_string(), + DiskCacheKeyReloadPolicy::Fuzzy => "fuzzy".to_string(), + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct DiskCacheConfig { /// Max bytes of cached raw table data. Default 20GB, set it to 0 to disable it. @@ -643,6 +668,7 @@ impl Default for CacheConfig { data_cache_storage: Default::default(), table_data_cache_population_queue_size: 0, disk_cache_config: Default::default(), + data_cache_key_reload_policy: Default::default(), table_data_deserialized_data_bytes: 0, table_data_deserialized_memory_ratio: 0, } diff --git a/src/query/config/src/lib.rs b/src/query/config/src/lib.rs index 568de4004231b..9fbb452ce70dd 100644 --- a/src/query/config/src/lib.rs +++ b/src/query/config/src/lib.rs @@ -46,6 +46,7 @@ pub use inner::CacheConfig; pub use inner::CacheStorageTypeConfig as CacheStorageTypeInnerConfig; pub use inner::CatalogConfig; pub use inner::CatalogHiveConfig; +pub use inner::DiskCacheKeyReloadPolicy; pub use inner::InnerConfig; pub use inner::ThriftProtocol; pub use version::DATABEND_COMMIT_VERSION; diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 46b141adb2947..4826106fc5b72 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -4,6 +4,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo +-----------+--------------------------------------------+----------------------------------------------------------------+----------+ | Column 0 | Column 1 | Column 2 | Column 3 | +-----------+--------------------------------------------+----------------------------------------------------------------+----------+ +| 'cache' | 'data_cache_key_reload_policy' | 'reset' | '' | | 'cache' | 'data_cache_storage' | 'none' | '' | | 'cache' | 'disk.max_bytes' | '21474836480' | '' | | 'cache' | 'disk.path' | './.databend/_cache' | '' | diff --git a/src/query/storages/common/cache/Cargo.toml b/src/query/storages/common/cache/Cargo.toml index 6208ac4a93264..add2b0a56060e 100644 --- a/src/query/storages/common/cache/Cargo.toml +++ b/src/query/storages/common/cache/Cargo.toml @@ -25,6 +25,7 @@ crossbeam-channel = "0.5.6" hex = "0.4.3" log = { workspace = true } parking_lot = { workspace = true } +rayon = "1.9.0" siphasher = "0.3.10" [dev-dependencies] diff --git a/src/query/storages/common/cache/src/providers/disk_cache.rs b/src/query/storages/common/cache/src/providers/disk_cache.rs index f4d0e421cf701..f628fef9b65cf 100644 --- a/src/query/storages/common/cache/src/providers/disk_cache.rs +++ b/src/query/storages/common/cache/src/providers/disk_cache.rs @@ -20,7 +20,10 @@ use std::io::Read; use std::io::Write; use std::path::Path; use std::path::PathBuf; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Instant; use bytes::Bytes; use databend_common_cache::Cache; @@ -31,8 +34,11 @@ use databend_common_cache::LruCache; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use log::error; +use log::info; use log::warn; use parking_lot::RwLock; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; use siphasher::sip128; use siphasher::sip128::Hasher128; @@ -69,7 +75,7 @@ impl From<&DiskCacheKey> for PathBuf { } impl DiskCache -where C: Cache +where C: Cache + Send + Sync + 'static { /// Create an `DiskCache` with `hashbrown::hash_map::DefaultHashBuilder` that stores files in `path`, /// limited to `size` bytes. @@ -80,18 +86,19 @@ where C: Cache /// /// The cache is not observant of changes to files under `path` from external sources, it /// expects to have sole maintenance of the contents. - pub fn new(path: T, size: u64) -> self::result::Result + pub fn new(path: T, size: u64, fuzzy_reload_cache_keys: bool) -> self::result::Result where PathBuf: From { DiskCache { cache: C::with_meter_and_hasher(size, FileSize, DefaultHashBuilder::default()), root: PathBuf::from(path), } - .init() + .init(fuzzy_reload_cache_keys) } } +type CacheHolder = Arc>>>; impl DiskCache -where C: Cache +where C: Cache + Send + Sync + 'static { /// Return the current size of all the files in the cache. pub fn size(&self) -> u64 { @@ -122,14 +129,152 @@ where C: Cache self.root.join(rel_path) } - fn init(self) -> self::result::Result { - // remove dir when init, ignore remove error - if let Err(e) = fs::remove_dir_all(&self.root) { - warn!("remove disk cache dir {:?} error {}", self.root, e); + fn reset_restart(cache_root: &PathBuf) -> result::Result<()> { + // remove dir recursively, ignore error + fn parallel_delete(path: &PathBuf, counter: &AtomicUsize) { + if let Ok(entries) = fs::read_dir(path) { + entries.par_bridge().for_each(|entry| { + if let Ok(entry) = entry { + let entry_path = entry.path(); + if entry_path.is_dir() { + info!("deleting content of path {:?}", entry_path); + parallel_delete(&entry_path, counter); + info!("path {:?} cleaned", entry_path); + } else if let Err(e) = fs::remove_file(&entry_path) { + warn!("failed to remove file {:?}. {}", entry_path, e); + } else { + let count = counter.fetch_add(1, Ordering::SeqCst) + 1; + if count % 1000 == 0 { + info!("Deleted {} files", count); + } + } + } + }); + } + if let Err(e) = fs::remove_dir(path) { + warn!("failed to remove path {:?}. {}", path, e); + } } - fs::create_dir_all(&self.root)?; - Ok(self) + let counter = AtomicUsize::new(0); + parallel_delete(cache_root, &counter); + + info!("all reset tasks done, {:?} cache items removed", counter); + Ok(()) + } + + fn fuzzy_restart(root: &PathBuf, me: CacheHolder) -> result::Result> { + fn populate_cache_key( + prefix: &Path, + path: &Path, + entry: &fs::DirEntry, + cache_holder: &CacheHolder, + ) -> result::Result<()> + where + C: Cache + Send + Sync + 'static, + { + let size = entry.metadata()?.len(); + let relative_path = path + .strip_prefix(prefix) + .map_err(|_| self::Error::MalformedPath(path.to_path_buf()))?; + let cache_key = recovery_cache_key_from_path(relative_path); + { + let mut cache_guard = cache_holder.write(); + let dick_cache_opt = (*cache_guard).as_mut(); + dick_cache_opt + .expect("unreachable, disk cache should be there") + .cache + .put(cache_key, size); + } + Ok(()) + } + + fn parallel_scan( + cache_root: &Path, + working_path: &PathBuf, + cache_holder: &CacheHolder, + counter: &AtomicUsize, + ) where + C: Cache + Send + Sync + 'static, + { + if let Ok(entries) = fs::read_dir(working_path) { + entries.par_bridge().for_each(|entry| { + if let Ok(entry) = entry { + let entry_path = entry.path(); + if entry_path.is_dir() { + info!("scanning path {:?}", entry_path); + parallel_scan(cache_root, &entry_path, cache_holder, counter); + info!("path {:?} processed", entry_path); + } else if let Err(e) = + populate_cache_key(cache_root, &entry_path, &entry, cache_holder) + { + warn!("failed to process path {:?}, error: {}", entry, e); + } else { + let count = counter.fetch_add(1, Ordering::SeqCst) + 1; + if count % 1000 == 0 { + info!("scanned {} files", count); + } + } + } + }); + } + } + + let counter = AtomicUsize::new(0); + parallel_scan(root, root, &me, &counter); + info!( + "all reload-cache-key tasks done, {:?} keys reloaded", + counter, + ); + + Ok(me) + } + + fn init(self, fuzzy_reload_cache_keys: bool) -> self::result::Result { + let begin = Instant::now(); + let parallelism = match std::thread::available_parallelism() { + Ok(degree) => degree.get(), + Err(e) => { + error!( + "failed to detect the number of parallelism: {}, fallback to 8", + e + ); + 8 + } + }; + + info!( + "initializing disk cache, parallelism set to {}", + parallelism + ); + + let thread_pool = ThreadPoolBuilder::new() + .num_threads(parallelism) + .thread_name(|index| format!("data-cache-restart-worker-{}", index)) + .build() + .expect("failed to build disk cache restart thread pool"); + + let ret = if fuzzy_reload_cache_keys { + info!("disk cache fuzzy restart"); + let cache_root = self.root.clone(); + let cache_holder = thread_pool + .install(|| Self::fuzzy_restart(&cache_root, Arc::new(RwLock::new(Some(self)))))?; + let me = { + let mut write_guard = cache_holder.write(); + std::mem::take(&mut *write_guard).expect("failed to take back cache object") + }; + me + } else { + info!("disk cache reset restart"); + thread_pool.install(|| Self::reset_restart(&self.root))?; + self + }; + + fs::create_dir_all(&ret.root)?; + + // error(if any) will be reported by the caller site + info!("disk cache initialized. time used: {:?}", begin.elapsed()); + Ok(ret) } /// Returns `true` if the disk cache can store a file of `size` bytes. @@ -216,6 +361,7 @@ pub mod result { use std::error::Error as StdError; use std::fmt; use std::io; + use std::path::PathBuf; /// Errors returned by this crate. #[derive(Debug)] @@ -223,17 +369,21 @@ pub mod result { /// The file was too large to fit in the cache. FileTooLarge, /// The file was not in the cache. - MalformedPath, + MalformedPath(PathBuf), /// An IO Error occurred. Io(io::Error), + + /// unclassified errors + Misc(String), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::FileTooLarge => write!(f, "File too large"), - Error::MalformedPath => write!(f, "Malformed catch file path"), + Error::MalformedPath(p) => write!(f, "Malformed catch file path: {:?}", p), Error::Io(ref e) => write!(f, "{e}"), + Error::Misc(msg) => write!(f, "{msg}"), } } } @@ -280,7 +430,7 @@ impl CacheAccessor { if let Err(e) = validate_checksum(bytes.as_slice()) { - error!("data cache, of key {k}, crc validation failure: {e}"); + error!("disk cache, of key {k}, crc validation failure: {e}"); { // remove the invalid cache, error of removal ignored let r = { @@ -346,6 +496,23 @@ impl CacheAccessor String { + let key_string = match relative_path.file_name() { + Some(file_name) => match file_name.to_str() { + Some(str) => str.to_owned(), + None => { + // relative_path is constructed by ourself, and shall be valid utf8 string + unreachable!() + } + }, + None => { + // only called during init, and only path of files are passed in + unreachable!() + } + }; + key_string +} + /// The crc32 checksum is stored at the end of `bytes` and encoded as le u32. // Although parquet page has built-in crc, but it is optional (and not generated in parquet2) fn validate_checksum(bytes: &[u8]) -> Result<()> { @@ -377,9 +544,11 @@ impl LruDiskCacheBuilder { pub fn new_disk_cache( path: &PathBuf, disk_cache_bytes_size: u64, + fuzzy_reload_cache_keys: bool, ) -> Result { - let external_cache = DiskCache::new(path, disk_cache_bytes_size) - .map_err(|e| ErrorCode::StorageOther(format!("create disk cache failed, {e}")))?; + let external_cache = + DiskCache::new(path, disk_cache_bytes_size, fuzzy_reload_cache_keys) + .map_err(|e| ErrorCode::StorageOther(format!("create disk cache failed, {e}")))?; Ok(Arc::new(RwLock::new(external_cache))) } } diff --git a/src/query/storages/common/cache/src/providers/table_data_cache.rs b/src/query/storages/common/cache/src/providers/table_data_cache.rs index 7796eaa92112e..62096d6a376f8 100644 --- a/src/query/storages/common/cache/src/providers/table_data_cache.rs +++ b/src/query/storages/common/cache/src/providers/table_data_cache.rs @@ -77,8 +77,13 @@ impl TableDataCacheBuilder { path: &PathBuf, population_queue_size: u32, disk_cache_bytes_size: u64, + fuzzy_reload_cache_keys: bool, ) -> Result> { - let disk_cache = LruDiskCacheBuilder::new_disk_cache(path, disk_cache_bytes_size)?; + let disk_cache = LruDiskCacheBuilder::new_disk_cache( + path, + disk_cache_bytes_size, + fuzzy_reload_cache_keys, + )?; let (tx, rx) = crossbeam_channel::bounded(population_queue_size as usize); let num_population_thread = 1; Ok(TableDataCache { diff --git a/src/query/storages/common/cache/tests/it/providers/disk_cache.rs b/src/query/storages/common/cache/tests/it/providers/disk_cache.rs index fdc29a7564caf..3cf3a2cc8a5c5 100644 --- a/src/query/storages/common/cache/tests/it/providers/disk_cache.rs +++ b/src/query/storages/common/cache/tests/it/providers/disk_cache.rs @@ -64,19 +64,22 @@ impl TestFixture { #[test] fn test_empty_dir() { let f = TestFixture::new(); - DiskCache::new(f.tmp(), 1024).unwrap(); + let fuzzy_reload_cache_keys = false; + DiskCache::new(f.tmp(), 1024, fuzzy_reload_cache_keys).unwrap(); } #[test] fn test_missing_root() { let f = TestFixture::new(); - DiskCache::new(f.tmp().join("not-here"), 1024).unwrap(); + let fuzzy_reload_cache_keys = false; + DiskCache::new(f.tmp().join("not-here"), 1024, fuzzy_reload_cache_keys).unwrap(); } #[test] fn test_insert_bytes() { let f = TestFixture::new(); - let mut c = DiskCache::new(f.tmp(), 25).unwrap(); + let fuzzy_reload_cache_keys = false; + let mut c = DiskCache::new(f.tmp(), 25, fuzzy_reload_cache_keys).unwrap(); c.insert_single_slice("a/b/c", &[0; 10]).unwrap(); assert!(c.contains_key("a/b/c")); c.insert_single_slice("a/b/d", &[0; 10]).unwrap(); @@ -95,7 +98,8 @@ fn test_insert_bytes() { fn test_insert_bytes_exact() { // Test that files adding up to exactly the size limit works. let f = TestFixture::new(); - let mut c = DiskCache::new(f.tmp(), 20).unwrap(); + let fuzzy_reload_cache_keys = false; + let mut c = DiskCache::new(f.tmp(), 20, fuzzy_reload_cache_keys).unwrap(); c.insert_single_slice("file1", &[1; 10]).unwrap(); c.insert_single_slice("file2", &[2; 10]).unwrap(); assert_eq!(c.size(), 20); @@ -108,7 +112,8 @@ fn test_insert_bytes_exact() { fn test_add_get_lru() { let f = TestFixture::new(); { - let mut c = DiskCache::new(f.tmp(), 25).unwrap(); + let fuzzy_reload_cache_keys = false; + let mut c = DiskCache::new(f.tmp(), 25, fuzzy_reload_cache_keys).unwrap(); c.insert_single_slice("file1", &[1; 10]).unwrap(); c.insert_single_slice("file2", &[2; 10]).unwrap(); // Get the file to bump its LRU status. @@ -127,7 +132,8 @@ fn test_add_get_lru() { #[test] fn test_insert_bytes_too_large() { let f = TestFixture::new(); - let mut c = DiskCache::new(f.tmp(), 1).unwrap(); + let fuzzy_reload_cache_keys = false; + let mut c = DiskCache::new(f.tmp(), 1, fuzzy_reload_cache_keys).unwrap(); match c.insert_single_slice("a/b/c", &[0; 2]) { Err(DiskCacheError::FileTooLarge) => {} x => panic!("Unexpected result: {x:?}"), @@ -137,7 +143,8 @@ fn test_insert_bytes_too_large() { #[test] fn test_evict_until_enough_space() { let f = TestFixture::new(); - let mut c = DiskCache::new(f.tmp(), 4).unwrap(); + let fuzzy_reload_cache_keys = false; + let mut c = DiskCache::new(f.tmp(), 4, fuzzy_reload_cache_keys).unwrap(); c.insert_single_slice("file1", &[1; 1]).unwrap(); c.insert_single_slice("file2", &[2; 2]).unwrap(); c.insert_single_slice("file3", &[3; 1]).unwrap(); diff --git a/src/query/storages/common/cache_manager/src/cache_manager.rs b/src/query/storages/common/cache_manager/src/cache_manager.rs index 4d1b651d4e904..48b7a627c2bd5 100644 --- a/src/query/storages/common/cache_manager/src/cache_manager.rs +++ b/src/query/storages/common/cache_manager/src/cache_manager.rs @@ -20,6 +20,7 @@ use databend_common_cache::CountableMeter; use databend_common_cache::DefaultHashBuilder; use databend_common_config::CacheConfig; use databend_common_config::CacheStorageTypeInnerConfig; +use databend_common_config::DiskCacheKeyReloadPolicy; use databend_common_exception::Result; use databend_storages_common_cache::InMemoryCacheBuilder; use databend_storages_common_cache::InMemoryItemCacheHolder; @@ -97,6 +98,7 @@ impl CacheManager { &real_disk_cache_root, queue_size, config.disk_cache_config.max_bytes, + config.data_cache_key_reload_policy.clone(), )? } } @@ -270,12 +272,19 @@ impl CacheManager { path: &PathBuf, population_queue_size: u32, disk_cache_bytes_size: u64, + restart_policy: DiskCacheKeyReloadPolicy, ) -> Result> { + let fuzzy_reload_cache_keys = match restart_policy { + DiskCacheKeyReloadPolicy::Reset => false, + DiskCacheKeyReloadPolicy::Fuzzy => true, + }; + if disk_cache_bytes_size > 0 { let cache_holder = TableDataCacheBuilder::new_table_data_disk_cache( path, population_queue_size, disk_cache_bytes_size, + fuzzy_reload_cache_keys, )?; Ok(Some(cache_holder)) } else {