Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK batching/revamp 2.2: homegrown arrow size estimation routines #2002

Merged
merged 41 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
68d8b0c
version crossbeam at the workspace level
teh-cmc Apr 17, 2023
cb74038
more DataRow size helpers
teh-cmc Apr 26, 2023
a0d9d39
DataTableBatcher
teh-cmc Apr 26, 2023
f46ac72
lints
teh-cmc Apr 26, 2023
5440f76
lints
teh-cmc Apr 26, 2023
c1088c5
self review
teh-cmc Apr 26, 2023
cbf17be
don't expose shutdown to make errors impossible
teh-cmc Apr 26, 2023
e7b42bf
doc
teh-cmc Apr 26, 2023
573de98
backport
teh-cmc Apr 26, 2023
67dc616
backport
teh-cmc Apr 27, 2023
14130c5
introduce RecordingStream
teh-cmc Apr 27, 2023
71a31bd
clean up old stuff from the before time
teh-cmc Apr 27, 2023
649bbe8
self-review
teh-cmc Apr 28, 2023
2b74c3b
ordered data columns in data tables
teh-cmc Apr 28, 2023
34be0a7
tests
teh-cmc Apr 28, 2023
72685fa
even more tests
teh-cmc Apr 28, 2023
067168f
rogue todo
teh-cmc Apr 28, 2023
b8e0065
batching is now a reality
teh-cmc Apr 28, 2023
0e69707
some extra peace of mind
teh-cmc Apr 28, 2023
a7f84c8
revert
teh-cmc Apr 28, 2023
ead5883
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc Apr 28, 2023
3f0ec73
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc Apr 28, 2023
6e348db
lock shenanigans
teh-cmc Apr 28, 2023
a31285b
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc May 3, 2023
ecb7ce5
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc May 3, 2023
d070c08
crash when relying on uncomputed cell sizes in debug
teh-cmc May 3, 2023
f8e74e9
thorough testing of estimated size measurements with unions and batch…
teh-cmc May 3, 2023
da160c5
dealing with all cases of reliance on uncomputed sizes
teh-cmc May 3, 2023
5fc49de
inline upstream's estimated_bytes_size routine and integrate it with …
teh-cmc May 3, 2023
0ba7631
homegrown, batch-compatible arrow size estimations
teh-cmc May 3, 2023
dd3d08f
just format
teh-cmc May 3, 2023
b89443f
missed one
teh-cmc May 3, 2023
8580773
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/2_rust…
teh-cmc May 3, 2023
4af3342
merge shenanigans
teh-cmc May 3, 2023
d1e5c19
address PR comments
teh-cmc May 3, 2023
da8bc6a
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/22_ba…
teh-cmc May 3, 2023
dc3cd57
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/22_bat…
teh-cmc May 4, 2023
8956636
self-review
teh-cmc May 4, 2023
eae05c5
address PR comments
teh-cmc May 4, 2023
6ee6d28
some more 'cargo test --bench' fun times
teh-cmc May 4, 2023
7ab6af4
Merge branch 'main' into cmc/sdk_revamp/22_batching_estimated_size_hell
teh-cmc May 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ fn gc(c: &mut Criterion) {
(NUM_INSTANCES * NUM_ROWS) as _,
));

let mut table = build_table(NUM_INSTANCES as usize, false);
table.compute_all_size_bytes();
let table = build_table(NUM_INSTANCES as usize, false);

// Default config
group.bench_function("default", |b| {
Expand Down Expand Up @@ -318,16 +317,19 @@ fn build_table(n: usize, packed: bool) -> DataTable {
)
}),
);
if cfg!(debug_assertions) {
table.compute_all_size_bytes();
}

// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
}

// NOTE: Using unsized cells will crash in debug mode, and benchmarks are run for 1 iteration,
// in debug mode, by the standard test harness.
if cfg!(debug_assertions) {
table.compute_all_size_bytes();
}

table
}

Expand Down
12 changes: 4 additions & 8 deletions crates/re_arrow_store/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,22 @@ use crate::{DataStore, DataStoreConfig};
#[macro_export]
macro_rules! test_row {
($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {{
let mut row = ::re_log_types::DataRow::from_cells1(
::re_log_types::DataRow::from_cells1_sized(
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
$c0,
);
row.compute_all_size_bytes();
row
)
}};
($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {{
let mut row = ::re_log_types::DataRow::from_cells2(
::re_log_types::DataRow::from_cells2_sized(
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
($c0, $c1),
);
row.compute_all_size_bytes();
row
)
}};
}

Expand Down
12 changes: 4 additions & 8 deletions crates/re_arrow_store/tests/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,22 @@ fn pathological_bucket_topology() {

let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]);
for _ in 0..num {
let mut row = DataRow::from_cells1(
let row = DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint.clone(),
num_instances,
build_some_instances(num_instances as _),
);
row.compute_all_size_bytes();
store_forward.insert_row(&row).unwrap();

let mut row = DataRow::from_cells1(
let row = DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint.clone(),
num_instances,
build_some_instances(num_instances as _),
);
row.compute_all_size_bytes();
store_backward.insert_row(&row).unwrap();
}
}
Expand All @@ -85,15 +83,13 @@ fn pathological_bucket_topology() {
let rows = range
.map(|frame_nr| {
let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]);
let mut row = DataRow::from_cells1(
DataRow::from_cells1_sized(
RowId::random(),
ent_path.clone(),
timepoint,
num_instances,
build_some_instances(num_instances as _),
);
row.compute_all_size_bytes();
row
)
})
.collect::<Vec<_>>();

