Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK batching/revamp 2.2: homegrown arrow size estimation routines #2002

Merged
merged 41 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
68d8b0c
version crossbeam at the workspace level
teh-cmc Apr 17, 2023
cb74038
more DataRow size helpers
teh-cmc Apr 26, 2023
a0d9d39
DataTableBatcher
teh-cmc Apr 26, 2023
f46ac72
lints
teh-cmc Apr 26, 2023
5440f76
lints
teh-cmc Apr 26, 2023
c1088c5
self review
teh-cmc Apr 26, 2023
cbf17be
don't expose shutdown to make errors impossible
teh-cmc Apr 26, 2023
e7b42bf
doc
teh-cmc Apr 26, 2023
573de98
backport
teh-cmc Apr 26, 2023
67dc616
backport
teh-cmc Apr 27, 2023
14130c5
introduce RecordingStream
teh-cmc Apr 27, 2023
71a31bd
clean up old stuff from the before time
teh-cmc Apr 27, 2023
649bbe8
self-review
teh-cmc Apr 28, 2023
2b74c3b
ordered data columns in data tables
teh-cmc Apr 28, 2023
34be0a7
tests
teh-cmc Apr 28, 2023
72685fa
even more tests
teh-cmc Apr 28, 2023
067168f
rogue todo
teh-cmc Apr 28, 2023
b8e0065
batching is now a reality
teh-cmc Apr 28, 2023
0e69707
some extra peace of mind
teh-cmc Apr 28, 2023
a7f84c8
revert
teh-cmc Apr 28, 2023
ead5883
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc Apr 28, 2023
3f0ec73
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc Apr 28, 2023
6e348db
lock shenanigans
teh-cmc Apr 28, 2023
a31285b
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc May 3, 2023
ecb7ce5
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc May 3, 2023
d070c08
crash when relying on uncomputed cell sizes in debug
teh-cmc May 3, 2023
f8e74e9
thorough testing of estimated size measurements with unions and batch…
teh-cmc May 3, 2023
da160c5
dealing with all cases of reliance on uncomputed sizes
teh-cmc May 3, 2023
5fc49de
inline upstream's estimated_bytes_size routine and integrate it with …
teh-cmc May 3, 2023
0ba7631
homegrown, batch-compatible arrow size estimations
teh-cmc May 3, 2023
dd3d08f
just format
teh-cmc May 3, 2023
b89443f
missed one
teh-cmc May 3, 2023
8580773
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/2_rust…
teh-cmc May 3, 2023
4af3342
merge shenanigans
teh-cmc May 3, 2023
d1e5c19
address PR comments
teh-cmc May 3, 2023
da8bc6a
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/22_ba…
teh-cmc May 3, 2023
dc3cd57
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/22_bat…
teh-cmc May 4, 2023
8956636
self-review
teh-cmc May 4, 2023
eae05c5
address PR comments
teh-cmc May 4, 2023
6ee6d28
some more 'cargo test --bench' fun times
teh-cmc May 4, 2023
7ab6af4
Merge branch 'main' into cmc/sdk_revamp/22_batching_estimated_size_hell
teh-cmc May 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions crates/re_arrow_store/benches/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use std::sync::Arc;

use arrow2::{
array::{Array, PrimitiveArray, StructArray, UnionArray},
compute::aggregate::estimated_bytes_size,
};
use arrow2::array::{Array, PrimitiveArray, StructArray, UnionArray};
use criterion::{criterion_group, Criterion};
use itertools::Itertools;
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_some_instances, build_some_point2d, build_some_rects},
external::arrow2_convert::serialize::TryIntoArrow,
DataCell, SerializableComponent,
DataCell, SerializableComponent, SizeBytes as _,
};

