Skip to content

Commit

Permalink
Add Domain tests and fix advance edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Apr 28, 2019
1 parent 688efa7 commit 491554a
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 117 deletions.
125 changes: 34 additions & 91 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,27 +217,33 @@ where
/// allowing traces to compact. All domain input handles are
/// forwarded up to the frontier, so as not to stall progress.
pub fn advance(&mut self) -> Result<(), Error> {
let mut frontier = self
.domain_probe
.with_frontier(|frontier| (*frontier).to_vec());

if !AntichainRef::new(&frontier).less_equal(self.epoch()) {
// Input handles have fallen behind the sources and need
// to be advanced, such as not to block progress.
if self.probed_source_count() == 0 {
// No sources registered.
self.advance_traces(&[self.epoch().clone()])
} else {
let frontier = self
.domain_probe
.with_frontier(|frontier| (*frontier).to_vec());

if frontier.is_empty() {
// @TODO strictly speaking we'd have to drop input
// handles here, but we can't distinguish whether the
// domain never even had sources to begin with.

// self.input_sessions.clear();
self.advance_traces(&[])
} else {
let max = frontier.drain(..).max().unwrap();
self.advance_epoch(max)?;
if !AntichainRef::new(&frontier).less_equal(self.epoch()) {
// Input handles have fallen behind the sources and need
// to be advanced, such as not to block progress.

let max = frontier.iter().max().unwrap().clone();
self.advance_epoch(max)?;
}

self.advance_traces(&frontier)
}
}

self.advance_traces(&frontier)
}

/// Advances the domain epoch. The domain epoch can be in advance
Expand Down Expand Up @@ -326,87 +332,24 @@ where
pub fn probed_source_count(&self) -> usize {
self.probed_source_count
}
}

#[cfg(test)]
mod tests {
use super::Domain;
use crate::{AttributeConfig, InputSemantics};

#[test]
fn test_advance_epoch() {
let mut domain = Domain::<u64>::new(0);
assert_eq!(domain.epoch(), &0);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);

assert!(domain.advance_epoch(0).is_err());
assert_eq!(domain.epoch(), &1);
}

#[test]
fn test_advance_without_sources() {
timely::execute_directly(move |worker| {
worker.dataflow::<u64, _, _>(|scope| {
let mut domain = Domain::<u64>::new(0);

assert_eq!(domain.epoch(), &0);

domain
.create_transactable_attribute(
"test",
AttributeConfig::tx_time(InputSemantics::Raw),
scope,
)
.unwrap();

assert_eq!(domain.epoch(), &0);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);

assert!(domain.advance_epoch(1).is_ok());
assert_eq!(domain.epoch(), &1);
/// Returns true iff the frontier dominates all domain inputs.
pub fn dominates(&self, frontier: AntichainRef<T>) -> bool {
// We must distinguish the scenario where the internal domain
// has no sources from one where all its internal sources have
// dropped their capabilities. We do this by checking the
// probed_source_count of the domain.

assert!(domain.advance_epoch(0).is_err());
assert_eq!(domain.epoch(), &1);
});
});
if self.probed_source_count() == 0 {
frontier.less_than(self.epoch())
} else {
if frontier.is_empty() {
false
} else {
self.domain_probe().with_frontier(|domain_frontier| {
domain_frontier.iter().all(|t| frontier.less_than(t))
})
}
}
}

// #[test]
// fn test_advance_with_sources() {
// timely::execute_directly(move |worker| {
// worker.dataflow::<u64, _, _>(|scope| {
// let mut domain = Domain::<u64>::new(0);

// domain.create_transactable_attribute(
// "test",
// AttributeConfig::tx_time(InputSemantics::Raw),
// scope,
// ).unwrap();

// domain.create_transactable_attribute(
// "test",
// AttributeConfig::tx_time(InputSemantics::Raw),
// scope,
// ).unwrap();

// assert_eq!(domain.epoch(), &0);

// assert!(domain.advance_epoch(1).is_ok());
// assert_eq!(domain.epoch(), &1);

// assert!(domain.advance_epoch(1).is_ok());
// assert_eq!(domain.epoch(), &1);

// assert!(domain.advance_epoch(0).is_err());
// assert_eq!(domain.epoch(), &1);
// });
// });
// }
}
31 changes: 5 additions & 26 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,33 +446,12 @@ where
}

/// Returns true iff the probe is behind any input handle. Mostly
/// used as a convenience method during testing.
/// used as a convenience method during testing. Using this within
/// `step_while` is not safe in general and might lead to stalls.
pub fn is_any_outdated(&self) -> bool {
// We must distinguish the scenario where the internal domain
// has no sources from one where all its internal sources have
// dropped their capabilities. We do this by checking the
// probed_source_count of the domain.

if self.probe.less_than(self.context.internal.epoch()) {
true
} else if self.context.internal.probed_source_count() > 0 {
self.probe.with_frontier(|out_frontier| {
if out_frontier.is_empty() {
false
} else {
self.context
.internal
.domain_probe()
.with_frontier(|in_frontier| {
out_frontier
.iter()
.any(|t_out| in_frontier.iter().all(|t_in| t_out.less_than(t_in)))
})
}
})
} else {
false
}
self.probe.with_frontier(|out_frontier| {
self.context.internal.dominates(out_frontier)
})
}

/// Helper for registering, publishing, and indicating interest in
Expand Down
Loading

0 comments on commit 491554a

Please sign in to comment.