Skip to content

Commit

Permalink
Let Layout specify OffsetContainer (#449)
Browse files Browse the repository at this point in the history
* Let Layout specify OffsetContainer

Signed-off-by: Moritz Hoffmann <[email protected]>

* Generalize ReadItem type

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
Co-authored-by: Frank McSherry <[email protected]>
  • Loading branch information
antiguru and frankmcsherry authored Dec 23, 2023
1 parent 30ac63d commit 304ed9d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 34 deletions.
91 changes: 90 additions & 1 deletion src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub use self::ord_neu::OrdValSpine as ValSpine;
pub use self::ord_neu::OrdKeySpine as KeySpine;

use std::borrow::{ToOwned};
use std::cmp::Ordering;

use timely::container::columnation::{Columnation, TimelyStack};
use crate::lattice::Lattice;
Expand Down Expand Up @@ -97,6 +98,8 @@ pub trait Layout {
/// Container for update vals.
type UpdContainer:
for<'a> BatchContainer<PushItem=(<Self::Target as Update>::Time, <Self::Target as Update>::Diff), ReadItem<'a> = &'a (<Self::Target as Update>::Time, <Self::Target as Update>::Diff)>;
/// Container for offsets.
type OffsetContainer: BatchContainer<PushItem=usize>;
}

/// A layout that uses vectors
Expand All @@ -113,6 +116,7 @@ where
type KeyContainer = Vec<U::Key>;
type ValContainer = Vec<U::Val>;
type UpdContainer = Vec<(U::Time, U::Diff)>;
type OffsetContainer = OffsetList;
}

/// A layout based on timely stacks
Expand All @@ -131,6 +135,7 @@ where
type KeyContainer = TimelyStack<U::Key>;
type ValContainer = TimelyStack<U::Val>;
type UpdContainer = TimelyStack<(U::Time, U::Diff)>;
type OffsetContainer = OffsetList;
}

/// A type with a preferred container.
Expand Down Expand Up @@ -183,10 +188,13 @@ where
type KeyContainer = K::Container;
type ValContainer = V::Container;
type UpdContainer = Vec<(T, D)>;
type OffsetContainer = OffsetList;
}

use std::convert::TryInto;
use std::ops::Deref;
use abomonation_derive::Abomonation;
use crate::trace::cursor::MyTrait;

/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)]
Expand Down Expand Up @@ -234,6 +242,87 @@ impl OffsetList {
}
}

/// Helper struct to provide `MyTrait` for `Copy` types.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
pub struct Wrapper<T: Copy>(T);

impl<T: Copy> Deref for Wrapper<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper<T> {
type Owned = T;

fn into_owned(self) -> Self::Owned {
self.0
}

fn clone_onto(&self, other: &mut Self::Owned) {
*other = self.0;
}

fn compare(&self, other: &Self::Owned) -> Ordering {
self.0.cmp(other)
}

fn borrow_as(other: &'a Self::Owned) -> Self {
Self(*other)
}
}

impl BatchContainer for OffsetList {
type PushItem = usize;
type ReadItem<'a> = Wrapper<usize>;

fn push(&mut self, item: Self::PushItem) {
self.push(item);
}

fn copy_push(&mut self, item: &Self::PushItem) {
self.push(*item);
}

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.push(item.0);
}

fn copy_slice(&mut self, slice: &[Self::PushItem]) {
for index in slice {
self.push(*index);
}
}

fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for offset in start..end {
self.push(other.index(offset));
}
}

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}

fn reserve(&mut self, _additional: usize) {
// Nop
}

fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self::with_capacity(cont1.len() + cont2.len())
}

fn index(&self, index: usize) -> Self::ReadItem<'_> {
Wrapper(self.index(index))
}

fn len(&self) -> usize {
self.len()
}
}

pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2};

