Skip to content

Commit

Permalink
Probe only retains weak handle to Rc (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
antiguru committed Feb 5, 2024
1 parent 64be92b commit 86b3d69
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions timely/src/dataflow/operators/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub trait Probe<G: Scope, D: Container> {
/// }
/// }).unwrap();
/// ```
fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> StreamCore<G, D>;
fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, D>;
}

impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
Expand All @@ -87,14 +87,14 @@ impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
self.probe_with(&mut handle);
handle
}
fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> StreamCore<G, D> {
fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, D> {

let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
let (tee, stream) = builder.new_output();
let mut output = PushBuffer::new(PushCounter::new(tee));

let shared_frontier = handle.frontier.clone();
let shared_frontier = Rc::downgrade(&handle.frontier);
let mut started = false;

let mut vector = Default::default();
Expand All @@ -103,8 +103,10 @@ impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
move |progress| {

// surface all frontier changes to the shared frontier.
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(progress.frontiers[0].drain());
if let Some(shared_frontier) = shared_frontier.upgrade() {
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(progress.frontiers[0].drain());
}

if !started {
// discard initial capability.
Expand Down

0 comments on commit 86b3d69

Please sign in to comment.