From 41b4d6f3bcc96cd2fdd9b115af9ec43ddd950157 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 5 May 2019 08:04:22 -0400 Subject: [PATCH] log park and unpark --- CHANGELOG.md | 6 ++++++ timely/src/logging.rs | 29 +++++++++++++++++++++++++++++ timely/src/worker.rs | 20 ++++++++++++++++---- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4858ffb1e..9bc8cc29f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## Unreleased + +### Added + +A `Worker` now has a `step_or_park(Option)` method, which instructs the worker to take a step and gives it permission to part the worker thread for at most the supplied timeout if there is no work to perform. A value of `None` implies no timeout (unboundedly parked) whereas a value of `Some(0)` should return immediately. The communication layers are implemented to awaken workers if they receive new communications, and workers should hand out copies of their `Thread` if they want other threads to wake them for other reasons (e.g. queues from threads external to timely). + ## 0.9.0 ### Added diff --git a/timely/src/logging.rs b/timely/src/logging.rs index 4672c3b47..adb8ed620 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -198,6 +198,29 @@ pub struct InputEvent { pub start_stop: StartStop, } +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] +/// Input logic start/stop +pub struct ParkEvent { + /// True when activity begins, false when it stops + pub event: ParkUnpark +} + +impl ParkEvent { + /// Creates a new park event from the supplied duration. + pub fn park(duration: Option) -> Self { ParkEvent { event: ParkUnpark::Park(duration) } } + /// Creates a new unpark event. + pub fn unpark() -> Self { ParkEvent { event: ParkUnpark::Unpark } } +} + +/// Records the starting and stopping of an operator. +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)] +pub enum ParkUnpark { + /// Worker parks. + Park(Option), + /// Worker unparks. + Unpark, +} + #[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)] /// An event in a timely worker pub enum TimelyEvent { @@ -225,6 +248,8 @@ pub enum TimelyEvent { CommChannels(CommChannelsEvent), /// Input event. Input(InputEvent), + /// Park event. + Park(ParkEvent), /// Unstructured event. Text(String), } @@ -276,3 +301,7 @@ impl From for TimelyEvent { impl From for TimelyEvent { fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) } } + +impl From for TimelyEvent { + fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) } +} diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 5d9968862..e40771167 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -147,9 +147,12 @@ impl Worker { /// Performs one step of the computation. /// /// A step gives each dataflow operator a chance to run, and is the - /// main way to ensure that a computation proceeds. This method may - /// park the thread until there is work to perform, with an optional - /// timeout. + /// main way to ensure that a computation proceeds. + /// + /// This method takes an optional timeout and may park the thread until + /// there is work to perform or until this timeout expires. A value of + /// `None` allows the worker to park indefinitely, whereas a value of + /// `Some(Duration::new(0, 0))` will return without parking the thread. /// /// # Examples /// @@ -196,10 +199,19 @@ impl Worker { .borrow_mut() .advance(); - if self.activations.borrow().is_empty() && !self.dataflows.borrow().is_empty() { + // Consider parking only if we have no pending events, some dataflows, and a non-zero duration. + if self.activations.borrow().is_empty() && !self.dataflows.borrow().is_empty() && duration != Some(Duration::new(0,0)) { + + // Log parking and flush log. + self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::park(duration))); + self.logging.borrow_mut().flush(); + self.allocator .borrow() .await_events(duration); + + // Log return from unpark. + self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark())); } else { // Schedule active dataflows.