Skip to content

Commit

Permalink
Fix RERUN_FLUSH_NUM_BYTES and data size estimations (#5086)
Browse files Browse the repository at this point in the history
### What
* Closes #5078

The first commit is the actual fix: the size calculations were
completely wrong for fixed-size array and `SmallVec`, leading the
batcher to severely under-counting how many bytes it had processed,
leading it to effectively ignore `RERUN_FLUSH_NUM_BYTES `, which in turn
lead to example .rrds which big batches of data, which streams poorly.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using newly built examples:
[app.rerun.io](https://app.rerun.io/pr/5086/index.html)
* Using examples from latest `main` build:
[app.rerun.io](https://app.rerun.io/pr/5086/index.html?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[app.rerun.io](https://app.rerun.io/pr/5086/index.html?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/5086)
- [Docs
preview](https://rerun.io/preview/1830ed9e7665f8c067fbf3ea9192afff1cc100e8/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/1830ed9e7665f8c067fbf3ea9192afff1cc100e8/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)
  • Loading branch information
emilk authored Feb 7, 2024
1 parent dd18b03 commit e0f51ab
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 45 deletions.
13 changes: 12 additions & 1 deletion crates/re_build_examples/src/rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,24 @@ pub struct Rrd {

#[argh(option, description = "include only examples in this channel")]
channel: Channel,

#[argh(option, description = "run only these examples")]
examples: Vec<String>,
}

impl Rrd {
pub fn run(self) -> anyhow::Result<()> {
create_dir_all(&self.output_dir)?;

let examples = self.channel.examples()?;
let examples = if self.examples.is_empty() {
self.channel.examples()?
} else {
Channel::Nightly
.examples()?
.into_iter()
.filter(|example| self.examples.contains(&example.name))
.collect()
};
let progress = MultiProgress::new();
let results: Vec<anyhow::Result<PathBuf>> = examples
.into_par_iter()
Expand Down
26 changes: 12 additions & 14 deletions crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,20 +626,18 @@ impl DataCell {
impl SizeBytes for DataCell {
#[inline]
fn heap_size_bytes(&self) -> u64 {
(self.inner.size_bytes > 0)
.then_some(self.inner.size_bytes)
.unwrap_or_else(|| {
// NOTE: Relying on unsized cells is always a mistake, but it isn't worth crashing
// the viewer when in release mode.
debug_assert!(
false,
"called `DataCell::heap_size_bytes() without computing it first"
);
re_log::warn_once!(
"called `DataCell::heap_size_bytes() without computing it first"
);
0
})
if 0 < self.inner.size_bytes {
self.inner.size_bytes
} else {
// NOTE: Relying on unsized cells is always a mistake, but it isn't worth crashing
// the viewer when in release mode.
debug_assert!(
false,
"called `DataCell::heap_size_bytes() without computing it first"
);
re_log::warn_once!("called `DataCell::heap_size_bytes() without computing it first");
0
}
}
}

Expand Down
41 changes: 27 additions & 14 deletions crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,19 @@ impl DataTableBatcherConfig {
}

if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
new.flush_num_bytes = s
.parse()
.map_err(|err| DataTableBatcherError::ParseConfig {
name: Self::ENV_FLUSH_NUM_BYTES,
value: s.clone(),
err: Box::new(err),
})?;
if let Some(num_bytes) = re_format::parse_bytes(&s) {
// e.g. "10MB"
new.flush_num_bytes = num_bytes.unsigned_abs();
} else {
// Assume it's just an integer
new.flush_num_bytes =
s.parse()
.map_err(|err| DataTableBatcherError::ParseConfig {
name: Self::ENV_FLUSH_NUM_BYTES,
value: s.clone(),
err: Box::new(err),
})?;
}
}

if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
Expand Down Expand Up @@ -437,23 +443,20 @@ fn batching_thread(
struct Accumulator {
latest: Instant,
pending_rows: Vec<DataRow>,
pending_num_rows: u64,
pending_num_bytes: u64,
}

impl Accumulator {
fn reset(&mut self) {
self.latest = Instant::now();
self.pending_rows.clear();
self.pending_num_rows = 0;
self.pending_num_bytes = 0;
}
}

let mut acc = Accumulator {
latest: Instant::now(),
pending_rows: Default::default(),
pending_num_rows: Default::default(),
pending_num_bytes: Default::default(),
};

Expand All @@ -462,7 +465,6 @@ fn batching_thread(
// it over the wire…
row.compute_all_size_bytes();

acc.pending_num_rows += 1;
acc.pending_num_bytes += row.total_size_bytes();
acc.pending_rows.push(row);
}
Expand All @@ -474,7 +476,11 @@ fn batching_thread(
return;
}

re_log::trace!(reason, "flushing tables");
re_log::trace!(
"Flushing {} rows and {} bytes. Reason: {reason}",
rows.len(),
re_format::format_bytes(acc.pending_num_bytes as _)
);

let table = DataTable::from_rows(TableId::new(), rows.drain(..));
// TODO(#1981): efficient table sorting here, following the same rules as the store's.
Expand All @@ -487,6 +493,13 @@ fn batching_thread(
acc.reset();
}

re_log::trace!(
"Flushing every: {:.2}s, {} rows, {}",
config.flush_tick.as_secs_f64(),
config.flush_num_rows,
re_format::format_bytes(config.flush_num_bytes as _),
);

use crossbeam::select;
loop {
select! {
Expand All @@ -505,7 +518,7 @@ fn batching_thread(
config(&acc.pending_rows);
}

if acc.pending_num_rows >= config.flush_num_rows {
if acc.pending_rows.len() as u64 >= config.flush_num_rows {
do_flush_all(&mut acc, &tx_table, "rows");
} else if acc.pending_num_bytes >= config.flush_num_bytes {
do_flush_all(&mut acc, &tx_table, "bytes");
Expand All @@ -519,7 +532,7 @@ fn batching_thread(
};
},
recv(rx_tick) -> _ => {
do_flush_all(&mut acc, &tx_table, "duration");
do_flush_all(&mut acc, &tx_table, "tick");
},
};
}
Expand Down
10 changes: 8 additions & 2 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,14 @@ impl RecordingStreamBuilder {
store_kind,
};

let batcher_config = batcher_config
.unwrap_or_else(|| DataTableBatcherConfig::from_env().unwrap_or_default());
let batcher_config =
batcher_config.unwrap_or_else(|| match DataTableBatcherConfig::from_env() {
Ok(config) => config,
Err(err) => {
re_log::error!("Failed to parse DataTableBatcherConfig from env: {}", err);
DataTableBatcherConfig::default()
}
});

(enabled, store_info, batcher_config)
}
Expand Down
17 changes: 13 additions & 4 deletions crates/re_types_core/src/size_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ impl<K: SizeBytes, V: SizeBytes, S> SizeBytes for HashMap<K, V, S> {
impl<T: SizeBytes, const N: usize> SizeBytes for [T; N] {
#[inline]
fn heap_size_bytes(&self) -> u64 {
0 // it's a const-sized array
if T::is_pod() {
0 // it's a const-sized array
} else {
self.iter().map(SizeBytes::heap_size_bytes).sum::<u64>()
}
}
}

Expand Down Expand Up @@ -131,10 +135,15 @@ impl<T: SizeBytes, const N: usize> SizeBytes for SmallVec<[T; N]> {
/// Does not take capacity into account.
#[inline]
fn heap_size_bytes(&self) -> u64 {
if self.len() < N {
if self.len() <= N {
// The `SmallVec` is still smaller than the threshold so no heap data has been
// allocated yet.
0
// allocated yet, beyond the heap data each element might have.

if T::is_pod() {
0 // early-out
} else {
self.iter().map(SizeBytes::heap_size_bytes).sum::<u64>()
}
} else {
// NOTE: It's all on the heap at this point.
if T::is_pod() {
Expand Down
16 changes: 9 additions & 7 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,17 +525,19 @@ impl PrintCommand {
} else {
table.compute_all_size_bytes();

let column_names = table
.columns
.keys()
.map(|name| name.short_name())
.collect_vec();
let column_names =
table.columns.keys().map(|name| name.short_name()).join(" ");

let entity_paths = if table.col_entity_path.len() == 1 {
format!("{:?}", table.col_entity_path[0])
} else {
format!("{} different entity paths", table.col_entity_path.len())
};

println!(
"Table with {} rows ({}). Columns: {:?}",
"Table with {} rows ({}) - {entity_paths} - columns: [{column_names}]",
table.num_rows(),
re_format::format_bytes(table.heap_size_bytes() as _),
column_names
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions examples/python/nuscenes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ contains lidar data, radar data, color images, and labeled bounding boxes.
pip install -r examples/python/nuscenes/requirements.txt
python examples/python/nuscenes/main.py
```

Requires at least Python 3.9 to run.
6 changes: 3 additions & 3 deletions examples/python/plots/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ def log_trig() -> None:
rr.log("trig/sin", rr.SeriesLine(color=[255, 0, 0], name="sin(0.01t)"), timeless=True)
rr.log("trig/cos", rr.SeriesLine(color=[0, 255, 0], name="cos(0.01t)"), timeless=True)

for t in range(0, int(tau * 2 * 1000.0)):
for t in range(0, int(tau * 2 * 100.0)):
rr.set_time_sequence("frame_nr", t)

sin_of_t = sin(float(t) / 1000.0)
sin_of_t = sin(float(t) / 100.0)
rr.log("trig/sin", rr.Scalar(sin_of_t))

cos_of_t = cos(float(t) / 1000.0)
cos_of_t = cos(float(t) / 100.0)
rr.log("trig/cos", rr.Scalar(cos_of_t))


Expand Down

0 comments on commit e0f51ab

Please sign in to comment.