Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in size estimation of array buffers #2991

Merged
merged 15 commits into from
Aug 16, 2023
Merged
10 changes: 8 additions & 2 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1231,10 +1231,16 @@ impl DataTable {
let c1_bytes = cell_to_bytes(c1.clone());
let c2_bytes = cell_to_bytes(c1.clone());
emilk marked this conversation as resolved.
Show resolved Hide resolved

size_mismatches.push(format!(
"Cell size is {} vs {} bytes",
c1_bytes.len(),
c2_bytes.len()
));

size_mismatches.push(
similar_asserts::SimpleDiff::from_str(
&format!("{c1_bytes:?}"),
&format!("{c2_bytes:?}"),
&format!("{c1_bytes:#?}"),
&format!("{c2_bytes:#?}"),
"cell1_ipc",
"cell2_ipc",
)
Expand Down
117 changes: 75 additions & 42 deletions crates/re_types/src/size_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::{BTreeMap, HashMap};

use arrow2::datatypes::{DataType, Field};
use nohash_hasher::IntSet;
use smallvec::SmallVec;

// ---
Expand Down Expand Up @@ -255,6 +254,7 @@ fn validity_size(validity: Option<&Bitmap>) -> usize {
}

/// Returns the total (heap) allocated size of the array in bytes.
///
/// # Implementation
/// This estimation is the sum of the size of its buffers, validity, including nested arrays.
/// Multiple arrays may share buffers and bitmaps. Therefore, the size of 2 arrays is not the
Expand All @@ -266,8 +266,13 @@ fn validity_size(validity: Option<&Bitmap>) -> usize {
///
/// FFI buffers are included in this estimation.
fn estimated_bytes_size(array: &dyn Array) -> usize {
// NOTE: `.len()` is the number of elements in an arrow2 buffer
// no matter WHAT the documentation says.
// See https://github.com/jorgecarleitao/arrow2/issues/1430

#[allow(clippy::enum_glob_use)]
use PhysicalType::*;

match array.data_type().to_physical_type() {
Null => 0,
Boolean => {
Expand Down Expand Up @@ -326,14 +331,9 @@ fn estimated_bytes_size(array: &dyn Array) -> usize {
Union => {
let array = array.as_any().downcast_ref::<UnionArray>().unwrap();

let types = array.types().len() * std::mem::size_of::<i8>();
let offsets = array
.offsets()
.as_ref()
.map(|x| x.len() * std::mem::size_of::<i32>())
.unwrap_or_default();
let types_size = array.types().len() * std::mem::size_of::<i8>();

let fields = if let Some(offsets) = array.offsets() {
if let Some(offsets) = array.offsets() {
// https://arrow.apache.org/docs/format/Columnar.html#dense-union:
//
// Dense union represents a mixed-type array with 5 bytes of overhead for each
Expand All @@ -347,36 +347,46 @@ fn estimated_bytes_size(array: &dyn Array) -> usize {
// The respective offsets for each child value array must be in
// order / increasing.

if offsets.is_empty() {
return 0;
/// The range of offsets for a given type id.
#[derive(Debug)]
struct Range {
/// Inclusive
min: i32,

/// Inclusive
max: i32,
}

let fields = array.fields();
let types: IntSet<_> = array.types().iter().copied().collect();
types
.into_iter()
.map(|cur_ty| {
let mut indices = array
.types()
.iter()
.enumerate()
.filter_map(|(idx, &ty)| (ty == cur_ty).then_some(idx));

let idx_start = indices.next().unwrap_or_default();
let mut idx_end = idx_start;
for idx in indices {
idx_end = idx;
}

let values_start = offsets[idx_start] as usize;
let values_end = offsets[idx_end] as usize;
fields.get(cur_ty as usize).map_or(0, |x| {
estimated_bytes_size(
x.sliced(values_start, values_end - values_start).as_ref(),
)
// The range of offsets for a given type id.
let mut type_ranges: BTreeMap<i8, Range> = Default::default();

debug_assert_eq!(array.types().len(), offsets.len());
for (&type_id, &offset) in array.types().iter().zip(offsets.iter()) {
// Offsets are monotonically increasing
type_ranges
.entry(type_id)
.and_modify(|range| {
range.max = offset;
})
})
.sum::<usize>()
.or_insert(Range {
min: offset,
max: offset,
});
}

let mut fields_size = 0;
for (type_id, range) in type_ranges {
if let Some(field) = array.fields().get(type_id as usize) {
let len = range.max - range.min + 1; // range is inclusive
fields_size += estimated_bytes_size(
field.sliced(range.min as usize, len as usize).as_ref(),
);
}
}

let offsets_size = offsets.len() * std::mem::size_of::<i32>();

types_size + offsets_size + fields_size
} else {
// https://arrow.apache.org/docs/format/Columnar.html#sparse-union:
//
Expand All @@ -391,14 +401,14 @@ fn estimated_bytes_size(array: &dyn Array) -> usize {
// - Equal-length arrays can be interpreted as a union by only defining the types
// array.

array
let num_elems = array.types().len();
let fields_size = array
.fields()
.iter()
.map(|x| estimated_bytes_size(x.sliced(0, array.len()).as_ref()))
.sum::<usize>()
};

types + offsets + fields
.map(|x| estimated_bytes_size(x.sliced(0, num_elems).as_ref()))
.sum::<usize>();
types_size + fields_size
}
}
Dictionary(key_type) => match_integer_type!(key_type, |$T| {
let array = array
Expand Down Expand Up @@ -429,9 +439,11 @@ fn estimated_bytes_size(array: &dyn Array) -> usize {
fn test_arrow_estimated_size_bytes() {
use arrow2::{
array::{Array, Float64Array, ListArray, StructArray, UInt64Array, Utf8Array},
datatypes::{DataType, Field},
buffer::Buffer,
datatypes::{DataType, Field, UnionMode},
offset::Offsets,
};
use std::mem::size_of;

// empty primitive array
{
Expand Down Expand Up @@ -582,4 +594,25 @@ fn test_arrow_estimated_size_bytes() {
assert_eq!(81200, raw_size_bytes);
assert_eq!(80204, arrow_size_bytes); // smaller because smaller inner headers
}

// Dense union, `enum { i(i32), f(f32) }`
{
let fields = vec![
Field::new("i", DataType::Int32, false),
Field::new("f", DataType::Float64, false),
];
let data_type = DataType::Union(fields, Some(vec![0i32, 1i32]), UnionMode::Dense);
let types = Buffer::<i8>::from(vec![0i8, 0i8, 1i8, 0i8, 1i8]);
let fields = vec![
PrimitiveArray::<i32>::from_vec(vec![0, 1, 2]).boxed(),
PrimitiveArray::<f64>::from_vec(vec![0.0, 1.0]).boxed(),
];
let offsets = vec![0, 1, 0, 2, 1];
let array = UnionArray::new(data_type, types, fields, Some(offsets.into())).boxed();

let raw_size_bytes = 5 + 3 * size_of::<i32>() + 2 * size_of::<f64>() + 5 * size_of::<i32>();
let arrow_size_bytes = estimated_bytes_size(&*array);

assert_eq!(raw_size_bytes, arrow_size_bytes);
}
}
16 changes: 8 additions & 8 deletions scripts/ci/run_e2e_roundtrip_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main() -> None:
print("----------------------------------------------------------")
print("Building rerun-sdk for Python…")
start_time = time.time()
returncode = subprocess.Popen(["just", "py-build"], env=build_env).wait()
returncode = subprocess.Popen(["just", "py-build", "--quiet"], env=build_env).wait()
assert returncode == 0, f"Python rerun-sdk build failed with exit code {returncode}"
elapsed = time.time() - start_time
print(f"rerun-sdk for Python built in {elapsed:.1f} seconds")
Expand All @@ -73,7 +73,7 @@ def main() -> None:
if args.release:
build_type = "Release"
configure_args = ["cmake", f"-DCMAKE_BUILD_TYPE={build_type}", "-DCMAKE_COMPILE_WARNING_AS_ERROR=ON", ".."]
print(subprocess.list2cmdline(configure_args))
print("> ${subprocess.list2cmdline(configure_args)}")
returncode = subprocess.Popen(
configure_args,
env=build_env,
Expand Down Expand Up @@ -117,7 +117,7 @@ def run_roundtrip_python(arch: str) -> str:

cmd = [python_executable, main_path, "--save", output_path]

print(cmd)
print(f"\n> {subprocess.list2cmdline(cmd)}")
roundtrip_process = subprocess.Popen(cmd, env=roundtrip_env())
returncode = roundtrip_process.wait(timeout=30)
assert returncode == 0, f"python roundtrip process exited with error code {returncode}"
Expand All @@ -129,7 +129,7 @@ def run_roundtrip_rust(arch: str, release: bool, target: str | None, target_dir:
project_name = f"roundtrip_{arch}"
output_path = f"tests/rust/roundtrips/{arch}/out.rrd"

cmd = ["cargo", "r", "-p", project_name]
cmd = ["cargo", "run", "--quiet", "-p", project_name]

if target is not None:
cmd += ["--target", target]
Expand All @@ -142,7 +142,7 @@ def run_roundtrip_rust(arch: str, release: bool, target: str | None, target_dir:

cmd += ["--", "--save", output_path]

print(cmd)
print(f"\n> {subprocess.list2cmdline(cmd)}")
roundtrip_process = subprocess.Popen(cmd, env=roundtrip_env())
returncode = roundtrip_process.wait(timeout=12000)
assert returncode == 0, f"rust roundtrip process exited with error code {returncode}"
Expand All @@ -157,7 +157,7 @@ def run_roundtrip_cpp(arch: str, release: bool) -> str:
cmake_build(target_name, release)

cmd = [f"./build/tests/cpp/roundtrips/{target_name}", output_path]
print(cmd)
print(f"\n> {subprocess.list2cmdline(cmd)}")
roundtrip_process = subprocess.Popen(cmd, env=roundtrip_env())
returncode = roundtrip_process.wait(timeout=12000)
assert returncode == 0, f"cpp roundtrip process exited with error code {returncode}"
Expand All @@ -181,7 +181,7 @@ def cmake_build(target: str, release: bool) -> None:
"--parallel",
str(multiprocessing.cpu_count()),
]
print(subprocess.list2cmdline(build_process_args))
print(f"\n> {subprocess.list2cmdline(build_process_args)}")
result = subprocess.run(build_process_args, cwd="build")

assert result.returncode == 0, f"cmake build of {target} exited with error code {result.returncode}"
Expand All @@ -193,7 +193,7 @@ def run_comparison(rrd0_path: str, rrd1_path: str, full_dump: bool) -> None:
cmd += ["--full-dump"]
cmd += [rrd0_path, rrd1_path]

print(cmd)
print(f"\n> {subprocess.list2cmdline(cmd)}")
comparison_process = subprocess.Popen(cmd, env=roundtrip_env())
returncode = comparison_process.wait(timeout=30)
assert returncode == 0, f"comparison process exited with error code {returncode}"
Expand Down