// ---
Expand Down Expand Up @@ -109,15 +106,15 @@ fn erased_clone(c: &mut Criterion) {

let total_size_bytes = arrays
.iter()
.map(|array| estimated_bytes_size(&**array) as u64)
.map(|array| array.total_size_bytes())
.sum::<u64>();
assert!(total_size_bytes as usize >= NUM_ROWS * NUM_INSTANCES * std::mem::size_of::<T>());

group.bench_function("array", |b| {
b.iter(|| {
let sz = arrays
.iter()
.map(|array| estimated_bytes_size(&**array) as u64)
.map(|array| array.total_size_bytes())
.sum::<u64>();
assert_eq!(total_size_bytes, sz);
sz
Expand Down
9 changes: 7 additions & 2 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ fn gc(c: &mut Criterion) {
(NUM_INSTANCES * NUM_ROWS) as _,
));

let mut table = build_table(NUM_INSTANCES as usize, false);
table.compute_all_size_bytes();
let table = build_table(NUM_INSTANCES as usize, false);

// Default config
group.bench_function("default", |b| {
Expand Down Expand Up @@ -325,6 +324,12 @@ fn build_table(n: usize, packed: bool) -> DataTable {
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
}

// NOTE: Using unsized cells will crash in debug mode, and benchmarks are run for 1 iteration,
// in debug mode, by the standard test harness.
if cfg!(debug_assertions) {
table.compute_all_size_bytes();
}

table
}

Expand Down
12 changes: 4 additions & 8 deletions crates/re_arrow_store/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,22 @@ use crate::{DataStore, DataStoreConfig};
#[macro_export]
macro_rules! test_row {
($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {{
let mut row = ::re_log_types::DataRow::from_cells1(
::re_log_types::DataRow::from_cells1_sized(
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
$c0,
);
row.compute_all_size_bytes();
row
)
}};
($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {{
let mut row = ::re_log_types::DataRow::from_cells2(
::re_log_types::DataRow::from_cells2_sized(
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
($c0, $c1),
);
row.compute_all_size_bytes();
row
)
}};
}

Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/tests/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn pathological_bucket_topology() {

let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]);
for _ in 0..num {
let row = DataRow::from_cells1(
let row = DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint.clone(),
Expand All @@ -61,7 +61,7 @@ fn pathological_bucket_topology() {
);
store_forward.insert_row(&row).unwrap();

let row = DataRow::from_cells1(
let row = DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint.clone(),
Expand All @@ -83,7 +83,7 @@ fn pathological_bucket_topology() {
let rows = range
.map(|frame_nr| {
let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]);
DataRow::from_cells1(
DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint,
Expand Down
2 changes: 0 additions & 2 deletions crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ array-init = "2.1.0"
arrow2 = { workspace = true, features = [
"io_ipc",
"io_print",
"compute_aggregate",
"compute_concatenate",
"compute_aggregate",
] }
arrow2_convert.workspace = true
bytemuck = "1.11"
Expand Down
193 changes: 15 additions & 178 deletions crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl std::fmt::Display for DataCell {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"DataCell({})",
re_format::format_bytes(self.total_size_bytes() as _)
re_format::format_bytes(self.inner.size_bytes as _)
))?;
re_format::arrow::format_table(
// NOTE: wrap in a ListArray so that it looks more cell-like (i.e. single row)
Expand Down Expand Up @@ -546,6 +546,12 @@ impl SizeBytes for DataCell {
(self.inner.size_bytes > 0)
.then_some(self.inner.size_bytes)
.unwrap_or_else(|| {
// NOTE: Relying on unsized cells is always a mistake, but it isn't worth crashing
// the viewer when in release mode.
debug_assert!(
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
false,
"called `DataCell::heap_size_bytes() without computing it first"
);
re_log::warn_once!(
"called `DataCell::heap_size_bytes() without computing it first"
);
Expand Down Expand Up @@ -573,11 +579,11 @@ impl DataCellInner {
return;
}

let values: &dyn arrow2::array::Array = values.as_ref();
*size_bytes = name.total_size_bytes()
+ size_bytes.total_size_bytes()
+ values.data_type().total_size_bytes()
+ std::mem::size_of_val(values) as u64
+ arrow2::compute::aggregate::estimated_bytes_size(&**values) as u64;
+ values.total_size_bytes();
}
}

Expand All @@ -587,7 +593,8 @@ fn data_cell_sizes() {
use arrow2::array::UInt64Array;

// not computed
{
// NOTE: Unsized cells are illegal in debug mode and will flat out crash.
if !cfg!(debug_assertions) {
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
let cell = DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed());
assert_eq!(0, cell.heap_size_bytes());
assert_eq!(0, cell.heap_size_bytes());
Expand All @@ -599,8 +606,8 @@ fn data_cell_sizes() {
DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed());
cell.compute_size_bytes();

assert_eq!(112, cell.heap_size_bytes());
assert_eq!(112, cell.heap_size_bytes());
assert_eq!(216, cell.heap_size_bytes());
assert_eq!(216, cell.heap_size_bytes());
}

// anything else
Expand All @@ -612,177 +619,7 @@ fn data_cell_sizes() {
cell.compute_size_bytes();

// zero-sized + 3x u64s
assert_eq!(136, cell.heap_size_bytes());
assert_eq!(136, cell.heap_size_bytes());
}
}

// This test exists because the documentation and online discussions revolving around
// arrow2's `estimated_bytes_size()` function indicate that there's a lot of limitations and
// edge cases to be aware of.
//
// Also, it's just plain hard to be sure that the answer you get is the answer you're looking
// for with these kinds of tools. When in doubt.. test everything we're going to need from it.
//
// In many ways, this is a specification of what we mean when we ask "what's the size of this
// Arrow array?".
#[test]
#[allow(clippy::from_iter_instead_of_collect)]
fn test_arrow_estimated_size_bytes() {
use arrow2::{
array::{Array, Float64Array, ListArray, StructArray, UInt64Array, Utf8Array},
compute::aggregate::estimated_bytes_size,
datatypes::{DataType, Field},
offset::Offsets,
};

// empty primitive array
{
let data = vec![];
let array = UInt64Array::from_vec(data.clone()).boxed();
let sz = estimated_bytes_size(&*array);
assert_eq!(0, sz);
assert_eq!(std::mem::size_of_val(data.as_slice()), sz);
}

// simple primitive array
{
let data = vec![42u64; 100];
let array = UInt64Array::from_vec(data.clone()).boxed();
assert_eq!(
std::mem::size_of_val(data.as_slice()),
estimated_bytes_size(&*array)
);
}

// utf8 strings array
{
let data = vec![Some("some very, very, very long string indeed"); 100];
let array = Utf8Array::<i32>::from(data.clone()).to_boxed();

let raw_size_bytes = data
.iter()
// headers + bodies!
.map(|s| std::mem::size_of_val(s) + std::mem::size_of_val(s.unwrap().as_bytes()))
.sum::<usize>();
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(5600, raw_size_bytes);
assert_eq!(4404, arrow_size_bytes); // smaller because validity bitmaps instead of opts
}

// simple primitive list array
{
let data = std::iter::repeat(vec![42u64; 100])
.take(50)
.collect::<Vec<_>>();
let array = {
let array_flattened =
UInt64Array::from_vec(data.clone().into_iter().flatten().collect()).boxed();

ListArray::<i32>::new(
ListArray::<i32>::default_datatype(DataType::UInt64),
Offsets::try_from_lengths(std::iter::repeat(50).take(50))
.unwrap()
.into(),
array_flattened,
None,
)
.boxed()
};

let raw_size_bytes = data
.iter()
// headers + bodies!
.map(|s| std::mem::size_of_val(s) + std::mem::size_of_val(s.as_slice()))
.sum::<usize>();
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(41200, raw_size_bytes);
assert_eq!(40200, arrow_size_bytes); // smaller because smaller inner headers
}

// compound type array
{
#[derive(Clone, Copy)]
struct Point {
x: f64,
y: f64,
}

impl Default for Point {
fn default() -> Self {
Self { x: 42.0, y: 666.0 }
}
}

let data = vec![Point::default(); 100];
let array = {
let x = Float64Array::from_vec(data.iter().map(|p| p.x).collect()).boxed();
let y = Float64Array::from_vec(data.iter().map(|p| p.y).collect()).boxed();
let fields = vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
];
StructArray::new(DataType::Struct(fields), vec![x, y], None).boxed()
};

let raw_size_bytes = std::mem::size_of_val(data.as_slice());
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(1600, raw_size_bytes);
assert_eq!(1600, arrow_size_bytes);
}

// compound type list array
{
#[derive(Clone, Copy)]
struct Point {
x: f64,
y: f64,
}

impl Default for Point {
fn default() -> Self {
Self { x: 42.0, y: 666.0 }
}
}

let data = std::iter::repeat(vec![Point::default(); 100])
.take(50)
.collect::<Vec<_>>();
let array: Box<dyn Array> = {
let array = {
let x =
Float64Array::from_vec(data.iter().flatten().map(|p| p.x).collect()).boxed();
let y =
Float64Array::from_vec(data.iter().flatten().map(|p| p.y).collect()).boxed();
let fields = vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
];
StructArray::new(DataType::Struct(fields), vec![x, y], None)
};

ListArray::<i32>::new(
ListArray::<i32>::default_datatype(array.data_type().clone()),
Offsets::try_from_lengths(std::iter::repeat(50).take(50))
.unwrap()
.into(),
array.boxed(),
None,
)
.boxed()
};

let raw_size_bytes = data
.iter()
// headers + bodies!
.map(|s| std::mem::size_of_val(s) + std::mem::size_of_val(s.as_slice()))
.sum::<usize>();
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(81200, raw_size_bytes);
assert_eq!(80200, arrow_size_bytes); // smaller because smaller inner headers
assert_eq!(240, cell.heap_size_bytes());
assert_eq!(240, cell.heap_size_bytes());
}
}
Loading