Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore revamp 2: serialization & formatting #1735

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ re_log.workspace = true

# External dependencies:
ahash.workspace = true
arrow2 = { workspace = true, features = [
"compute_concatenate",
"compute_aggregate",
] }
arrow2 = { workspace = true, features = ["compute_concatenate"] }
arrow2_convert.workspace = true
document-features = "0.2"
indent = "0.1"
Expand Down
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

mod arrow_util;
mod store;
mod store_arrow;
mod store_format;
mod store_gc;
mod store_read;
Expand Down
37 changes: 16 additions & 21 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;

use ahash::HashMap;
use arrow2::array::Int64Array;
use arrow2::datatypes::{DataType, TimeUnit};
use arrow2::datatypes::DataType;
use smallvec::SmallVec;

use nohash_hasher::{IntMap, IntSet};
Expand Down Expand Up @@ -272,8 +271,7 @@ fn datastore_internal_repr() {
store.insert_table(&temporal).unwrap();

store.sanity_check().unwrap();
// TODO(#1619): bring back formatting
// eprintln!("{store}");
eprintln!("{store}");
}

// --- Temporal ---
Expand All @@ -283,7 +281,13 @@ fn datastore_internal_repr() {
///
/// See also [`IndexedBucket`].
///
// TODO(#1619): show internal structure once formatting is back
/// Run the following command to display a visualization of the store's internal datastructures and
/// better understand how everything fits together:
/// ```text
/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr
/// ```
//
// TODO(#1524): inline visualization once it's back to a manageable state
#[derive(Debug)]
pub struct IndexedTable {
/// The timeline this table operates in, for debugging purposes.
Expand Down Expand Up @@ -432,26 +436,17 @@ impl Default for IndexedBucketInner {
}
}

impl IndexedBucket {
/// Returns a (name, [`Int64Array`]) with a logical type matching the timeline.
// TODO(cmc): should be defined in `DataTable` serialization stuff
pub fn times(&self) -> (String, Int64Array) {
crate::profile_function!();

let times = Int64Array::from_slice(self.inner.read().col_time.as_slice());
let logical_type = match self.timeline.typ() {
re_log_types::TimeType::Time => DataType::Timestamp(TimeUnit::Nanosecond, None),
re_log_types::TimeType::Sequence => DataType::Int64,
};
(self.timeline.name().to_string(), times.to(logical_type))
}
}

// --- Timeless ---

/// The timeless specialization of an [`IndexedTable`].
///
// TODO(#1619): show internal structure once formatting is back
/// Run the following command to display a visualization of the store's internal datastructures and
/// better understand how everything fits together:
/// ```text
/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr
/// ```
//
// TODO(#1524): inline visualization once it's back to a manageable state
#[derive(Debug)]
pub struct PersistentIndexedTable {
/// The entity this table is related to, for debugging purposes.
Expand Down
197 changes: 197 additions & 0 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use std::collections::BTreeMap;

use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
use nohash_hasher::IntMap;
use re_log_types::{
ComponentName, DataCellColumn, DataTable, DataTableResult, RowId, Timeline, COLUMN_INSERT_ID,
COLUMN_NUM_INSTANCES, COLUMN_ROW_ID,
};

use crate::store::{IndexedBucket, IndexedBucketInner, PersistentIndexedTable};

// ---

impl IndexedBucket {
/// Serializes the entire bucket into an arrow payload and schema.
///
/// Column order:
/// - `insert_id`
/// - `row_id`
/// - `time`
/// - `num_instances`
/// - `$cluster_key`
/// - rest of component columns in ascending lexical order
pub fn serialize(&self) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
crate::profile_function!();

let Self {
timeline,
cluster_key,
inner,
} = self;

let IndexedBucketInner {
is_sorted: _,
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
time_range: _,
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = &*inner.read();

serialize(
cluster_key,
Some((*timeline, col_time)),
col_insert_id,
col_row_id,
col_num_instances,
columns,
)
}
}

impl PersistentIndexedTable {
/// Serializes the entire table into an arrow payload and schema.
///
/// Column order:
/// - `insert_id`
/// - `row_id`
/// - `time`
/// - `num_instances`
/// - `$cluster_key`
/// - rest of component columns in ascending lexical order
pub fn serialize(&self) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
crate::profile_function!();

let Self {
ent_path: _,
cluster_key,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

serialize(
cluster_key,
None,
col_insert_id,
col_row_id,
col_num_instances,
columns,
)
}
}

// ---

fn serialize(
cluster_key: &ComponentName,
col_time: Option<(Timeline, &[i64])>,
col_insert_id: &[u64],
col_row_id: &[RowId],
col_num_instances: &[u32],
table: &IntMap<ComponentName, DataCellColumn>,
) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
crate::profile_function!();

let mut schema = Schema::default();
let mut columns = Vec::new();

{
let (control_schema, control_columns) =
serialize_control_columns(col_time, col_insert_id, col_row_id, col_num_instances)?;
schema.fields.extend(control_schema.fields);
schema.metadata.extend(control_schema.metadata);
columns.extend(control_columns.into_iter());
}

{
let (data_schema, data_columns) = serialize_data_columns(cluster_key, table)?;
schema.fields.extend(data_schema.fields);
schema.metadata.extend(data_schema.metadata);
columns.extend(data_columns.into_iter());
}

Ok((schema, Chunk::new(columns)))
}

fn serialize_control_columns(
col_time: Option<(Timeline, &[i64])>,
col_insert_id: &[u64],
col_row_id: &[RowId],
col_num_instances: &[u32],
) -> DataTableResult<(Schema, Vec<Box<dyn Array>>)> {
crate::profile_function!();

let mut schema = Schema::default();
let mut columns = Vec::new();

// NOTE: ordering is taken into account!
// - insert_id
// - row_id
// - time
// - num_instances

let (insert_id_field, insert_id_column) =
DataTable::serialize_primitive_column(COLUMN_INSERT_ID, col_insert_id, None)?;
schema.fields.push(insert_id_field);
columns.push(insert_id_column);

let (row_id_field, row_id_column) =
DataTable::serialize_control_column(COLUMN_ROW_ID, col_row_id)?;
schema.fields.push(row_id_field);
columns.push(row_id_column);

if let Some((timeline, col_time)) = col_time {
let (time_field, time_column) = DataTable::serialize_primitive_column(
timeline.name(),
col_time,
timeline.datatype().into(),
)?;
schema.fields.push(time_field);
columns.push(time_column);
}

let (num_instances_field, num_instances_column) =
DataTable::serialize_primitive_column(COLUMN_NUM_INSTANCES, col_num_instances, None)?;
schema.fields.push(num_instances_field);
columns.push(num_instances_column);

Ok((schema, columns))
}

fn serialize_data_columns(
cluster_key: &ComponentName,
table: &IntMap<ComponentName, DataCellColumn>,
) -> DataTableResult<(Schema, Vec<Box<dyn Array>>)> {
crate::profile_function!();

let mut schema = Schema::default();
let mut columns = Vec::new();

// NOTE: ordering is taken into account!
let mut table: BTreeMap<_, _> = table.iter().collect();

// Cluster column first and foremost!
//
// NOTE: cannot fail, the cluster key _has_ to be there by definition
let cluster_column = table.remove(&cluster_key).unwrap();
{
let (field, column) =
DataTable::serialize_data_column(cluster_key.as_str(), cluster_column)?;
schema.fields.push(field);
columns.push(column);
}

for (component, column) in table {
let (field, column) = DataTable::serialize_data_column(component.as_str(), column)?;
schema.fields.push(field);
columns.push(column);
}

Ok((schema, columns))
}
38 changes: 18 additions & 20 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,15 @@ impl std::fmt::Display for IndexedBucket {
};
f.write_fmt(format_args!("{time_range}\n"))?;

// TODO(#1619): bring back formatting
// let (schema, columns) = self.serialize().map_err(|err| {
// re_log::error_once!("couldn't display indexed bucket: {err}");
// std::fmt::Error
// })?;
// re_format::arrow::format_table(
// columns.columns(),
// schema.fields.iter().map(|field| field.name.as_str()),
// )
// .fmt(f)?;
let (schema, columns) = self.serialize().map_err(|err| {
re_log::error_once!("couldn't display indexed bucket: {err}");
std::fmt::Error
})?;
re_format::arrow::format_table(
columns.columns(),
schema.fields.iter().map(|field| field.name.as_str()),
)
.fmt(f)?;

writeln!(f)
}
Expand Down Expand Up @@ -168,16 +167,15 @@ impl std::fmt::Display for PersistentIndexedTable {
format_number(self.total_rows() as _),
))?;

