diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567e..ad990cb79 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -76,7 +76,7 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(&self, handle: &mut Handle) -> StreamCore; + fn probe_with(&self, handle: &Handle) -> StreamCore; } impl Probe for StreamCore { @@ -87,14 +87,14 @@ impl Probe for StreamCore { self.probe_with(&mut handle); handle } - fn probe_with(&self, handle: &mut Handle) -> StreamCore { + fn probe_with(&self, handle: &Handle) -> StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); let (tee, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(tee)); - let shared_frontier = handle.frontier.clone(); + let shared_frontier = Rc::downgrade(&handle.frontier); let mut started = false; let mut vector = Default::default(); @@ -103,8 +103,10 @@ impl Probe for StreamCore { move |progress| { // surface all frontier changes to the shared frontier. - let mut borrow = shared_frontier.borrow_mut(); - borrow.update_iter(progress.frontiers[0].drain()); + if let Some(shared_frontier) = shared_frontier.upgrade() { + let mut borrow = shared_frontier.borrow_mut(); + borrow.update_iter(progress.frontiers[0].drain()); + } if !started { // discard initial capability.