Skip to content

Commit

Permalink
Rework domain probing and advancing
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Apr 28, 2019
1 parent 823a1bc commit f09ec28
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 86 deletions.
238 changes: 157 additions & 81 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,38 @@ use crate::{AttributeConfig, CollectionIndex, InputSemantics, RelationConfig, Re
mod unordered_session;
use unordered_session::UnorderedSession;

/// A domain manages attributes (and their inputs) that share a
/// timestamp semantics (e.g. come from the same logical source).
/// A domain manages attributes that share a timestamp semantics. Each
/// attribute within a domain can be either fed from an external
/// system, or from user transactions. The former are referred to as
/// *sourced*, the latter as *transactable* attributes.
///
/// Both types of input must make sure not to block overall domain
/// progress, s.t. results can be revealed and traces can be
/// compacted. For attributes with an opinion on time, users and
/// source operators are required to regularly downgrade their
/// capabilities. As they do so, the domain frontier advances.
///
/// Some attributes do not care about time. Such attributes want their
/// information to be immediately available to all
/// queries. Conceptually, they want all their inputs to happen at
/// t0. This is however not a practical solution, because holding
/// capabilities for t0 in perpetuity completely stalls monotemporal
/// domains and prevents trace compaction in multitemporal ones. We
/// refer to this type of attributes as *timeless*. Instead, timeless
/// attributes must be automatically advanced in lockstep with a
/// high-watermark of all timeful domain inputs. This ensures that
/// they will never block overall progress.
pub struct Domain<T: Timestamp + Lattice> {
/// The current timestamp.
/// The current input epoch.
now_at: T,
/// Input handles to attributes in this domain.
input_sessions: HashMap<String, UnorderedSession<T, (Value, Value), isize>>,
/// The probe keeping track of source progress in this domain.
domain_probe: ProbeHandle<T>,
/// Maintaining the number of probed sources allows us to
/// distinguish between a domain without sources, and one where
/// sources have ceased producing inputs.
probed_source_count: usize,
/// Configurations for attributes in this domain.
pub attributes: HashMap<Aid, AttributeConfig>,
/// Forward attribute indices eid -> v.
Expand All @@ -51,6 +74,7 @@ where
now_at: start_at,
input_sessions: HashMap::new(),
domain_probe: ProbeHandle::new(),
probed_source_count: 0,
attributes: HashMap::new(),
forward: HashMap::new(),
reverse: HashMap::new(),
Expand Down Expand Up @@ -140,6 +164,7 @@ where
let source_pairs = if config.timeless {
pairs.to_owned()
} else {
self.probed_source_count += 1;
pairs.probe_with(&mut self.domain_probe)
};

Expand Down Expand Up @@ -188,100 +213,63 @@ where
}
}

