Skip to content

Commit

Permalink
SDK batching/revamp 2.2: homegrown arrow size estimation routines (#2002
Browse files Browse the repository at this point in the history
)

* version crossbeam at the workspace level

* more DataRow size helpers

* DataTableBatcher

* lints

* lints

* self review

* don't expose shutdown to make errors impossible

* doc

* backport

* backport

* introduce RecordingStream

* clean up old stuff from the before time

* self-review

* ordered data columns in data tables

* tests

* even more tests

* rogue todo

* batching is now a reality

* some extra peace of mind

* revert

* lock shenanigans

* crash when relying on uncomputed cell sizes in debug

* thorough testing of estimated size measurements with unions and batching involved

* dealing with all cases of reliance on uncomputed sizes

* inline upstream's estimated_bytes_size routine and integrate it with SizeBytes

* homegrown, batch-compatible arrow size estimations

* just format

* missed one

* merge shenanigans

* address PR comments

* self-review

* address PR comments

* some more 'cargo test --bench' fun times
  • Loading branch information
teh-cmc authored May 4, 2023
1 parent 141d717 commit 5a32f6e
Show file tree
Hide file tree
Showing 16 changed files with 936 additions and 245 deletions.
11 changes: 4 additions & 7 deletions crates/re_arrow_store/benches/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use std::sync::Arc;

use arrow2::{
array::{Array, PrimitiveArray, StructArray, UnionArray},
compute::aggregate::estimated_bytes_size,
};
use arrow2::array::{Array, PrimitiveArray, StructArray, UnionArray};
use criterion::{criterion_group, Criterion};
use itertools::Itertools;
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_some_instances, build_some_point2d, build_some_rects},
external::arrow2_convert::serialize::TryIntoArrow,
DataCell, SerializableComponent,
DataCell, SerializableComponent, SizeBytes as _,
};

