Skip to content

Commit

Permalink
log park and unpark
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed May 5, 2019
1 parent 768b9ee commit 41b4d6f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## Unreleased

### Added

A `Worker` now has a `step_or_park(Option<Duration>)` method, which instructs the worker to take a step and gives it permission to part the worker thread for at most the supplied timeout if there is no work to perform. A value of `None` implies no timeout (unboundedly parked) whereas a value of `Some(0)` should return immediately. The communication layers are implemented to awaken workers if they receive new communications, and workers should hand out copies of their `Thread` if they want other threads to wake them for other reasons (e.g. queues from threads external to timely).

## 0.9.0

### Added
Expand Down
29 changes: 29 additions & 0 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,29 @@ pub struct InputEvent {
pub start_stop: StartStop,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Input logic start/stop
pub struct ParkEvent {
/// True when activity begins, false when it stops
pub event: ParkUnpark
}

impl ParkEvent {
/// Creates a new park event from the supplied duration.
pub fn park(duration: Option<Duration>) -> Self { ParkEvent { event: ParkUnpark::Park(duration) } }
/// Creates a new unpark event.
pub fn unpark() -> Self { ParkEvent { event: ParkUnpark::Unpark } }
}

/// Records the starting and stopping of an operator.
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
pub enum ParkUnpark {
/// Worker parks.
Park(Option<Duration>),
/// Worker unparks.
Unpark,
}

#[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// An event in a timely worker
pub enum TimelyEvent {
Expand Down Expand Up @@ -225,6 +248,8 @@ pub enum TimelyEvent {
CommChannels(CommChannelsEvent),
/// Input event.
Input(InputEvent),
/// Park event.
Park(ParkEvent),
/// Unstructured event.
Text(String),
}
Expand Down Expand Up @@ -276,3 +301,7 @@ impl From<CommChannelsEvent> for TimelyEvent {
impl From<InputEvent> for TimelyEvent {
fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) }
}

impl From<ParkEvent> for TimelyEvent {
fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) }
}
20 changes: 16 additions & 4 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ impl<A: Allocate> Worker<A> {
/// Performs one step of the computation.
///
/// A step gives each dataflow operator a chance to run, and is the
/// main way to ensure that a computation proceeds. This method may
/// park the thread until there is work to perform, with an optional
/// timeout.
/// main way to ensure that a computation proceeds.
///
/// This method takes an optional timeout and may park the thread until
/// there is work to perform or until this timeout expires. A value of
/// `None` allows the worker to park indefinitely, whereas a value of
/// `Some(Duration::new(0, 0))` will return without parking the thread.
///
/// # Examples
///
Expand Down Expand Up @@ -196,10 +199,19 @@ impl<A: Allocate> Worker<A> {
.borrow_mut()
.advance();

if self.activations.borrow().is_empty() && !self.dataflows.borrow().is_empty() {
// Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
if self.activations.borrow().is_empty() && !self.dataflows.borrow().is_empty() && duration != Some(Duration::new(0,0)) {

// Log parking and flush log.
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::park(duration)));
self.logging.borrow_mut().flush();

self.allocator
.borrow()
.await_events(duration);

// Log return from unpark.
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
}
else { // Schedule active dataflows.

Expand Down

0 comments on commit 41b4d6f

Please sign in to comment.