Skip to content

Commit

Permalink
Add server scheduler and 'blocking' feature
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Mar 29, 2019
1 parent 37a5f18 commit 1a393c4
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ csv = { version = "1", optional = true }
chrono = { version = "0.4", optional = true }

[features]
blocking = []
real-time = []
set-semantics = []
csv-source = ["csv", "chrono"]
Expand Down
23 changes: 19 additions & 4 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,25 @@ fn main() {
// polling - should usually be driven completely
// non-blocking (i.e. timeout 0), but higher timeouts can
// be used for debugging or artificial braking
//
// @TODO handle errors
poll.poll(&mut events, Some(Duration::from_millis(0)))
.unwrap();

if server.scheduler.borrow().has_pending() {
let mut scheduler = server.scheduler.borrow_mut();
while let Some(activator) = scheduler.next() {
activator.activate();
}

// We mustn't timeout here, operators are pending!
poll.poll(&mut events, Some(Duration::from_millis(0)))
.expect("failed to poll I/O events");
} else {
#[cfg(not(feature = "blocking"))]
poll.poll(&mut events, Some(Duration::from_millis(0)))
.expect("failed to poll I/O events");

#[cfg(feature = "blocking")]
poll.poll(&mut events, None)
.expect("failed to poll I/O events");
}

for event in events.iter() {
trace!(
Expand Down
2 changes: 1 addition & 1 deletion src/plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use timely::progress::Timestamp;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::Threshold;

use crate::binding::{AsBinding, Binding};
use crate::binding::Binding;
use crate::plan::{Dependencies, ImplContext, Implementable};
use crate::{CollectionRelation, Relation, ShutdownHandle, Var, VariableMap};

Expand Down
12 changes: 10 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! Server logic for driving the library via commands.

use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::ops::Sub;
use std::rc::Rc;
use std::time::{Duration, Instant};

use timely::dataflow::{ProbeHandle, Scope};
Expand All @@ -23,6 +25,9 @@ use crate::{
};
use crate::{Aid, Error, Time, TxData, Value};

pub mod scheduler;
use self::scheduler::Scheduler;

/// Server configuration.
#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -143,6 +148,8 @@ where
pub shutdown_handles: HashMap<String, ShutdownHandle>,
/// Probe keeping track of overall dataflow progress.
pub probe: ProbeHandle<T>,
/// Scheduler managing deferred operator activations.
pub scheduler: Rc<RefCell<Scheduler>>,
}

/// Implementation context.
Expand Down Expand Up @@ -213,6 +220,7 @@ where
interests: HashMap::new(),
shutdown_handles: HashMap::new(),
probe: ProbeHandle::new(),
scheduler: Rc::new(RefCell::new(Scheduler::new())),
}
}

