Skip to content

Commit

Permalink
avoid multiple borrows in join (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Mar 4, 2021
1 parent cdb9b70 commit afa4413
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,22 +364,29 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
// TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow()));

// We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
// on both traces at the same time, as they could be the same trace and this would panic.
let mut batch2_cursors = Vec::new();
trace2.map_batches(|batch2| {
acknowledged2.clone_from(batch2.upper());
batch2_cursors.push((batch2.cursor(), batch2.clone()));
});
// At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
// iterating through batches and capturing the upper bound. This is a great moment to assert that
// `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
// TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));

// Load up deferred work using trace2 cursors and batches captured just above.
for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
// It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
let batch2_cursor = batch2.cursor();
// We could downgrade the capability here, but doing so is a bit complicated mathematically.
// TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
// in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
// that property.
todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
});
// At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
// iterating through batches and capturing the upper bound. This is a great moment to assert that
// `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
// TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));
}

// Droppable handles to shared trace data structures.
let mut trace1_option = Some(trace1);
Expand Down

0 comments on commit afa4413

Please sign in to comment.