Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Split cache name and label #2161

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,18 @@ pub fn open_or_create_cache(
connections: &HashSet<String>,
write_options: CacheWriteOptions,
) -> Result<Box<dyn RwCache>, CacheError> {
match cache_manager.open_rw_cache(labels.clone(), write_options)? {
match cache_manager.open_rw_cache(
labels.to_non_empty_string().into_owned(),
labels.clone(),
write_options,
)? {
Some(cache) => {
debug_assert!(cache.get_schema() == &schema);
Ok(cache)
}
None => {
let cache = cache_manager.create_cache(
labels.to_non_empty_string().into_owned(),
labels,
schema.0,
schema.1,
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn open_cache_reader(
labels: Labels,
) -> Result<Option<CacheReader>, ApiInitError> {
let cache = cache_manager
.open_ro_cache(labels)
.open_ro_cache(labels.to_non_empty_string().into_owned(), labels)
.map_err(ApiInitError::OpenOrCreateCache)?;
Ok(cache.map(CacheReader::new))
}
Expand Down
1 change: 1 addition & 0 deletions dozer-api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub fn initialize_cache(
let (schema, secondary_indexes) = schema.unwrap_or_else(get_schema);
let mut cache = cache_manager
.create_cache(
labels.to_non_empty_string().into_owned(),
labels,
schema,
secondary_indexes,
Expand Down
1 change: 1 addition & 0 deletions dozer-cache/benches/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn cache(c: &mut Criterion) {
let cache = Mutex::new(
cache_manager
.create_cache(
"temp".to_string(),
Default::default(),
schema,
secondary_indexes,
Expand Down
12 changes: 6 additions & 6 deletions dozer-cache/src/cache/lmdb/cache/dump_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,21 @@ pub async fn dump<'txn, T: Transaction, C: LmdbCache>(
}

pub async fn restore(
options: &CacheOptions,
options: CacheOptions,
write_options: CacheWriteOptions,
indexing_thread_pool: Arc<Mutex<IndexingThreadPool>>,
reader: &mut (impl AsyncRead + Unpin),
) -> Result<LmdbRwCache, CacheError> {
info!("Restoring cache with options {options:?}");
let rw_main_env =
main_environment::dump_restore::restore(options, write_options, reader).await?;
main_environment::dump_restore::restore(&options, write_options, reader).await?;

let options = CacheOptions {
path: Some((
rw_main_env.base_path().to_path_buf(),
rw_main_env.labels().clone(),
rw_main_env.name().to_string(),
)),
..*options
..options
};
let ro_main_env = rw_main_env.share();

Expand All @@ -103,7 +103,7 @@ pub async fn restore(
for index in 0..ro_main_env.schema().1.len() {
let name = secondary_environment_name(index);
let rw_secondary_env =
secondary_environment::dump_restore::restore(name, &options, reader).await?;
secondary_environment::dump_restore::restore(name, options.clone(), reader).await?;
let ro_secondary_env = rw_secondary_env.share();

rw_secondary_envs.push(rw_secondary_env);
Expand Down Expand Up @@ -156,7 +156,7 @@ mod tests {
}

let restored_cache = restore(
&Default::default(),
Default::default(),
Default::default(),
indexing_thread_pool.clone(),
&mut data.as_slice(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn init_env(conflict_resolution: ConflictResolution) -> (RwMainEnvironment, Sche
..Default::default()
};
let main_env =
RwMainEnvironment::new(Some(&schema), None, &Default::default(), write_options).unwrap();
RwMainEnvironment::new(Some(&schema), None, Default::default(), write_options).unwrap();
(main_env, schema.0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn restore(
reader: &mut (impl AsyncRead + Unpin),
) -> Result<RwMainEnvironment, CacheError> {
info!("Restoring main environment with options {options:?}");
let (mut env, (base_path, labels), temp_dir) = create_env(options)?;
let (mut env, (base_path, name), temp_dir) = create_env(options)?;

info!("Restoring schema");
dozer_storage::restore(&mut env, reader).await?;
Expand All @@ -62,7 +62,8 @@ pub async fn restore(
info!("Restoring connection snapshotting done");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring operation log");
let operation_log = OperationLog::restore(&mut env, reader, labels).await?;
let operation_log =
OperationLog::restore(&mut env, reader, name, options.labels.clone()).await?;

let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let commit_state = LmdbOption::open(&env, Some(COMMIT_STATE_DB_NAME))?;
Expand Down Expand Up @@ -142,7 +143,7 @@ pub mod tests {
let mut env = RwMainEnvironment::new(
Some(&(schema, vec![])),
None,
&Default::default(),
Default::default(),
Default::default(),
)
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn test_hash_insert_delete_insert() {
let mut env = RwMainEnvironment::new(
Some(&(schema, vec![])),
None,
&Default::default(),
Default::default(),
Default::default(),
)
.unwrap();
Expand Down
23 changes: 14 additions & 9 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub trait MainEnvironment: LmdbEnvironment {
&self.common().base_path
}

fn name(&self) -> &str {
self.common().operation_log.name()
}

fn labels(&self) -> &Labels {
self.common().operation_log.labels()
}
Expand Down Expand Up @@ -154,12 +158,12 @@ impl RwMainEnvironment {
pub fn new(
schema: Option<&SchemaWithIndex>,
connections: Option<&HashSet<String>>,
options: &CacheOptions,
options: CacheOptions,
write_options: CacheWriteOptions,
) -> Result<Self, CacheError> {
let (mut env, (base_path, labels), temp_dir) = create_env(options)?;
let (mut env, (base_path, name), temp_dir) = create_env(&options)?;

let operation_log = OperationLog::create(&mut env, labels.clone())?;
let operation_log = OperationLog::create(&mut env, name.clone(), options.labels)?;
let schema_option = LmdbOption::create(&mut env, Some(SCHEMA_DB_NAME))?;
let commit_state = LmdbOption::create(&mut env, Some(COMMIT_STATE_DB_NAME))?;
let connection_snapshotting_done =
Expand All @@ -173,7 +177,7 @@ impl RwMainEnvironment {
(Some(schema), Some(old_schema)) => {
if &old_schema != schema {
return Err(CacheError::SchemaMismatch {
name: labels.to_string(),
name: name.clone(),
given: Box::new(schema.clone()),
stored: Box::new(old_schema),
});
Expand Down Expand Up @@ -206,7 +210,7 @@ impl RwMainEnvironment {
if &existing_connections != connections {
return Err(CacheError::ConnectionsMismatch(Box::new(
ConnectionMismatch {
name: labels.to_string(),
name,
given: connections.clone(),
stored: existing_connections,
},
Expand Down Expand Up @@ -587,10 +591,11 @@ impl MainEnvironment for RoMainEnvironment {
}

impl RoMainEnvironment {
pub fn new(options: &CacheOptions) -> Result<Self, CacheError> {
let (env, (base_path, labels), _temp_dir) = open_env(options)?;
pub fn new(options: CacheOptions) -> Result<Self, CacheError> {
let (env, (base_path, name)) = open_env(&options)?;
let base_path = base_path.to_path_buf();

let operation_log = OperationLog::open(&env, labels.clone())?;
let operation_log = OperationLog::open(&env, name.to_string(), options.labels)?;
let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let commit_state = LmdbOption::open(&env, Some(COMMIT_STATE_DB_NAME))?;
let connection_snapshotting_done =
Expand All @@ -604,7 +609,7 @@ impl RoMainEnvironment {
Ok(Self {
env,
common: MainEnvironmentCommon {
base_path: base_path.to_path_buf(),
base_path,
schema,
schema_option,
commit_state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct OperationLog {
next_operation_id: LmdbCounter,
/// Operation_id -> operation.
operation_id_to_operation: LmdbMap<u64, Operation>,
/// The cache name.
name: String,
/// The cache labels.
labels: Labels,
}
Expand All @@ -69,7 +71,11 @@ const OPERATION_ID_TO_OPERATION_DB_NAME: &str = "operation_id_to_operation";
const CACHE_OPERATION_LOG_COUNTER_NAME: &str = "cache_operation_log";

impl OperationLog {
pub fn create(env: &mut RwLmdbEnvironment, labels: Labels) -> Result<Self, StorageError> {
pub fn create(
env: &mut RwLmdbEnvironment,
name: String,
labels: Labels,
) -> Result<Self, StorageError> {
describe_counter!(
CACHE_OPERATION_LOG_COUNTER_NAME,
"Number of operations stored in the cache"
Expand All @@ -87,11 +93,16 @@ impl OperationLog {
present_operation_ids,
next_operation_id,
operation_id_to_operation,
name,
labels,
})
}

pub fn open<E: LmdbEnvironment>(env: &E, labels: Labels) -> Result<Self, StorageError> {
pub fn open<E: LmdbEnvironment>(
env: &E,
name: String,
labels: Labels,
) -> Result<Self, StorageError> {
let primary_key_metadata = PrimaryKeyMetadata::open(env)?;
let hash_metadata = HashMetadata::open(env)?;
let present_operation_ids = LmdbSet::open(env, Some(PRESENT_OPERATION_IDS_DB_NAME))?;
Expand All @@ -104,10 +115,15 @@ impl OperationLog {
present_operation_ids,
next_operation_id,
operation_id_to_operation,
name,
labels,
})
}

pub fn name(&self) -> &str {
&self.name
}

pub fn labels(&self) -> &Labels {
&self.labels
}
Expand Down Expand Up @@ -506,6 +522,7 @@ impl OperationLog {
pub async fn restore<'txn, R: tokio::io::AsyncRead + Unpin>(
env: &mut RwLmdbEnvironment,
reader: &mut R,
name: String,
labels: Labels,
) -> Result<Self, dozer_storage::RestoreError> {
info!("Restoring primary key metadata");
Expand All @@ -518,7 +535,7 @@ impl OperationLog {
dozer_storage::restore(env, reader).await?;
info!("Restoring operation id to operation");
dozer_storage::restore(env, reader).await?;
Self::open(env, labels).map_err(Into::into)
Self::open(env, name, labels).map_err(Into::into)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn assert_operation_log_equal<T1: Transaction, T2: Transaction>(
#[test]
fn test_operation_log_append_only() {
let mut env = create_env(&Default::default()).unwrap().0;
let log = OperationLog::create(&mut env, Default::default()).unwrap();
let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap();
let txn = env.txn_mut().unwrap();
let append_only = true;

Expand Down Expand Up @@ -96,7 +96,7 @@ fn test_operation_log_append_only() {
#[test]
fn test_operation_log_with_primary_key() {
let mut env = create_env(&Default::default()).unwrap().0;
let log = OperationLog::create(&mut env, Default::default()).unwrap();
let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap();
let txn = env.txn_mut().unwrap();
let append_only = false;

Expand Down Expand Up @@ -219,7 +219,7 @@ fn test_operation_log_with_primary_key() {
#[test]
fn test_operation_log_without_primary_key() {
let mut env = create_env(&Default::default()).unwrap().0;
let log = OperationLog::create(&mut env, Default::default()).unwrap();
let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap();
let txn = env.txn_mut().unwrap();
let append_only = false;

Expand Down
31 changes: 21 additions & 10 deletions dozer-cache/src/cache/lmdb/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ pub struct CacheOptions {
pub intersection_chunk_size: usize,

/// Provide a path where db will be created. If nothing is provided, will default to a temp location.
/// Db path will be `PathBuf.join(Labels.to_non_empty_string())`.
pub path: Option<(PathBuf, Labels)>,
/// Db path will be `PathBuf.join(name)`.
pub path: Option<(PathBuf, String)>,

/// The labels to attach to the cache.
pub labels: Labels,
}

impl Default for CacheOptions {
Expand All @@ -51,6 +54,7 @@ impl Default for CacheOptions {
max_size: 1024 * 1024 * 1024,
intersection_chunk_size: 100,
path: None,
labels: Labels::default(),
}
}
}
Expand All @@ -62,10 +66,12 @@ pub struct LmdbRoCache {
}

impl LmdbRoCache {
pub fn new(options: &CacheOptions) -> Result<Self, CacheError> {
let main_env = RoMainEnvironment::new(options)?;
pub fn new(options: CacheOptions) -> Result<Self, CacheError> {
let main_env = RoMainEnvironment::new(options.clone())?;
let secondary_envs = (0..main_env.schema().1.len())
.map(|index| RoSecondaryEnvironment::new(secondary_environment_name(index), options))
.map(|index| {
RoSecondaryEnvironment::new(secondary_environment_name(index), options.clone())
})
.collect::<Result<_, _>>()?;
Ok(Self {
main_env,
Expand All @@ -85,18 +91,19 @@ impl LmdbRwCache {
pub fn new(
schema: Option<&SchemaWithIndex>,
connections: Option<&HashSet<String>>,
options: &CacheOptions,
options: CacheOptions,
write_options: CacheWriteOptions,
indexing_thread_pool: Arc<Mutex<IndexingThreadPool>>,
) -> Result<Self, CacheError> {
let rw_main_env = RwMainEnvironment::new(schema, connections, options, write_options)?;
let rw_main_env =
RwMainEnvironment::new(schema, connections, options.clone(), write_options)?;

let options = CacheOptions {
path: Some((
rw_main_env.base_path().to_path_buf(),
rw_main_env.labels().clone(),
rw_main_env.name().to_string(),
)),
..*options
..options
};
let ro_main_env = rw_main_env.share();

Expand All @@ -105,7 +112,7 @@ impl LmdbRwCache {
for (index, index_definition) in ro_main_env.schema().1.iter().enumerate() {
let name = secondary_environment_name(index);
let rw_secondary_env =
RwSecondaryEnvironment::new(index_definition, name.clone(), &options)?;
RwSecondaryEnvironment::new(index_definition, name.clone(), options.clone())?;
let ro_secondary_env = rw_secondary_env.share();

rw_secondary_envs.push(rw_secondary_env);
Expand All @@ -125,6 +132,10 @@ impl LmdbRwCache {
}

impl<C: LmdbCache> RoCache for C {
fn name(&self) -> &str {
self.main_env().name()
}

fn labels(&self) -> &Labels {
self.main_env().labels()
}
Expand Down
Loading
Loading