Skip to content

Commit

Permalink
Probe only retains weak handle to Rc
Browse files Browse the repository at this point in the history
The probe operator writes into a shared frontier, but it does so even if
there is no other reference to the shared frontier. This change adjusts
the behavior to only write to the shared frontier while there exists
another reference.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Feb 5, 2024
1 parent 64be92b commit 90c7e9f
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions timely/src/dataflow/operators/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub trait Probe<G: Scope, D: Container> {
/// }
/// }).unwrap();
/// ```
fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> StreamCore<G, D>;
fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, D>;
}

impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
Expand All @@ -87,14 +87,14 @@ impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
self.probe_with(&mut handle);
handle
}
fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> StreamCore<G, D> {
fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, D> {

let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
let (tee, stream) = builder.new_output();
let mut output = PushBuffer::new(PushCounter::new(tee));

let shared_frontier = handle.frontier.clone();
let shared_frontier = Rc::downgrade(&handle.frontier);
let mut started = false;

let mut vector = Default::default();
Expand All @@ -103,8 +103,10 @@ impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
move |progress| {

// surface all frontier changes to the shared frontier.
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(progress.frontiers[0].drain());
if let Some(shared_frontier) = shared_frontier.upgrade() {
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(progress.frontiers[0].drain());
}

if !started {
// discard initial capability.
Expand Down

0 comments on commit 90c7e9f

Please sign in to comment.