Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust join implementation #307

Merged
merged 2 commits into from
Feb 23, 2021
Merged

Robust join implementation #307

merged 2 commits into from
Feb 23, 2021

Conversation

frankmcsherry
Copy link
Member

@frankmcsherry frankmcsherry commented Feb 19, 2021

This PR changes the implementation of join to be more robust in the presence of pre-compacted arrangements.

In particular, https://github.com/MaterializeInc/database-issues/issues/1780 was at its core about join receiving pre-compacted arrangements, misunderstanding the physical compaction frontier, mis-SETTING the physical compaction frontier, and then believing it was getting results that it was not correctly receiving. A giant tire fire, really.

The problem was that it was not carefully tracking the relationship between received batches and the physical compaction frontier. This PR attempts to fix that by being substantially more clear about the invariants that should be checked at operator construction time, and then maintained as the operator executes. They are not very complicated, and boil down to:

for each trace, the physical compaction frontier should be less or equal to the completed trace frontier.

That is, we shouldn't physically compact past the timestamps we have completed, which makes some sense because such a handle to a trace is not especially useful (you cannot subset out any of the existing data, under the current rules for extracting out subsets of data). Join in particular wants to be able to subset out from a trace the data that it has received in batches, and if it cannot reliably do that .. well the implementation we have now doesn't work, nor will this new one (just, it will more aggressively not work).

cc @ruchirK

@frankmcsherry
Copy link
Member Author

@ryzhyk if you happen to have a test suite hanging around to point at this branch I'd be interested! We'll hammer on it at Materialize too, but you do different things than we do (e.g. multi-dimensional timestamps in your joins). The only intended change is that on pre-compacted arrangements it should be less wrong, and it may be that this hasn't been an issue for you, but we'll probably aim to land this or something like it.

@ryzhyk
Copy link
Contributor

ryzhyk commented Feb 19, 2021

@ryzhyk if you happen to have a test suite hanging around to point at this branch I'd be interested! We'll hammer on it at Materialize too, but you do different things than we do (e.g. multi-dimensional timestamps in your joins). The only intended change is that on pre-compacted arrangements it should be less wrong, and it may be that this hasn't been an issue for you, but we'll probably aim to land this or something like it.

Sure, I'll get on it. Is it possible to characterize the scenarios where the bug shows up and how it manifests outside of DD?

@frankmcsherry
Copy link
Member Author

Yeah, the main thing is that if you import an Arranged into a dataflow, where the underlying trace has undergone physical compaction, then .. under circumstances that now seem easy to reproduce (in Materialize) the compacted representation of the trace confuses join who produces double the output it should have (it takes in inputs A and B, and produces A x B in response to A and A x B in response to B).

It's a bit tricky to isolate, as it seems to reliably happen in the second execution of a Materialize query, and I haven't exactly figured out how to make it happen, but I can detail what does happen once it gets in that state.

@frankmcsherry
Copy link
Member Author

In maybe more detail: join starts up with the compacted trace, and in the first execution finds no inputs, but nonetheless learns that [0, x) is empty. It "accepts" this range and advances its internal bounds, and indicates to the trace that it will only need to be able to get access to the trace from x onward. However, the trace is perhaps compacted past x at this point. That should be an error there (and in this PR is now a debug_assert!) as you should not be able to roll back the physical compaction frontier. Instead, it overwrote the compaction frontier (it succeeded in rolling it back) and the next time around the loop asked for data up to that point, and got all the data instead (as the confused trace had more data, and yet saw that the request lined up with its new physical compaction frontier).

It's hard to explain without wincing at the sketchiness of it all. The new code is meant to be a lot more clear about stating, checking, and maintaining invariants.

@ryzhyk
Copy link
Contributor

ryzhyk commented Feb 19, 2021

Our tests are passing, but then they did not catch the bug in the first place...

@frankmcsherry
Copy link
Member Author

This has passed some amount of randomized stress testing at Materialize, so once we get a review on it I think we can land it (not erroring is good, but it should also land in a form that is more easy to understand than previously).

Copy link
Contributor

@ruchirK ruchirK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it makes sense! I don't fully understand everything this code is doing and I didn't take a close look to try to establish that the old / new versions of the code do the exact same stuff but I think the invariants we are trying to establish make sense. I left a few comments around places that seemed confusing.

src/operators/join.rs Show resolved Hide resolved
src/operators/join.rs Outdated Show resolved Hide resolved
src/operators/join.rs Show resolved Hide resolved
src/operators/join.rs Show resolved Hide resolved
src/operators/join.rs Outdated Show resolved Hide resolved
trace1.advance_upper(acknowledged1);
trace1.set_physical_compaction(acknowledged1.borrow());
}
trace1.advance_upper(&mut acknowledged1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand how set_logical_compaction, advance_upper and set_physical_compaction interact (specifically the first two seem very overlapping?). Haven't read the docs on these so this is a bit of a lazy question - this could be a nice place for a few more comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some more comments!

@frankmcsherry
Copy link
Member Author

I've restructured the code a bit, with substantially more comments, which I think resolve all the issues except perhaps the last comment about "how all these things interact". The comments are meant to address that, but I'd be up for any feedback on whether they do or do not do this.

// Allow `trace1` to physically compact up to the upper bound of batches we
// have received in its input (`input1`). We will not require a cursor that
// is not beyond this bound.
trace1.set_physical_compaction(acknowledged1.borrow());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does acknowledged1 need to be <= input2.frontier() / is it always because of some other invariant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are generally unrelated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants