Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: stabilize dataframe sorts #1549

Merged
merged 4 commits into from
Mar 15, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl DataStore {
pub fn to_dataframe(&self) -> DataFrame {
crate::profile_function!();

const IS_TIMELESS_COL: &str = "_is_timeless";
const TIMELESS_COL: &str = "_is_timeless";

let timeless_dfs = self.timeless_indices.values().map(|index| {
let ent_path = index.ent_path.clone();
Expand All @@ -34,7 +34,7 @@ impl DataStore {
// Add a column where every row is a boolean true (timeless)
let timeless = {
let timeless = BooleanArray::from(vec![Some(true); num_rows]).boxed();
new_infallible_series(IS_TIMELESS_COL, timeless.as_ref(), num_rows)
new_infallible_series(TIMELESS_COL, timeless.as_ref(), num_rows)
};
let df = df.with_column(timeless).unwrap(); // cannot fail

Expand Down Expand Up @@ -89,25 +89,48 @@ impl DataStore {
})
.collect();

// Some internal functions of `polars` will panic if everything's empty: early exit.
if dfs.iter().all(|df| df.is_empty()) {
return DataFrame::empty();
}

// Concatenate all indices together.
//
// This has to be done diagonally since these indices refer to different entities with
// potentially wildly different sets of components and lengths.
let df = diag_concat_df(dfs.as_slice())
// TODO(cmc): is there any way this can fail in this case?
.unwrap();
//
// NOTE: The only way this can fail in this case is if all these frames are empty, because
// the store itself is empty, which we check just above.
let df = diag_concat_df(dfs.as_slice()).unwrap();

// Arrange the columns in the order that makes the most sense as a user.
let df = sort_df_columns(&df, self.config.store_insert_ids);

let has_insert_ids = df.column(DataStore::insert_id_key().as_str()).is_ok();
let has_timeless = df.column(IS_TIMELESS_COL).is_ok();
let has_timeless = df.column(TIMELESS_COL).is_ok();
let insert_id_col = DataStore::insert_id_key().as_str();

// Now we want to sort _the contents_ of the columns, and we need to make sure we do so in
// as stable a way as possible given our constraints: we cannot actually sort the
// component columns themselves as they are internally lists of their own.
let (sort_cols, sort_orders): (Vec<_>, Vec<_>) = [
has_timeless.then_some((IS_TIMELESS_COL, true)),
has_insert_ids.then_some((DataStore::insert_id_key().as_str(), false)),
df.column(TIMELESS_COL)
.is_ok()
.then_some((TIMELESS_COL, true)), // descending
df.column(insert_id_col)
.is_ok()
.then_some((insert_id_col, false)), // ascending
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
]
.into_iter()
.flatten()
// NOTE: Already properly arranged above, and already contains insert_id if needed.
.chain(
df.get_column_names()
.into_iter()
.filter(|col| *col != TIMELESS_COL) // we handle this one separately
.filter(|col| *col != insert_id_col) // we handle this one separately
.filter(|col| !col.starts_with("rerun.")) // lists cannot be sorted
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
.map(|col| (col, false)), // ascending order
)
.unzip();

let df = if !sort_cols.is_empty() {
Expand All @@ -117,7 +140,7 @@ impl DataStore {
};

if has_timeless {
df.drop(IS_TIMELESS_COL).unwrap()
df.drop(TIMELESS_COL).unwrap()
} else {
df
}
Expand Down