Skip to content

Commit

Permalink
Fix wrong RowId order of logged data (#4658)
Browse files Browse the repository at this point in the history
### What
Rerun is designed to be able to handle out-of-order ingestion, but with
a performance hit.

Usually that performance hit is very small, but when logging to the
exact same timestamp many times, we hit a corner-case of the data store
where we get a huge bucket (see
#4415). If the data arrives
out-of-order, this means an expensive resort of the huge bucket on each
ingestion.

Usually such out-of-order log rows should only happen in multi-threaded
applications, but due to a bug it happened almost always. This PR fixes
this bug, and adds regression test for it. This PR thus alleviates the
problem in #4415, but does not
fix it for actual out-of-order multi-threaded applications.

~I introduced the bug in #4502 after the 0.11 release.~ **EDIT:** no, it
was pre-existing!

<img
src="https://github.com/rerun-io/rerun/assets/1148717/fde1f973-deec-46be-b6cc-b671968bf633"
width="250">

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using newly built examples:
[app.rerun.io](https://app.rerun.io/pr/4658/index.html)
* Using examples from latest `main` build:
[app.rerun.io](https://app.rerun.io/pr/4658/index.html?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[app.rerun.io](https://app.rerun.io/pr/4658/index.html?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG

- [PR Build Summary](https://build.rerun.io/pr/4658)
- [Docs
preview](https://rerun.io/preview/b58459b740b693d86be9e9b3e846d37bf565f060/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/b58459b740b693d86be9e9b3e846d37bf565f060/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)
  • Loading branch information
emilk authored Jan 3, 2024
1 parent 17e64f6 commit 18aac95
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 22 deletions.
4 changes: 2 additions & 2 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ where

impl ArrowChunkReleaseCallback {
#[inline]
pub fn as_ptr(&self) -> *const () {
fn as_ptr(&self) -> *const () {
Arc::as_ptr(&self.0).cast::<()>()
}
}

impl PartialEq for ArrowChunkReleaseCallback {
#[inline]
fn eq(&self, other: &Self) -> bool {
std::ptr::eq(self.as_ptr(), other.as_ptr())
Arc::ptr_eq(&self.0, &other.0)
}
}

Expand Down
74 changes: 66 additions & 8 deletions crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,67 @@ pub enum DataTableBatcherError {

pub type DataTableBatcherResult<T> = Result<T, DataTableBatcherError>;

/// Callbacks you can install on the [`DataTableBatcher`].
#[derive(Clone, Default)]
pub struct BatcherHooks {
/// Called when a new row arrives.
///
/// The callback is given the slice of all rows not yet batched,
/// including the new one.
///
/// Used for testing.
#[allow(clippy::type_complexity)]
pub on_insert: Option<Arc<dyn Fn(&[DataRow]) + Send + Sync>>,

/// Callback to be run when an Arrow Chunk` goes out of scope.
///
/// See [`crate::ArrowChunkReleaseCallback`] for more information.
pub on_release: Option<crate::ArrowChunkReleaseCallback>,
}

impl BatcherHooks {
pub const NONE: Self = Self {
on_insert: None,
on_release: None,
};
}

impl PartialEq for BatcherHooks {
fn eq(&self, other: &Self) -> bool {
let Self {
on_insert,
on_release,
} = self;

let on_insert_eq = match (on_insert, &other.on_insert) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
};

on_insert_eq && on_release == &other.on_release
}
}

impl std::fmt::Debug for BatcherHooks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
on_insert,
on_release,
} = self;
f.debug_struct("BatcherHooks")
.field("on_insert", &on_insert.as_ref().map(|_| "…"))
.field("on_release", &on_release)
.finish()
}
}

// ---

/// Defines the different thresholds of the associated [`DataTableBatcher`].
///
/// See [`Self::default`] and [`Self::from_env`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq)]
pub struct DataTableBatcherConfig {
/// Duration of the periodic tick.
//
Expand Down Expand Up @@ -65,10 +120,8 @@ pub struct DataTableBatcherConfig {
/// Unbounded if left unspecified.
pub max_tables_in_flight: Option<u64>,

/// Callback to be run when an Arrow Chunk` goes out of scope.
///
/// See [`crate::ArrowChunkReleaseCallback`] for more information.
pub on_release: Option<crate::ArrowChunkReleaseCallback>,
/// Callbacks you can install on the [`DataTableBatcher`].
pub hooks: BatcherHooks,
}

impl Default for DataTableBatcherConfig {
Expand All @@ -85,7 +138,7 @@ impl DataTableBatcherConfig {
flush_num_rows: u64::MAX,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
hooks: BatcherHooks::NONE,
};

/// Always flushes ASAP.
Expand All @@ -95,7 +148,7 @@ impl DataTableBatcherConfig {
flush_num_rows: 0,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
hooks: BatcherHooks::NONE,
};

/// Never flushes unless manually told to.
Expand All @@ -105,7 +158,7 @@ impl DataTableBatcherConfig {
flush_num_rows: u64::MAX,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
hooks: BatcherHooks::NONE,
};

/// Environment variable to configure [`Self::flush_tick`].
Expand Down Expand Up @@ -447,6 +500,11 @@ fn batching_thread(
match cmd {
Command::AppendRow(row) => {
do_push_row(&mut acc, row);

if let Some(config) = config.hooks.on_insert.as_ref() {
config(&acc.pending_rows);
}

if acc.pending_num_rows >= config.flush_num_rows {
do_flush_all(&mut acc, &tx_table, "rows");
} else if acc.pending_num_bytes >= config.flush_num_bytes {
Expand Down
20 changes: 10 additions & 10 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ impl RecordingStreamInner {
batcher_config: DataTableBatcherConfig,
sink: Box<dyn LogSink>,
) -> RecordingStreamResult<Self> {
let on_release = batcher_config.on_release.clone();
let on_release = batcher_config.hooks.on_release.clone();
let batcher = DataTableBatcher::new(batcher_config)?;

{
Expand Down Expand Up @@ -912,29 +912,29 @@ impl RecordingStream {
// internal clock.
let timepoint = TimePoint::timeless();

let instanced = if instanced.is_empty() {
// TODO(#1893): unsplit splats once new data cells are in
let splatted = if splatted.is_empty() {
None
} else {
splatted.push(DataCell::from_native([InstanceKey::SPLAT]));
Some(DataRow::from_cells(
row_id,
timepoint.clone(),
ent_path.clone(),
num_instances as _,
instanced,
1,
splatted,
)?)
};

// TODO(#1893): unsplit splats once new data cells are in
let splatted = if splatted.is_empty() {
let instanced = if instanced.is_empty() {
None
} else {
splatted.push(DataCell::from_native([InstanceKey::SPLAT]));
Some(DataRow::from_cells(
row_id.incremented_by(1), // we need a unique RowId from what is used for the instanced data
row_id.incremented_by(1), // we need a unique RowId from what is used for the splatted data
timepoint,
ent_path,
1,
splatted,
num_instances as _,
instanced,
)?)
};

Expand Down
29 changes: 29 additions & 0 deletions crates/rerun/tests/rerun_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/// Regression test for checking that `RowId`s are generated in-order (when single-threaded).
///
/// Out-of-order row IDs is technically fine, but can cause unnecessary performance issues.
///
/// See for instance <https://github.com/rerun-io/rerun/issues/4415>.
#[test]
fn test_row_id_order() {
let mut batcher_config = rerun::log::DataTableBatcherConfig::NEVER;
batcher_config.hooks.on_insert = Some(std::sync::Arc::new(|rows| {
if let [.., penultimate, ultimate] = rows {
assert!(
penultimate.row_id() <= ultimate.row_id(),
"Rows coming to batcher out-of-order"
);
}
}));
let (rec, _mem_storage) = rerun::RecordingStreamBuilder::new("rerun_example_test")
.batcher_config(batcher_config)
.memory()
.unwrap();

for _ in 0..10 {
rec.log(
"foo",
&rerun::Points2D::new([(1.0, 2.0), (3.0, 4.0)]).with_radii([1.0]),
)
.unwrap();
}
}
4 changes: 2 additions & 2 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ fn new_recording(
let on_release = |chunk| {
GARBAGE_QUEUE.0.send(chunk).ok();
};
batcher_config.on_release = Some(on_release.into());
batcher_config.hooks.on_release = Some(on_release.into());

let recording = RecordingStreamBuilder::new(application_id)
.batcher_config(batcher_config)
Expand Down Expand Up @@ -299,7 +299,7 @@ fn new_blueprint(
let on_release = |chunk| {
GARBAGE_QUEUE.0.send(chunk).ok();
};
batcher_config.on_release = Some(on_release.into());
batcher_config.hooks.on_release = Some(on_release.into());

let blueprint = RecordingStreamBuilder::new(application_id)
.batcher_config(batcher_config)
Expand Down

0 comments on commit 18aac95

Please sign in to comment.