Skip to content

Commit

Permalink
Datastore revamp 2: serialization & formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 12, 2023
1 parent 8b0aa2e commit 9b54077
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 63 deletions.
5 changes: 1 addition & 4 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ re_log.workspace = true

# External dependencies:
ahash.workspace = true
arrow2 = { workspace = true, features = [
"compute_concatenate",
"compute_aggregate",
] }
arrow2 = { workspace = true, features = ["compute_concatenate"] }
arrow2_convert.workspace = true
document-features = "0.2"
indent = "0.1"
Expand Down
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
mod arrow_util;
mod store;
mod store_arrow;
mod store_format;
mod store_gc;
mod store_read;
Expand Down
37 changes: 16 additions & 21 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;

use ahash::HashMap;
use arrow2::array::Int64Array;
use arrow2::datatypes::{DataType, TimeUnit};
use arrow2::datatypes::DataType;
use smallvec::SmallVec;

use nohash_hasher::{IntMap, IntSet};
Expand Down Expand Up @@ -272,8 +271,7 @@ fn datastore_internal_repr() {
store.insert_table(&temporal).unwrap();

store.sanity_check().unwrap();
// TODO(#1619): bring back formatting
// eprintln!("{store}");
eprintln!("{store}");
}

// --- Temporal ---
Expand All @@ -283,7 +281,13 @@ fn datastore_internal_repr() {
///
/// See also [`IndexedBucket`].
///
// TODO(#1619): show internal structure once formatting is back
/// Run the following command to display a visualization of the store's internal datastructures and
/// better understand how everything fits together:
/// ```text
/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr
/// ```
//
// TODO(#1524): inline visualization once it's back to a manageable state
#[derive(Debug)]
pub struct IndexedTable {
/// The timeline this table operates in, for debugging purposes.
Expand Down Expand Up @@ -432,26 +436,17 @@ impl Default for IndexedBucketInner {
}
}

impl IndexedBucket {
/// Returns a (name, [`Int64Array`]) with a logical type matching the timeline.
// TODO(cmc): should be defined in `DataTable` serialization stuff
pub fn times(&self) -> (String, Int64Array) {
crate::profile_function!();

let times = Int64Array::from_slice(self.inner.read().col_time.as_slice());
let logical_type = match self.timeline.typ() {
re_log_types::TimeType::Time => DataType::Timestamp(TimeUnit::Nanosecond, None),
re_log_types::TimeType::Sequence => DataType::Int64,
};
(self.timeline.name().to_string(), times.to(logical_type))
}
}

// --- Timeless ---

/// The timeless specialization of an [`IndexedTable`].
///
// TODO(#1619): show internal structure once formatting is back
/// Run the following command to display a visualization of the store's internal datastructures and
/// better understand how everything fits together:
/// ```text
/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr
/// ```
//
// TODO(#1524): inline visualization once it's back to a manageable state
#[derive(Debug)]
pub struct PersistentIndexedTable {
/// The entity this table is related to, for debugging purposes.
Expand Down
197 changes: 197 additions & 0 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use std::collections::BTreeMap;

use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
use nohash_hasher::IntMap;
use re_log_types::{
ComponentName, DataCellColumn, DataTable, DataTableResult, RowId, Timeline, COLUMN_INSERT_ID,
COLUMN_NUM_INSTANCES, COLUMN_ROW_ID,
};

use crate::store::{IndexedBucket, IndexedBucketInner, PersistentIndexedTable};

// ---

impl IndexedBucket {
/// Serializes the entire bucket into an arrow payload and schema.
///
/// Column order:
/// - `insert_id`
/// - `row_id`
/// - `time`
/// - `num_instances`
/// - `$cluster_key`
/// - rest of component columns in ascending lexical order
pub fn serialize(&self) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
crate::profile_function!();

let Self {
timeline,
cluster_key,
inner,
} = self;

let IndexedBucketInner {
is_sorted: _,
time_range: _,
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = &*inner.read();

serialize(
cluster_key,
Some((*timeline, col_time)),
col_insert_id,
col_row_id,
col_num_instances,
columns,
)
}
}

impl PersistentIndexedTable {
/// Serializes the entire table into an arrow payload and schema.
///
/// Column order:
/// - `insert_id`
/// - `row_id`
/// - `time`
/// - `num_instances`
/// - `$cluster_key`
/// - rest of component columns in ascending lexical order
pub fn serialize(&self) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
crate::profile_function!();

let Self {
ent_path: _,
cluster_key,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

serialize(
cluster_key,
None,
col_insert_id,
col_row_id,
col_num_instances,
columns,
)
}
}

// ---

fn serialize(
cluster_key: &ComponentName,
col_time: Option<(Timeline, &[i64])>,
col_insert_id: &[u64],
col_row_id: &[RowId],
col_num_instances: &[u32],
table: &IntMap<ComponentName, DataCellColumn>,
) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
crate::profile_function!();

let mut schema = Schema::default();
let mut columns = Vec::new();

{
let (control_schema, control_columns) =
serialize_control_columns(col_time, col_insert_id, col_row_id, col_num_instances)?;
schema.fields.extend(control_schema.fields);
schema.metadata.extend(control_schema.metadata);
columns.extend(control_columns.into_iter());
}

{
let (data_schema, data_columns) = serialize_data_columns(cluster_key, table)?;
schema.fields.extend(data_schema.fields);
schema.metadata.extend(data_schema.metadata);
columns.extend(data_columns.into_iter());
}

Ok((schema, Chunk::new(columns)))
}

fn serialize_control_columns(
col_time: Option<(Timeline, &[i64])>,
col_insert_id: &[u64],
col_row_id: &[RowId],
col_num_instances: &[u32],
) -> DataTableResult<(Schema, Vec<Box<dyn Array>>)> {
crate::profile_function!();

let mut schema = Schema::default();
let mut columns = Vec::new();

// NOTE: ordering is taken into account!
// - insert_id
// - row_id
// - time
// - num_instances

let (insert_id_field, insert_id_column) =
DataTable::serialize_primitive_column(COLUMN_INSERT_ID, col_insert_id, None)?;
schema.fields.push(insert_id_field);
columns.push(insert_id_column);

let (row_id_field, row_id_column) =
DataTable::serialize_control_column(COLUMN_ROW_ID, col_row_id)?;
schema.fields.push(row_id_field);
columns.push(row_id_column);

if let Some((timeline, col_time)) = col_time {
let (time_field, time_column) = DataTable::serialize_primitive_column(
timeline.name(),
col_time,
timeline.datatype().into(),
)?;
schema.fields.push(time_field);
columns.push(time_column);
}

let (num_instances_field, num_instances_column) =
DataTable::serialize_primitive_column(COLUMN_NUM_INSTANCES, col_num_instances, None)?;
schema.fields.push(num_instances_field);
columns.push(num_instances_column);

Ok((schema, columns))
}

fn serialize_data_columns(
cluster_key: &ComponentName,
table: &IntMap<ComponentName, DataCellColumn>,
) -> DataTableResult<(Schema, Vec<Box<dyn Array>>)> {
crate::profile_function!();

let mut schema = Schema::default();
let mut columns = Vec::new();

// NOTE: ordering is taken into account!
let mut table: BTreeMap<_, _> = table.iter().collect();

// Cluster column first and foremost!
//
// NOTE: cannot fail, the cluster key _has_ to be there by definition
let cluster_column = table.remove(&cluster_key).unwrap();
{
let (field, column) =
DataTable::serialize_data_column(cluster_key.as_str(), cluster_column)?;
schema.fields.push(field);
columns.push(column);
}

for (component, column) in table {
let (field, column) = DataTable::serialize_data_column(component.as_str(), column)?;
schema.fields.push(field);
columns.push(column);
}

Ok((schema, columns))
}
38 changes: 18 additions & 20 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,15 @@ impl std::fmt::Display for IndexedBucket {
};
f.write_fmt(format_args!("{time_range}\n"))?;

// TODO(#1619): bring back formatting
// let (schema, columns) = self.serialize().map_err(|err| {
// re_log::error_once!("couldn't display indexed bucket: {err}");
// std::fmt::Error
// })?;
// re_format::arrow::format_table(
// columns.columns(),
// schema.fields.iter().map(|field| field.name.as_str()),
// )
// .fmt(f)?;
let (schema, columns) = self.serialize().map_err(|err| {
re_log::error_once!("couldn't display indexed bucket: {err}");
std::fmt::Error
})?;
re_format::arrow::format_table(
columns.columns(),
schema.fields.iter().map(|field| field.name.as_str()),
)
.fmt(f)?;

writeln!(f)
}
Expand Down Expand Up @@ -168,16 +167,15 @@ impl std::fmt::Display for PersistentIndexedTable {
format_number(self.total_rows() as _),
))?;