Expand Down Expand Up @@ -374,7 +382,7 @@ impl<Token: Hash> Server<u64, Token> {
source: Source,
scope: &mut S,
) -> Result<(), Error> {
let mut attribute_streams = source.source(scope, self.t0);
let mut attribute_streams = source.source(scope, self.t0, Rc::downgrade(&self.scheduler));

for (aid, datoms) in attribute_streams.drain() {
self.context.internal.create_source(&aid, &datoms)?;
Expand Down Expand Up @@ -409,7 +417,7 @@ impl<Token: Hash> Server<Duration, Token> {
source: Source,
scope: &mut S,
) -> Result<(), Error> {
let mut attribute_streams = source.source(scope, self.t0);
let mut attribute_streams = source.source(scope, self.t0, Rc::downgrade(&self.scheduler));

for (aid, datoms) in attribute_streams.drain() {
self.context.internal.create_source(&aid, &datoms)?;
Expand Down
92 changes: 92 additions & 0 deletions src/server/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Timer-based management of operator activators.

use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};

use timely::scheduling::activate::Activator;

/// A scheduler allows polling sources to defer triggering their
/// activators, in case they do not have work available. This reduces
/// time spent polling infrequently updated sources and allows us to
/// (optionally) block for input from within the event loop without
/// unnecessarily delaying sources that have run out of fuel during
/// the current step.
pub struct Scheduler {
activator_queue: BinaryHeap<TimedActivator>,
}

impl Scheduler {
/// Creates a new, empty scheduler.
pub fn new() -> Self {
Scheduler {
activator_queue: BinaryHeap::new(),
}
}

/// Returns true whenever an activator is queued and ready to be
/// scheduled.
pub fn has_pending(&self) -> bool {
if let Some(ref timed_activator) = self.activator_queue.peek() {
if Instant::now() <= timed_activator.at {
true
} else {
false
}
} else {
false
}
}

/// Schedule activation at the specified instant. No hard
/// guarantees on when the activator will actually be triggered.
pub fn schedule_at(&mut self, at: Instant, activator: Activator) {
self.activator_queue.push(TimedActivator { at, activator });
}

/// Schedule activation after the specified duration. No hard
/// guarantees on when the activator will actually be triggered.
pub fn schedule_after(&mut self, after: Duration, activator: Activator) {
self.activator_queue.push(TimedActivator {
at: Instant::now() + after,
activator,
});
}
}

impl Iterator for Scheduler {
type Item = Activator;
fn next(&mut self) -> Option<Activator> {
if self.has_pending() {
Some(self.activator_queue.pop().unwrap().activator)
} else {
None
}
}
}

struct TimedActivator {
pub at: Instant,
pub activator: Activator,
}

// We want the activator_queue to act like a min-heap.
impl Ord for TimedActivator {
fn cmp(&self, other: &TimedActivator) -> Ordering {
other.at.cmp(&self.at)
}
}

impl PartialOrd for TimedActivator {
fn partial_cmp(&self, other: &TimedActivator) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl PartialEq for TimedActivator {
fn eq(&self, other: &TimedActivator) -> bool {
self.at.eq(&other.at)
}
}

impl Eq for TimedActivator {}
4 changes: 4 additions & 0 deletions src/sources/csv_file.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! Operator and utilities to source data from csv files.

use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Weak;
use std::time::{Duration, Instant};

use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::{Scope, Stream};

use chrono::DateTime;

use crate::server::scheduler::Scheduler;
use crate::sources::Sourceable;
use crate::{Aid, Eid, Value};

Expand Down Expand Up @@ -38,6 +41,7 @@ impl Sourceable<Duration> for CsvFile {
&self,
scope: &mut S,
t0: Instant,
scheduler: Weak<RefCell<Scheduler>>,
) -> HashMap<Aid, Stream<S, ((Value, Value), Duration, isize)>> {
let filename = self.path.clone();

Expand Down
5 changes: 4 additions & 1 deletion src/sources/differential_logging.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Operator and utilities to source data from the underlying
//! Differential logging streams.

use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::rc::{Rc, Weak};
use std::time::{Duration, Instant};

use timely::dataflow::channels::pact::Pipeline;
Expand All @@ -14,6 +15,7 @@ use timely::logging::BatchLogger;

use differential_dataflow::logging::DifferentialEvent;

use crate::server::scheduler::Scheduler;
use crate::sources::Sourceable;
use crate::{Aid, Value};
use Value::{Eid, Number};
Expand All @@ -30,6 +32,7 @@ impl Sourceable<Duration> for DifferentialLogging {
&self,
scope: &mut S,
_t0: Instant,
_scheduler: Weak<RefCell<Scheduler>>,
) -> HashMap<Aid, Stream<S, ((Value, Value), Duration, isize)>> {
let events = Rc::new(EventLink::new());
let mut logger = BatchLogger::new(events.clone());
Expand Down
4 changes: 4 additions & 0 deletions src/sources/json_file.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! Operator and utilities to source data from plain files containing
//! arbitrary json structures.

use std::cell::RefCell;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
use std::rc::Weak;
use std::time::{Duration, Instant};

use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::{Scope, Stream};

// use sources::json_file::flate2::read::GzDecoder;

use crate::server::scheduler::Scheduler;
use crate::sources::Sourceable;
use crate::{Aid, Eid, Value};
use Value::{Bool, Number};
Expand All @@ -30,6 +33,7 @@ impl Sourceable<Duration> for JsonFile {
&self,
scope: &mut S,
t0: Instant,
_scheduler: Weak<RefCell<Scheduler>>,
) -> HashMap<Aid, Stream<S, ((Value, Value), Duration, isize)>> {
let filename = self.path.clone();

Expand Down
12 changes: 9 additions & 3 deletions src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Types and operators to work with external data sources.

use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Weak;
use std::time::{Duration, Instant};

use timely::dataflow::{Scope, Stream};
Expand All @@ -9,6 +11,7 @@ use timely::progress::Timestamp;

use differential_dataflow::lattice::Lattice;

use crate::server::scheduler::Scheduler;
use crate::{Aid, Value};

#[cfg(feature = "csv-source")]
Expand All @@ -32,6 +35,7 @@ where
&self,
scope: &mut S,
t0: Instant,
scheduler: Weak<RefCell<Scheduler>>,
) -> HashMap<Aid, Stream<S, ((Value, Value), T, isize)>>;
}

Expand All @@ -55,12 +59,13 @@ impl Sourceable<Duration> for Source {
&self,
scope: &mut S,
t0: Instant,
scheduler: Weak<RefCell<Scheduler>>,
) -> HashMap<Aid, Stream<S, ((Value, Value), Duration, isize)>> {
match *self {
Source::TimelyLogging(ref source) => source.source(scope, t0),
Source::DifferentialLogging(ref source) => source.source(scope, t0),
Source::TimelyLogging(ref source) => source.source(scope, t0, scheduler),
Source::DifferentialLogging(ref source) => source.source(scope, t0, scheduler),
#[cfg(feature = "csv-source")]
Source::CsvFile(ref source) => source.source(scope, t0),
Source::CsvFile(ref source) => source.source(scope, t0, scheduler),
_ => unimplemented!(),
}
}
Expand All @@ -71,6 +76,7 @@ impl Sourceable<u64> for Source {
&self,
_scope: &mut S,
_t0: Instant,
_scheduler: Weak<RefCell<Scheduler>>,
) -> HashMap<Aid, Stream<S, ((Value, Value), u64, isize)>> {
match *self {
// Source::TimelyLogging(ref source) => source.source(scope, t0),
Expand Down
5 changes: 4 additions & 1 deletion src/sources/timely_logging.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Operator and utilities to source data from the underlying Timely
//! logging streams.

use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::rc::{Rc, Weak};
use std::time::{Duration, Instant};

use timely::dataflow::channels::pact::Pipeline;
Expand All @@ -12,6 +13,7 @@ use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::{Scope, Stream};
use timely::logging::{BatchLogger, TimelyEvent};

use crate::server::scheduler::Scheduler;
use crate::sources::Sourceable;
use crate::{Aid, Value};
use Value::{Bool, Eid};
Expand All @@ -28,6 +30,7 @@ impl Sourceable<Duration> for TimelyLogging {
&self,
scope: &mut S,
_t0: Instant,
_scheduler: Weak<RefCell<Scheduler>>,
) -> HashMap<Aid, Stream<S, ((Value, Value), Duration, isize)>> {
let events = Rc::new(EventLink::new());
let mut logger = BatchLogger::new(events.clone());
Expand Down

0 comments on commit 1a393c4

Please sign in to comment.