Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Columnar timepoints in data tables and during transport #1767

Merged
merged 2 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 154 additions & 23 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::BTreeMap;

use ahash::HashMap;
use itertools::Itertools as _;
use nohash_hasher::{IntMap, IntSet};
use smallvec::SmallVec;

use crate::{
ArrowMsg, ComponentName, DataCell, DataCellError, DataRow, DataRowError, EntityPath, MsgId,
TimePoint,
TimePoint, Timeline,
};

// ---
Expand All @@ -15,6 +17,11 @@ pub enum DataTableError {
#[error("Trying to deserialize data that is missing a column present in the schema: {0:?}")]
MissingColumn(String),

#[error(
"Trying to deserialize time column data with invalid datatype: {name:?} ({datatype:#?})"
)]
NotATimeColumn { name: String, datatype: DataType },

#[error("Trying to deserialize column data that doesn't contain any ListArrays: {0:?}")]
NotAColumn(String),

Expand All @@ -37,6 +44,7 @@ pub type DataTableResult<T> = ::std::result::Result<T, DataTableError>;
// ---

type RowIdVec = SmallVec<[MsgId; 4]>;
type TimeOptVec = SmallVec<[Option<i64>; 4]>;
type TimePointVec = SmallVec<[TimePoint; 4]>;
type EntityPathVec = SmallVec<[EntityPath; 4]>;
type NumInstancesVec = SmallVec<[u32; 4]>;
Expand Down Expand Up @@ -229,8 +237,11 @@ pub struct DataTable {
/// The entire column of `RowId`s.
pub row_id: RowIdVec,

/// The entire column of [`TimePoint`]s.
pub timepoint: TimePointVec,
/// All the rows for all the time columns.
///
/// The times are optional since not all rows are guaranteed to have a timestamp for every
/// single timeline (though it is highly likely to be the case in practice).
pub col_timelines: BTreeMap<Timeline, TimeOptVec>,

/// The entire column of [`EntityPath`]s.
pub entity_path: EntityPathVec,
Expand All @@ -251,7 +262,7 @@ impl DataTable {
Self {
table_id,
row_id: Default::default(),
timepoint: Default::default(),
col_timelines: Default::default(),
entity_path: Default::default(),
num_instances: Default::default(),
columns: Default::default(),
Expand Down Expand Up @@ -287,6 +298,24 @@ impl DataTable {
})
.multiunzip();

// All time columns.
let mut col_timelines: BTreeMap<Timeline, TimeOptVec> = BTreeMap::default();
for (i, timepoint) in timepoint.iter().enumerate() {
for (timeline, time) in timepoint.iter() {
match col_timelines.entry(*timeline) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry
.insert(smallvec::smallvec![None; i])
.push(Some(time.as_i64()));
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
entry.push(Some(time.as_i64()));
}
}
}
}

