Skip to content

Commit

Permalink
Fix lurking bug in datastore bucket sorting routines (#3281)
Browse files Browse the repository at this point in the history
_Commit A demonstrates the bug, commit B fixes it and passes the test._

Our public API exposes a well-defined order when logging from a
single-thread, and that order is encoded into our `RowId`s.

As such it is expected that:
1. An incoming non-monotically increasing `RowId` (however it got there)
toggles the dirty bit.
This shouldn't be possible with the current clients since we do not yet
have reordering retries (as far as I remember), but it'll be coming at
some point and we can't trust the clients anyhow.
2. The `RowId` must be used as tie-breaker when sorting the buckets.
We use a stable sort so this shouldn't be an issue as of today, but this
is a nasty bug waiting to happen nonetheless.
  • Loading branch information
teh-cmc authored Sep 12, 2023
1 parent 7c90c5d commit 2fe28df
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 5 deletions.
5 changes: 4 additions & 1 deletion crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,10 @@ impl IndexedBucketInner {
let swaps = {
re_tracing::profile_scope!("swaps");
let mut swaps = (0..col_time.len()).collect::<Vec<_>>();
swaps.sort_by_key(|&i| &col_time[i]);
// NOTE: Within a single timestamp, we must use the Row ID as tie-breaker!
// The Row ID is how we define ordering within a client's thread, and our public APIs
// guarantee that logging order is respected within a single thread!
swaps.sort_by_key(|&i| (&col_time[i], &col_row_id[i]));
swaps
.iter()
.copied()
Expand Down
7 changes: 3 additions & 4 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,9 @@ impl IndexedBucket {

// append time to primary column and update time range appropriately

if let Some(last_time) = col_time.last() {
if time.as_i64() < *last_time {
*is_sorted = false;
}
if let (Some(last_time), Some(last_row_id)) = (col_time.last(), col_row_id.last()) {
// NOTE: Within a single timestamp, we use the Row ID as tie-breaker
*is_sorted &= (*last_time, *last_row_id) <= (time.as_i64(), row.row_id());
}

col_time.push(time.as_i64());
Expand Down
51 changes: 51 additions & 0 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,3 +1088,54 @@ pub fn init_logs() {
re_log::setup_native_logging();
}
}

// ---

#[test]
fn row_id_ordering() {
init_logs();

let mut store = DataStore::new(InstanceKey::name(), Default::default());

let timeline = re_log_types::Timeline::new("frame_nr", re_log_types::TimeType::Sequence);
let ent_path = EntityPath::from("this/that");

let frame1 = TimeInt::from(1);

let (instances1, points1, points2) = (
build_some_instances(3),
build_some_point2d(3),
build_some_point2d(3),
);
let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), &points1]);

let mut row2 =
test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), points2]);
// NOTE: This should always come before `row1` in frame #1.
row2.row_id = re_log_types::RowId::ZERO;

store.insert_row(&row1).unwrap();
store.insert_row(&row2).unwrap();

// NOTE: Don't sort, let's see if the dirtiness bit got flipped.. as it should have!
// store.sort_indices_if_needed();

let (row_id, results) = store
.latest_at(
&re_arrow_store::LatestAtQuery::latest(timeline),
&ent_path,
re_types::components::Point2D::name(),
&[re_types::components::Point2D::name()],
)
.unwrap();

// The results should come from `row1`!

assert_eq!(row1.row_id, row_id);

let points = results[0]
.as_ref()
.unwrap()
.to_native::<re_types::components::Point2D>();
assert_eq!(points1, points);
}

0 comments on commit 2fe28df

Please sign in to comment.