From 60451c0ea99a78a87048a2890103245a80a53834 Mon Sep 17 00:00:00 2001 From: Matthijs van Otterdijk Date: Tue, 19 Sep 2023 11:33:56 +0200 Subject: [PATCH] base and child layers create indexed property collections --- src/layer/internal/base.rs | 22 ++++++++++++- src/layer/internal/child.rs | 32 +++++++++++++++++++ src/layer/simple_builder.rs | 2 ++ src/storage/archive.rs | 28 +++++++++++++++-- src/storage/file.rs | 51 ++++++++++++++++++++++++++++--- src/storage/layer.rs | 20 +++++++++++- src/storage/memory.rs | 38 +++++++++++++++++++++++ src/structure/indexed_property.rs | 18 ++++++++--- 8 files changed, 197 insertions(+), 14 deletions(-) diff --git a/src/layer/internal/base.rs b/src/layer/internal/base.rs index 1233556f..46d9a697 100644 --- a/src/layer/internal/base.rs +++ b/src/layer/internal/base.rs @@ -395,9 +395,22 @@ impl BaseLayerFileBuilderPhase2 { self.indexed_properties_builder.add(subject, index, object) } + pub fn set_index_triples>( + &mut self, + triples: I, + ) { + self.indexed_properties_builder.add_triples(triples) + } + pub(crate) async fn partial_finalize(self) -> io::Result> { self.triple_builder.finalize().await?; - let indexed_properties_collection_bufs = self.indexed_properties_builder.finalize(); + if let Some(indexed_properties_collection_bufs) = self.indexed_properties_builder.finalize() + { + self.files + .indexed_property_files + .write_from_maps(indexed_properties_collection_bufs.into()) + .await?; + } chrono_log!("finalized base triples builder"); Ok(self.files) @@ -405,6 +418,13 @@ impl BaseLayerFileBuilderPhase2 { pub async fn finalize(self) -> io::Result<()> { self.triple_builder.finalize().await?; + if let Some(indexed_properties_collection_bufs) = self.indexed_properties_builder.finalize() + { + self.files + .indexed_property_files + .write_from_maps(indexed_properties_collection_bufs.into()) + .await?; + } chrono_log!("finalized base triples builder"); let s_p_adjacency_list_files = self.files.s_p_adjacency_list_files.clone(); let sp_o_adjacency_list_files = self.files.sp_o_adjacency_list_files.clone(); diff --git a/src/layer/internal/child.rs b/src/layer/internal/child.rs index 2fd8a566..a76af624 100644 --- a/src/layer/internal/child.rs +++ b/src/layer/internal/child.rs @@ -7,6 +7,8 @@ use super::super::builder::*; use super::super::id_map::*; use crate::layer::*; use crate::storage::*; +use crate::structure::indexed_property::IndexedPropertyBuilder; +use crate::structure::indexed_property::IndexedPropertyCollection; use crate::structure::*; use rayon::prelude::*; @@ -49,6 +51,8 @@ pub struct ChildLayer { pub(super) pos_predicate_wavelet_tree: WaveletTree, pub(super) neg_predicate_wavelet_tree: WaveletTree, + + pub(super) indexed_property_collection: Option, } impl ChildLayer { @@ -164,6 +168,10 @@ impl ChildLayer { neg_predicate_wavelet_tree_width, ); + let indexed_property_collection = maps + .index_property_maps + .map(|m| IndexedPropertyCollection::from_buffers(m.into())); + InternalLayer::Child(ChildLayer { name, parent, @@ -193,6 +201,8 @@ impl ChildLayer { pos_predicate_wavelet_tree, neg_predicate_wavelet_tree, + + indexed_property_collection, }) } } @@ -388,6 +398,7 @@ pub struct ChildLayerFileBuilderPhase2, neg_builder: TripleFileBuilder, + indexed_properties_builder: IndexedPropertyBuilder, } impl ChildLayerFileBuilderPhase2 { @@ -420,12 +431,15 @@ impl ChildLayerFileBuil ) .await?; + let indexed_properties_builder = IndexedPropertyBuilder::new(); + Ok(ChildLayerFileBuilderPhase2 { parent, files, pos_builder, neg_builder, + indexed_properties_builder, }) } @@ -507,6 +521,17 @@ impl ChildLayerFileBuil Ok(()) } + pub fn set_index_triple(&mut self, subject: u64, index: usize, object: u64) { + self.indexed_properties_builder.add(subject, index, object) + } + + pub fn set_index_triples>( + &mut self, + triples: I, + ) { + self.indexed_properties_builder.add_triples(triples) + } + /// Remove the given triple. /// /// This will panic if a greater triple has already been removed, @@ -532,6 +557,13 @@ impl ChildLayerFileBuil pub async fn finalize(self) -> io::Result<()> { let pos_task = tokio::spawn(self.pos_builder.finalize()); let neg_task = tokio::spawn(self.neg_builder.finalize()); + if let Some(indexed_properties_collection_bufs) = self.indexed_properties_builder.finalize() + { + self.files + .indexed_property_files + .write_from_maps(indexed_properties_collection_bufs.into()) + .await?; + } pos_task.await??; neg_task.await??; diff --git a/src/layer/simple_builder.rs b/src/layer/simple_builder.rs index 4d2ff0a1..7c949581 100644 --- a/src/layer/simple_builder.rs +++ b/src/layer/simple_builder.rs @@ -459,6 +459,7 @@ impl LayerBuilder for SimpleLayerBuil builder.add_id_triples(id_additions).await?; builder.remove_id_triples(id_removals).await?; + builder.set_index_triples(index_id_additions); builder.finalize().await } @@ -474,6 +475,7 @@ impl LayerBuilder for SimpleLayerBuil let mut builder = builder.into_phase2().await?; builder.add_id_triples(id_additions).await?; + builder.set_index_triples(index_id_additions); builder.finalize().await } diff --git a/src/storage/archive.rs b/src/storage/archive.rs index caab5ab5..8953fe08 100644 --- a/src/storage/archive.rs +++ b/src/storage/archive.rs @@ -645,6 +645,7 @@ impl ArchiveMetadataBackend } pub enum ConstructionFileState { + Fresh, UnderConstruction(BytesMut), Finalizing, Finalized(Bytes), @@ -655,9 +656,7 @@ pub struct ConstructionFile(Arc>); impl ConstructionFile { fn new() -> Self { - Self(Arc::new(RwLock::new( - ConstructionFileState::UnderConstruction(BytesMut::new()), - ))) + Self(Arc::new(RwLock::new(ConstructionFileState::Fresh))) } fn new_finalized(bytes: Bytes) -> Self { @@ -691,6 +690,17 @@ impl FileStore for ConstructionFile { async fn open_write(&self) -> io::Result { Ok(self.clone()) } + + async fn write_bytes(&self, bytes: Bytes) -> io::Result<()> { + let mut guard = self.0.write().unwrap(); + match *guard { + ConstructionFileState::Fresh => { + *guard = ConstructionFileState::Finalized(bytes); + } + _ => panic!("tried to write bytes to a construction file that was already written to"), + } + Ok(()) + } } impl AsyncWrite for ConstructionFile { @@ -701,6 +711,12 @@ impl AsyncWrite for ConstructionFile { ) -> Poll> { let mut guard = self.0.write().unwrap(); match &mut *guard { + ConstructionFileState::Fresh => { + let mut bytes = BytesMut::new(); + bytes.put_slice(buf); + *guard = ConstructionFileState::UnderConstruction(bytes); + Poll::Ready(Ok(buf.len())) + } ConstructionFileState::UnderConstruction(x) => { x.put_slice(buf); @@ -738,6 +754,11 @@ impl SyncableFile for ConstructionFile { std::mem::swap(&mut state, &mut *guard); match state { + ConstructionFileState::Fresh => { + *guard = ConstructionFileState::Finalized(Bytes::new()); + + Ok(()) + } ConstructionFileState::UnderConstruction(x) => { let buf = x.freeze(); *guard = ConstructionFileState::Finalized(buf); @@ -1344,6 +1365,7 @@ impl ArchiveLayerStore { pub fn write_bytes(&self, name: [u32; 5], file: LayerFileEnum, bytes: Bytes) { let mut guard = self.construction.write().unwrap(); if let Some(map) = guard.get_mut(&name) { + // TODO this could be fine if the file is there but in fresh state, but it's not a pattern that is in use now. if map.contains_key(&file) { panic!("tried to write bytes to an archive, but file is already open"); } diff --git a/src/storage/file.rs b/src/storage/file.rs index ed9b4df5..5904084a 100644 --- a/src/storage/file.rs +++ b/src/storage/file.rs @@ -20,6 +20,12 @@ pub trait SyncableFile: AsyncWrite + Unpin + Send { pub trait FileStore: Clone + Send + Sync { type Write: SyncableFile; async fn open_write(&self) -> io::Result; + async fn write_bytes(&self, mut bytes: Bytes) -> io::Result<()> { + let mut writable = self.open_write().await?; + writable.write_all_buf(&mut bytes).await?; + writable.flush().await?; + writable.sync_all().await + } } #[async_trait] @@ -102,7 +108,7 @@ pub struct BaseLayerFiles { pub s_p_adjacency_list_files: AdjacencyListFiles, pub sp_o_adjacency_list_files: AdjacencyListFiles, - pub index_property_files: IndexPropertyFiles, + pub indexed_property_files: IndexedPropertyFiles, pub o_ps_adjacency_list_files: AdjacencyListFiles, @@ -142,7 +148,7 @@ impl BaseLayerFiles { let s_p_adjacency_list_maps = self.s_p_adjacency_list_files.map_all().await?; let sp_o_adjacency_list_maps = self.sp_o_adjacency_list_files.map_all().await?; - let index_property_maps = self.index_property_files.map_all_if_exists().await?; + let index_property_maps = self.indexed_property_files.map_all_if_exists().await?; let o_ps_adjacency_list_maps = self.o_ps_adjacency_list_files.map_all().await?; let predicate_wavelet_tree_maps = self.predicate_wavelet_tree_files.map_all().await?; @@ -189,6 +195,8 @@ pub struct ChildLayerFiles, pub neg_predicate_wavelet_tree_files: BitIndexFiles, + + pub indexed_property_files: IndexedPropertyFiles, } #[derive(Clone)] @@ -213,6 +221,7 @@ pub struct ChildLayerMaps { pub pos_predicate_wavelet_tree_maps: BitIndexMaps, pub neg_predicate_wavelet_tree_maps: BitIndexMaps, + pub index_property_maps: Option, } impl ChildLayerFiles { @@ -241,6 +250,8 @@ impl ChildLayerFiles { let neg_predicate_wavelet_tree_maps = self.neg_predicate_wavelet_tree_files.map_all().await?; + let index_property_maps = self.indexed_property_files.map_all_if_exists().await?; + Ok(ChildLayerMaps { node_dictionary_maps, predicate_dictionary_maps, @@ -262,6 +273,7 @@ impl ChildLayerFiles { pos_predicate_wavelet_tree_maps, neg_predicate_wavelet_tree_maps, + index_property_maps, }) } } @@ -378,7 +390,7 @@ impl DictionaryFiles { } #[derive(Clone)] -pub struct IndexPropertyFiles { +pub struct IndexedPropertyFiles { pub subjects_logarray_file: F, pub adjacency_files: AdjacencyListFiles, pub objects_logarray_file: F, @@ -391,7 +403,7 @@ pub struct IndexPropertyMaps { pub objects_logarray_map: Bytes, } -impl IndexPropertyFiles { +impl IndexedPropertyFiles { pub async fn map_all_if_exists(&self) -> io::Result> { if let Some(subjects_logarray_map) = self.subjects_logarray_file.map_if_exists().await? { Ok(Some(IndexPropertyMaps { @@ -403,6 +415,20 @@ impl IndexPropertyFiles { Ok(None) } } + + pub async fn write_from_maps(&self, maps: IndexPropertyMaps) -> io::Result<()> { + self.subjects_logarray_file + .write_bytes(maps.subjects_logarray_map) + .await?; + self.adjacency_files + .write_from_maps(maps.adjacency_maps) + .await?; + self.objects_logarray_file + .write_bytes(maps.objects_logarray_map) + .await?; + + Ok(()) + } } // a little silly @@ -489,6 +515,14 @@ impl BitIndexFiles { Ok(None) } } + + pub async fn write_from_maps(&self, maps: BitIndexMaps) -> io::Result<()> { + self.bits_file.write_bytes(maps.bits_map).await?; + self.blocks_file.write_bytes(maps.blocks_map).await?; + self.sblocks_file.write_bytes(maps.sblocks_map).await?; + + Ok(()) + } } #[derive(Clone)] @@ -548,4 +582,13 @@ impl AdjacencyListFiles { nums_map, }) } + + pub async fn write_from_maps(&self, maps: AdjacencyListMaps) -> io::Result<()> { + self.bitindex_files + .write_from_maps(maps.bitindex_maps) + .await?; + self.nums_file.write_bytes(maps.nums_map).await?; + + Ok(()) + } } diff --git a/src/storage/layer.rs b/src/storage/layer.rs index f0728fe7..ccad3d0b 100644 --- a/src/storage/layer.rs +++ b/src/storage/layer.rs @@ -502,7 +502,7 @@ pub trait PersistentLayerStore: 'static + Send + Sync + Clone { blocks_file: files[29].clone(), sblocks_file: files[30].clone(), }, - index_property_files: IndexPropertyFiles { + indexed_property_files: IndexedPropertyFiles { subjects_logarray_file: files[31].clone(), adjacency_files: AdjacencyListFiles { nums_file: files[32].clone(), @@ -567,6 +567,12 @@ pub trait PersistentLayerStore: 'static + Send + Sync + Clone { FILENAMES.neg_predicate_wavelet_tree_bits, FILENAMES.neg_predicate_wavelet_tree_bit_index_blocks, FILENAMES.neg_predicate_wavelet_tree_bit_index_sblocks, + FILENAMES.index_property_subjects, + FILENAMES.index_property_adjacency_list_nums, + FILENAMES.index_property_adjacency_list_bits, + FILENAMES.index_property_adjacency_list_bit_index_blocks, + FILENAMES.index_property_adjacency_list_bit_index_sblocks, + FILENAMES.index_property_objects, ]; let mut files = Vec::with_capacity(filenames.len()); @@ -666,6 +672,18 @@ pub trait PersistentLayerStore: 'static + Send + Sync + Clone { blocks_file: files[46].clone(), sblocks_file: files[47].clone(), }, + indexed_property_files: IndexedPropertyFiles { + subjects_logarray_file: files[48].clone(), + adjacency_files: AdjacencyListFiles { + nums_file: files[49].clone(), + bitindex_files: BitIndexFiles { + bits_file: files[50].clone(), + blocks_file: files[51].clone(), + sblocks_file: files[52].clone(), + }, + }, + objects_logarray_file: files[53].clone(), + }, }) } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 26150a5a..8ee987b8 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -88,6 +88,20 @@ impl FileStore for MemoryBackedStore { bytes: BytesMut::new(), }) } + + async fn write_bytes(&self, bytes: Bytes) -> io::Result<()> { + let mut guard = self.contents.write().unwrap(); + match *guard { + MemoryBackedStoreContents::Nonexistent => { + *guard = MemoryBackedStoreContents::Existent(bytes) + } + MemoryBackedStoreContents::Existent(_) => { + panic!("tried to write to existing memory file") + } + } + + Ok(()) + } } pub struct MemoryBackedStoreReader { @@ -378,6 +392,18 @@ pub fn base_layer_memory_files() -> BaseLayerFiles { blocks_file: MemoryBackedStore::new(), sblocks_file: MemoryBackedStore::new(), }, + indexed_property_files: IndexedPropertyFiles { + subjects_logarray_file: MemoryBackedStore::new(), + adjacency_files: AdjacencyListFiles { + bitindex_files: BitIndexFiles { + bits_file: MemoryBackedStore::new(), + blocks_file: MemoryBackedStore::new(), + sblocks_file: MemoryBackedStore::new(), + }, + nums_file: MemoryBackedStore::new(), + }, + objects_logarray_file: MemoryBackedStore::new(), + }, } } @@ -475,6 +501,18 @@ pub fn child_layer_memory_files() -> ChildLayerFiles { blocks_file: MemoryBackedStore::new(), sblocks_file: MemoryBackedStore::new(), }, + indexed_property_files: IndexedPropertyFiles { + subjects_logarray_file: MemoryBackedStore::new(), + adjacency_files: AdjacencyListFiles { + bitindex_files: BitIndexFiles { + bits_file: MemoryBackedStore::new(), + blocks_file: MemoryBackedStore::new(), + sblocks_file: MemoryBackedStore::new(), + }, + nums_file: MemoryBackedStore::new(), + }, + objects_logarray_file: MemoryBackedStore::new(), + }, } } diff --git a/src/structure/indexed_property.rs b/src/structure/indexed_property.rs index ff46bcaf..60bdf455 100644 --- a/src/structure/indexed_property.rs +++ b/src/structure/indexed_property.rs @@ -1,5 +1,7 @@ use bytes::{Bytes, BytesMut}; +use crate::layer::IndexIdTriple; + use super::{ util::calculate_width, AdjacencyList, AdjacencyListBufBuilder, AdjacencyListBuffers, LogArray, LogArrayBufBuilder, MonotonicLogArray, @@ -27,13 +29,19 @@ impl IndexedPropertyBuilder { self.added.push((subject_id, array_index, object_id)); } + pub fn add_triples>(&mut self, triples: I) { + for triple in triples { + self.add(triple.subject, triple.index, triple.object); + } + } + pub fn remove(&mut self, subject_id: u64, array_index: usize) { self.added.push((subject_id, array_index, 0)); } - pub fn finalize(mut self) -> IndexPropertyBuffers { + pub fn finalize(mut self) -> Option { if self.added.len() == 0 { - panic!("no data was added"); + return None; } self.added.sort(); @@ -70,11 +78,11 @@ impl IndexedPropertyBuilder { let aj_bufs = aj_builder.finalize(); let objects_buf = objects_logarray.finalize().freeze(); - IndexPropertyBuffers { + Some(IndexPropertyBuffers { subjects_logarray_buf: subjects_buf, adjacency_bufs: aj_bufs, objects_logarray_buf: objects_buf, - } + }) } } @@ -154,7 +162,7 @@ mod tests { builder.add(3, 4, 42); builder.add(3, 7, 420); builder.add(5, 1, 21); - let buffers = builder.finalize(); + let buffers = builder.finalize().unwrap(); let collection = IndexedPropertyCollection::from_buffers(buffers); assert_eq!(Some(42), collection.lookup_index(3, 4));