Skip to content

Commit

Permalink
Builder::with_capacity enrichment
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 24, 2023
1 parent 0dded6b commit 49ba02f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/hashable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::hash::Hasher;
/// can take advantage of the smaller size.
pub trait Hashable {
/// The type of the output value.
type Output: Into<u64>+Copy;
type Output: Into<u64>+Copy+Ord;
/// A well-distributed integer derived from the data.
fn hashed(&self) -> Self::Output;
}
Expand Down
29 changes: 27 additions & 2 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,36 @@ impl<U: Update> Batcher for MergeBatcher<U> {
#[inline(never)]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {

let mut builder = B::new();

let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);

// Determine the number of distinct keys, values, and updates,
// and form a builder pre-sized for these numbers.
let mut builder = {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for buffer in merged.iter() {
for ((key, val), time, _) in buffer.iter() {
if !upper.less_equal(time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
}
else if p_val != val {
vals += 1;
}
upds += 1;
}
prev_keyval = Some((key, val));
}
}
}
B::with_capacity(keys, vals, upds)
};

let mut kept = Vec::new();
let mut keep = Vec::new();

Expand Down
29 changes: 27 additions & 2 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,36 @@ where
#[inline]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {

let mut builder = B::new();

let mut merged = Default::default();
self.sorter.finish_into(&mut merged);

// Determine the number of distinct keys, values, and updates,
// and form a builder pre-sized for these numbers.
let mut builder = {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for buffer in merged.iter() {
for ((key, val), time, _) in buffer.iter() {
if !upper.less_equal(time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
}
else if p_val != val {
vals += 1;
}
upds += 1;
}
prev_keyval = Some((key, val));
}
}
}
B::with_capacity(keys, vals, upds)
};

let mut kept = Vec::new();
let mut keep = TimelyStack::default();

Expand Down
19 changes: 4 additions & 15 deletions src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,9 @@ where
type Time = <L::Target as Update>::Time;
type Output = OrdValBatch<L>;

fn new() -> Self {
fn with_capacity(_keys: usize, _vals: usize, upds: usize) -> Self {
OrdValBuilder {
builder: <KVTDBuilder<L>>::new(),
}
}
fn with_capacity(cap: usize) -> Self {
OrdValBuilder {
builder: <KVTDBuilder<L> as TupleBuilder>::with_capacity(cap),
builder: <KVTDBuilder<L> as TupleBuilder>::with_capacity(upds),
}
}

Expand Down Expand Up @@ -628,15 +623,9 @@ where
type Time = <L::Target as Update>::Time;
type Output = OrdKeyBatch<L>;

fn new() -> Self {
OrdKeyBuilder {
builder: <KTDBuilder<L>>::new(),
}
}

fn with_capacity(cap: usize) -> Self {
fn with_capacity(_keys: usize, _vals: usize, upds: usize) -> Self {
OrdKeyBuilder {
builder: <KTDBuilder<L> as TupleBuilder>::with_capacity(cap),
builder: <KTDBuilder<L> as TupleBuilder>::with_capacity(upds),
}
}

Expand Down
13 changes: 6 additions & 7 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,16 +509,15 @@ mod val_batch {
type Time = <L::Target as Update>::Time;
type Output = OrdValBatch<L>;

fn new() -> Self { Self::with_capacity(0) }
fn with_capacity(cap: usize) -> Self {
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
// We don't introduce zero offsets as they will be introduced by the first `push` call.
Self {
result: OrdValStorage {
keys: L::KeyContainer::with_capacity(cap),
keys_offs: Vec::with_capacity(cap),
vals: L::ValContainer::with_capacity(cap),
vals_offs: Vec::with_capacity(cap),
updates: L::UpdContainer::with_capacity(cap),
keys: L::KeyContainer::with_capacity(keys),
keys_offs: Vec::with_capacity(keys + 1),
vals: L::ValContainer::with_capacity(vals),
vals_offs: Vec::with_capacity(vals + 1),
updates: L::UpdContainer::with_capacity(upds),
},
singleton: None,
singletons: 0,
Expand Down
19 changes: 11 additions & 8 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub trait Batcher {
}

/// Functionality for building batches from ordered update sequences.
pub trait Builder {
pub trait Builder: Sized {
/// Input item type.
type Item;
/// Timestamp type.
Expand All @@ -323,9 +323,14 @@ pub trait Builder {
type Output;

/// Allocates an empty builder.
fn new() -> Self;
/// Allocates an empty builder with some capacity.
fn with_capacity(cap: usize) -> Self;
///
/// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`.
// #[deprecated]
fn new() -> Self { Self::with_capacity(0, 0, 0) }
/// Allocates an empty builder with capacity for the specified keys, values, and updates.
///
/// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
/// Adds an element to the batch.
///
/// The default implementation uses `self.copy` with references to the owned arguments.
Expand Down Expand Up @@ -443,8 +448,7 @@ pub mod rc_blanket_impls {
type Item = B::Item;
type Time = B::Time;
type Output = Rc<B::Output>;
fn new() -> Self { RcBuilder { builder: B::new() } }
fn with_capacity(cap: usize) -> Self { RcBuilder { builder: B::with_capacity(cap) } }
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(lower, upper, since)) }
Expand Down Expand Up @@ -550,8 +554,7 @@ pub mod abomonated_blanket_impls {
type Item = B::Item;
type Time = B::Time;
type Output = Abomonated<B::Output, Vec<u8>>;
fn new() -> Self { AbomonatedBuilder { builder: B::new() } }
fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(cap) } }
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output {
Expand Down

0 comments on commit 49ba02f

Please sign in to comment.