Skip to content

Commit

Permalink
columnar timepoints
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 4, 2023
1 parent 1946683 commit 476103f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 213 deletions.
178 changes: 155 additions & 23 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::BTreeMap;

use ahash::HashMap;
use itertools::Itertools as _;
use nohash_hasher::{IntMap, IntSet};
use smallvec::SmallVec;

use crate::{
ArrowMsg, ComponentName, DataCell, DataCellError, DataRow, DataRowError, EntityPath, MsgId,
TimePoint,
TimePoint, Timeline,
};

// ---
Expand All @@ -15,6 +17,11 @@ pub enum DataTableError {
#[error("Trying to deserialize data that is missing a column present in the schema: {0:?}")]
MissingColumn(String),

#[error(
"Trying to deserialize time column data with invalid datatype: {name:?} ({datatype:#?})"
)]
NotATimeColumn { name: String, datatype: DataType },

#[error("Trying to deserialize column data that doesn't contain any ListArrays: {0:?}")]
NotAColumn(String),

Expand All @@ -37,6 +44,7 @@ pub type DataTableResult<T> = ::std::result::Result<T, DataTableError>;
// ---

type RowIdVec = SmallVec<[MsgId; 4]>;
type TimeOptVec = SmallVec<[Option<i64>; 4]>;
type TimePointVec = SmallVec<[TimePoint; 4]>;
type EntityPathVec = SmallVec<[EntityPath; 4]>;
type NumInstancesVec = SmallVec<[u32; 4]>;
Expand Down Expand Up @@ -229,8 +237,11 @@ pub struct DataTable {
/// The entire column of `RowId`s.
pub row_id: RowIdVec,

/// The entire column of [`TimePoint`]s.
pub timepoint: TimePointVec,
/// All the rows for all the time columns.
///
/// The times are optional since not all rows are guaranteed to have a timestamp for every
/// single timeline (though it is highly likely to be the case in practice).
pub col_timelines: BTreeMap<Timeline, TimeOptVec>,

/// The entire column of [`EntityPath`]s.
pub entity_path: EntityPathVec,
Expand All @@ -251,7 +262,7 @@ impl DataTable {
Self {
table_id,
row_id: Default::default(),
timepoint: Default::default(),
col_timelines: Default::default(),
entity_path: Default::default(),
num_instances: Default::default(),
columns: Default::default(),
Expand Down Expand Up @@ -287,6 +298,24 @@ impl DataTable {
})
.multiunzip();

// All time columns.
let mut col_timelines: BTreeMap<Timeline, TimeOptVec> = BTreeMap::default();
for (i, timepoint) in timepoint.iter().enumerate() {
for (timeline, time) in timepoint.iter() {
match col_timelines.entry(*timeline) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry
.insert(smallvec::smallvec![None; i])
.push(Some(time.as_i64()));
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
entry.push(Some(time.as_i64()));
}
}
}
}

