From 9fea2199856f61fb2c312ef8b88ad8eecb5eddc5 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Tue, 17 Oct 2023 22:06:36 +0800 Subject: [PATCH] feat: Split cache name and label --- dozer-api/src/cache_builder/mod.rs | 7 +- dozer-api/src/lib.rs | 2 +- dozer-api/src/test_utils.rs | 1 + dozer-cache/benches/cache.rs | 1 + .../src/cache/lmdb/cache/dump_restore.rs | 12 +- .../conflict_resolution_tests.rs | 2 +- .../cache/main_environment/dump_restore.rs | 7 +- .../lmdb/cache/main_environment/hash_tests.rs | 2 +- .../cache/lmdb/cache/main_environment/mod.rs | 23 +-- .../main_environment/operation_log/mod.rs | 23 ++- .../main_environment/operation_log/tests.rs | 6 +- dozer-cache/src/cache/lmdb/cache/mod.rs | 31 ++-- .../secondary_environment/dump_restore.rs | 8 +- .../lmdb/cache/secondary_environment/mod.rs | 16 +- dozer-cache/src/cache/lmdb/cache_manager.rs | 146 +++++++++++++++--- dozer-cache/src/cache/lmdb/indexing.rs | 4 +- .../src/cache/lmdb/tests/read_write.rs | 7 +- dozer-cache/src/cache/lmdb/tests/utils.rs | 2 +- dozer-cache/src/cache/lmdb/utils.rs | 32 ++-- dozer-cache/src/cache/mod.rs | 16 +- dozer-cache/src/errors.rs | 2 + dozer-cache/src/main.rs | 17 +- .../src/cache_tests/film/load_database.rs | 6 +- 23 files changed, 266 insertions(+), 107 deletions(-) diff --git a/dozer-api/src/cache_builder/mod.rs b/dozer-api/src/cache_builder/mod.rs index b41fff5d32..7f1ee38ca2 100644 --- a/dozer-api/src/cache_builder/mod.rs +++ b/dozer-api/src/cache_builder/mod.rs @@ -77,13 +77,18 @@ pub fn open_or_create_cache( connections: &HashSet, write_options: CacheWriteOptions, ) -> Result, CacheError> { - match cache_manager.open_rw_cache(labels.clone(), write_options)? { + match cache_manager.open_rw_cache( + labels.to_non_empty_string().into_owned(), + labels.clone(), + write_options, + )? { Some(cache) => { debug_assert!(cache.get_schema() == &schema); Ok(cache) } None => { let cache = cache_manager.create_cache( + labels.to_non_empty_string().into_owned(), labels, schema.0, schema.1, diff --git a/dozer-api/src/lib.rs b/dozer-api/src/lib.rs index fe984597dd..1fda4ede40 100644 --- a/dozer-api/src/lib.rs +++ b/dozer-api/src/lib.rs @@ -136,7 +136,7 @@ fn open_cache_reader( labels: Labels, ) -> Result, ApiInitError> { let cache = cache_manager - .open_ro_cache(labels) + .open_ro_cache(labels.to_non_empty_string().into_owned(), labels) .map_err(ApiInitError::OpenOrCreateCache)?; Ok(cache.map(CacheReader::new)) } diff --git a/dozer-api/src/test_utils.rs b/dozer-api/src/test_utils.rs index 1a8ba2a27a..48ef76e4e6 100644 --- a/dozer-api/src/test_utils.rs +++ b/dozer-api/src/test_utils.rs @@ -110,6 +110,7 @@ pub fn initialize_cache( let (schema, secondary_indexes) = schema.unwrap_or_else(get_schema); let mut cache = cache_manager .create_cache( + labels.to_non_empty_string().into_owned(), labels, schema, secondary_indexes, diff --git a/dozer-cache/benches/cache.rs b/dozer-cache/benches/cache.rs index 11b7a4ba8d..9b38d73b90 100644 --- a/dozer-cache/benches/cache.rs +++ b/dozer-cache/benches/cache.rs @@ -58,6 +58,7 @@ fn cache(c: &mut Criterion) { let cache = Mutex::new( cache_manager .create_cache( + "temp".to_string(), Default::default(), schema, secondary_indexes, diff --git a/dozer-cache/src/cache/lmdb/cache/dump_restore.rs b/dozer-cache/src/cache/lmdb/cache/dump_restore.rs index c00cd70873..1aa380a518 100644 --- a/dozer-cache/src/cache/lmdb/cache/dump_restore.rs +++ b/dozer-cache/src/cache/lmdb/cache/dump_restore.rs @@ -80,21 +80,21 @@ pub async fn dump<'txn, T: Transaction, C: LmdbCache>( } pub async fn restore( - options: &CacheOptions, + options: CacheOptions, write_options: CacheWriteOptions, indexing_thread_pool: Arc>, reader: &mut (impl AsyncRead + Unpin), ) -> Result { info!("Restoring cache with options {options:?}"); let rw_main_env = - main_environment::dump_restore::restore(options, write_options, reader).await?; + main_environment::dump_restore::restore(&options, write_options, reader).await?; let options = CacheOptions { path: Some(( rw_main_env.base_path().to_path_buf(), - rw_main_env.labels().clone(), + rw_main_env.name().to_string(), )), - ..*options + ..options }; let ro_main_env = rw_main_env.share(); @@ -103,7 +103,7 @@ pub async fn restore( for index in 0..ro_main_env.schema().1.len() { let name = secondary_environment_name(index); let rw_secondary_env = - secondary_environment::dump_restore::restore(name, &options, reader).await?; + secondary_environment::dump_restore::restore(name, options.clone(), reader).await?; let ro_secondary_env = rw_secondary_env.share(); rw_secondary_envs.push(rw_secondary_env); @@ -156,7 +156,7 @@ mod tests { } let restored_cache = restore( - &Default::default(), + Default::default(), Default::default(), indexing_thread_pool.clone(), &mut data.as_slice(), diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs index 35847f7642..f7f15d994b 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs @@ -18,7 +18,7 @@ fn init_env(conflict_resolution: ConflictResolution) -> (RwMainEnvironment, Sche ..Default::default() }; let main_env = - RwMainEnvironment::new(Some(&schema), None, &Default::default(), write_options).unwrap(); + RwMainEnvironment::new(Some(&schema), None, Default::default(), write_options).unwrap(); (main_env, schema.0) } diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs index a232f75ffa..643e5caf32 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs @@ -53,7 +53,7 @@ pub async fn restore( reader: &mut (impl AsyncRead + Unpin), ) -> Result { info!("Restoring main environment with options {options:?}"); - let (mut env, (base_path, labels), temp_dir) = create_env(options)?; + let (mut env, (base_path, name), temp_dir) = create_env(options)?; info!("Restoring schema"); dozer_storage::restore(&mut env, reader).await?; @@ -62,7 +62,8 @@ pub async fn restore( info!("Restoring connection snapshotting done"); dozer_storage::restore(&mut env, reader).await?; info!("Restoring operation log"); - let operation_log = OperationLog::restore(&mut env, reader, labels).await?; + let operation_log = + OperationLog::restore(&mut env, reader, name, options.labels.clone()).await?; let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?; let commit_state = LmdbOption::open(&env, Some(COMMIT_STATE_DB_NAME))?; @@ -142,7 +143,7 @@ pub mod tests { let mut env = RwMainEnvironment::new( Some(&(schema, vec![])), None, - &Default::default(), + Default::default(), Default::default(), ) .unwrap(); diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/hash_tests.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/hash_tests.rs index ea77767475..c0caf737d7 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/hash_tests.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/hash_tests.rs @@ -10,7 +10,7 @@ fn test_hash_insert_delete_insert() { let mut env = RwMainEnvironment::new( Some(&(schema, vec![])), None, - &Default::default(), + Default::default(), Default::default(), ) .unwrap(); diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs index d0801abdfd..94f89c1186 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs @@ -53,6 +53,10 @@ pub trait MainEnvironment: LmdbEnvironment { &self.common().base_path } + fn name(&self) -> &str { + self.common().operation_log.name() + } + fn labels(&self) -> &Labels { self.common().operation_log.labels() } @@ -154,12 +158,12 @@ impl RwMainEnvironment { pub fn new( schema: Option<&SchemaWithIndex>, connections: Option<&HashSet>, - options: &CacheOptions, + options: CacheOptions, write_options: CacheWriteOptions, ) -> Result { - let (mut env, (base_path, labels), temp_dir) = create_env(options)?; + let (mut env, (base_path, name), temp_dir) = create_env(&options)?; - let operation_log = OperationLog::create(&mut env, labels.clone())?; + let operation_log = OperationLog::create(&mut env, name.clone(), options.labels)?; let schema_option = LmdbOption::create(&mut env, Some(SCHEMA_DB_NAME))?; let commit_state = LmdbOption::create(&mut env, Some(COMMIT_STATE_DB_NAME))?; let connection_snapshotting_done = @@ -173,7 +177,7 @@ impl RwMainEnvironment { (Some(schema), Some(old_schema)) => { if &old_schema != schema { return Err(CacheError::SchemaMismatch { - name: labels.to_string(), + name: name.clone(), given: Box::new(schema.clone()), stored: Box::new(old_schema), }); @@ -206,7 +210,7 @@ impl RwMainEnvironment { if &existing_connections != connections { return Err(CacheError::ConnectionsMismatch(Box::new( ConnectionMismatch { - name: labels.to_string(), + name, given: connections.clone(), stored: existing_connections, }, @@ -587,10 +591,11 @@ impl MainEnvironment for RoMainEnvironment { } impl RoMainEnvironment { - pub fn new(options: &CacheOptions) -> Result { - let (env, (base_path, labels), _temp_dir) = open_env(options)?; + pub fn new(options: CacheOptions) -> Result { + let (env, (base_path, name)) = open_env(&options)?; + let base_path = base_path.to_path_buf(); - let operation_log = OperationLog::open(&env, labels.clone())?; + let operation_log = OperationLog::open(&env, name.to_string(), options.labels)?; let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?; let commit_state = LmdbOption::open(&env, Some(COMMIT_STATE_DB_NAME))?; let connection_snapshotting_done = @@ -604,7 +609,7 @@ impl RoMainEnvironment { Ok(Self { env, common: MainEnvironmentCommon { - base_path: base_path.to_path_buf(), + base_path, schema, schema_option, commit_state, diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/mod.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/mod.rs index a3f3c64f4a..2d7b102afa 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/mod.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/mod.rs @@ -52,6 +52,8 @@ pub struct OperationLog { next_operation_id: LmdbCounter, /// Operation_id -> operation. operation_id_to_operation: LmdbMap, + /// The cache name. + name: String, /// The cache labels. labels: Labels, } @@ -69,7 +71,11 @@ const OPERATION_ID_TO_OPERATION_DB_NAME: &str = "operation_id_to_operation"; const CACHE_OPERATION_LOG_COUNTER_NAME: &str = "cache_operation_log"; impl OperationLog { - pub fn create(env: &mut RwLmdbEnvironment, labels: Labels) -> Result { + pub fn create( + env: &mut RwLmdbEnvironment, + name: String, + labels: Labels, + ) -> Result { describe_counter!( CACHE_OPERATION_LOG_COUNTER_NAME, "Number of operations stored in the cache" @@ -87,11 +93,16 @@ impl OperationLog { present_operation_ids, next_operation_id, operation_id_to_operation, + name, labels, }) } - pub fn open(env: &E, labels: Labels) -> Result { + pub fn open( + env: &E, + name: String, + labels: Labels, + ) -> Result { let primary_key_metadata = PrimaryKeyMetadata::open(env)?; let hash_metadata = HashMetadata::open(env)?; let present_operation_ids = LmdbSet::open(env, Some(PRESENT_OPERATION_IDS_DB_NAME))?; @@ -104,10 +115,15 @@ impl OperationLog { present_operation_ids, next_operation_id, operation_id_to_operation, + name, labels, }) } + pub fn name(&self) -> &str { + &self.name + } + pub fn labels(&self) -> &Labels { &self.labels } @@ -506,6 +522,7 @@ impl OperationLog { pub async fn restore<'txn, R: tokio::io::AsyncRead + Unpin>( env: &mut RwLmdbEnvironment, reader: &mut R, + name: String, labels: Labels, ) -> Result { info!("Restoring primary key metadata"); @@ -518,7 +535,7 @@ impl OperationLog { dozer_storage::restore(env, reader).await?; info!("Restoring operation id to operation"); dozer_storage::restore(env, reader).await?; - Self::open(env, labels).map_err(Into::into) + Self::open(env, name, labels).map_err(Into::into) } } diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/tests.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/tests.rs index dc15567ea6..0d40d00982 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/tests.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/tests.rs @@ -53,7 +53,7 @@ pub fn assert_operation_log_equal( #[test] fn test_operation_log_append_only() { let mut env = create_env(&Default::default()).unwrap().0; - let log = OperationLog::create(&mut env, Default::default()).unwrap(); + let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap(); let txn = env.txn_mut().unwrap(); let append_only = true; @@ -96,7 +96,7 @@ fn test_operation_log_append_only() { #[test] fn test_operation_log_with_primary_key() { let mut env = create_env(&Default::default()).unwrap().0; - let log = OperationLog::create(&mut env, Default::default()).unwrap(); + let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap(); let txn = env.txn_mut().unwrap(); let append_only = false; @@ -219,7 +219,7 @@ fn test_operation_log_with_primary_key() { #[test] fn test_operation_log_without_primary_key() { let mut env = create_env(&Default::default()).unwrap().0; - let log = OperationLog::create(&mut env, Default::default()).unwrap(); + let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap(); let txn = env.txn_mut().unwrap(); let append_only = false; diff --git a/dozer-cache/src/cache/lmdb/cache/mod.rs b/dozer-cache/src/cache/lmdb/cache/mod.rs index 0c61f0d5ca..3b79165828 100644 --- a/dozer-cache/src/cache/lmdb/cache/mod.rs +++ b/dozer-cache/src/cache/lmdb/cache/mod.rs @@ -39,8 +39,11 @@ pub struct CacheOptions { pub intersection_chunk_size: usize, /// Provide a path where db will be created. If nothing is provided, will default to a temp location. - /// Db path will be `PathBuf.join(Labels.to_non_empty_string())`. - pub path: Option<(PathBuf, Labels)>, + /// Db path will be `PathBuf.join(name)`. + pub path: Option<(PathBuf, String)>, + + /// The labels to attach to the cache. + pub labels: Labels, } impl Default for CacheOptions { @@ -51,6 +54,7 @@ impl Default for CacheOptions { max_size: 1024 * 1024 * 1024, intersection_chunk_size: 100, path: None, + labels: Labels::default(), } } } @@ -62,10 +66,12 @@ pub struct LmdbRoCache { } impl LmdbRoCache { - pub fn new(options: &CacheOptions) -> Result { - let main_env = RoMainEnvironment::new(options)?; + pub fn new(options: CacheOptions) -> Result { + let main_env = RoMainEnvironment::new(options.clone())?; let secondary_envs = (0..main_env.schema().1.len()) - .map(|index| RoSecondaryEnvironment::new(secondary_environment_name(index), options)) + .map(|index| { + RoSecondaryEnvironment::new(secondary_environment_name(index), options.clone()) + }) .collect::>()?; Ok(Self { main_env, @@ -85,18 +91,19 @@ impl LmdbRwCache { pub fn new( schema: Option<&SchemaWithIndex>, connections: Option<&HashSet>, - options: &CacheOptions, + options: CacheOptions, write_options: CacheWriteOptions, indexing_thread_pool: Arc>, ) -> Result { - let rw_main_env = RwMainEnvironment::new(schema, connections, options, write_options)?; + let rw_main_env = + RwMainEnvironment::new(schema, connections, options.clone(), write_options)?; let options = CacheOptions { path: Some(( rw_main_env.base_path().to_path_buf(), - rw_main_env.labels().clone(), + rw_main_env.name().to_string(), )), - ..*options + ..options }; let ro_main_env = rw_main_env.share(); @@ -105,7 +112,7 @@ impl LmdbRwCache { for (index, index_definition) in ro_main_env.schema().1.iter().enumerate() { let name = secondary_environment_name(index); let rw_secondary_env = - RwSecondaryEnvironment::new(index_definition, name.clone(), &options)?; + RwSecondaryEnvironment::new(index_definition, name.clone(), options.clone())?; let ro_secondary_env = rw_secondary_env.share(); rw_secondary_envs.push(rw_secondary_env); @@ -125,6 +132,10 @@ impl LmdbRwCache { } impl RoCache for C { + fn name(&self) -> &str { + self.main_env().name() + } + fn labels(&self) -> &Labels { self.main_env().labels() } diff --git a/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs b/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs index 1b34157185..d946058b83 100644 --- a/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs +++ b/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs @@ -43,7 +43,7 @@ pub async fn dump<'txn, E: SecondaryEnvironment, T: Transaction>( pub async fn restore( name: String, - options: &CacheOptions, + options: CacheOptions, reader: &mut (impl AsyncRead + Unpin), ) -> Result { info!("Restoring secondary environment {name} with options {options:?}"); @@ -139,7 +139,7 @@ pub mod tests { let mut main_env = RwMainEnvironment::new( Some(&(schema, vec![])), None, - &Default::default(), + Default::default(), Default::default(), ) .unwrap(); @@ -160,7 +160,7 @@ pub mod tests { let mut env = RwSecondaryEnvironment::new( &IndexDefinition::SortedInverted(vec![0]), "0".to_string(), - &Default::default(), + Default::default(), ) .unwrap(); { @@ -186,7 +186,7 @@ pub mod tests { } } - let restored_env = restore("0".to_string(), &Default::default(), &mut data.as_slice()) + let restored_env = restore("0".to_string(), Default::default(), &mut data.as_slice()) .await .unwrap(); assert_secondary_env_equal(&env, &restored_env); diff --git a/dozer-cache/src/cache/lmdb/cache/secondary_environment/mod.rs b/dozer-cache/src/cache/lmdb/cache/secondary_environment/mod.rs index 717a15fa4a..843ff30ae2 100644 --- a/dozer-cache/src/cache/lmdb/cache/secondary_environment/mod.rs +++ b/dozer-cache/src/cache/lmdb/cache/secondary_environment/mod.rs @@ -80,7 +80,7 @@ impl RwSecondaryEnvironment { pub fn new( index_definition: &IndexDefinition, name: String, - options: &CacheOptions, + options: CacheOptions, ) -> Result { let mut env = create_env(&get_cache_options(name.clone(), options))?.0; @@ -212,7 +212,7 @@ impl SecondaryEnvironment for RoSecondaryEnvironment { } impl RoSecondaryEnvironment { - pub fn new(name: String, options: &CacheOptions) -> Result { + pub fn new(name: String, options: CacheOptions) -> Result { let env = open_env(&get_cache_options(name.clone(), options))?.0; let database = LmdbMultimap::open(&env, Some(DATABASE_DB_NAME))?; @@ -239,14 +239,12 @@ impl RoSecondaryEnvironment { pub mod dump_restore; -fn get_cache_options(name: String, options: &CacheOptions) -> CacheOptions { - let path = options.path.as_ref().map(|(main_base_path, main_labels)| { - let base_path = main_base_path.join(format!("{}_index", main_labels)); - let mut labels = Labels::empty(); - labels.push("secondary_index", name); - (base_path, labels) +fn get_cache_options(name: String, options: CacheOptions) -> CacheOptions { + let path = options.path.as_ref().map(|(main_base_path, main_name)| { + let base_path = main_base_path.join(format!("{}_index", main_name)); + (base_path, format!("secondary_index_{name}")) }); - CacheOptions { path, ..*options } + CacheOptions { path, ..options } } fn set_comparator( diff --git a/dozer-cache/src/cache/lmdb/cache_manager.rs b/dozer-cache/src/cache/lmdb/cache_manager.rs index 6f672c1612..96d6c7aa29 100644 --- a/dozer-cache/src/cache/lmdb/cache_manager.rs +++ b/dozer-cache/src/cache/lmdb/cache_manager.rs @@ -1,8 +1,11 @@ use std::collections::HashSet; +use std::ops::Deref; use std::{path::PathBuf, sync::Arc}; use dozer_storage::{lmdb_storage::LmdbEnvironmentManager, LmdbMap, RwLmdbEnvironment}; +use dozer_storage::{LmdbEnvironment, RoLmdbEnvironment}; use dozer_tracing::Labels; +use dozer_types::borrow::IntoOwned; use dozer_types::parking_lot::RwLock; use dozer_types::{ parking_lot::Mutex, @@ -61,6 +64,8 @@ impl Default for CacheManagerOptions { pub struct LmdbRoCacheManager { options: CacheManagerOptions, base_path: PathBuf, + alias_to_real_name: LmdbMap, + env: RoLmdbEnvironment, } impl LmdbRoCacheManager { @@ -70,12 +75,30 @@ impl LmdbRoCacheManager { .as_deref() .ok_or(CacheError::PathNotInitialized)?; let base_path = base_path.to_path_buf(); - Ok(Self { options, base_path }) + + let env = LmdbEnvironmentManager::create_ro( + &base_path, + LMDB_CACHE_MANAGER_ALIAS_ENV_NAME, + Default::default(), + )?; + let alias_to_real_name = LmdbMap::open(&env, None)?; + + Ok(Self { + options, + base_path, + alias_to_real_name, + env, + }) } // HACK: We're leaking internal types here. - pub fn open_lmdb_cache(&self, labels: Labels) -> Result, CacheError> { - open_ro_cache(self.base_path.clone(), labels, &self.options) + pub fn open_lmdb_cache( + &self, + name_or_alias: String, + labels: Labels, + ) -> Result, CacheError> { + let name = resolve_alias(&self.env, self.alias_to_real_name, name_or_alias)?; + open_ro_cache(self.base_path.clone(), name, labels, &self.options) } } @@ -83,8 +106,12 @@ impl LmdbRoCacheManager { pub use super::cache::dump_restore::{begin_dump_txn, dump}; impl RoCacheManager for LmdbRoCacheManager { - fn open_ro_cache(&self, labels: Labels) -> Result>, CacheError> { - self.open_lmdb_cache(labels) + fn open_ro_cache( + &self, + name_or_alias: String, + labels: Labels, + ) -> Result>, CacheError> { + self.open_lmdb_cache(name_or_alias, labels) .map(|cache| cache.map(|cache| Box::new(cache) as _)) } } @@ -143,12 +170,13 @@ impl LmdbRwCacheManager { pub async fn restore_cache( &self, + name: String, labels: Labels, write_options: CacheWriteOptions, reader: &mut (impl AsyncRead + Unpin), ) -> Result<(), CacheError> { dump_restore::restore( - &cache_options(&self.options, self.base_path.clone(), labels), + cache_options(&self.options, self.base_path.clone(), name, labels), write_options, self.indexing_thread_pool.clone(), reader, @@ -159,13 +187,23 @@ impl LmdbRwCacheManager { } impl RoCacheManager for LmdbRwCacheManager { - fn open_ro_cache(&self, labels: Labels) -> Result>, CacheError> { + fn open_ro_cache( + &self, + name_or_alias: String, + labels: Labels, + ) -> Result>, CacheError> { + let name = resolve_alias( + self.env.read().deref(), + self.alias_to_real_name, + name_or_alias, + )?; + // Check if the cache is already opened. - if let Some(cache) = self.indexing_thread_pool.lock().find_cache(&labels) { + if let Some(cache) = self.indexing_thread_pool.lock().find_cache(&name) { return Ok(Some(Box::new(cache) as _)); } - open_ro_cache(self.base_path.clone(), labels, &self.options) + open_ro_cache(self.base_path.clone(), name, labels, &self.options) .map(|cache| cache.map(|cache| Box::new(cache) as _)) } } @@ -173,15 +211,21 @@ impl RoCacheManager for LmdbRwCacheManager { impl RwCacheManager for LmdbRwCacheManager { fn open_rw_cache( &self, + name_or_alias: String, labels: Labels, write_options: CacheWriteOptions, ) -> Result>, CacheError> { + let name = resolve_alias( + self.env.read().deref(), + self.alias_to_real_name, + name_or_alias, + )?; let cache: Option> = - if LmdbEnvironmentManager::exists(&self.base_path, &labels.to_non_empty_string()) { + if LmdbEnvironmentManager::exists(&self.base_path, &name) { let cache = LmdbRwCache::new( None, None, - &cache_options(&self.options, self.base_path.clone(), labels), + cache_options(&self.options, self.base_path.clone(), name, labels), write_options, self.indexing_thread_pool.clone(), )?; @@ -194,16 +238,21 @@ impl RwCacheManager for LmdbRwCacheManager { fn create_cache( &self, + name: String, labels: Labels, schema: Schema, indexes: Vec, connections: &HashSet, write_options: CacheWriteOptions, ) -> Result, CacheError> { + if name.is_empty() { + return Err(CacheError::EmptyName); + } + let cache = LmdbRwCache::new( Some(&(schema, indexes)), Some(connections), - &cache_options(&self.options, self.base_path.clone(), labels), + cache_options(&self.options, self.base_path.clone(), name, labels), write_options, self.indexing_thread_pool.clone(), )?; @@ -221,9 +270,24 @@ impl RwCacheManager for LmdbRwCacheManager { const LMDB_CACHE_MANAGER_ALIAS_ENV_NAME: &str = "__DOZER_CACHE_MANAGER_ALIAS__"; +fn resolve_alias( + env: &E, + alias_to_real_name: LmdbMap, + name_or_alias: String, +) -> Result { + Ok( + if let Some(real_name) = alias_to_real_name.get(&env.begin_txn()?, &name_or_alias)? { + real_name.into_owned() + } else { + name_or_alias + }, + ) +} + fn cache_options( options: &CacheManagerOptions, base_path: PathBuf, + name: String, labels: Labels, ) -> CacheOptions { CacheOptions { @@ -231,17 +295,19 @@ fn cache_options( max_readers: options.max_readers, max_size: options.max_size, intersection_chunk_size: options.intersection_chunk_size, - path: Some((base_path, labels)), + path: Some((base_path, name)), + labels, } } fn open_ro_cache( base_path: PathBuf, + name: String, labels: Labels, options: &CacheManagerOptions, ) -> Result, CacheError> { - let cache = if LmdbEnvironmentManager::exists(&base_path, &labels.to_non_empty_string()) { - let cache = LmdbRoCache::new(&cache_options(options, base_path, labels))?; + let cache = if LmdbEnvironmentManager::exists(&base_path, &name) { + let cache = LmdbRoCache::new(cache_options(options, base_path, name, labels))?; Some(cache) } else { None @@ -256,8 +322,9 @@ mod tests { #[test] fn test_lmdb_cache_manager() { let cache_manager = LmdbRwCacheManager::new(Default::default()).unwrap(); - let labels = cache_manager + let name = cache_manager .create_cache( + "temp".to_string(), Default::default(), Schema::default(), vec![], @@ -265,24 +332,53 @@ mod tests { Default::default(), ) .unwrap() - .labels() - .clone(); - // Test open with labels. + .name() + .to_string(); + // Test open with name. assert_eq!( cache_manager - .open_rw_cache(labels.clone(), Default::default()) + .open_rw_cache(name.clone(), Default::default(), Default::default()) .unwrap() .unwrap() - .labels(), - &labels + .name(), + &name ); + // Test open with alias. + let alias = "alias".to_string(); + cache_manager.create_alias(&name, &alias).unwrap(); + assert_eq!( + cache_manager + .open_rw_cache(alias.clone(), Default::default(), Default::default()) + .unwrap() + .unwrap() + .name(), + &name + ); + assert_eq!( + cache_manager + .open_ro_cache(alias.clone(), Default::default()) + .unwrap() + .unwrap() + .name(), + &name + ); + let LmdbRwCacheManager { + base_path, + _temp_dir, + .. + } = cache_manager; + let cache_manager = LmdbRoCacheManager::new(CacheManagerOptions { + path: Some(base_path), + ..Default::default() + }) + .unwrap(); assert_eq!( cache_manager - .open_ro_cache(labels.clone()) + .open_ro_cache(alias, Default::default()) .unwrap() .unwrap() - .labels(), - &labels + .name(), + &name ); } } diff --git a/dozer-cache/src/cache/lmdb/indexing.rs b/dozer-cache/src/cache/lmdb/indexing.rs index bc61706b3f..863b00baf9 100644 --- a/dozer-cache/src/cache/lmdb/indexing.rs +++ b/dozer-cache/src/cache/lmdb/indexing.rs @@ -63,9 +63,9 @@ impl IndexingThreadPool { } } - pub fn find_cache(&self, labels: &Labels) -> Option { + pub fn find_cache(&self, name: &str) -> Option { for cache in self.caches.iter() { - if cache.main_env.labels() == labels { + if cache.main_env.name() == name { let secondary_envs = cache .secondary_envs .iter() diff --git a/dozer-cache/src/cache/lmdb/tests/read_write.rs b/dozer-cache/src/cache/lmdb/tests/read_write.rs index 3c83ee250f..65a4fa6db1 100644 --- a/dozer-cache/src/cache/lmdb/tests/read_write.rs +++ b/dozer-cache/src/cache/lmdb/tests/read_write.rs @@ -12,7 +12,7 @@ use tempdir::TempDir; #[test] fn read_and_write() { let path = TempDir::new("dozer").unwrap(); - let path = (path.path().to_path_buf(), Default::default()); + let path = (path.path().to_path_buf(), "temp".to_string()); // write and read from cache from two different threads. @@ -21,12 +21,13 @@ fn read_and_write() { let mut cache_writer = LmdbRwCache::new( Some(&schema), None, - &CacheOptions { + CacheOptions { max_readers: 2, max_db_size: 100, max_size: 1024 * 1024, path: Some(path.clone()), intersection_chunk_size: 1, + labels: Default::default(), }, Default::default(), indexing_thread_pool.clone(), @@ -51,7 +52,7 @@ fn read_and_write() { path: Some(path), ..Default::default() }; - let cache_reader = LmdbRoCache::new(&read_options).unwrap(); + let cache_reader = LmdbRoCache::new(read_options).unwrap(); for (a, b, c) in items { let rec = cache_reader.get(&Field::Int(a).encode()).unwrap(); let values = vec![ diff --git a/dozer-cache/src/cache/lmdb/tests/utils.rs b/dozer-cache/src/cache/lmdb/tests/utils.rs index 716d9610df..225bc9c1fb 100644 --- a/dozer-cache/src/cache/lmdb/tests/utils.rs +++ b/dozer-cache/src/cache/lmdb/tests/utils.rs @@ -24,7 +24,7 @@ pub fn create_cache( let cache = LmdbRwCache::new( Some(&schema), None, - &Default::default(), + Default::default(), Default::default(), indexing_thread_pool.clone(), ) diff --git a/dozer-cache/src/cache/lmdb/utils.rs b/dozer-cache/src/cache/lmdb/utils.rs index e815f2d997..0ea98059fa 100644 --- a/dozer-cache/src/cache/lmdb/utils.rs +++ b/dozer-cache/src/cache/lmdb/utils.rs @@ -10,7 +10,6 @@ use dozer_storage::{ LmdbEnvironmentManager, LmdbEnvironmentOptions, RoLmdbEnvironment, RwLmdbEnvironment, }, }; -use dozer_tracing::Labels; use tempdir::TempDir; use super::cache::CacheOptions; @@ -18,18 +17,20 @@ use super::cache::CacheOptions; #[allow(clippy::type_complexity)] pub fn create_env( options: &CacheOptions, -) -> Result<(RwLmdbEnvironment, (PathBuf, Labels), Option), CacheError> { - let (base_path, labels, temp_dir) = match &options.path { +) -> Result<(RwLmdbEnvironment, (PathBuf, String), Option), CacheError> { + let (base_path, name, temp_dir) = match &options.path { None => { let base_path = TempDir::new("dozer").map_err(|e| CacheError::Io("tempdir".into(), e))?; - let mut labels = Labels::empty(); - labels.push("cache_name", "temp"); - (base_path.path().to_path_buf(), labels, Some(base_path)) + ( + base_path.path().to_path_buf(), + "temp".to_string(), + Some(base_path), + ) } - Some((base_path, labels)) => { + Some((base_path, name)) => { fs::create_dir_all(base_path).map_err(|e| CacheError::Io(base_path.clone(), e))?; - (base_path.clone(), labels.clone(), None) + (base_path.clone(), name.clone(), None) } }; @@ -41,17 +42,15 @@ pub fn create_env( ); Ok(( - LmdbEnvironmentManager::create_rw(&base_path, &labels.to_non_empty_string(), options)?, - (base_path, labels), + LmdbEnvironmentManager::create_rw(&base_path, &name, options)?, + (base_path, name), temp_dir, )) } #[allow(clippy::type_complexity)] -pub fn open_env( - options: &CacheOptions, -) -> Result<(RoLmdbEnvironment, (&Path, &Labels), Option), CacheError> { - let (base_path, labels) = options +pub fn open_env(options: &CacheOptions) -> Result<(RoLmdbEnvironment, (&Path, &str)), CacheError> { + let (base_path, name) = options .path .as_ref() .ok_or(CacheError::PathNotInitialized)?; @@ -64,9 +63,8 @@ pub fn open_env( ); Ok(( - LmdbEnvironmentManager::create_ro(base_path, &labels.to_non_empty_string(), env_options)?, - (base_path, labels), - None, + LmdbEnvironmentManager::create_ro(base_path, name, env_options)?, + (base_path, name), )) } diff --git a/dozer-cache/src/cache/mod.rs b/dozer-cache/src/cache/mod.rs index 88c05c95a4..e8cf085cae 100644 --- a/dozer-cache/src/cache/mod.rs +++ b/dozer-cache/src/cache/mod.rs @@ -40,8 +40,12 @@ impl CacheRecord { } pub trait RoCacheManager: Send + Sync + Debug { - /// Opens a cache in read-only mode with given labels. - fn open_ro_cache(&self, labels: Labels) -> Result>, CacheError>; + /// Opens a cache in read-only mode, and attach given labels. + fn open_ro_cache( + &self, + name_or_alias: String, + labels: Labels, + ) -> Result>, CacheError>; } #[derive(Debug, Clone, Copy, Default)] @@ -53,9 +57,10 @@ pub struct CacheWriteOptions { } pub trait RwCacheManager: RoCacheManager { - /// Opens a cache in read-write mode with given labels. + /// Opens a cache in read-write mode, and attach given labels. fn open_rw_cache( &self, + name_or_alias: String, labels: Labels, write_options: CacheWriteOptions, ) -> Result>, CacheError>; @@ -64,9 +69,10 @@ pub trait RwCacheManager: RoCacheManager { /// /// Schemas cannot be changed after the cache is created. /// - /// The labels must be unique. + /// The name must be unique and non-empty. fn create_cache( &self, + name: String, labels: Labels, schema: Schema, indexes: Vec, @@ -81,6 +87,8 @@ pub trait RwCacheManager: RoCacheManager { } pub trait RoCache: Send + Sync + Debug { + /// Returns the name of the cache. + fn name(&self) -> &str; /// Returns the labels of the cache. fn labels(&self) -> &Labels; diff --git a/dozer-cache/src/errors.rs b/dozer-cache/src/errors.rs index 7e80d19b85..d58ae8adb4 100644 --- a/dozer-cache/src/errors.rs +++ b/dozer-cache/src/errors.rs @@ -39,6 +39,8 @@ pub enum CacheError { #[error("Storage error: {0}")] Storage(#[from] dozer_storage::errors::StorageError), + #[error("Cache name is empty")] + EmptyName, #[error("Schema is not found")] SchemaNotFound, #[error("Schema for {name} mismatch: given {given:?}, stored {stored:?}")] diff --git a/dozer-cache/src/main.rs b/dozer-cache/src/main.rs index f27ec3964a..9309a60821 100644 --- a/dozer-cache/src/main.rs +++ b/dozer-cache/src/main.rs @@ -51,7 +51,10 @@ async fn main() { ..Default::default() }) .unwrap(); - let cache = cache_manager.open_lmdb_cache(labels).unwrap().unwrap(); + let cache = cache_manager + .open_lmdb_cache(labels.to_non_empty_string().into_owned(), labels) + .unwrap() + .unwrap(); let count = cache.count(&QueryExpression::with_no_limit()).unwrap(); println!("Count: {}", count); } @@ -61,7 +64,10 @@ async fn main() { ..Default::default() }) .unwrap(); - let cache = &cache_manager.open_lmdb_cache(labels).unwrap().unwrap(); + let cache = &cache_manager + .open_lmdb_cache(labels.to_non_empty_string().into_owned(), labels) + .unwrap() + .unwrap(); let file = tokio::fs::File::create(path).await.unwrap(); let mut writer = tokio::io::BufWriter::new(file); @@ -84,7 +90,12 @@ async fn main() { let file = tokio::fs::File::open(path).await.unwrap(); let mut reader = tokio::io::BufReader::new(file); cache_manager - .restore_cache(labels, Default::default(), &mut reader) + .restore_cache( + labels.to_non_empty_string().into_owned(), + labels, + Default::default(), + &mut reader, + ) .await .unwrap(); } diff --git a/dozer-tests/src/cache_tests/film/load_database.rs b/dozer-tests/src/cache_tests/film/load_database.rs index aa16dc218a..4f96ad4132 100644 --- a/dozer-tests/src/cache_tests/film/load_database.rs +++ b/dozer-tests/src/cache_tests/film/load_database.rs @@ -21,6 +21,7 @@ pub async fn load_database( let labels = Labels::default(); let mut cache = cache_manager .create_cache( + labels.to_non_empty_string().into_owned(), labels.clone(), schema.clone(), secondary_indexes, @@ -70,7 +71,10 @@ pub async fn load_database( drop(cache); ( - cache_manager.open_ro_cache(labels).unwrap().unwrap(), + cache_manager + .open_ro_cache(labels.to_non_empty_string().into_owned(), labels) + .unwrap() + .unwrap(), mongo_collection, ) }