-
Notifications
You must be signed in to change notification settings - Fork 373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SDK batching/revamp 1: impl DataTableBatcher
#1980
Conversation
c7dfb72
to
8b91b65
Compare
eba29b7
to
67dc616
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small comments and maybe a deadlock but the overall structure looks good. Nice standalone clean PR!
pub flush_tick: Duration, | ||
|
||
/// Flush if the accumulated payload has a size in bytes equal or greater than this. | ||
pub flush_num_bytes: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a subtle distinction between "batch size" and "flush threshold" -- I suspect they are related but it's not entirely explicit here. A bit more explanation could be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you're referring to? What's "batch size"? There's no config with that name 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but this thing is a "Batcher" and so it produces batches and those batches have a size. In particular the thing I think I was curious about was whether the batch would have a size > flush_num_bytes (I believe the answer to this is yes), but I wanted to be explicit that when we flush, all of the outstanding bytes (including those above the flush threshold) will end up in the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, that's what the equal or greater than this
alludes to. I'll make it extra clear.
|
||
let cmds_to_tables_handle = { | ||
const NAME: &str = "DataTableBatcher::cmds_to_tables"; | ||
std::thread::Builder::new() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope for this PR but I'm a bit unsure myself how we should be deciding when we use tokio vs when we just spawn threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My general motto is: if you can avoid async, avoid async.
fn drop(&mut self) { | ||
// NOTE: The command channel is private, if we're here, nothing is currently capable of | ||
// sending data down the pipeline. | ||
self.tx_cmds.send(Command::Shutdown).ok(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to drop self.rx_tables
before we call this send to avoid a deadlock?
Basically, if we are using a bounded channels and rx_tables isn't being drained by any listeners, then tx_table.send(table)
could be blocking the batching thread, preventing it from ever processing the Shutdown. Dropping rx_tables first should at least cause tx_table to either error or it means some other outstanding sender thread is still holding a receiver and then at least it isn't our problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the docs mention this:
/// Shutting down cannot ever block, unless the output channel is bounded and happens to be full
/// (see [DataTableBatcherConfig::max_tables_in_flight
]).
My thought process being that if the user deliberately configures the channel sizes to be bounded, then they should expect that the system can and will block at any (inconvenient) time if they don't consume as needed.
Eh, I guess we can be extra polite...
|
||
// --- Subscribe to tables --- | ||
|
||
/// Returns a _shared_ channel in which are sent the batched [`DataTable`]s. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason to make this shared? Is there value in a new receiver jumping in mid-stream? I worry about the usefulness of the data in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel is already mpmc by nature, so I didn't see the point of umping through extra hoops to turn it back into an mpsc one. Also parallel consumers might come in handy at some point.
) -> bool { | ||
// TODO(#1760): now that we're re doing this here, it really is a massive waste not to send | ||
// it over the wire... | ||
row.compute_all_size_bytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How expensive is this? Wasn't this a bottle-neck before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was and still is and that's what's so nice about it: it's now done in a background thread on the clients...
But now we still need to send that information to the server to make things really fantastic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's now done in a background thread on the clients
We should be careful about that though... we don't want to make this a bottleneck or a CPU drain on the clients either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It really isn't: it's extremely costly compared to the rest of the operations that we do on the very fast paths in the store, but it's still order of magnitude faster that most of what goes on on the client... especially if it's a python client 😒
acc.pending_num_rows >= config.flush_num_rows | ||
|| acc.pending_num_bytes >= config.flush_num_bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than returning a bool it seems like it would be clearer to call do_flush_all
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it clearer to be able to see all flush triggers in the main loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case why not move the check into the main loop as well?
do_push_row(&mut acc, row);
if acc.pending_num_rows >= config.flush_num_rows || acc.pending_num_bytes >= config.flush_num_bytes
{
do_flush_all(&mut acc, &tx_table, "bytes|rows");
acc.reset();
}
But, at the very least add a comment to do_push_row
indicating that it returns a bool with the assumption that the caller will flush the data if it returns true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the check sounds good to me 👍
This PR implements
DataTableBatcher
, which... batchesDataTable
s.Not used anywhere yet, just the type itself.
DataTableBatcher
#1980Session
withRecordingContext
#1983clock
example for Rust #2000PythonSession
#1985Part of #1619
Related:
DataCell
's size (& other metadata) over the wire #1760Future work:
DataTable::sort
shared withDataStore
#1981