diff --git a/src/layer/internal/base.rs b/src/layer/internal/base.rs index 26653fd2..8a984094 100644 --- a/src/layer/internal/base.rs +++ b/src/layer/internal/base.rs @@ -490,7 +490,7 @@ pub async fn open_base_triple_stream( } #[cfg(test)] -pub mod tests { +pub mod base_tests { use super::*; use crate::storage::memory::*; use futures::stream::TryStreamExt; diff --git a/src/layer/internal/child.rs b/src/layer/internal/child.rs index ef915eb7..2fd8a566 100644 --- a/src/layer/internal/child.rs +++ b/src/layer/internal/child.rs @@ -702,9 +702,9 @@ pub async fn open_child_triple_stream( } #[cfg(test)] -pub mod tests { +pub mod child_tests { use super::*; - use crate::layer::base::tests::*; + use crate::layer::base::base_tests::*; use crate::storage::memory::*; use futures::stream::TryStreamExt; diff --git a/src/layer/internal/mod.rs b/src/layer/internal/mod.rs index c5a84fc7..2045ff6d 100644 --- a/src/layer/internal/mod.rs +++ b/src/layer/internal/mod.rs @@ -1098,7 +1098,7 @@ mod tests { assert_eq!(1, layer.triple_layer_removal_count().unwrap()); } - use crate::layer::base::tests::*; + use crate::layer::base::base_tests::*; #[tokio::test] async fn base_layer_with_gaps_addition_count() { let files = base_layer_files(); diff --git a/src/layer/internal/object_iterator.rs b/src/layer/internal/object_iterator.rs index aa2faf9e..bbfb9189 100644 --- a/src/layer/internal/object_iterator.rs +++ b/src/layer/internal/object_iterator.rs @@ -227,7 +227,7 @@ impl Iterator for InternalTripleObjectIterator { #[cfg(test)] mod tests { use super::*; - use crate::layer::base::tests::*; + use crate::layer::base::base_tests::*; use crate::storage::memory::*; use crate::storage::*; diff --git a/src/layer/internal/predicate_iterator.rs b/src/layer/internal/predicate_iterator.rs index 35a54a8b..db99e087 100644 --- a/src/layer/internal/predicate_iterator.rs +++ b/src/layer/internal/predicate_iterator.rs @@ -182,8 +182,8 @@ impl Iterator for InternalTriplePredicateIterator { #[cfg(test)] mod tests { - use crate::layer::base::tests::*; - use crate::layer::child::tests::*; + use crate::layer::base::base_tests::*; + use crate::layer::child::child_tests::*; use crate::layer::*; use std::sync::Arc; diff --git a/src/layer/internal/subject_iterator.rs b/src/layer/internal/subject_iterator.rs index 7321d349..92e27447 100644 --- a/src/layer/internal/subject_iterator.rs +++ b/src/layer/internal/subject_iterator.rs @@ -452,8 +452,8 @@ impl Iterator for InternalTripleStackIterator { #[cfg(test)] mod tests { - use crate::layer::base::tests::*; - use crate::layer::child::tests::*; + use crate::layer::base::base_tests::*; + use crate::layer::child::child_tests::*; use crate::layer::*; use crate::structure::TdbDataType; diff --git a/src/layer/layer.rs b/src/layer/layer.rs index 51652384..a03437d3 100644 --- a/src/layer/layer.rs +++ b/src/layer/layer.rs @@ -405,9 +405,9 @@ impl ObjectType { #[cfg(test)] mod tests { use super::*; - use crate::layer::internal::base::tests::base_layer_files; + use crate::layer::internal::base::base_tests::base_layer_files; use crate::layer::internal::base::BaseLayer; - use crate::layer::internal::child::tests::child_layer_files; + use crate::layer::internal::child::child_tests::child_layer_files; use crate::layer::internal::child::ChildLayer; use crate::layer::internal::InternalLayer; use crate::layer::simple_builder::{LayerBuilder, SimpleLayerBuilder}; diff --git a/src/structure/adjacencylist.rs b/src/structure/adjacencylist.rs index 8644bd94..5a123a78 100644 --- a/src/structure/adjacencylist.rs +++ b/src/structure/adjacencylist.rs @@ -15,6 +15,7 @@ use std::io; use std::pin::Pin; use bytes::Bytes; +use bytes::BytesMut; use super::bitarray::*; use super::bitindex::*; @@ -36,6 +37,15 @@ impl AdjacencyList { AdjacencyList { nums, bits } } + pub fn from_buffers(buffers: AdjacencyListBuffers) -> AdjacencyList { + Self::parse( + buffers.nums, + buffers.bits, + buffers.bitindex_blocks, + buffers.bitindex_sblocks, + ) + } + pub fn parse( nums_slice: Bytes, bits_slice: Bytes, @@ -224,6 +234,82 @@ pub async fn adjacency_list_stream_pairs( ) } +pub struct UnindexedAdjacencyListBufBuilder { + bitarray: BitArrayBufBuilder, + nums: LogArrayBufBuilder, + last_left: u64, + last_right: u64, +} + +impl UnindexedAdjacencyListBufBuilder { + pub fn new(width: u8) -> Self { + Self { + bitarray: BitArrayBufBuilder::new(BytesMut::new()), + nums: LogArrayBufBuilder::new(BytesMut::new(), width), + last_left: 0, + last_right: 0, + } + } + + pub fn push(&mut self, left: u64, right: u64) { + // the tricky thing with this code is that the bitarray lags one entry behind the logarray. + // The reason for this is that at push time, we do not yet know if this entry is going to be + // the last entry for `left`, we only know this when we push a greater `left` later on. + if left < self.last_left || (left == self.last_left && right <= self.last_right) { + panic!("tried to push an unordered adjacent pair"); + } + + // the left hand side of the adjacencylist is expected to be a continuous range from 1 up to the max + // but when adding entries, there may be holes. We handle holes by writing a '0' to the logarray + // (which is otherwise an invalid right-hand side) and pushing a 1 onto the bitarray to immediately close the segment. + let skip = left - self.last_left; + + if self.last_left == 0 && skip == 1 { + // this is the first entry. we can't push a bit yet + } else if skip == 0 { + // same `left` as before. so the previous entry was not the last one, and the bitarray gets a 0 appended. + self.bitarray.push(false); + } else { + // if this is the first element, but we do need to skip, make sure we write one less bit than we'd usually do + let bitskip = if self.last_left == 0 { skip - 1 } else { skip }; + // there's a different `left`. we push a bunch of 1s to the bitarray, and 0s to the num array. + for _ in 0..bitskip { + self.bitarray.push(true); + } + for _ in 0..(skip - 1) { + self.nums.push(0); + } + } + + // finally push right to the logarray + self.nums.push(right); + self.last_left = left; + self.last_right = right; + } + + pub fn push_all>(&mut self, mut iter: I) { + while let Some((left, right)) = iter.next() { + self.push(left, right); + } + } + + pub fn finalize(mut self) -> (Bytes, Bytes) { + if self.nums.count() != 0 { + // push last bit to bitarray + self.bitarray.push(true); + } + + let ba = self.bitarray.finalize(); + let nums = self.nums.finalize(); + + (ba.freeze(), nums.freeze()) + } + + pub fn count(&self) -> u64 { + self.bitarray.count() + } +} + pub struct UnindexedAdjacencyListBuilder { bitarray: BitArrayFileBuilder, nums: LogArrayFileBuilder, @@ -307,6 +393,59 @@ impl UnindexedAdjacencyListBuilder { } } +pub struct AdjacencyListBufBuilder { + builder: UnindexedAdjacencyListBufBuilder, + bitindex_blocks: BytesMut, + bitindex_sblocks: BytesMut, +} + +impl AdjacencyListBufBuilder { + pub fn new(width: u8) -> AdjacencyListBufBuilder { + AdjacencyListBufBuilder { + builder: UnindexedAdjacencyListBufBuilder::new(width), + bitindex_blocks: BytesMut::new(), + bitindex_sblocks: BytesMut::new(), + } + } + + pub fn push(&mut self, left: u64, right: u64) { + self.builder.push(left, right) + } + + pub fn push_all>(&mut self, iter: I) { + self.builder.push_all(iter) + } + + pub fn finalize(self) -> AdjacencyListBuffers { + let AdjacencyListBufBuilder { + builder, + mut bitindex_blocks, + mut bitindex_sblocks, + } = self; + let (bitfile, nums) = builder.finalize(); + + build_bitindex_from_buf(&bitfile[..], &mut bitindex_blocks, &mut bitindex_sblocks); + + AdjacencyListBuffers { + nums, + bits: bitfile, + bitindex_blocks: bitindex_blocks.freeze(), + bitindex_sblocks: bitindex_sblocks.freeze(), + } + } + + pub fn count(&self) -> u64 { + self.builder.count() + } +} + +pub struct AdjacencyListBuffers { + nums: Bytes, + bits: Bytes, + bitindex_blocks: Bytes, + bitindex_sblocks: Bytes, +} + pub struct AdjacencyListBuilder where F: 'static + FileLoad + FileStore, @@ -778,4 +917,16 @@ mod tests { result ); } + + #[test] + fn adjacencylist_buf_builder_works() { + let adjacencies = [(1, 1), (1, 5), (2, 3), (2, 7), (4, 8)]; + let mut builder = AdjacencyListBufBuilder::new(8); + builder.push_all(adjacencies.iter().copied()); + let buffers = builder.finalize(); + let aj = AdjacencyList::from_buffers(buffers); + + let result: Vec<_> = aj.iter().collect(); + assert_eq!(&adjacencies[..], &result[..]); + } } diff --git a/src/structure/bitarray.rs b/src/structure/bitarray.rs index 43ea1dba..918e37f5 100644 --- a/src/structure/bitarray.rs +++ b/src/structure/bitarray.rs @@ -34,7 +34,7 @@ use super::util; use crate::storage::*; use crate::structure::bititer::BitIter; use byteorder::{BigEndian, ByteOrder}; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures::io; use futures::stream::{Stream, StreamExt, TryStreamExt}; use std::{convert::TryFrom, error, fmt}; @@ -200,6 +200,70 @@ impl BitArray { } } +pub struct BitArrayBufBuilder { + /// Destination of the bit array data. + dest: B, + /// Storage for the next word to be written. + current: u64, + /// Number of bits written to the buffer + count: u64, +} + +impl BitArrayBufBuilder { + pub fn new(dest: B) -> BitArrayBufBuilder { + BitArrayBufBuilder { + dest, + current: 0, + count: 0, + } + } + + pub fn push(&mut self, bit: bool) { + // Set the bit in the current word. + if bit { + // Determine the position of the bit to be set from `count`. + let pos = self.count & 0b11_1111; + self.current |= 0x8000_0000_0000_0000 >> pos; + } + + // Advance the bit count. + self.count += 1; + + // Check if the new `count` has reached a word boundary. + if self.count & 0b11_1111 == 0 { + // We have filled `current`, so write it to the destination. + self.dest.put_u64(self.current); + self.current = 0; + } + } + + pub fn push_all>(&mut self, mut iter: I) { + while let Some(bit) = iter.next() { + self.push(bit); + } + } + + fn finalize_data(&mut self) { + if self.count & 0b11_1111 != 0 { + self.dest.put_u64(self.current); + } + } + + pub fn finalize(mut self) -> B { + let count = self.count; + // Write the final data word. + self.finalize_data(); + // Write the control word. + self.dest.put_u64(count); + + self.dest + } + + pub fn count(&self) -> u64 { + self.count + } +} + pub struct BitArrayFileBuilder { /// Destination of the bit array data. dest: W, @@ -317,22 +381,22 @@ pub fn bitarray_stream_blocks(r: R) -> FramedRead(b: &mut B) -> BitArrayBlockIterator { +pub fn bitarray_iter_blocks(b: B) -> BitArrayBlockIterator { BitArrayBlockIterator { buf: b, readahead: None, } } -pub struct BitArrayBlockIterator<'a, B: Buf> { - buf: &'a mut B, +pub struct BitArrayBlockIterator { + buf: B, readahead: Option, } -impl<'a, B: Buf> Iterator for BitArrayBlockIterator<'a, B> { +impl Iterator for BitArrayBlockIterator { type Item = u64; fn next(&mut self) -> Option { - decode_next_bitarray_block(self.buf, &mut self.readahead) + decode_next_bitarray_block(&mut self.buf, &mut self.readahead) } } diff --git a/src/structure/bitindex.rs b/src/structure/bitindex.rs index 76b4c117..760099a9 100644 --- a/src/structure/bitindex.rs +++ b/src/structure/bitindex.rs @@ -437,10 +437,10 @@ pub async fn build_bitindex< Ok(()) } -pub fn build_bitindex_from_block_iter<'a, I: Iterator, B1: BufMut, B2: BufMut>( - blocks_iter: &'a mut I, - blocks: &mut B1, - sblocks: &mut B2, +pub fn build_bitindex_from_block_iter, B1: BufMut, B2: BufMut>( + blocks_iter: I, + blocks: B1, + sblocks: B2, ) { // the following widths are unoptimized, but should always be large enough let mut blocks_builder = @@ -474,9 +474,9 @@ pub fn build_bitindex_from_block_iter<'a, I: Iterator, B1: BufMut, B } pub fn build_bitindex_from_buf( - bitarray: &mut B1, - blocks: &mut B2, - sblocks: &mut B3, + bitarray: B1, + blocks: B2, + sblocks: B3, ) { let mut iter = bitarray_iter_blocks(bitarray); build_bitindex_from_block_iter(&mut iter, blocks, sblocks) diff --git a/src/structure/logarray.rs b/src/structure/logarray.rs index 942925f5..86126497 100644 --- a/src/structure/logarray.rs +++ b/src/structure/logarray.rs @@ -404,9 +404,9 @@ impl LogArray { } /// write a logarray directly to an AsyncWrite -pub struct LogArrayBufBuilder<'a, B: BufMut> { +pub struct LogArrayBufBuilder { /// Destination of the log array data - buf: &'a mut B, + buf: B, /// Bit width of an element width: u8, /// Storage for the next word to be written to the buffer @@ -417,14 +417,14 @@ pub struct LogArrayBufBuilder<'a, B: BufMut> { count: u64, } -impl<'a> LogArrayBufBuilder<'a, BytesMut> { +impl + BufMut> LogArrayBufBuilder { pub fn reserve(&mut self, additional: usize) { self.buf.reserve(additional * self.width as usize / 8); } } -impl<'a, B: BufMut> LogArrayBufBuilder<'a, B> { - pub fn new(buf: &'a mut B, width: u8) -> Self { +impl LogArrayBufBuilder { + pub fn new(buf: B, width: u8) -> Self { Self { buf, width, @@ -492,10 +492,11 @@ impl<'a, B: BufMut> LogArrayBufBuilder<'a, B> { } } - pub fn finalize(mut self) { + pub fn finalize(mut self) -> B { self.finalize_data(); self.write_control_word(); + self.buf } pub(crate) fn finalize_without_control_word(mut self) {