Skip to content

Commit

Permalink
Update interactive
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 11, 2024
1 parent 6ac17bb commit 014c0c8
Showing 1 changed file with 2 additions and 9 deletions.
11 changes: 2 additions & 9 deletions interactive/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ where
let (mut park_out, park) = demux.new_output();
let (mut text_out, text) = demux.new_output();

let mut demux_buffer = Vec::new();

demux.build(move |_capability| {

move |_frontiers| {
Expand All @@ -71,8 +69,6 @@ where

input.for_each(|time, data| {

data.swap(&mut demux_buffer);

let mut operates_session = operates.session(&time);
let mut channels_session = channels.session(&time);
let mut schedule_session = schedule.session(&time);
Expand All @@ -81,7 +77,7 @@ where
let mut park_session = park.session(&time);
let mut text_session = text.session(&time);

for (time, _worker, datum) in demux_buffer.drain(..) {
for (time, _worker, datum) in data.drain(..) {

// Round time up to next multiple of `granularity_ns`.
let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns;
Expand Down Expand Up @@ -235,8 +231,6 @@ where
let (mut batch_out, batch) = demux.new_output();
let (mut merge_out, merge) = demux.new_output();

let mut demux_buffer = Vec::new();

demux.build(move |_capability| {

move |_frontiers| {
Expand All @@ -246,11 +240,10 @@ where

input.for_each(|time, data| {

data.swap(&mut demux_buffer);
let mut batch_session = batch.session(&time);
let mut merge_session = merge.session(&time);

for (time, _worker, datum) in demux_buffer.drain(..) {
for (time, _worker, datum) in data.drain(..) {

// Round time up to next multiple of `granularity_ns`.
let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns;
Expand Down

0 comments on commit 014c0c8

Please sign in to comment.