diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index ac0ef7f8f..2235c4df9 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -9,6 +9,7 @@ use timely::dataflow::Scope; use crate::{Collection, ExchangeData, Hashable}; +use crate::consolidation::ConsolidatingContainerBuilder; use crate::difference::Semigroup; use crate::Data; @@ -92,14 +93,13 @@ where use crate::collection::AsCollection; self.inner - .unary(Pipeline, "ConsolidateStream", |_cap, _info| { + .unary::, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| { let mut vector = Vec::new(); move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - crate::consolidation::consolidate_updates(&mut vector); - output.session(&time).give_container(&mut vector); + output.session_with_builder(&time).give_container(&mut vector); }) } })