Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly shutdown sources on Drop #12082

Merged
merged 1 commit into from
Apr 27, 2022
Merged

Conversation

guswynn
Copy link
Contributor

@guswynn guswynn commented Apr 26, 2022

Before this pr, sources are timely-worker-local objects, and they return what amounts Rc's or Rc<RefCell>'s of thread-local capabilities. The intent is that if these capabilities are dropped, then the source operators are correctly shutdown. However, now that all sources end with EventPusher's that consolidate all messages into a single tcp boundary. This boundary holds onto a thread-safe token, which the EventPusher's notice is dropped and drop their worker-local tokens. However, there is not guarantee that new data or progress, in either the ok or err streams of a source will happen, which means the trigger for dropping those tokens may never run.

Instead, @frankmcsherry and I decided that we should just upgrade the original source-tokens to be thread-safe. The way we do this is very consistent across multiple locations:

  • Make the core capabilities local variables.
  • Make the returned token a SourceToken, which is a share-able SyncActivator that activates on Drop.
  • Keep an Arc's Weak around in the operator. When upgrading is impossible, that means the token is dropped, and respond by dropping the local capabilities.
    • This code is guaranteed to run because the token drop will activate the operator.

The places we do this are:

  • the core source operator used by sources in mz_storage::source::util
  • import_table in mz_storage::render::sources
    • This one is complex because we had to switch map_in_place to its implementation with some complexity added
    • I tested this by testing that the drop was correctly called when I selected from a table.
  • The "non-source" replay thing in mz_storage::render. I have no idea how to test this one (cc @antiguru @frankmcsherry )

Perf implications

This pr means each source timely step will have at least 2 added atomic CAS's. If this becomes a problem, then we can switch to an atomic flag as opposed to a full Weak::upgrade, which may save us some perf.

Additionally: we may be able to switch back to Rc's someday, if we move off of tcp_boundary and persist-per-worker instead.

Motivation

  • This PR fixes a previously unreported bug.

Sources are not correctly dropped

Tips for reviewer

TimelyDataflow/differential-dataflow#364 is a smaller, more readable change that gives an example of the exact same kind of change we are doing in multiple places here (and in fact, that change is required for the decode_cdcv2 token to work in this pr.

Also, this pr has some code that is very similar copied to many places. While we may avoid bugs by consolidating it into a function, its unclear if there is a clean way to do so, and it may make the code more unreadable.

Testing

  • This PR has adequate test coverage / QA involvement has been duly considered.

Release notes

None

@guswynn guswynn marked this pull request as ready for review April 26, 2022 21:44
@guswynn guswynn requested review from antiguru, frankmcsherry and petrosagg and removed request for antiguru April 26, 2022 21:44
Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generally looks good to me, except that I am confused why MapInPlaceWithSyncToken is now a thing when it was previously just a map_in_place one-liner.

src/storage/src/render/sources.rs Show resolved Hide resolved
@guswynn guswynn enabled auto-merge (rebase) April 27, 2022 18:18
@guswynn guswynn merged commit b5e7ead into MaterializeInc:main Apr 27, 2022
@guswynn guswynn deleted the synnccc branch April 27, 2022 18:32
benesch added a commit to benesch/materialize that referenced this pull request Jun 1, 2022
Resolve a TODO to read the Debezium transactional metadata source out of
persist, rather than re-rendering the source.

This PR will unblock creating a pod per source (MaterializeInc#12770), but it is
blocked on reverting (MaterializeInc#12082), which is no longer necessary now that TCP
boundary has been removed.
benesch added a commit to benesch/materialize that referenced this pull request Jun 1, 2022
Resolve a TODO to read the Debezium transactional metadata source out of
persist, rather than re-rendering the source.

This PR will unblock creating a pod per source (MaterializeInc#12770), but it is
blocked on reverting (MaterializeInc#12082), which is no longer necessary now that TCP
boundary has been removed.
benesch added a commit to benesch/materialize that referenced this pull request Jun 1, 2022
PR MaterializeInc#12082 converted source tokens to thread-safe `Arc`s to be compatible
with the TCP storage/compute boundary, but since MaterializeInc#12216 replaced the TCP
boundary with persist we can go back to Rcs.
benesch added a commit to benesch/materialize that referenced this pull request Jun 1, 2022
Resolve a TODO to read the Debezium transactional metadata source out of
persist, rather than re-rendering the source.

This PR will unblock creating a pod per source (MaterializeInc#12770), but it is
blocked on reverting (MaterializeInc#12082), which is no longer necessary now that TCP
boundary has been removed.
benesch added a commit to benesch/materialize that referenced this pull request Jun 1, 2022
PR MaterializeInc#12082 converted source tokens to thread-safe `Arc`s to be compatible
with the TCP storage/compute boundary, but since MaterializeInc#12216 replaced the TCP
boundary with persist we can go back to Rcs.
benesch added a commit that referenced this pull request Jun 1, 2022
PR #12082 converted source tokens to thread-safe `Arc`s to be compatible
with the TCP storage/compute boundary, but since #12216 replaced the TCP
boundary with persist we can go back to Rcs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants