Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demonstrate columnar stuff #586

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ edition.workspace = true
columnation = { git = "https://github.com/frankmcsherry/columnation" }
flatcontainer = "0.5"
serde = { version = "1.0", features = ["derive"] }
# columnar = { path = "../../columnar" }
columnar = { git = "https://github.com/frankmcsherry/columnar" }
88 changes: 88 additions & 0 deletions container/src/columnar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! Present a columnar container as a timely container.

use serde::{Serialize, Deserialize};

pub use columnar::*;
use columnar::common::IterOwn;

use crate::{Container, SizableContainer, PushInto};

/// A container based on a `columnar` store.
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct Columnar<C> {
store: C,
}

impl<C: Len + Clear + Clone + Default + 'static> Container for Columnar<C>
where
for<'a> &'a C: columnar::Index,
{
fn len(&self) -> usize { self.store.len() }
fn clear(&mut self) { self.store.clear() }

type ItemRef<'a> = <&'a C as Index>::Ref where Self: 'a;
type Iter<'a> = IterOwn<&'a C>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { (&self.store).into_iter() }

type Item<'a> = <&'a C as Index>::Ref where Self: 'a;
type DrainIter<'a> = IterOwn<&'a C>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { (&self.store).into_iter() }
}

impl<C: Len + Clear + Clone + Default + 'static> SizableContainer for Columnar<C>
where
for<'a> &'a C: columnar::Index,
{
fn capacity(&self) -> usize { 1024 }
fn preferred_capacity() -> usize { 1024 }
fn reserve(&mut self, _additional: usize) { }
}

impl<C: columnar::Push<T>, T> PushInto<T> for Columnar<C> {
#[inline]
fn push_into(&mut self, item: T) {
self.store.push(item);
}
}


use columnar::bytes::{AsBytes, FromBytes, serialization::decode};

/// A container based on a columnar store, encoded in aligned bytes.
#[derive(Clone, Default)]
pub struct ColumnarBytes<B, C> {
bytes: B,
phantom: std::marker::PhantomData<C>,
}

impl<B: std::ops::Deref<Target = [u64]> + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> Container for ColumnarBytes<B, C>
where
for<'a> C::Borrowed<'a> : Len + Clear + Index,
{
fn len(&self) -> usize {
<C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(&self.bytes)).len()
}
// Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do.
fn clear(&mut self) { unimplemented!() }

type ItemRef<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> {
<C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(&self.bytes)).into_iter()
}

type Item<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
<C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(&self.bytes)).into_iter()
}
}

impl<B: std::ops::Deref<Target = [u64]> + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> SizableContainer for ColumnarBytes<B, C>
where
for<'a> C::Borrowed<'a> : Len + Clear + Index,
{
fn capacity(&self) -> usize { 1024 }
fn preferred_capacity() -> usize { 1024 }
fn reserve(&mut self, _additional: usize) { }
}
1 change: 1 addition & 0 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::VecDeque;

pub mod columnation;
pub mod flatcontainer;
pub mod columnar;

/// A container transferring data through dataflow edges
///
Expand Down
98 changes: 98 additions & 0 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Wordcount based on flatcontainer.

use {
std::collections::HashMap,
timely::{Container, container::CapacityContainerBuilder},
timely::container::columnar::Columnar,
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};

fn main() {

use timely_container::columnar::Strings;
type Container = Columnar<(Strings, Vec<i64>)>;

// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let mut input = <InputHandleCore<_, CapacityContainerBuilder<Container>>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
worker.dataflow::<usize, _, _>(|scope| {
input
.to_stream(scope)
.unary(
Pipeline,
"Split",
|_cap, _info| {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session(&time);
for (text, diff) in data.iter().flat_map(|(text, diff)| {
text.split_whitespace().map(move |s| (s, diff))
}) {
session.give((text, diff));
}
}
}
},
)
.container::<Container>()
.unary_frontier(
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
let mut counts = HashMap::new();

move |input, output| {
while let Some((time, data)) = input.next() {
queues
.entry(time.retain())
.or_insert(Vec::new())
.push(data.take());
}

for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for batch in val.drain(..) {
for (word, diff) in batch.iter() {
let total =
if let Some(count) = counts.get_mut(word) {
*count += diff;
*count
}
else {
counts.insert(word.to_string(), *diff);
*diff
};
session.give((word, total));
}
}
}
}

queues.retain(|_key, val| !val.is_empty());
}
},
)
.container::<Container>()
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});

// introduce data and watch!
for round in 0..10 {
input.send(("flat container", 1));
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
})
.unwrap();
}
Loading