Skip to content

Commit

Permalink
datastore: stabilize dataframe sorts (#1549)
Browse files Browse the repository at this point in the history
* store_polars: fixed panic edge case

* keep those polar sorts as stable as we can

* improve content sort

* improve column sort
  • Loading branch information
teh-cmc authored Mar 15, 2023
1 parent c62aa91 commit 05cd517
Showing 1 changed file with 59 additions and 18 deletions.
77 changes: 59 additions & 18 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeSet;

use arrow2::{
array::{new_empty_array, Array, BooleanArray, ListArray, UInt64Array, Utf8Array},
bitmap::Bitmap,
Expand All @@ -23,7 +25,7 @@ impl DataStore {
pub fn to_dataframe(&self) -> DataFrame {
crate::profile_function!();

const IS_TIMELESS_COL: &str = "_is_timeless";
const TIMELESS_COL: &str = "_is_timeless";

let timeless_dfs = self.timeless_indices.values().map(|index| {
let ent_path = index.ent_path.clone();
Expand All @@ -34,7 +36,7 @@ impl DataStore {
// Add a column where every row is a boolean true (timeless)
let timeless = {
let timeless = BooleanArray::from(vec![Some(true); num_rows]).boxed();
new_infallible_series(IS_TIMELESS_COL, timeless.as_ref(), num_rows)
new_infallible_series(TIMELESS_COL, timeless.as_ref(), num_rows)
};
let df = df.with_column(timeless).unwrap(); // cannot fail

Expand Down Expand Up @@ -89,25 +91,56 @@ impl DataStore {
})
.collect();

// Some internal functions of `polars` will panic if everything's empty: early exit.
if dfs.iter().all(|df| df.is_empty()) {
return DataFrame::empty();
}

// Concatenate all indices together.
//
// This has to be done diagonally since these indices refer to different entities with
// potentially wildly different sets of components and lengths.
let df = diag_concat_df(dfs.as_slice())
// TODO(cmc): is there any way this can fail in this case?
.unwrap();
//
// NOTE: The only way this can fail in this case is if all these frames are empty, because
// the store itself is empty, which we check just above.
let df = diag_concat_df(dfs.as_slice()).unwrap();

// Arrange the columns in the order that makes the most sense as a user.
let timelines: BTreeSet<&str> = self
.indices
.keys()
.map(|(timeline, _)| timeline.name().as_str())
.collect();
let df = sort_df_columns(&df, self.config.store_insert_ids, &timelines);

let df = sort_df_columns(&df, self.config.store_insert_ids);
let has_timeless = df.column(TIMELESS_COL).is_ok();
let insert_id_col = DataStore::insert_id_key().as_str();

let has_insert_ids = df.column(DataStore::insert_id_key().as_str()).is_ok();
let has_timeless = df.column(IS_TIMELESS_COL).is_ok();
const ASCENDING: bool = false;
const DESCENDING: bool = true;

// Now we want to sort based on _the contents_ of the columns, and we need to make sure
// we do so in as stable a way as possible given our constraints: we cannot actually sort
// the component columns themselves as they are internally lists of their own.
let (sort_cols, sort_orders): (Vec<_>, Vec<_>) = [
has_timeless.then_some((IS_TIMELESS_COL, true)),
has_insert_ids.then_some((DataStore::insert_id_key().as_str(), false)),
df.column(TIMELESS_COL)
.is_ok()
.then_some((TIMELESS_COL, DESCENDING)),
df.column(insert_id_col)
.is_ok()
.then_some((insert_id_col, ASCENDING)),
]
.into_iter()
.flatten()
// NOTE: Already properly arranged above, and already contains insert_id if needed.
.chain(
df.get_column_names()
.into_iter()
.filter(|col| *col != TIMELESS_COL) // we handle this one separately
.filter(|col| *col != insert_id_col) // we handle this one separately
.filter(|col| df.column(col).unwrap().list().is_err()) // lists cannot be sorted
.map(|col| (col, ASCENDING)),
)
.unzip();

let df = if !sort_cols.is_empty() {
Expand All @@ -117,7 +150,7 @@ impl DataStore {
};

if has_timeless {
df.drop(IS_TIMELESS_COL).unwrap()
df.drop(TIMELESS_COL).unwrap()
} else {
df
}
Expand Down Expand Up @@ -308,8 +341,13 @@ fn new_infallible_series(name: &str, data: &dyn Array, len: usize) -> Series {
// - insert ID comes first if it's available,
// - followed by lexically sorted timelines,
// - followed by the entity path,
// - and finally all components in lexical order.
fn sort_df_columns(df: &DataFrame, store_insert_ids: bool) -> DataFrame {
// - followed by native components (i.e. "rerun.XXX") in lexical order,
// - and finally extension components (i.e. "ext.XXX") in lexical order.
fn sort_df_columns(
df: &DataFrame,
store_insert_ids: bool,
timelines: &BTreeSet<&str>,
) -> DataFrame {
crate::profile_function!();

let columns: Vec<_> = {
Expand All @@ -325,25 +363,28 @@ fn sort_df_columns(df: &DataFrame, store_insert_ids: bool) -> DataFrame {
);
}

let timelines = all
let timelines = timelines.iter().copied().map(Some).collect::<Vec<_>>();

let native_components = all
.iter()
.copied()
.filter(|name| !name.starts_with("rerun."))
.filter(|name| name.starts_with("rerun."))
.map(Some)
.collect::<Vec<_>>();

let components = all
let extension_components = all
.iter()
.copied()
.filter(|name| name.starts_with("rerun."))
.filter(|name| name.starts_with("ext."))
.map(Some)
.collect::<Vec<_>>();

[
vec![store_insert_ids.then(|| DataStore::insert_id_key().as_str())],
timelines,
vec![Some("entity")],
components,
native_components,
extension_components,
]
.into_iter()
.flatten() // flatten vectors
Expand Down

0 comments on commit 05cd517

Please sign in to comment.