-
Notifications
You must be signed in to change notification settings - Fork 373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace Session
with RecordingContext
#1983
Conversation
RecordingContext
& revamp Rust SDKRecordingContext
& revamp Rust SDK
fb1ea44
to
36dc901
Compare
RecordingContext
& revamp Rust SDKRecordingContext
/ sunset Rust's Session
ac0ce2e
to
a3ed552
Compare
eba29b7
to
67dc616
Compare
a3ed552
to
ac1fac2
Compare
RecordingContext
/ sunset Rust's Session
RecordingContext
/ sunset Session
ac1fac2
to
14130c5
Compare
@@ -61,7 +61,7 @@ impl EntityDb { | |||
let mut table = DataTable::from_arrow_msg(msg)?; | |||
table.compute_all_size_bytes(); | |||
|
|||
// TODO(#1619): batch all of this | |||
// TODO(cmc): batch all of this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oooh.... even more performance gains to be had?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we only batch during transport at the moment, everything else happens on a row-by-row basis.
Though I'm not sure it's ever going to be worth it in this instance: implementing the write path in terms of individual rows is so flexible, simple and easy to extend... and things are already so fast even with just the transport-level batching...
/// Only applies to sinks that maintain a backlog. | ||
#[inline] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that all sinks implement this but don't support it strikes me as awkward. It is possible to be explicit about tracking when our Sink is a BufferedSink and only implement drain_backlog there?
At a minimum maybe signature should be Option<Vec<LogMsg>>
to disambiguate between "The sink doesn't support a backlog" and "The backlog was empty."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean sure we could return an option but I don't see how that'd be an improvement? We can always add it later on if we ever need to disambiguate between no backlog
vs. empty backlog
, but there's no such need at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we ever need to disambiguate between no backlog vs. empty backlog
Yeah, we can talk about this more concretely in a future context. I'm thinking about cases like: "I tried to use a Tcp sink, but the tcp end-point wasn't there, so now instead I try to switch my sink to a FileSink, expecting it to drain the backlog of buffered tcp messages, but even though the tcp sink told me there was no backlog, I actually just lost my data"
/// | ||
/// This can be used to then construct a [`RecordingStream`] manually using | ||
/// [`RecordingStream::new`]. | ||
pub fn finalize(self) -> (bool, RecordingInfo, DataTableBatcherConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically, returning enabled
from the RecordingStreamBuilder feels a bit out of place to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the fact that this doesn't return a RecordingStream at all is even more unexpected... maybe this struct should be named something like RecordstringStreamConstructionArgs (but shorter)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, I'm not sure adding yet another type will help... The doc already makes it pretty clear what's going on here.
Also this code is just a copy-paste of the old SessionBuilder::finalize
code which never really gave us any problems 🤷
pub enum RecordingStreamError { | ||
/// Error within the underlying file sink. | ||
#[error("Failed to create the underlying file sink: {0}")] | ||
FileSink(#[from] re_log_encoding::FileSinkError), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to fix this now but curious how we would make this Sink-agnostic for extensibility reasons in the future? Maybe we don't need extensible anyways so I'm over-thinking it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like
/// Error within the underlying file sink.
#[error("Failed to create the underlying file sink: {0}")]
Sink(#[from] Box<dyn std::error::Error + Send + Sync>),
would do the job, wouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, something along those lines.
Updated with some (hopefully) clarifications |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Co-authored-by: Jeremy Leibs <[email protected]>
RecordingContext
/ sunset Session
Session
with RecordingContext
Session
has been replaced byRecordingStream
, andSessionBuilder
withRecordingStreamBuilder
.This introduces
RecordStream
, which we've all talked about in leeeeeeength this afternoon so I won't dwell on it too much.A
RecordStream
is in charge of the entire data pipeline for a single recording, and comes with well-defined ordering, durability and multithreading guarantees/semantics.RecordStream
replaces the previousSession
stuff in the Rust SDK, and will become the basis of the Python SDK too in the next PR.Try it!
api_demo
on row at a time:api_demo
in one big single packet:TODO:
DataTableBatcher
#1980Session
withRecordingContext
#1983clock
example for Rust #2000PythonSession
#1985On top of #1980
Part of #1619
Future work:
BeginRecordingMsg
is a misnomer #1996