/// @DEPRECATED in favor of advance_by
pub fn advance_to(&mut self, next: T) -> Result<(), Error> {
/// Advances the domain to the current domain frontier, thus
/// allowing traces to compact. All domain input handles are
/// forwarded up to the frontier, so as not to stall progress.
pub fn advance(&mut self) -> Result<(), Error> {
let mut frontier = self
.domain_probe
.with_frontier(|frontier| (*frontier).to_vec());

if !AntichainRef::new(&frontier).less_equal(self.epoch()) {
// Input handles have fallen behind the sources and need
// to be advanced, such as not to block progress.

if frontier.is_empty() {
// @TODO strictly speaking we'd have to drop input
// handles here, but we can't distinguish whether the
// domain never even had sources to begin with.

// self.input_sessions.clear();
} else {
let max = frontier.drain(..).max().unwrap();
self.advance_epoch(max)?;
}
}

self.advance_traces(&frontier)
}

/// Advances the domain epoch. The domain epoch can be in advance
/// of or lag behind the domain frontier. It is used by timeless
/// attributes to avoid stalling timeful inputs.
pub fn advance_epoch(&mut self, next: T) -> Result<(), Error> {
if !self.now_at.less_equal(&next) {
// We can't rewind time.
Err(Error::conflict(format!(
"Domain is at {:?}, you attempted to rewind to {:?}.",
&self.now_at, &next
)))
} else if !self.now_at.eq(&next) {
trace!(
"Advancing domain to {:?} ({} attributes, {} handles)",
next,
self.attributes.len(),
self.input_sessions.len()
);

self.now_at = next.clone();
trace!("Advancing domain epoch to {:?} ", next);

for handle in self.input_sessions.values_mut() {
handle.advance_to(next.clone());
handle.flush();
}

for (aid, config) in self.attributes.iter() {
if let Some(ref trace_slack) = config.trace_slack {
let frontier = &[next.rewind(trace_slack.clone().into())];

let forward_index = self.forward.get_mut(aid).unwrap_or_else(|| {
panic!("Configuration available for unknown attribute {}", aid)
});

forward_index.advance_by(frontier);
forward_index.distinguish_since(frontier);

let reverse_index = self.reverse.get_mut(aid).unwrap_or_else(|| {
panic!("Configuration available for unknown attribute {}", aid)
});

reverse_index.advance_by(frontier);
reverse_index.distinguish_since(frontier);
}
}

for (name, config) in self.relations.iter() {
if let Some(ref trace_slack) = config.trace_slack {
let frontier = &[next.rewind(trace_slack.clone().into())];

let trace = self.arrangements.get_mut(name).unwrap_or_else(|| {
panic!("Configuration available for unknown relation {}", name)
});

trace.advance_by(frontier);
trace.distinguish_since(frontier);
}
}
self.now_at = next;

Ok(())
} else {
Ok(())
}
}

/// Advances all handles of the domain to its current frontier.
pub fn advance_domain_to_source(&mut self) -> Result<(), Error> {
let frontier = self
.domain_probe
.with_frontier(|frontier| (*frontier).to_vec());
self.advance_by(&frontier)
}

/// Advances the domain up to the specified frontier. Advances all
/// traces accordingly, depending on their configured slack.
pub fn advance_by(&mut self, frontier: &[T]) -> Result<(), Error> {
/// Advances domain traces up to the specified frontier minus
/// their configured slack.
pub fn advance_traces(&mut self, frontier: &[T]) -> Result<(), Error> {
let frontier = AntichainRef::new(frontier);

if !frontier.less_equal(&self.now_at) {
// Input handles have fallen behind the sources and need
// to be advanced, such as not to block progress.
if frontier.is_empty() {
self.input_sessions.clear();
} else if frontier.len() == 1 {
for handle in self.input_sessions.values_mut() {
handle.advance_to(frontier[0].clone());
handle.flush();
}
self.now_at = frontier[0].clone();
} else {
// @TODO This can only happen with partially ordered
// domain timestamps (e.g. bitemporal mode). We will
// worry about when it happens.
unimplemented!();
}
}

for (aid, config) in self.attributes.iter() {
if let Some(ref trace_slack) = config.trace_slack {
let slacking_frontier = frontier
Expand Down Expand Up @@ -324,13 +312,101 @@ where
Ok(())
}

/// Reports the current timestamp.
pub fn time(&self) -> &T {
&self.now_at
}

/// Returns a handle to the domain's input probe.
pub fn domain_probe(&self) -> &ProbeHandle<T> {
&self.domain_probe
}

/// Reports the current input epoch.
pub fn epoch(&self) -> &T {
&self.now_at
}

/// Reports the number of probed (timeful) sources in the domain.
pub fn probed_source_count(&self) -> usize {
self.probed_source_count
}
}

#[cfg(test)]
mod tests {
use super::Domain;
use crate::{AttributeConfig, InputSemantics};

#[test]
fn test_advance_epoch() {
let mut domain = Domain::<u64>::new(0);
assert_eq!(domain.epoch(), &0);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);

assert!(domain.advance_epoch(0).is_err());
assert_eq!(domain.epoch(), &1);
}

#[test]
fn test_advance_without_sources() {
timely::execute_directly(move |worker| {
worker.dataflow::<u64, _, _>(|scope| {
let mut domain = Domain::<u64>::new(0);

assert_eq!(domain.epoch(), &0);

domain
.create_transactable_attribute(
"test",
AttributeConfig::tx_time(InputSemantics::Raw),
scope,
)
.unwrap();

assert_eq!(domain.epoch(), &0);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);

assert!(domain.advance_epoch(0).is_err());
assert_eq!(domain.epoch(), &1);
});
});
}

// #[test]
// fn test_advance_with_sources() {
// timely::execute_directly(move |worker| {
// worker.dataflow::<u64, _, _>(|scope| {
// let mut domain = Domain::<u64>::new(0);

// domain.create_transactable_attribute(
// "test",
// AttributeConfig::tx_time(InputSemantics::Raw),
// scope,
// ).unwrap();

// domain.create_transactable_attribute(
// "test",
// AttributeConfig::tx_time(InputSemantics::Raw),
// scope,
// ).unwrap();

// assert_eq!(domain.epoch(), &0);

// assert!(domain.advance_epoch(1).is_ok());
// assert_eq!(domain.epoch(), &1);

// assert!(domain.advance_epoch(1).is_ok());
// assert_eq!(domain.epoch(), &1);

// assert!(domain.advance_epoch(0).is_err());
// assert_eq!(domain.epoch(), &1);
// });
// });
// }
}
30 changes: 25 additions & 5 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ where
/// Handle an AdvanceDomain request.
pub fn advance_domain(&mut self, name: Option<String>, next: T) -> Result<(), Error> {
match name {
None => self.context.internal.advance_to(next),
None => self.context.internal.advance_epoch(next),
Some(_) => Err(Error::unsupported("Named domains are not yet supported.")),
}
}
Expand Down Expand Up @@ -448,11 +448,31 @@ where
/// Returns true iff the probe is behind any input handle. Mostly
/// used as a convenience method during testing.
pub fn is_any_outdated(&self) -> bool {
if self.probe.less_than(self.context.internal.time()) {
return true;
// We must distinguish the scenario where the internal domain
// has no sources from one where all its internal sources have
// dropped their capabilities. We do this by checking the
// probed_source_count of the domain.

if self.probe.less_than(self.context.internal.epoch()) {
true
} else if self.context.internal.probed_source_count() > 0 {
self.probe.with_frontier(|out_frontier| {
if out_frontier.is_empty() {
false
} else {
self.context
.internal
.domain_probe()
.with_frontier(|in_frontier| {
out_frontier
.iter()
.any(|t_out| in_frontier.iter().all(|t_in| t_out.less_than(t_in)))
})
}
})
} else {
false
}

false
}

/// Helper for registering, publishing, and indicating interest in
Expand Down

0 comments on commit f09ec28

Please sign in to comment.