-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DNM: A merge batcher that gracefully handles non-ready data #463
base: master
Are you sure you want to change the base?
Conversation
Teaches the merge batcher to extract ready times from the existing chains, and maintaining the chain invariant after extracting data. This reduces the effort to maintain data that is not yet ready, by maintaining a frontier per chain block that allows us to efficiently decide that a block needs to be inspected or not. Signed-off-by: Moritz Hoffmann <[email protected]>
7fb5413
to
1d6a5e5
Compare
@@ -151,18 +113,20 @@ where | |||
|
|||
struct MergeSorter<D, T, R> { | |||
/// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. | |||
queue: Vec<Vec<Vec<(D, T, R)>>>, | |||
queue: Vec<Vec<(Antichain<T>, Vec<(D, T, R)>)>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a comment here about the role of the antichain relative to the vector of updates.
self.lower.clone(), | ||
upper.clone(), | ||
Antichain::from_elem(T::minimum()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider (not in this PR) switching these to references.
stash: Vec<Vec<(D, T, R)>>, | ||
pending: Vec<(D, T, R)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this?
|
||
const BUFFER_SIZE_BYTES: usize = 1 << 13; | ||
const BUFFER_SIZE_BYTES: usize = 64 << 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed, but future todo: extract this into something wrapping our buffers so that they can express an opinion without the MergeBatcher
needing to be up to date on the opinions.
@@ -179,81 +143,235 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> { | |||
operator_id, | |||
queue: Vec::new(), | |||
stash: Vec::new(), | |||
pending: Vec::new(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a note that ideally all of these start at zero capacity, so that if they are not used (e.g. on a zero volume channel, like an error path) they do not allocate. This means that we have to check the capacity elsewhere.
|
||
// Walk all chains, separate ready data from data to keep. | ||
for mut chain in std::mem::take(&mut self.queue).drain(..) { | ||
let mut block_list = Vec::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Said aloud: "could this be ship_list
?". Maybe not, but the name block_list
is not very specific in this context (many lists of blocks here).
// Iterate block, sorting items into ship and keep | ||
for datum in block.drain(..) { | ||
if upper.less_equal(&datum.1) { | ||
frontier.insert_ref(&datum.1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be the only (?) place we update frontier
, even though when we ship none of the updates (the else
case below) we still want to reflect those times in the overall frontier.
} | ||
keep_list.push((block_frontier, block)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is potentially a good moment, as we go, to perform the "adjacent blocks" compaction. That would allow us to return memory to self.empty()
eagerly, and have it available as we loop. Waiting until maintain()
is not wrong, but it seems like it may cause memory to spike during extract_into
and return down only after maintain()
.
while ship_list.len() > 1 { | ||
let list1 = ship_list.pop().unwrap(); | ||
let list2 = ship_list.pop().unwrap(); | ||
ship_list.push(self.merge_by(list1, list2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason these need to be in the same geometric order as the held updates. There is the opportunity to resort them, or to use a binary heap to continually merge the smallest chains. No strong feelings, because it seems unlikely to be adversarially selected.
} | ||
|
||
impl<D, T, R> MergeSorter<D, T, R> | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
???
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
We figured that the approach taken in this PR changes what we report as the next lower bound of data to be extracted. As before, The reason for this is that in the past, This can become a problem when all data cancels out, and can cause an unknown amount of additional work for the rest of the system, because it needs to maintain more capabilities and might need to ask for more data more times. We don't have an immediate solution for this problem, but there are some options:
|
This PR shows how to implement a merge batcher that is smart about reconsidering data that's in advance of the current frontier. It does the following things:
This should have the potential to reduce the amount of work for outstanding data from$O(n)$ where $n$ is the number of records in the merge batcher to $O(n/1024)$ by considering only the block itself, but not the data it contains.
I am sorry for the formatting noise which originates from copying this code from DD to Mz and back again :/