Skip to content

Commit

Permalink
agent: Avoid temporary allocations by keeping a temporary buffer
Browse files Browse the repository at this point in the history
This commit changes the TraceAgent to keep a buffer for temporary
antichains around, which is used for setting the logical and physical
compaction. The upside is that it avoids temporary allocations, but
on the downside it keeps another antichain around.

This should be fine, unless there is a defective use case where the size
of the antichain grows significantly and the potentially unused memory will
not be reclaimed anymore.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Sep 10, 2021
1 parent f9bb685 commit a2d1d60
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where
queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
logical_compaction: Antichain<Tr::Time>,
physical_compaction: Antichain<Tr::Time>,
temp_antichain: Antichain<Tr::Time>,

operator: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<::logging::Logger>,
Expand All @@ -57,19 +58,21 @@ where
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
// This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
// Instead, it determines the joint consequences of both guarantees and moves forward with that.
let new_frontier = crate::lattice::antichain_join(&self.logical_compaction.borrow()[..], &frontier[..]);
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), new_frontier.borrow());
self.logical_compaction = new_frontier;
crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
self.temp_antichain.clear();
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.logical_compaction.borrow()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
// This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
// Instead, it determines the joint consequences of both guarantees and moves forward with that.
let new_frontier = crate::lattice::antichain_join(&self.physical_compaction.borrow()[..], &frontier[..]);
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), new_frontier.borrow());
self.physical_compaction = new_frontier;
crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
self.temp_antichain.clear();
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.physical_compaction.borrow()
Expand Down Expand Up @@ -105,6 +108,7 @@ where
queues: Rc::downgrade(&queues),
logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
temp_antichain: Antichain::new(),
operator,
logging,
};
Expand Down Expand Up @@ -544,6 +548,7 @@ where
physical_compaction: self.physical_compaction.clone(),
operator: self.operator.clone(),
logging: self.logging.clone(),
temp_antichain: Antichain::new(),
}
}
}
Expand Down

0 comments on commit a2d1d60

Please sign in to comment.