Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM: Remove Container::Item #538

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions container/src/columnation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,6 @@ mod container {
use crate::columnation::{Columnation, TimelyStack};

impl<T: Columnation + 'static> Container for TimelyStack<T> {
type Item = T;

fn len(&self) -> usize {
self.local.len()
}
Expand All @@ -316,6 +314,7 @@ mod container {
}

impl<T: Columnation + 'static> PushPartitioned for TimelyStack<T> {
type Item = T;
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
I: FnMut(&Self::Item) -> usize,
Expand Down
13 changes: 4 additions & 9 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ pub mod columnation;
/// is efficient (which is not necessarily the case when deriving `Clone`.)
/// TODO: Don't require `Container: Clone`
pub trait Container: Default + Clone + 'static {
/// The type of elements this container holds.
type Item;

/// The number of elements in this container
///
/// The length of a container must be consistent between sending and receiving it.
Expand All @@ -41,8 +38,6 @@ pub trait Container: Default + Clone + 'static {
}

impl<T: Clone + 'static> Container for Vec<T> {
type Item = T;

fn len(&self) -> usize {
Vec::len(self)
}
Expand All @@ -64,8 +59,6 @@ mod rc {
use crate::Container;

impl<T: Container> Container for Rc<T> {
type Item = T::Item;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}
Expand Down Expand Up @@ -95,8 +88,6 @@ mod arc {
use crate::Container;

impl<T: Container> Container for Arc<T> {
type Item = T::Item;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}
Expand All @@ -122,6 +113,9 @@ mod arc {

/// A container that can partition itself into pieces.
pub trait PushPartitioned: Container {
/// Type of item to distribute among containers.
type Item;

/// Partition and push this container.
///
/// Drain all elements from `self`, and use the function `index` to determine which `buffer` to
Expand All @@ -133,6 +127,7 @@ pub trait PushPartitioned: Container {
}

impl<T: Clone + 'static> PushPartitioned for Vec<T> {
type Item = T;
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
I: FnMut(&Self::Item) -> usize,
Expand Down
2 changes: 0 additions & 2 deletions mdbook/src/chapter_5/chapter_5_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ impl<T, E> Default for ResultContainer<T, E> {
}

impl<T: Clone + 'static, E: Clone + 'static> Container for ResultContainer<T, E> {
type Item = Result<T, E>;

fn len(&self) -> usize {
match self {
ResultContainer::Ok(data) => data.len(),
Expand Down
20 changes: 10 additions & 10 deletions timely/src/dataflow/operators/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::operators::generic::Operator;

/// Methods to inspect records and batches of records on a stream.
pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
pub trait Inspect<G: Scope, C: Container, I>: InspectCore<G, C> + Sized {
/// Runs a supplied closure on each observed data element.
///
/// # Examples
Expand All @@ -21,7 +21,7 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn inspect(&self, mut func: impl FnMut(&C::Item)+'static) -> Self {
fn inspect(&self, mut func: impl FnMut(&I)+'static) -> Self {
self.inspect_batch(move |_, data| {
for datum in data.iter() { func(datum); }
})
Expand All @@ -38,7 +38,7 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
/// });
/// ```
fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &C::Item)+'static) -> Self {
fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &I)+'static) -> Self {
self.inspect_batch(move |time, data| {
for datum in data.iter() {
func(&time, &datum);
Expand All @@ -57,7 +57,7 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
/// });
/// ```
fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[C::Item])+'static) -> Self {
fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[I])+'static) -> Self {
self.inspect_core(move |event| {
if let Ok((time, data)) = event {
func(time, data);
Expand All @@ -84,25 +84,25 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// });
/// });
/// ```
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static;
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[I]), &[G::Timestamp]>)+'static;
}

impl<G: Scope, D: Data> Inspect<G, Vec<D>> for StreamCore<G, Vec<D>> {
impl<G: Scope, D: Data> Inspect<G, Vec<D>, D> for StreamCore<G, Vec<D>> {
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
}
}

impl<G: Scope, D: Data+Columnation> Inspect<G, TimelyStack<D>> for StreamCore<G, TimelyStack<D>> {
impl<G: Scope, D: Data+Columnation> Inspect<G, TimelyStack<D>, D> for StreamCore<G, TimelyStack<D>> {
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
}
}

impl<G: Scope, C: Container> Inspect<G, Rc<C>> for StreamCore<G, Rc<C>>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the semantics for inspecting Rc<C>, and I'm not sure we use this. It might be relevant for consuming streams of differential updates at some point, but at the moment it's Vec<Rc<Batch>> and not Rc<Container<Batch>> for some unspecified container.

where C: AsRef<[C::Item]>
impl<G: Scope, C: Container> Inspect<G, Rc<C>, C> for StreamCore<G, Rc<C>>
where C: AsRef<[C]>
{
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static {
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C]), &[G::Timestamp]>) + 'static {
self.inspect_container(move |r| func(r.map(|(t, c)| (t, c.as_ref().as_ref()))))
}
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/reclock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ pub trait Reclock<S: Scope> {
/// assert_eq!(extracted[1], (5, vec![4,5]));
/// assert_eq!(extracted[2], (8, vec![6,7,8]));
/// ```
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> Self;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only looking for a progress stream, but requires a container because we have no other way to express its expectations.

fn reclock<TC: Container>(&self, clock: &StreamCore<S, TC>) -> Self;
}

impl<S: Scope, C: Container> Reclock<S> for StreamCore<S, C> {
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {
fn reclock<TC: Container>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {

let mut stash = vec![];

Expand Down
Loading