// Pre-allocate all columns (one per component).
let mut columns = IntMap::default();
for component in components {
Expand Down Expand Up @@ -314,7 +343,7 @@ impl DataTable {
Self {
table_id,
row_id,
timepoint,
col_timelines,
entity_path,
num_instances,
columns,
Expand All @@ -335,7 +364,7 @@ impl DataTable {
let Self {
table_id: _,
row_id,
timepoint,
col_timelines,
entity_path,
num_instances,
columns,
Expand All @@ -348,7 +377,14 @@ impl DataTable {

DataRow::from_cells(
row_id[i],
timepoint[i].clone(),
TimePoint::from(
col_timelines
.iter()
.filter_map(|(timeline, times)| {
times[i].map(|time| (*timeline, time.into()))
})
.collect::<BTreeMap<_, _>>(),
),
entity_path[i].clone(),
num_instances[i],
cells,
Expand All @@ -360,19 +396,23 @@ impl DataTable {
/// and returns the corresponding [`TimePoint`].
#[inline]
pub fn timepoint_max(&self) -> TimePoint {
self.timepoint
.iter()
.fold(TimePoint::timeless(), |acc, tp| acc.union_max(tp))
let mut timepoint = TimePoint::timeless();
for (timeline, col_time) in &self.col_timelines {
if let Some(time) = col_time.iter().flatten().max().copied() {
timepoint.insert(*timeline, time.into());
}
}
timepoint
}
}

// --- Serialization ---

use arrow2::{
array::{Array, ListArray},
array::{Array, ListArray, PrimitiveArray},
bitmap::Bitmap,
chunk::Chunk,
datatypes::{DataType, Field, Schema},
datatypes::{DataType, Field, Schema, TimeUnit},
offset::Offsets,
};
use arrow2_convert::{
Expand All @@ -383,13 +423,13 @@ use arrow2_convert::{
// TODO(#1696): Those names should come from the datatypes themselves.

pub const COLUMN_ROW_ID: &str = "rerun.row_id";
pub const COLUMN_TIMEPOINT: &str = "rerun.timepoint";
pub const COLUMN_ENTITY_PATH: &str = "rerun.entity_path";
pub const COLUMN_NUM_INSTANCES: &str = "rerun.num_instances";

pub const METADATA_KIND: &str = "rerun.kind";
pub const METADATA_KIND_DATA: &str = "data";
pub const METADATA_KIND_CONTROL: &str = "control";
pub const METADATA_KIND_TIME: &str = "time";
pub const METADATA_TABLE_ID: &str = "rerun.table_id";

impl DataTable {
Expand All @@ -400,6 +440,9 @@ impl DataTable {
/// * Control columns are those that drive the behavior of the storage systems.
/// They are always present, always dense, and always deserialized upon reception by the
/// server.
/// Internally, time columns are (de)serialized separately from the rest of the control
/// columns for efficiency/QOL concerns: that doesn't change the fact that they are control
/// columns all the same!
/// * Data columns are the one that hold component data.
/// They are optional, potentially sparse, and never deserialized on the server-side (not by
/// the storage systems, at least).
Expand All @@ -409,6 +452,13 @@ impl DataTable {
let mut schema = Schema::default();
let mut columns = Vec::new();

{
let (control_schema, control_columns) = self.serialize_time_columns();
schema.fields.extend(control_schema.fields);
schema.metadata.extend(control_schema.metadata);
columns.extend(control_columns.into_iter());
}

{
let (control_schema, control_columns) = self.serialize_control_columns()?;
schema.fields.extend(control_schema.fields);
Expand All @@ -426,6 +476,43 @@ impl DataTable {
Ok((schema, Chunk::new(columns)))
}

/// Serializes all time columns into an arrow payload and schema.
fn serialize_time_columns(&self) -> (Schema, Vec<Box<dyn Array>>) {
crate::profile_function!();

fn serialize_time_column(
timeline: Timeline,
times: &TimeOptVec,
) -> (Field, Box<dyn Array>) {
let data = PrimitiveArray::from(times.as_slice()).to(timeline.datatype());

let field = Field::new(timeline.name().as_str(), data.data_type().clone(), false)
.with_metadata([(METADATA_KIND.to_owned(), METADATA_KIND_TIME.to_owned())].into());

(field, data.boxed())
}

let Self {
table_id: _,
row_id: _,
col_timelines,
entity_path: _,
num_instances: _,
columns: _,
} = self;

let mut schema = Schema::default();
let mut columns = Vec::new();

for (timeline, col_time) in col_timelines {
let (time_field, time_column) = serialize_time_column(*timeline, col_time);
schema.fields.push(time_field);
columns.push(time_column);
}

(schema, columns)
}

/// Serializes all controls columns into an arrow payload and schema.
///
/// Control columns are those that drive the behavior of the storage systems.
Expand Down Expand Up @@ -476,7 +563,7 @@ impl DataTable {
let Self {
table_id,
row_id,
timepoint,
col_timelines: _,
entity_path,
num_instances,
columns: _,
Expand All @@ -489,11 +576,6 @@ impl DataTable {
schema.fields.push(row_id_field);
columns.push(row_id_column);

let (timepoint_field, timepoint_column) =
serialize_dense_column(COLUMN_TIMEPOINT, timepoint)?;
schema.fields.push(timepoint_field);
columns.push(timepoint_column);

let (entity_path_field, entity_path_column) =
serialize_dense_column(COLUMN_ENTITY_PATH, entity_path)?;
schema.fields.push(entity_path_field);
Expand All @@ -520,7 +602,7 @@ impl DataTable {
let Self {
table_id: _,
row_id: _,
timepoint: _,
col_timelines: _,
entity_path: _,
num_instances: _,
columns: table,
Expand Down Expand Up @@ -603,6 +685,28 @@ impl DataTable {
) -> DataTableResult<Self> {
crate::profile_function!();

// --- Time ---

let col_timelines: DataTableResult<_> = schema
.fields
.iter()
.enumerate()
.filter_map(|(i, field)| {
field.metadata.get(METADATA_KIND).and_then(|kind| {
(kind == METADATA_KIND_TIME).then_some((field.name.as_str(), i))
})
})
.map(|(name, index)| {
chunk
.get(index)
.ok_or(DataTableError::MissingColumn(name.to_owned()))
.and_then(|column| Self::deserialize_time_column(name, &**column))
})
.collect();
let col_timelines = col_timelines?;

// --- Control ---

let control_indices: HashMap<&str, usize> = schema
.fields
.iter()
Expand All @@ -623,14 +727,14 @@ impl DataTable {
// NOTE: the unwrappings cannot fail since control_index() makes sure the index is valid
let row_id =
(&**chunk.get(control_index(COLUMN_ROW_ID)?).unwrap()).try_into_collection()?;
let timepoint =
(&**chunk.get(control_index(COLUMN_TIMEPOINT)?).unwrap()).try_into_collection()?;
let entity_path =
(&**chunk.get(control_index(COLUMN_ENTITY_PATH)?).unwrap()).try_into_collection()?;
// TODO(#1712): This is unnecessarily slow...
let num_instances =
(&**chunk.get(control_index(COLUMN_NUM_INSTANCES)?).unwrap()).try_into_collection()?;

// --- Components ---

let columns: DataTableResult<_> = schema
.fields
.iter()
Expand All @@ -656,13 +760,40 @@ impl DataTable {
Ok(Self {
table_id,
row_id,
timepoint,
col_timelines,
entity_path,
num_instances,
columns,
})
}

/// Deserializes a sparse time column.
fn deserialize_time_column(
name: &str,
column: &dyn Array,
) -> DataTableResult<(Timeline, TimeOptVec)> {
// See also [`Timeline::datatype`]
let timeline = match column.data_type().to_logical_type() {
DataType::Int64 => Timeline::new_sequence(name),
DataType::Timestamp(TimeUnit::Nanosecond, None) => Timeline::new_temporal(name),
_ => {
return Err(DataTableError::NotATimeColumn {
name: name.into(),
datatype: column.data_type().clone(),
})
}
};

let col_time = column
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
// NOTE: cannot fail, datatype checked above
.unwrap();
let col_time: TimeOptVec = col_time.into_iter().map(|time| time.copied()).collect();

Ok((timeline, col_time))
}

/// Deserializes a sparse data column.
fn deserialize_data_column(
component: ComponentName,
Expand Down
Loading