diff --git a/Cargo.toml b/Cargo.toml index ea4bf56..1c87089 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ csv = { version = "1", optional = true } chrono = { version = "0.4", optional = true } [features] +blocking = [] real-time = [] set-semantics = [] csv-source = ["csv", "chrono"] diff --git a/src/bin/server.rs b/src/bin/server.rs index eccdb8f..0eb2638 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -218,10 +218,25 @@ fn main() { // polling - should usually be driven completely // non-blocking (i.e. timeout 0), but higher timeouts can // be used for debugging or artificial braking - // - // @TODO handle errors - poll.poll(&mut events, Some(Duration::from_millis(0))) - .unwrap(); + + if server.scheduler.borrow().has_pending() { + let mut scheduler = server.scheduler.borrow_mut(); + while let Some(activator) = scheduler.next() { + activator.activate(); + } + + // We mustn't timeout here, operators are pending! + poll.poll(&mut events, Some(Duration::from_millis(0))) + .expect("failed to poll I/O events"); + } else { + #[cfg(not(feature = "blocking"))] + poll.poll(&mut events, Some(Duration::from_millis(0))) + .expect("failed to poll I/O events"); + + #[cfg(feature = "blocking")] + poll.poll(&mut events, None) + .expect("failed to poll I/O events"); + } for event in events.iter() { trace!( diff --git a/src/plan/union.rs b/src/plan/union.rs index d867528..a400ae9 100644 --- a/src/plan/union.rs +++ b/src/plan/union.rs @@ -8,7 +8,7 @@ use timely::progress::Timestamp; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::Threshold; -use crate::binding::{AsBinding, Binding}; +use crate::binding::Binding; use crate::plan::{Dependencies, ImplContext, Implementable}; use crate::{CollectionRelation, Relation, ShutdownHandle, Var, VariableMap}; diff --git a/src/server/mod.rs b/src/server/mod.rs index 5a0f675..23b2f2f 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,8 +1,10 @@ //! Server logic for driving the library via commands. +use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::ops::Sub; +use std::rc::Rc; use std::time::{Duration, Instant}; use timely::dataflow::{ProbeHandle, Scope}; @@ -23,6 +25,9 @@ use crate::{ }; use crate::{Aid, Error, Time, TxData, Value}; +pub mod scheduler; +use self::scheduler::Scheduler; + /// Server configuration. #[derive(Clone, Debug)] pub struct Config { @@ -143,6 +148,8 @@ where pub shutdown_handles: HashMap, /// Probe keeping track of overall dataflow progress. pub probe: ProbeHandle, + /// Scheduler managing deferred operator activations. + pub scheduler: Rc>, } /// Implementation context. @@ -213,6 +220,7 @@ where interests: HashMap::new(), shutdown_handles: HashMap::new(), probe: ProbeHandle::new(), + scheduler: Rc::new(RefCell::new(Scheduler::new())), } } @@ -374,7 +382,7 @@ impl Server { source: Source, scope: &mut S, ) -> Result<(), Error> { - let mut attribute_streams = source.source(scope, self.t0); + let mut attribute_streams = source.source(scope, self.t0, Rc::downgrade(&self.scheduler)); for (aid, datoms) in attribute_streams.drain() { self.context.internal.create_source(&aid, &datoms)?; @@ -409,7 +417,7 @@ impl Server { source: Source, scope: &mut S, ) -> Result<(), Error> { - let mut attribute_streams = source.source(scope, self.t0); + let mut attribute_streams = source.source(scope, self.t0, Rc::downgrade(&self.scheduler)); for (aid, datoms) in attribute_streams.drain() { self.context.internal.create_source(&aid, &datoms)?; diff --git a/src/server/scheduler.rs b/src/server/scheduler.rs new file mode 100644 index 0000000..93ea742 --- /dev/null +++ b/src/server/scheduler.rs @@ -0,0 +1,92 @@ +//! Timer-based management of operator activators. + +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::time::{Duration, Instant}; + +use timely::scheduling::activate::Activator; + +/// A scheduler allows polling sources to defer triggering their +/// activators, in case they do not have work available. This reduces +/// time spent polling infrequently updated sources and allows us to +/// (optionally) block for input from within the event loop without +/// unnecessarily delaying sources that have run out of fuel during +/// the current step. +pub struct Scheduler { + activator_queue: BinaryHeap, +} + +impl Scheduler { + /// Creates a new, empty scheduler. + pub fn new() -> Self { + Scheduler { + activator_queue: BinaryHeap::new(), + } + } + + /// Returns true whenever an activator is queued and ready to be + /// scheduled. + pub fn has_pending(&self) -> bool { + if let Some(ref timed_activator) = self.activator_queue.peek() { + if Instant::now() <= timed_activator.at { + true + } else { + false + } + } else { + false + } + } + + /// Schedule activation at the specified instant. No hard + /// guarantees on when the activator will actually be triggered. + pub fn schedule_at(&mut self, at: Instant, activator: Activator) { + self.activator_queue.push(TimedActivator { at, activator }); + } + + /// Schedule activation after the specified duration. No hard + /// guarantees on when the activator will actually be triggered. + pub fn schedule_after(&mut self, after: Duration, activator: Activator) { + self.activator_queue.push(TimedActivator { + at: Instant::now() + after, + activator, + }); + } +} + +impl Iterator for Scheduler { + type Item = Activator; + fn next(&mut self) -> Option { + if self.has_pending() { + Some(self.activator_queue.pop().unwrap().activator) + } else { + None + } + } +} + +struct TimedActivator { + pub at: Instant, + pub activator: Activator, +} + +// We want the activator_queue to act like a min-heap. +impl Ord for TimedActivator { + fn cmp(&self, other: &TimedActivator) -> Ordering { + other.at.cmp(&self.at) + } +} + +impl PartialOrd for TimedActivator { + fn partial_cmp(&self, other: &TimedActivator) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for TimedActivator { + fn eq(&self, other: &TimedActivator) -> bool { + self.at.eq(&other.at) + } +} + +impl Eq for TimedActivator {} diff --git a/src/sources/csv_file.rs b/src/sources/csv_file.rs index 643aa5c..f9dfa9a 100644 --- a/src/sources/csv_file.rs +++ b/src/sources/csv_file.rs @@ -1,6 +1,8 @@ //! Operator and utilities to source data from csv files. +use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Weak; use std::time::{Duration, Instant}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; @@ -8,6 +10,7 @@ use timely::dataflow::{Scope, Stream}; use chrono::DateTime; +use crate::server::scheduler::Scheduler; use crate::sources::Sourceable; use crate::{Aid, Eid, Value}; @@ -38,6 +41,7 @@ impl Sourceable for CsvFile { &self, scope: &mut S, t0: Instant, + scheduler: Weak>, ) -> HashMap> { let filename = self.path.clone(); diff --git a/src/sources/differential_logging.rs b/src/sources/differential_logging.rs index cd091cb..f8d11c0 100644 --- a/src/sources/differential_logging.rs +++ b/src/sources/differential_logging.rs @@ -1,8 +1,9 @@ //! Operator and utilities to source data from the underlying //! Differential logging streams. +use std::cell::RefCell; use std::collections::HashMap; -use std::rc::Rc; +use std::rc::{Rc, Weak}; use std::time::{Duration, Instant}; use timely::dataflow::channels::pact::Pipeline; @@ -14,6 +15,7 @@ use timely::logging::BatchLogger; use differential_dataflow::logging::DifferentialEvent; +use crate::server::scheduler::Scheduler; use crate::sources::Sourceable; use crate::{Aid, Value}; use Value::{Eid, Number}; @@ -30,6 +32,7 @@ impl Sourceable for DifferentialLogging { &self, scope: &mut S, _t0: Instant, + _scheduler: Weak>, ) -> HashMap> { let events = Rc::new(EventLink::new()); let mut logger = BatchLogger::new(events.clone()); diff --git a/src/sources/json_file.rs b/src/sources/json_file.rs index fd451b3..94dc3a9 100644 --- a/src/sources/json_file.rs +++ b/src/sources/json_file.rs @@ -1,10 +1,12 @@ //! Operator and utilities to source data from plain files containing //! arbitrary json structures. +use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; use std::io::{BufRead, BufReader}; use std::path::Path; +use std::rc::Weak; use std::time::{Duration, Instant}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; @@ -12,6 +14,7 @@ use timely::dataflow::{Scope, Stream}; // use sources::json_file::flate2::read::GzDecoder; +use crate::server::scheduler::Scheduler; use crate::sources::Sourceable; use crate::{Aid, Eid, Value}; use Value::{Bool, Number}; @@ -30,6 +33,7 @@ impl Sourceable for JsonFile { &self, scope: &mut S, t0: Instant, + _scheduler: Weak>, ) -> HashMap> { let filename = self.path.clone(); diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 43378e2..36b3cc9 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -1,6 +1,8 @@ //! Types and operators to work with external data sources. +use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Weak; use std::time::{Duration, Instant}; use timely::dataflow::{Scope, Stream}; @@ -9,6 +11,7 @@ use timely::progress::Timestamp; use differential_dataflow::lattice::Lattice; +use crate::server::scheduler::Scheduler; use crate::{Aid, Value}; #[cfg(feature = "csv-source")] @@ -32,6 +35,7 @@ where &self, scope: &mut S, t0: Instant, + scheduler: Weak>, ) -> HashMap>; } @@ -55,12 +59,13 @@ impl Sourceable for Source { &self, scope: &mut S, t0: Instant, + scheduler: Weak>, ) -> HashMap> { match *self { - Source::TimelyLogging(ref source) => source.source(scope, t0), - Source::DifferentialLogging(ref source) => source.source(scope, t0), + Source::TimelyLogging(ref source) => source.source(scope, t0, scheduler), + Source::DifferentialLogging(ref source) => source.source(scope, t0, scheduler), #[cfg(feature = "csv-source")] - Source::CsvFile(ref source) => source.source(scope, t0), + Source::CsvFile(ref source) => source.source(scope, t0, scheduler), _ => unimplemented!(), } } @@ -71,6 +76,7 @@ impl Sourceable for Source { &self, _scope: &mut S, _t0: Instant, + _scheduler: Weak>, ) -> HashMap> { match *self { // Source::TimelyLogging(ref source) => source.source(scope, t0), diff --git a/src/sources/timely_logging.rs b/src/sources/timely_logging.rs index 7805dd2..7083766 100644 --- a/src/sources/timely_logging.rs +++ b/src/sources/timely_logging.rs @@ -1,8 +1,9 @@ //! Operator and utilities to source data from the underlying Timely //! logging streams. +use std::cell::RefCell; use std::collections::HashMap; -use std::rc::Rc; +use std::rc::{Rc, Weak}; use std::time::{Duration, Instant}; use timely::dataflow::channels::pact::Pipeline; @@ -12,6 +13,7 @@ use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::{Scope, Stream}; use timely::logging::{BatchLogger, TimelyEvent}; +use crate::server::scheduler::Scheduler; use crate::sources::Sourceable; use crate::{Aid, Value}; use Value::{Bool, Eid}; @@ -28,6 +30,7 @@ impl Sourceable for TimelyLogging { &self, scope: &mut S, _t0: Instant, + _scheduler: Weak>, ) -> HashMap> { let events = Rc::new(EventLink::new()); let mut logger = BatchLogger::new(events.clone());