// TODO(#1619): bring back formatting
// let (schema, columns) = self.serialize().map_err(|err| {
// re_log::error_once!("couldn't display timeless indexed table: {err}");
// std::fmt::Error
// })?;
// re_format::arrow::format_table(
// columns.columns(),
// schema.fields.iter().map(|field| field.name.as_str()),
// )
// .fmt(f)?;
let (schema, columns) = self.serialize().map_err(|err| {
re_log::error_once!("couldn't display timeless indexed table: {err}");
std::fmt::Error
})?;
re_format::arrow::format_table(
columns.columns(),
schema.fields.iter().map(|field| field.name.as_str()),
)
.fmt(f)?;

writeln!(f)
}
Expand Down
17 changes: 11 additions & 6 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow2::{
offset::Offsets,
};
use polars_core::{functions::diag_concat_df, prelude::*};
use re_log_types::{ComponentName, DataCell};
use re_log_types::{ComponentName, DataCell, DataTable};

use crate::{
store::InsertIdVec, ArrayExt, DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner,
Expand Down Expand Up @@ -209,20 +209,25 @@ impl IndexedBucket {
pub fn to_dataframe(&self, store: &DataStore, config: &DataStoreConfig) -> DataFrame {
crate::profile_function!();

let (_, times) = self.times();
let num_rows = times.len();

let IndexedBucketInner {
is_sorted: _,
time_range: _,
col_time: _,
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = &*self.inner.read();

let (_, times) = DataTable::serialize_primitive_column(
self.timeline.name(),
col_time,
self.timeline.datatype().into(),
)
.unwrap();
let num_rows = times.len();

let insert_ids = config
.store_insert_ids
.then(|| insert_ids_as_series(&col_insert_id));
Expand All @@ -234,7 +239,7 @@ impl IndexedBucket {
// One column for the time index.
Some(new_infallible_series(
self.timeline.name().as_str(),
&times,
&*times,
num_rows,
)),
]
Expand Down
Loading