// ---
Expand Down Expand Up @@ -109,15 +106,15 @@ fn erased_clone(c: &mut Criterion) {

let total_size_bytes = arrays
.iter()
.map(|array| estimated_bytes_size(&**array) as u64)
.map(|array| array.total_size_bytes())
.sum::<u64>();
assert!(total_size_bytes as usize >= NUM_ROWS * NUM_INSTANCES * std::mem::size_of::<T>());

group.bench_function("array", |b| {
b.iter(|| {
let sz = arrays
.iter()
.map(|array| estimated_bytes_size(&**array) as u64)
.map(|array| array.total_size_bytes())
.sum::<u64>();
assert_eq!(total_size_bytes, sz);
sz
Expand Down
9 changes: 7 additions & 2 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ fn gc(c: &mut Criterion) {
(NUM_INSTANCES * NUM_ROWS) as _,
));

let mut table = build_table(NUM_INSTANCES as usize, false);
table.compute_all_size_bytes();
let table = build_table(NUM_INSTANCES as usize, false);

// Default config
group.bench_function("default", |b| {
Expand Down Expand Up @@ -325,6 +324,12 @@ fn build_table(n: usize, packed: bool) -> DataTable {
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
}

// NOTE: Using unsized cells will crash in debug mode, and benchmarks are run for 1 iteration,
// in debug mode, by the standard test harness.
if cfg!(debug_assertions) {
table.compute_all_size_bytes();
}

table
}

Expand Down
12 changes: 4 additions & 8 deletions crates/re_arrow_store/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,22 @@ use crate::{DataStore, DataStoreConfig};
#[macro_export]
macro_rules! test_row {
($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {{
let mut row = ::re_log_types::DataRow::from_cells1(
::re_log_types::DataRow::from_cells1_sized(
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
$c0,
);
row.compute_all_size_bytes();
row
)
}};
($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {{
let mut row = ::re_log_types::DataRow::from_cells2(
::re_log_types::DataRow::from_cells2_sized(
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
($c0, $c1),
);
row.compute_all_size_bytes();
row
)
}};
}

Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/tests/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn pathological_bucket_topology() {

let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]);
for _ in 0..num {
let row = DataRow::from_cells1(
let row = DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint.clone(),
Expand All @@ -61,7 +61,7 @@ fn pathological_bucket_topology() {
);
store_forward.insert_row(&row).unwrap();

let row = DataRow::from_cells1(
let row = DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint.clone(),
Expand All @@ -83,7 +83,7 @@ fn pathological_bucket_topology() {
let rows = range
.map(|frame_nr| {
let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]);
DataRow::from_cells1(
DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint,
Expand Down
2 changes: 0 additions & 2 deletions crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ array-init = "2.1.0"
arrow2 = { workspace = true, features = [
"io_ipc",
"io_print",
"compute_aggregate",
"compute_concatenate",
"compute_aggregate",
] }
arrow2_convert.workspace = true
bytemuck = "1.11"
Expand Down
193 changes: 15 additions & 178 deletions crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl std::fmt::Display for DataCell {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"DataCell({})",
re_format::format_bytes(self.total_size_bytes() as _)
re_format::format_bytes(self.inner.size_bytes as _)
))?;
re_format::arrow::format_table(
// NOTE: wrap in a ListArray so that it looks more cell-like (i.e. single row)
Expand Down Expand Up @@ -546,6 +546,12 @@ impl SizeBytes for DataCell {
(self.inner.size_bytes > 0)
.then_some(self.inner.size_bytes)
.unwrap_or_else(|| {
// NOTE: Relying on unsized cells is always a mistake, but it isn't worth crashing
// the viewer when in release mode.
debug_assert!(
false,
"called `DataCell::heap_size_bytes() without computing it first"
);
re_log::warn_once!(
"called `DataCell::heap_size_bytes() without computing it first"
);
Expand Down Expand Up @@ -573,11 +579,11 @@ impl DataCellInner {
return;
}

let values: &dyn arrow2::array::Array = values.as_ref();
*size_bytes = name.total_size_bytes()
+ size_bytes.total_size_bytes()
+ values.data_type().total_size_bytes()
+ std::mem::size_of_val(values) as u64
+ arrow2::compute::aggregate::estimated_bytes_size(&**values) as u64;
+ values.total_size_bytes();
}
}

Expand All @@ -587,7 +593,8 @@ fn data_cell_sizes() {
use arrow2::array::UInt64Array;

// not computed
{
// NOTE: Unsized cells are illegal in debug mode and will flat out crash.
if !cfg!(debug_assertions) {
let cell = DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed());
assert_eq!(0, cell.heap_size_bytes());
assert_eq!(0, cell.heap_size_bytes());
Expand All @@ -599,8 +606,8 @@ fn data_cell_sizes() {
DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed());
cell.compute_size_bytes();

assert_eq!(112, cell.heap_size_bytes());
assert_eq!(112, cell.heap_size_bytes());
assert_eq!(216, cell.heap_size_bytes());
assert_eq!(216, cell.heap_size_bytes());
}

// anything else
Expand All @@ -612,177 +619,7 @@ fn data_cell_sizes() {
cell.compute_size_bytes();

// zero-sized + 3x u64s
assert_eq!(136, cell.heap_size_bytes());
assert_eq!(136, cell.heap_size_bytes());
}
}

// This test exists because the documentation and online discussions revolving around
// arrow2's `estimated_bytes_size()` function indicate that there's a lot of limitations and
// edge cases to be aware of.
//
// Also, it's just plain hard to be sure that the answer you get is the answer you're looking
// for with these kinds of tools. When in doubt.. test everything we're going to need from it.
//
// In many ways, this is a specification of what we mean when we ask "what's the size of this
// Arrow array?".
#[test]
#[allow(clippy::from_iter_instead_of_collect)]
fn test_arrow_estimated_size_bytes() {
use arrow2::{
array::{Array, Float64Array, ListArray, StructArray, UInt64Array, Utf8Array},
compute::aggregate::estimated_bytes_size,
datatypes::{DataType, Field},
offset::Offsets,
};

// empty primitive array
{
let data = vec![];
let array = UInt64Array::from_vec(data.clone()).boxed();
let sz = estimated_bytes_size(&*array);
assert_eq!(0, sz);
assert_eq!(std::mem::size_of_val(data.as_slice()), sz);
}

// simple primitive array
{
let data = vec![42u64; 100];
let array = UInt64Array::from_vec(data.clone()).boxed();
assert_eq!(
std::mem::size_of_val(data.as_slice()),
estimated_bytes_size(&*array)
);
}

// utf8 strings array
{
let data = vec![Some("some very, very, very long string indeed"); 100];
let array = Utf8Array::<i32>::from(data.clone()).to_boxed();

let raw_size_bytes = data
.iter()
// headers + bodies!
.map(|s| std::mem::size_of_val(s) + std::mem::size_of_val(s.unwrap().as_bytes()))
.sum::<usize>();
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(5600, raw_size_bytes);
assert_eq!(4404, arrow_size_bytes); // smaller because validity bitmaps instead of opts
}

// simple primitive list array
{
let data = std::iter::repeat(vec![42u64; 100])
.take(50)
.collect::<Vec<_>>();
let array = {
let array_flattened =
UInt64Array::from_vec(data.clone().into_iter().flatten().collect()).boxed();

ListArray::<i32>::new(
ListArray::<i32>::default_datatype(DataType::UInt64),
Offsets::try_from_lengths(std::iter::repeat(50).take(50))
.unwrap()
.into(),
array_flattened,
None,
)
.boxed()
};

let raw_size_bytes = data
.iter()
// headers + bodies!
.map(|s| std::mem::size_of_val(s) + std::mem::size_of_val(s.as_slice()))
.sum::<usize>();
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(41200, raw_size_bytes);
assert_eq!(40200, arrow_size_bytes); // smaller because smaller inner headers
}

// compound type array
{
#[derive(Clone, Copy)]
struct Point {
x: f64,
y: f64,
}

impl Default for Point {
fn default() -> Self {
Self { x: 42.0, y: 666.0 }
}
}

let data = vec![Point::default(); 100];
let array = {
let x = Float64Array::from_vec(data.iter().map(|p| p.x).collect()).boxed();
let y = Float64Array::from_vec(data.iter().map(|p| p.y).collect()).boxed();
let fields = vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
];
StructArray::new(DataType::Struct(fields), vec![x, y], None).boxed()
};

let raw_size_bytes = std::mem::size_of_val(data.as_slice());
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(1600, raw_size_bytes);
assert_eq!(1600, arrow_size_bytes);
}

// compound type list array
{
#[derive(Clone, Copy)]
struct Point {
x: f64,
y: f64,
}

impl Default for Point {
fn default() -> Self {
Self { x: 42.0, y: 666.0 }
}
}

let data = std::iter::repeat(vec![Point::default(); 100])
.take(50)
.collect::<Vec<_>>();
let array: Box<dyn Array> = {
let array = {
let x =
Float64Array::from_vec(data.iter().flatten().map(|p| p.x).collect()).boxed();
let y =
Float64Array::from_vec(data.iter().flatten().map(|p| p.y).collect()).boxed();
let fields = vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
];
StructArray::new(DataType::Struct(fields), vec![x, y], None)
};

ListArray::<i32>::new(
ListArray::<i32>::default_datatype(array.data_type().clone()),
Offsets::try_from_lengths(std::iter::repeat(50).take(50))
.unwrap()
.into(),
array.boxed(),
None,
)
.boxed()
};

let raw_size_bytes = data
.iter()
// headers + bodies!
.map(|s| std::mem::size_of_val(s) + std::mem::size_of_val(s.as_slice()))
.sum::<usize>();
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(81200, raw_size_bytes);
assert_eq!(80200, arrow_size_bytes); // smaller because smaller inner headers
assert_eq!(240, cell.heap_size_bytes());
assert_eq!(240, cell.heap_size_bytes());
}
}
Loading

0 comments on commit 5a32f6e

Please sign in to comment.