Skip to content

Commit

Permalink
Allow custom exertion logic (#411)
Browse files Browse the repository at this point in the history
* Allow custom exertion logic

* Extract logic to config parameter

* Correct rescheduling logic
  • Loading branch information
frankmcsherry authored Nov 9, 2023
1 parent 6406abe commit 0673ecd
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 65 deletions.
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,17 @@ impl Config {
pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
if let Some(effort) = options.idle_merge_effort {
config.set("differential/idle_merge_effort".to_string(), effort);
config.set::<trace::ExertionLogic>(
"differential/default_exert_logic".to_string(),
std::sync::Arc::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
}),
);
}
}
20 changes: 8 additions & 12 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use timely::dataflow::operators::Capability;
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;

use trace::wrappers::enter::{TraceEnter, BatchEnter};
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use trace::wrappers::filter::{TraceFilter, BatchFilter};
Expand Down Expand Up @@ -563,15 +563,13 @@ where
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let (activator, effort) =
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
let activator = Some(self.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If there is default exertion logic set, install it.
if let Some(exert_logic) = self.inner.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}
else {
(None, None)
};

let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);

*reader = Some(reader_local);
Expand Down Expand Up @@ -672,9 +670,7 @@ where
prev_frontier.extend(input.frontier().frontier().iter().cloned());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
23 changes: 9 additions & 14 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ use timely::dataflow::operators::Capability;

use ::{ExchangeData, Hashable};
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, Cursor};
use trace::{self, Trace, TraceReader, Batch, Cursor};

use trace::Builder;

Expand Down Expand Up @@ -165,20 +165,17 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Establish compaction effort to apply even without updates.
let (activator, effort) =
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(stream.scope().activator_for(&info.address[..])), Some(effort))
}
else {
(None, None)
};

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let activator = Some(stream.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}

let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
// Capture the reader outside the builder scope.
*reader = Some(reader_local.clone());
Expand Down Expand Up @@ -334,9 +331,7 @@ where
reader_local.set_physical_compaction(prev_frontier.borrow());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ where
}

/// Exerts merge effort, even without additional updates.
pub fn exert(&mut self, fuel: &mut isize) {
pub fn exert(&mut self) {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.exert(fuel);
trace.borrow_mut().trace.exert();
}
}

Expand Down
21 changes: 8 additions & 13 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use timely::dataflow::operators::Capability;

use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
use lattice::Lattice;
use trace::{Batch, BatchReader, Cursor, Trace, Builder};
use trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic};
use trace::cursor::CursorList;
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
Expand Down Expand Up @@ -351,17 +351,14 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Determine if we should regularly exert the trace maintenance machinery,
// and with what amount of effort each time.
let (activator, effort) =
if let Some(effort) = self.stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort))
let activator = Some(self.stream.scope().activator_for(&operator_info.address[..]));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If there is default exert logic set, install it.
if let Some(exert_logic) = self.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
empty.set_exert_logic(exert_logic);
}
else {
(None, None)
};

let empty = T2::new(operator_info.clone(), logger.clone(), activator);

let mut source_trace = self.trace.clone();

let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
Expand Down Expand Up @@ -629,9 +626,7 @@ where
}

// Exert trace maintenance if we have been so requested.
if let Some(mut fuel) = effort.clone() {
output_writer.exert(&mut fuel);
}
output_writer.exert();
}
}
)
Expand Down
51 changes: 29 additions & 22 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use std::fmt::Debug;
use ::logging::Logger;
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Batch, BatchReader, Trace, TraceReader};
use trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic};
use trace::cursor::{Cursor, CursorList};
use trace::Merger;

Expand All @@ -97,6 +97,8 @@ pub struct Spine<B: Batch> where B::Time: Lattice+Ord, B::R: Semigroup {
upper: Antichain<B::Time>,
effort: usize,
activator: Option<timely::scheduling::activate::Activator>,
/// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
exert_logic: ExertionLogic,
}

impl<B> TraceReader for Spine<B>
Expand Down Expand Up @@ -264,22 +266,21 @@ where