// TODO(#1619): bring back formatting
// let (schema, columns) = self.serialize().map_err(|err| {
// re_log::error_once!("couldn't display timeless indexed table: {err}");
// std::fmt::Error
// })?;
// re_format::arrow::format_table(
// columns.columns(),
// schema.fields.iter().map(|field| field.name.as_str()),
// )
// .fmt(f)?;
let (schema, columns) = self.serialize().map_err(|err| {
re_log::error_once!("couldn't display timeless indexed table: {err}");
std::fmt::Error
})?;
re_format::arrow::format_table(
columns.columns(),
schema.fields.iter().map(|field| field.name.as_str()),
)
.fmt(f)?;

writeln!(f)
}
Expand Down
17 changes: 11 additions & 6 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow2::{
offset::Offsets,
};
use polars_core::{functions::diag_concat_df, prelude::*};
use re_log_types::{ComponentName, DataCell};
use re_log_types::{ComponentName, DataCell, DataTable};

use crate::{
store::InsertIdVec, ArrayExt, DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner,
Expand Down Expand Up @@ -209,20 +209,25 @@ impl IndexedBucket {
pub fn to_dataframe(&self, store: &DataStore, config: &DataStoreConfig) -> DataFrame {
crate::profile_function!();

let (_, times) = self.times();
let num_rows = times.len();

let IndexedBucketInner {
is_sorted: _,
time_range: _,
col_time: _,
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = &*self.inner.read();

let (_, times) = DataTable::serialize_primitive_column(
self.timeline.name(),
col_time,
self.timeline.datatype().into(),
)
.unwrap();
let num_rows = times.len();

let insert_ids = config
.store_insert_ids
.then(|| insert_ids_as_series(&col_insert_id));
Expand All @@ -234,7 +239,7 @@ impl IndexedBucket {
// One column for the time index.
Some(new_infallible_series(
self.timeline.name().as_str(),
&times,
&*times,
num_rows,
)),
]
Expand Down
Loading

0 comments on commit 9b54077

Please sign in to comment.