// Pre-allocate all columns (one per component).
let mut columns = IntMap::default();
for component in components {
Expand Down Expand Up @@ -314,7 +343,7 @@ impl DataTable {
Self {
table_id,
row_id,
timepoint,
col_timelines,
entity_path,
num_instances,
columns,
Expand All @@ -335,7 +364,7 @@ impl DataTable {
let Self {
table_id: _,
row_id,
timepoint,
col_timelines,
entity_path,
num_instances,
columns,
Expand All @@ -348,7 +377,14 @@ impl DataTable {

DataRow::from_cells(
row_id[i],
timepoint[i].clone(),
TimePoint::from(
col_timelines
.iter()
.filter_map(|(timeline, times)| {
times[i].map(|time| (*timeline, time.into()))
})
.collect::<BTreeMap<_, _>>(),
),
entity_path[i].clone(),
num_instances[i],
cells,
Expand All @@ -360,19 +396,23 @@ impl DataTable {
/// and returns the corresponding [`TimePoint`].
#[inline]
pub fn timepoint_max(&self) -> TimePoint {
self.timepoint
.iter()
.fold(TimePoint::timeless(), |acc, tp| acc.union_max(tp))
let mut timepoint = TimePoint::timeless();
for (timeline, col_time) in &self.col_timelines {
if let Some(time) = col_time.iter().flatten().max().copied() {
timepoint.insert(*timeline, time.into());
}
}
timepoint
}
}

// --- Serialization ---

use arrow2::{
array::{Array, ListArray},
array::{Array, ListArray, PrimitiveArray},
bitmap::Bitmap,
chunk::Chunk,
datatypes::{DataType, Field, Schema},
datatypes::{DataType, Field, Schema, TimeUnit},
offset::Offsets,
};
use arrow2_convert::{
Expand All @@ -383,13 +423,13 @@ use arrow2_convert::{
// TODO(#1696): Those names should come from the datatypes themselves.

pub const COLUMN_ROW_ID: &str = "rerun.row_id";
pub const COLUMN_TIMEPOINT: &str = "rerun.timepoint";
pub const COLUMN_ENTITY_PATH: &str = "rerun.entity_path";
pub const COLUMN_NUM_INSTANCES: &str = "rerun.num_instances";

pub const METADATA_KIND: &str = "rerun.kind";
pub const METADATA_KIND_DATA: &str = "data";
pub const METADATA_KIND_CONTROL: &str = "control";
pub const METADATA_KIND_TIME: &str = "time";
pub const METADATA_TABLE_ID: &str = "rerun.table_id";

impl DataTable {
Expand All @@ -400,6 +440,8 @@ impl DataTable {
/// * Control columns are those that drive the behavior of the storage systems.
/// They are always present, always dense, and always deserialized upon reception by the
/// server.
/// Internally, time columns are handled separately from the rest of the control columns,
/// although they are control columns in and of themselves.
/// * Data columns are the one that hold component data.
/// They are optional, potentially sparse, and never deserialized on the server-side (not by
/// the storage systems, at least).
Expand All @@ -409,6 +451,13 @@ impl DataTable {
let mut schema = Schema::default();
let mut columns = Vec::new();

{
let (control_schema, control_columns) = self.serialize_time_columns();
schema.fields.extend(control_schema.fields);
schema.metadata.extend(control_schema.metadata);
columns.extend(control_columns.into_iter());
}

{
let (control_schema, control_columns) = self.serialize_control_columns()?;
schema.fields.extend(control_schema.fields);
Expand All @@ -426,6 +475,45 @@ impl DataTable {
Ok((schema, Chunk::new(columns)))
}

/// Serializes all time columns into an arrow payload and schema.
fn serialize_time_columns(&self) -> (Schema, Vec<Box<dyn Array>>) {
crate::profile_function!();

fn serialize_time_column(
timeline: Timeline,
times: &TimeOptVec,
) -> (Field, Box<dyn Array>) {
let data = PrimitiveArray::from(times.as_slice()).to(timeline.datatype());

let field = Field::new(timeline.name().as_str(), data.data_type().clone(), false)
.with_metadata([(METADATA_KIND.to_owned(), METADATA_KIND_TIME.to_owned())].into());

(field, data.boxed())
}

let Self {
table_id,
row_id: _,
col_timelines,
entity_path: _,
num_instances: _,
columns: _,
} = self;

let mut schema = Schema::default();
let mut columns = Vec::new();

for (timeline, col_time) in col_timelines {
let (time_field, time_column) = serialize_time_column(*timeline, col_time);
schema.fields.push(time_field);
columns.push(time_column);
}

schema.metadata = [(METADATA_TABLE_ID.into(), table_id.to_string())].into();

(schema, columns)
}

/// Serializes all controls columns into an arrow payload and schema.
///
/// Control columns are those that drive the behavior of the storage systems.
Expand Down Expand Up @@ -476,7 +564,7 @@ impl DataTable {
let Self {
table_id,
row_id,
timepoint,
col_timelines: _,
entity_path,
num_instances,
columns: _,
Expand All @@ -489,11 +577,6 @@ impl DataTable {
schema.fields.push(row_id_field);
columns.push(row_id_column);

let (timepoint_field, timepoint_column) =
serialize_dense_column(COLUMN_TIMEPOINT, timepoint)?;
schema.fields.push(timepoint_field);
columns.push(timepoint_column);

let (entity_path_field, entity_path_column) =
serialize_dense_column(COLUMN_ENTITY_PATH, entity_path)?;
schema.fields.push(entity_path_field);
Expand All @@ -520,7 +603,7 @@ impl DataTable {
let Self {
table_id: _,
row_id: _,
timepoint: _,
col_timelines: _,
entity_path: _,
num_instances: _,
columns: table,
Expand Down Expand Up @@ -603,6 +686,28 @@ impl DataTable {
) -> DataTableResult<Self> {
crate::profile_function!();

// --- Time ---

let col_timelines: DataTableResult<_> = schema
.fields
.iter()
.enumerate()
.filter_map(|(i, field)| {
field.metadata.get(METADATA_KIND).and_then(|kind| {
(kind == METADATA_KIND_TIME).then_some((field.name.as_str(), i))
})
})
.map(|(name, index)| {
chunk
.get(index)
.ok_or(DataTableError::MissingColumn(name.to_owned()))
.and_then(|column| Self::deserialize_time_column(name, &**column))
})
.collect();
let col_timelines = col_timelines?;

// --- Control ---

let control_indices: HashMap<&str, usize> = schema
.fields
.iter()
Expand All @@ -623,14 +728,14 @@ impl DataTable {
// NOTE: the unwrappings cannot fail since control_index() makes sure the index is valid
let row_id =
(&**chunk.get(control_index(COLUMN_ROW_ID)?).unwrap()).try_into_collection()?;
let timepoint =
(&**chunk.get(control_index(COLUMN_TIMEPOINT)?).unwrap()).try_into_collection()?;
let entity_path =
(&**chunk.get(control_index(COLUMN_ENTITY_PATH)?).unwrap()).try_into_collection()?;
// TODO(#1712): This is unnecessarily slow...
let num_instances =
(&**chunk.get(control_index(COLUMN_NUM_INSTANCES)?).unwrap()).try_into_collection()?;

// --- Components ---

let columns: DataTableResult<_> = schema
.fields
.iter()
Expand All @@ -656,13 +761,40 @@ impl DataTable {
Ok(Self {
table_id,
row_id,
timepoint,
col_timelines,
entity_path,
num_instances,
columns,
})
}

/// Deserializes a sparse time column.
fn deserialize_time_column(
name: &str,
column: &dyn Array,
) -> DataTableResult<(Timeline, TimeOptVec)> {
// See also [`Timeline::datatype`]
let timeline = match column.data_type().to_logical_type() {
DataType::Int64 => Timeline::new_sequence(name),
DataType::Timestamp(TimeUnit::Nanosecond, None) => Timeline::new_temporal(name),
_ => {
return Err(DataTableError::NotATimeColumn {
name: name.into(),
datatype: column.data_type().clone(),
})
}
};

let col_time = column
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
// NOTE: cannot fail, datatype checked above
.unwrap();
let col_time: TimeOptVec = col_time.into_iter().map(|time| time.copied()).collect();

Ok((timeline, col_time))
}

/// Deserializes a sparse data column.
fn deserialize_data_column(
component: ComponentName,
Expand Down
Loading

0 comments on commit 476103f

Please sign in to comment.