/// Apply some amount of effort to trace maintenance.
///
/// The units of effort are updates, and the method should be
/// thought of as analogous to inserting as many empty updates,
/// where the trace is permitted to perform proportionate work.
fn exert(&mut self, effort: &mut isize) {
/// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set.
fn exert(&mut self) {
// If there is work to be done, ...
self.tidy_layers();
if !self.reduced() {
// Determine whether we should apply effort independent of updates.
if let Some(effort) = self.exert_effort() {

// If any merges exist, we can directly call `apply_fuel`.
if self.merging.iter().any(|b| b.is_double()) {
self.apply_fuel(effort);
self.apply_fuel(&mut (effort as isize));
}
// Otherwise, we'll need to introduce fake updates to move merges along.
else {
// Introduce an empty batch with roughly *effort number of virtual updates.
let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize;
let level = effort.next_power_of_two().trailing_zeros() as usize;
self.introduce_batch(None, level);
}
// We were not in reduced form, so let's check again in the future.
Expand All @@ -289,6 +290,10 @@ where
}
}

fn set_exert_logic(&mut self, logic: ExertionLogic) {
self.exert_logic = logic;
}

// Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin
// merging the batch. This means it is a good time to perform amortized work proportional
// to the size of batch.
Expand Down Expand Up @@ -388,19 +393,20 @@ where
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
B::R: Semigroup,
{
/// True iff there is at most one non-empty batch in `self.merging`.
/// Determine the amount of effort we should exert in the absence of updates.
///
/// When true, there is no maintenance work to perform in the trace, other than compaction.
/// We do not yet have logic in place to determine if compaction would improve a trace, so
/// for now we are ignoring that.
fn reduced(&self) -> bool {
let mut non_empty = 0;
for index in 0 .. self.merging.len() {
if self.merging[index].is_double() { return false; }
if self.merging[index].len() > 0 { non_empty += 1; }
if non_empty > 1 { return false; }
}
true
/// This method prepares an iterator over batches, including the level, count, and length of each layer.
/// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply.
fn exert_effort(&self) -> Option<usize> {
(self.exert_logic)(
Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| {
match batch {
MergeState::Vacant => (index, 0, 0),
MergeState::Single(_) => (index, 1, batch.len()),
MergeState::Double(_) => (index, 2, batch.len()),
}
}))
)
}

/// Describes the merge progress of layers in the trace.
Expand Down Expand Up @@ -443,6 +449,7 @@ where
upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
effort,
activator,
exert_logic: std::sync::Arc::new(|_batches| None),
}
}

Expand Down Expand Up @@ -482,8 +489,8 @@ where
}
}

// Having performed all of our work, if more than one batch remains reschedule ourself.
if !self.reduced() {
// Having performed all of our work, if we should perform more work reschedule ourselves.
if self.exert_effort().is_some() {
if let Some(activator) = &self.activator {
activator.activate();
}
Expand Down
14 changes: 12 additions & 2 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use timely::progress::Timestamp;
pub use self::cursor::Cursor;
pub use self::description::Description;

/// A type used to express how much effort a trace should exert even in the absence of updates.
pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>+Send+Sync>;

// The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
// values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
// and vals that change the `Ord` implementation, or stash hash codes, or the like.
Expand Down Expand Up @@ -208,8 +211,15 @@ where <Self as TraceReader>::Batch: Batch {
activator: Option<timely::scheduling::activate::Activator>,
) -> Self;

/// Exert merge effort, even without updates.
fn exert(&mut self, effort: &mut isize);
/// Exert merge effort, even without updates.
fn exert(&mut self);

/// Sets the logic for exertion in the absence of updates.
///
/// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
/// indicating the level, the number of batches, and their total length in updates. It should return a number of
/// updates to perform, or `None` if no work is required.
fn set_exert_logic(&mut self, logic: ExertionLogic);

/// Introduces a batch of updates to the trace.
///
Expand Down

0 comments on commit 0673ecd

Please sign in to comment.