Expand Down
3 changes: 3 additions & 0 deletions crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ impl SizeBytes for DataCell {
(self.inner.size_bytes > 0)
.then_some(self.inner.size_bytes)
.unwrap_or_else(|| {
// NOTE: Relying on unsized cells is always a mistake, but it isn't worth crashing
// the viewer when in release mode.
debug_assert!(
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
false,
"called `DataCell::heap_size_bytes() without computing it first"
Expand Down Expand Up @@ -591,6 +593,7 @@ fn data_cell_sizes() {
use arrow2::array::UInt64Array;

// not computed
// NOTE: Unsized cells are illegal in debug mode and will flat out crash.
if !cfg!(debug_assertions) {
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
let cell = DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed());
assert_eq!(0, cell.heap_size_bytes());
Expand Down
106 changes: 106 additions & 0 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,24 @@ impl DataRow {
Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap()
}

/// A helper that combines [`Self::from_cells`] followed by [`Self::compute_all_size_bytes`].
///
/// See respective documentations for more information.
///
/// Beware: this is costly!
pub fn from_cells_sized(
row_id: RowId,
timepoint: impl Into<TimePoint>,
entity_path: impl Into<EntityPath>,
num_instances: u32,
cells: impl IntoIterator<Item = DataCell>,
) -> Self {
let mut this =
Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap();
this.compute_all_size_bytes();
this
}

/// Turns the `DataRow` into a single-row [`DataTable`].
#[inline]
pub fn into_table(self) -> DataTable {
Expand Down Expand Up @@ -428,6 +446,32 @@ impl DataRow {
)
}

/// A helper that combines [`Self::from_cells1`] followed by [`Self::compute_all_size_bytes`].
///
/// See respective documentations for more information.
///
/// Beware: this is costly!
pub fn from_cells1_sized<C0>(
row_id: RowId,
entity_path: impl Into<EntityPath>,
timepoint: impl Into<TimePoint>,
num_instances: u32,
into_cells: C0,
) -> DataRow
where
C0: Into<DataCell>,
{
let mut this = Self::from_cells(
row_id,
timepoint.into(),
entity_path.into(),
num_instances,
[into_cells.into()],
);
this.compute_all_size_bytes();
this
}

pub fn try_from_cells1<C0>(
row_id: RowId,
entity_path: impl Into<EntityPath>,
Expand Down Expand Up @@ -471,6 +515,36 @@ impl DataRow {
)
}

/// A helper that combines [`Self::from_cells2`] followed by [`Self::compute_all_size_bytes`].
///
/// See respective documentations for more information.
///
/// Beware: this is costly!
pub fn from_cells2_sized<C0, C1>(
row_id: RowId,
entity_path: impl Into<EntityPath>,
timepoint: impl Into<TimePoint>,
num_instances: u32,
into_cells: (C0, C1),
) -> DataRow
where
C0: Into<DataCell>,
C1: Into<DataCell>,
{
let mut this = Self::from_cells(
row_id,
timepoint.into(),
entity_path.into(),
num_instances,
[
into_cells.0.into(), //
into_cells.1.into(), //
],
);
this.compute_all_size_bytes();
this
}

pub fn try_from_cells2<C0, C1>(
row_id: RowId,
entity_path: impl Into<EntityPath>,
Expand Down Expand Up @@ -521,6 +595,38 @@ impl DataRow {
)
}

/// A helper that combines [`Self::from_cells3`] followed by [`Self::compute_all_size_bytes`].
///
/// See respective documentations for more information.
///
/// Beware: this is costly!
pub fn from_cells3_sized<C0, C1, C2>(
row_id: RowId,
entity_path: impl Into<EntityPath>,
timepoint: impl Into<TimePoint>,
num_instances: u32,
into_cells: (C0, C1, C2),
) -> DataRow
where
C0: Into<DataCell>,
C1: Into<DataCell>,
C2: Into<DataCell>,
{
let mut this = Self::from_cells(
row_id,
timepoint.into(),
entity_path.into(),
num_instances,
[
into_cells.0.into(), //
into_cells.1.into(), //
into_cells.2.into(), //
],
);
this.compute_all_size_bytes();
this
}

pub fn try_from_cells3<C0, C1, C2>(
row_id: RowId,
entity_path: impl Into<EntityPath>,
Expand Down
4 changes: 2 additions & 2 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ fn data_table_sizes_unions() {
);
assert_eq!(num_bytes, table.heap_size_bytes());

let err_margin = (num_bytes as f64 * 0.05) as u64;
let err_margin = (num_bytes as f64 * 0.01) as u64;
let num_bytes_min = num_bytes;
let num_bytes_max = num_bytes + err_margin;

Expand Down Expand Up @@ -1359,7 +1359,7 @@ fn data_table_sizes_unions() {
.as_slice(),
),
10_000, // num_rows
27_250_064, // expected_num_bytes
27_230_064, // expected_num_bytes
);

// dense union (varying)
Expand Down
18 changes: 7 additions & 11 deletions crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,7 @@ mod tests {
let batcher = DataTableBatcher::new(DataTableBatcherConfig::NEVER).unwrap();
let tables = batcher.tables();

let mut table = create_table();
table.compute_all_size_bytes();
let table = create_table();
let rows = table.to_rows().collect_vec();

for _ in 0..3 {
Expand Down Expand Up @@ -551,9 +550,7 @@ mod tests {

#[test]
fn num_bytes_trigger() {
let mut table = create_table();
table.compute_all_size_bytes();

let table = create_table();
let rows = table.to_rows().collect_vec();
let flush_duration = std::time::Duration::from_millis(50);
let flush_num_bytes = rows
Expand Down Expand Up @@ -615,9 +612,7 @@ mod tests {

#[test]
fn num_rows_trigger() {
let mut table = create_table();
table.compute_all_size_bytes();

let table = create_table();
let rows = table.to_rows().collect_vec();
let flush_duration = std::time::Duration::from_millis(50);
let flush_num_rows = rows.len() as u64 - 1;
Expand Down Expand Up @@ -675,8 +670,7 @@ mod tests {

#[test]
fn duration_trigger() {
let mut table = create_table();
table.compute_all_size_bytes();
let table = create_table();
let rows = table.to_rows().collect_vec();

let flush_duration = Duration::from_millis(50);
Expand Down Expand Up @@ -791,6 +785,8 @@ mod tests {
)
};

DataTable::from_rows(TableId::ZERO, [row0, row1, row2])
let mut table = DataTable::from_rows(TableId::ZERO, [row0, row1, row2]);
table.compute_all_size_bytes();
table
}
}
30 changes: 22 additions & 8 deletions crates/re_log_types/src/size_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::{BTreeMap, HashMap};

use arrow2::datatypes::{DataType, Field};
use nohash_hasher::IntSet;
use smallvec::SmallVec;

// ---
Expand Down Expand Up @@ -291,6 +292,7 @@ fn estimated_bytes_size(array: &dyn Array) -> usize {
LargeBinary => dyn_binary!(array, BinaryArray<i64>, i64),
Utf8 => dyn_binary!(array, Utf8Array<i32>, i32),
LargeUtf8 => dyn_binary!(array, Utf8Array<i64>, i64),
// NOTE: Diverges from upstream.
List | LargeList => {
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
let array = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();

Expand Down Expand Up @@ -320,6 +322,7 @@ fn estimated_bytes_size(array: &dyn Array) -> usize {
.sum::<usize>()
+ validity_size(array.validity())
}
// NOTE: Diverges from upstream.
Union => {
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
let array = array.as_any().downcast_ref::<UnionArray>().unwrap();

Expand Down Expand Up @@ -349,14 +352,25 @@ fn estimated_bytes_size(array: &dyn Array) -> usize {
}

let fields = array.fields();
let values_start = offsets[0] as usize;
let values_end = offsets[offsets.len() - 1] as usize;
array
.types()
.iter()
.copied()
.map(|ty| {
fields.get(ty as usize).map_or(0, |x| {
let types: IntSet<_> = array.types().iter().copied().collect();
types
.into_iter()
.map(|cur_ty| {
let mut indices = array
.types()
.iter()
.enumerate()
.filter_map(|(idx, &ty)| (ty == cur_ty).then_some(idx));

let idx_start = indices.next().unwrap_or_default();
let mut idx_end = idx_start;
for idx in indices {
idx_end = idx;
}
Comment on lines +366 to +369
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut idx_end = idx_start;
for idx in indices {
idx_end = idx;
}
let idx_end = indices.last().unwrap_or_default();

Copy link
Member Author

@teh-cmc teh-cmc May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I didn't go there because it very much looks like this is some magic 0(1) function, which feels like I'm betraying the reader...


let values_start = offsets[idx_start] as usize;
let values_end = offsets[idx_end] as usize;
fields.get(cur_ty as usize).map_or(0, |x| {
estimated_bytes_size(
x.slice(values_start, values_end - values_start).as_ref(),
)
Expand Down
2 changes: 1 addition & 1 deletion crates/re_query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ re_format.workspace = true
re_log_types = { workspace = true, features = ["arrow_datagen"] }

# External dependencies:
arrow2 = { workspace = true, features = ["compute_concatenate"] }
arrow2.workspace = true
document-features = "0.2"
itertools = { workspace = true }
thiserror.workspace = true
Expand Down
Loading