Skip to content

Commit

Permalink
Merge pull request #140 from terminusdb/rollup_file_synchronization
Browse files Browse the repository at this point in the history
use locks and syncs for rollup file synchronization
  • Loading branch information
GavinMendelGleason authored Jul 19, 2023
2 parents 87a195c + 117d823 commit 03fa9a4
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions src/storage/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::structure::{

use super::{
consts::{LayerFileEnum, FILENAME_ENUM_MAP},
locking::{ExclusiveLockedFile, LockedFile},
name_to_string, string_to_name, FileLoad, FileStore, PersistentLayerStore, SyncableFile,
};

Expand Down Expand Up @@ -298,24 +299,44 @@ impl ArchiveMetadataBackend for DirectoryArchiveBackend {
}

async fn get_rollup(&self, id: [u32; 5]) -> io::Result<Option<[u32; 5]>> {
// acquire a shared lock on the layer. This ensures nobody will write a rollup file while we're retrieving it.
let layer_path = self.path_for_layer(id);
let layer_lock = LockedFile::open(layer_path).await;
if layer_lock.is_err() && layer_lock.as_ref().err().unwrap().kind() == ErrorKind::NotFound {
// no such layer - therefore no such rollup
return Ok(None);
}
let _layer_lock = layer_lock.unwrap();

let path = self.path_for_rollup(id);
let result = fs::read_to_string(path).await;

if result.is_err() && result.as_ref().err().unwrap().kind() == ErrorKind::NotFound {
return Ok(None);
}
let data = result?;
let name = data.lines().skip(1).next().unwrap();
let name = data.lines().skip(1).next().expect(
"Expected rollup file to have two lines but was unable to skip to the second line",
);
Ok(Some(string_to_name(&name)?))
}

async fn set_rollup(&self, id: [u32; 5], rollup: [u32; 5]) -> io::Result<()> {
// acquire an exclusive lock on the layer. This ensures nobody tries to lookup the rollup while we're writing it.
let layer_path = self.path_for_layer(id);
let _layer_lock = ExclusiveLockedFile::open(layer_path).await?;

let path = self.path_for_rollup(id);
let mut data = Vec::with_capacity(43);
data.extend_from_slice(b"1\n");
data.extend_from_slice(name_to_string(rollup).as_bytes());
data.extend_from_slice(b"\n");
fs::write(path, data).await
let mut file = fs::File::create(path).await?;
file.write_all(&data).await?;
file.flush().await?;
file.sync_all().await?;

Ok(())
}

async fn get_parent(&self, id: [u32; 5]) -> io::Result<Option<[u32; 5]>> {
Expand Down

0 comments on commit 03fa9a4

Please sign in to comment.