/// Containers for data that resemble `Vec<T>`, with leaner implementations.
Expand All @@ -245,7 +334,7 @@ pub mod containers {
use crate::trace::MyTrait;

/// A general-purpose container resembling `Vec<T>`.
pub trait BatchContainer: Default + 'static {
pub trait BatchContainer: 'static {
/// The type of contained item.
///
/// The container only supplies references to the item, so it needn't be sized.
Expand Down
42 changes: 23 additions & 19 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ mod val_batch {
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, OffsetList};
use crate::trace::implementations::BatchContainer;
use crate::trace::cursor::MyTrait;

use super::{Layout, Update};
Expand All @@ -83,7 +83,7 @@ mod val_batch {
/// Offsets used to provide indexes from keys to values.
///
/// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
pub keys_offs: OffsetList,
pub keys_offs: L::OffsetContainer,
/// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
pub vals: L::ValContainer,
/// Offsets used to provide indexes from values to updates.
Expand All @@ -94,20 +94,20 @@ mod val_batch {
/// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
///
/// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
pub vals_offs: OffsetList,
pub vals_offs: L::OffsetContainer,
/// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
pub updates: L::UpdContainer,
}

impl<L: Layout> OrdValStorage<L> {
/// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
fn values_for_key(&self, index: usize) -> (usize, usize) {
(self.keys_offs.index(index), self.keys_offs.index(index+1))
(self.keys_offs.index(index).into_owned(), self.keys_offs.index(index+1).into_owned())
}
/// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
fn updates_for_value(&self, index: usize) -> (usize, usize) {
let mut lower = self.vals_offs.index(index);
let upper = self.vals_offs.index(index+1);
let mut lower = self.vals_offs.index(index).into_owned();
let upper = self.vals_offs.index(index+1).into_owned();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
Expand Down Expand Up @@ -206,14 +206,17 @@ mod val_batch {

let mut storage = OrdValStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
};

storage.keys_offs.push(0);
storage.vals_offs.push(0);
// Mark explicit types because type inference fails to resolve it.
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);
let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
vals_offs.push(0);

OrdValMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -546,9 +549,9 @@ mod val_batch {
Self {
result: OrdValStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: OffsetList::with_capacity(keys + 1),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
vals: L::ValContainer::with_capacity(vals),
vals_offs: OffsetList::with_capacity(vals + 1),
vals_offs: L::OffsetContainer::with_capacity(vals + 1),
updates: L::UpdContainer::with_capacity(upds),
},
singleton: None,
Expand Down Expand Up @@ -636,7 +639,7 @@ mod key_batch {
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, OffsetList};
use crate::trace::implementations::BatchContainer;
use crate::trace::cursor::MyTrait;

use super::{Layout, Update};
Expand All @@ -654,16 +657,16 @@ mod key_batch {
/// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
///
/// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
pub keys_offs: OffsetList,
pub keys_offs: L::OffsetContainer,
/// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
pub updates: L::UpdContainer,
}

impl<L: Layout> OrdKeyStorage<L> {
/// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
fn updates_for_key(&self, index: usize) -> (usize, usize) {
let mut lower = self.keys_offs.index(index);
let upper = self.keys_offs.index(index+1);
let mut lower = self.keys_offs.index(index).into_owned();
let upper = self.keys_offs.index(index+1).into_owned();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
Expand Down Expand Up @@ -763,11 +766,12 @@ mod key_batch {

let mut storage = OrdKeyStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
};

storage.keys_offs.push(0);
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);

OrdKeyMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -1011,7 +1015,7 @@ mod key_batch {
Self {
result: OrdKeyStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: OffsetList::with_capacity(keys + 1),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
updates: L::UpdContainer::with_capacity(upds),
},
singleton: None,
Expand Down
31 changes: 17 additions & 14 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ mod val_batch {
use crate::hashable::Hashable;

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, OffsetList};
use crate::trace::implementations::BatchContainer;
use crate::trace::cursor::MyTrait;

use super::{Layout, Update, HashOrdered};
Expand Down Expand Up @@ -122,7 +122,7 @@ mod val_batch {
/// Offsets used to provide indexes from keys to values.
///
/// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
pub keys_offs: OffsetList,
pub keys_offs: L::OffsetContainer,
/// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
pub vals: L::ValContainer,
/// Offsets used to provide indexes from values to updates.
Expand All @@ -133,7 +133,7 @@ mod val_batch {
/// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
///
/// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
pub vals_offs: OffsetList,
pub vals_offs: L::OffsetContainer,
/// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
pub updates: L::UpdContainer,
}
Expand All @@ -144,16 +144,16 @@ mod val_batch {
{
/// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
fn values_for_key(&self, index: usize) -> (usize, usize) {
let lower = self.keys_offs.index(index);
let upper = self.keys_offs.index(index+1);
let lower = self.keys_offs.index(index).into_owned();
let upper = self.keys_offs.index(index+1).into_owned();
// Looking up values for an invalid key indicates something is wrong.
assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
(lower, upper)
}
/// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
fn updates_for_value(&self, index: usize) -> (usize, usize) {
let mut lower = self.vals_offs.index(index);
let upper = self.vals_offs.index(index+1);
let mut lower = self.vals_offs.index(index).into_owned();
let upper = self.vals_offs.index(index+1).into_owned();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
Expand All @@ -178,7 +178,7 @@ mod val_batch {
// push additional blank entries in.
while self.keys.len() < desired {
// We insert a default (dummy) key and repeat the offset to indicate this.
let current_offset = self.keys_offs.index(self.keys.len());
let current_offset = self.keys_offs.index(self.keys.len()).into_owned();
self.keys.push(Default::default());
self.keys_offs.push(current_offset);
}
Expand Down Expand Up @@ -339,17 +339,20 @@ mod val_batch {

let mut storage = RhhValStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
key_count: 0,
key_capacity: rhh_cap,
divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
};

storage.keys_offs.push(0);
storage.vals_offs.push(0);
// Mark explicit types because type inference fails to resolve it.
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);
let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
vals_offs.push(0);

RhhValMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -746,9 +749,9 @@ mod val_batch {
Self {
result: RhhValStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: OffsetList::with_capacity(keys + 1),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
vals: L::ValContainer::with_capacity(vals),
vals_offs: OffsetList::with_capacity(vals + 1),
vals_offs: L::OffsetContainer::with_capacity(vals + 1),
updates: L::UpdContainer::with_capacity(upds),
key_count: 0,
key_capacity: rhh_capacity,
Expand Down

0 comments on commit 304ed9d

Please sign in to comment.