Skip to content

Commit

Permalink
make capture::source's token thread-safe (#364)
Browse files Browse the repository at this point in the history
* make capture::source's token thread-safe

* clean up capability dropping more

* break tuple out into variables
  • Loading branch information
guswynn authored Apr 25, 2022
1 parent 27492f2 commit 8155e02
Showing 1 changed file with 74 additions and 55 deletions.
129 changes: 74 additions & 55 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,24 @@ pub mod source {
use std::cell::RefCell;
use std::hash::Hash;
use std::rc::Rc;
use std::marker::{Send, Sync};
use std::sync::Arc;
use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
use timely::progress::Timestamp;
use timely::scheduling::SyncActivator;
use timely::scheduling::{SyncActivator, activate::SyncActivateOnDrop};

// TODO(guswynn): implement this generally in timely
struct DropActivator {
activator: Arc<SyncActivator>,
}

impl Drop for DropActivator {
fn drop(&mut self) {
// Best effort: failure to activate
// is ignored
let _ = self.activator.activate();
}
}

/// Constructs a stream of updates from a source of messages.
///
Expand All @@ -277,7 +292,7 @@ pub mod source {
pub fn build<G, B, I, D, T, R>(
scope: G,
source_builder: B,
) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
where
G: Scope<Timestamp = T>,
B: FnOnce(SyncActivator) -> I,
Expand Down Expand Up @@ -324,8 +339,7 @@ pub mod source {
// Some message distribution logic depends on the number of workers.
let workers = scope.peers();

// Vector of strong references to capabilities, which can be dropped to terminate the sources.
let mut tokens = Vec::new();
let mut token = None;
// Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
let mut antichain = MutableAntichain::new();
antichain.update_iter(Some((T::minimum(), workers as i64)));
Expand All @@ -337,66 +351,71 @@ pub mod source {
let address = messages_op.operator_info().address;
let activator = scope.sync_activator_for(&address);
let activator2 = scope.activator_for(&address);
let activations = scope.activations();
let drop_activator = Arc::new(SyncActivateOnDrop::new((), scope.sync_activator_for(&address)));
let mut source = source_builder(activator);
let (mut updates_out, updates) = messages_op.new_output();
let (mut progress_out, progress) = messages_op.new_output();
let tokens_mut = &mut tokens;
messages_op.build(move |capabilities| {
messages_op.build(|capabilities| {

// A Weak that communicates whether the returned token has been dropped.
let drop_activator_weak = Arc::downgrade(&drop_activator);

token = Some(drop_activator);

// Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
// First, wrap capabilities in a rc refcell so that they can be downgraded to weak references.
use timely::scheduling::activate::ActivateOnDrop;
let capability_sets = (CapabilitySet::from_elem(capabilities[0].clone()), CapabilitySet::from_elem(capabilities[1].clone()));
let capability_sets = ActivateOnDrop::new(capability_sets, Rc::new(address), activations);
let strong_capabilities = Rc::new(RefCell::new(capability_sets));
let local_capabilities = Rc::downgrade(&strong_capabilities);
tokens_mut.push(strong_capabilities);
let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
// Capture the shared frontier to read out frontier updates to apply.
let local_frontier = shared_frontier.clone();
//
move |_frontiers| {
// First check to ensure that we haven't been terminated by someone dropping our tokens.
if let Some(capabilities) = local_capabilities.upgrade() {
let (updates_caps, progress_caps) = &mut **capabilities.borrow_mut();
// Consult our shared frontier, and ensure capabilities are downgraded to it.
let shared_frontier = local_frontier.borrow();
updates_caps.downgrade(&shared_frontier.frontier());
progress_caps.downgrade(&shared_frontier.frontier());

// Next check to see if we have been terminated by the source being complete.
if !updates_caps.is_empty() && !progress_caps.is_empty() {
let mut updates = updates_out.activate();
let mut progress = progress_out.activate();

// TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
// Specifically, there may not be one capability valid for all updates.
let mut updates_session = updates.session(&updates_caps[0]);
let mut progress_session = progress.session(&progress_caps[0]);

// We presume the iterator will yield if appropriate.
while let Some(message) = source.next() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
if drop_activator_weak.upgrade().is_none() {
// Give up our capabilities
updates_caps.downgrade(&[]);
progress_caps.downgrade(&[]);
// never continue, even if we are (erroneously) activated again.
return;
}

// Consult our shared frontier, and ensure capabilities are downgraded to it.
let shared_frontier = local_frontier.borrow();
updates_caps.downgrade(&shared_frontier.frontier());
progress_caps.downgrade(&shared_frontier.frontier());

// Next check to see if we have been terminated by the source being complete.
if !updates_caps.is_empty() && !progress_caps.is_empty() {
let mut updates = updates_out.activate();
let mut progress = progress_out.activate();

// TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
// Specifically, there may not be one capability valid for all updates.
let mut updates_session = updates.session(&updates_caps[0]);
let mut progress_session = progress.session(&progress_caps[0]);

// We presume the iterator will yield if appropriate.
while let Some(message) = source.next() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
}
Message::Progress(progress) => {
// We must send a copy of each progress message to all workers,
// but we can partition the counts across workers by timestamp.
let mut to_worker = vec![Vec::new(); workers];
for (time, count) in progress.counts {
to_worker[(time.hashed() as usize) % workers]
.push((time, count));
}
Message::Progress(progress) => {
// We must send a copy of each progress message to all workers,
// but we can partition the counts across workers by timestamp.
let mut to_worker = vec![Vec::new(); workers];
for (time, count) in progress.counts {
to_worker[(time.hashed() as usize) % workers]
.push((time, count));
}
for (worker, counts) in to_worker.into_iter().enumerate() {
progress_session.give((
worker,
Progress {
lower: progress.lower.clone(),
upper: progress.upper.clone(),
counts,
},
));
}
for (worker, counts) in to_worker.into_iter().enumerate() {
progress_session.give((
worker,
Progress {
lower: progress.lower.clone(),
upper: progress.upper.clone(),
counts,
},
));
}
}
}
Expand Down Expand Up @@ -558,7 +577,7 @@ pub mod source {
}
});

(Box::new(tokens), changes)
(Box::new(token.unwrap()), changes)
}
}

Expand Down

0 comments on commit 8155e02

Please sign in to comment.