Skip to content

Commit

Permalink
Performance improvements (#91)
Browse files Browse the repository at this point in the history
* memory profiling and some memory footprint improvements

see https://github.com/frankmcsherry/differential-dataflow/issues/113

- a new Variable implementation by Frank that reduces the cost of
recursive variables by 1/3
- use distinct_total instead of distinct where safe
- don't create recursive variables for non-recursive collections
- use 32-bit timestamp for nested scopes

* differential update; barrier synchronization for worker threads

- differential moved to tag-based release model
- use barrier synchronization to block worker threads instead of idling
when there is no work to do
  • Loading branch information
ryzhyk committed Sep 17, 2018
1 parent da31ed7 commit b717dcc
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 151 deletions.
3 changes: 3 additions & 0 deletions rust/cmd_parser/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum Command {
Commit,
Rollback,
Timestamp,
Profile,
Dump(Option<String>),
Exit,
Echo(String),
Expand Down Expand Up @@ -59,6 +60,7 @@ named!(pub parse_command<&[u8], Command>,
upd: alt!(do_parse!(apply!(sym,"start") >> apply!(sym,";") >> (Command::Start)) |
do_parse!(apply!(sym,"commit") >> apply!(sym,";") >> (Command::Commit)) |
do_parse!(apply!(sym,"timestamp") >> apply!(sym,";") >> (Command::Timestamp)) |
do_parse!(apply!(sym,"profile") >> apply!(sym,";") >> (Command::Profile)) |
do_parse!(apply!(sym,"dump") >>
rel: opt!(identifier) >>
apply!(sym,";") >>
Expand All @@ -81,6 +83,7 @@ fn test_command() {
assert_eq!(parse_command(br"start;") , Ok((&br""[..], Command::Start)));
assert_eq!(parse_command(br"commit;") , Ok((&br""[..], Command::Commit)));
assert_eq!(parse_command(br"timestamp;"), Ok((&br""[..], Command::Timestamp)));
assert_eq!(parse_command(br"profile;") , Ok((&br""[..], Command::Profile)));
assert_eq!(parse_command(br"dump;") , Ok((&br""[..], Command::Dump(None))));
assert_eq!(parse_command(br"dump Tab;") , Ok((&br""[..], Command::Dump(Some("Tab".to_string())))));
assert_eq!(parse_command(br"exit;") , Ok((&br""[..], Command::Exit)));
Expand Down
4 changes: 2 additions & 2 deletions rust/differential_datalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ git="https://github.com/frankmcsherry/graph-map.git"

[dependencies.differential-dataflow]
git="https://github.com/frankmcsherry/differential-dataflow.git"
rev="578f48bb9a8d03fb36d9791e1532f4b619edd95c"
tag="v0.7.0"

[dependencies.cmd_parser]
path = "../cmd_parser"
Expand All @@ -20,7 +20,7 @@ itertools="^0.6"

[dependencies]
abomonation= { git = "https://github.com/frankmcsherry/abomonation" }
timely = { git = "https://github.com/frankmcsherry/timely-dataflow" }
timely = { git = "https://github.com/frankmcsherry/timely-dataflow", tag = "v0.7.0" }
fnv="1.0.2"
num = "0.2"
serde = "1.0.14"
Expand Down
1 change: 1 addition & 0 deletions rust/differential_datalog/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ extern crate fnv;

extern crate cmd_parser;

mod profile;
mod variable;
#[cfg(test)]
mod test;
Expand Down
109 changes: 109 additions & 0 deletions rust/differential_datalog/profile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//! Memory profile of a DDlog program.

use fnv::FnvHashMap;
use std::time::Duration;
use std::fmt;
use std::cell::RefCell;
use std::cmp::max;
use differential_dataflow::logging::DifferentialEvent;
use timely::logging::TimelyEvent;
use timely::logging::OperatesEvent;

thread_local! {
pub static PROF_CONTEXT: RefCell<String> = RefCell::new("".to_string());
}

pub fn set_prof_context(s: &str) {
PROF_CONTEXT.with(|ctx| *ctx.borrow_mut() = s.to_string());
}

pub fn get_prof_context() -> String {
PROF_CONTEXT.with(|ctx| ctx.borrow().to_string())
}

pub fn with_prof_context<T, F: FnOnce()->T>(s: &str, f: F) -> T {
set_prof_context(s);
let res = f();
set_prof_context("");
res
}


/* Profiling information message sent by worker to profiling thread
*/
pub enum ProfMsg {
TimelyMessage(Vec<((Duration, usize, TimelyEvent), String)>),
DifferentialMessage(Vec<(Duration, usize, DifferentialEvent)>)
}

#[derive(Clone)]
pub struct Profile {
names: FnvHashMap<usize, String>,
sizes: FnvHashMap<usize, isize>,
peak_sizes: FnvHashMap<usize, isize>
}

impl fmt::Display for Profile {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let res1: Result<(), fmt::Error> = self.sizes.iter().map(|(operator, size)| {
let name = self.names.get(operator).map(|s|s.as_ref()).unwrap_or("???");
write!(f, "current size of {} {}: {}\n", name, operator, size)
}).collect();
res1.and(
self.peak_sizes.iter().map(|(operator, size)| {
let name = self.names.get(operator).map(|s|s.as_ref()).unwrap_or("???");
write!(f, "peak size of {} {}: {}\n", name, operator, size)
}).collect())
}
}

impl Profile {
pub fn new() -> Profile {
Profile{
names: FnvHashMap::default(),
sizes: FnvHashMap::default(),
peak_sizes: FnvHashMap::default()
}
}

pub fn update(&mut self, msg: &ProfMsg) {
match msg {
ProfMsg::TimelyMessage(msg) => self.handle_timely(msg),
ProfMsg::DifferentialMessage(msg) => self.handle_differential(msg)
}
}

fn handle_timely(&mut self, msg: &Vec<((Duration, usize, TimelyEvent), String)>) {
for ((_, _, event), ctx) in msg.iter() {
match event {
TimelyEvent::Operates(OperatesEvent{id, addr:_, name}) => {
self.names.insert(*id, ctx.clone() + "." + name);
},
_ => ()
}
}
}

fn handle_differential(&mut self, msg: &Vec<(Duration, usize, DifferentialEvent)>) {
//eprintln!("profiling message: {:?}", msg);
for (_, _, event) in msg.iter() {
match event {
DifferentialEvent::Batch(x) => {
let size = self.sizes.entry(x.operator).or_insert(0);
*size += x.length as isize;
let peak = self.peak_sizes.entry(x.operator).or_insert(0);
*peak = max(*peak, *size);
},
DifferentialEvent::Merge(m) => {
if let Some(complete) = m.complete {
let size = self.sizes.entry(m.operator).or_insert(0);
*size += (complete as isize) - (m.length1 + m.length2) as isize;
let peak = self.peak_sizes.entry(m.operator).or_insert(0);
*peak = max(*peak, *size);
}
},
_ => (),
}
}
}
}
Loading

0 comments on commit b717dcc

Please sign in to comment.