Skip to content

Remove the progress API from the derivation pipeline#3506

Closed
trianglesphere wants to merge 8 commits intodevelopfrom
jg/invert_pipeline_v2
Closed

Remove the progress API from the derivation pipeline#3506
trianglesphere wants to merge 8 commits intodevelopfrom
jg/invert_pipeline_v2

Conversation

@trianglesphere
Copy link
Contributor

Description

This is a stack of PRs to incrementally refactor the derivation pipeline with the goal of removing the progress API. This does this by recognizing that the progress API is used for two different things: 1) Recognizing when the a stage will not get any more data from a lower level stage and 2) Tracking where the stage is. There is a third function of advancing the L1 origin of the pipeline by a complex relationship between the progress and step APIs.

The new API is as follows:

  • Each stage retains a reference to the previous stage
  • Each stage asks the previous stage for more data. If a stage is out of data, it returns io.EOF. If a stage may produce more data but is currently out, it returns derive.NotEnoughData.
  • Once the final stage (the engine queue) returns io.EOF, the pipeline advances the L1 traversal stage
  • There is a single origin for the entire pipeline. It is set the the origin required of the channel bank. As such, the batch queue needs to reject some batches that if pulls from earlier in the pipeline.

The goal of this refactor (removing the progress API) + defining a new API between stages is to make it easier to test each stage in isolation as well as make it easier to incrementally improve each stage in the future.

Tests

Every commit in this sequence passes the op-e2e tests, however no effort was made to make the unit tests pass.

Additional context

Each commit in this stack is meant to be able to be reviewed as it's own PR. I've combined them together to show what each PR is driving towards.

@changeset-bot
Copy link

changeset-bot bot commented Sep 19, 2022

⚠️ No Changeset found

Latest commit: d33db24

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@mergify
Copy link
Contributor

mergify bot commented Sep 19, 2022

This PR changes implementation code, but doesn't include a changeset. Did you forget to add one?

@trianglesphere trianglesphere changed the title Jg/invert pipeline v2 Pipeline Refactor Sep 19, 2022
@trianglesphere trianglesphere changed the title Pipeline Refactor Remove the progress API from the derivation pipeline Sep 20, 2022
if len(*ds) == 0 {
func NewCalldataSourceImpl(ctx context.Context, log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, block eth.BlockID) *CalldataSourceImpl {
_, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this get stuck in an infinite loop if the block hash doesn't exist? Perhaps want to check for ethereum.NotFound

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this can. This PR serves as an example of the structure rather than being ready to go, but this specific code will need to be beefed up before being merge.

Copy link
Contributor

@protolambda protolambda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good! I like how the explicit L1 traversal means the io.EOF of stages can now be regarded as having fully read the current origin, instead of immediately traversing further. And it looks much more like an io.Reader interface, which is nice.

I'm more hesitant on the Reset logic: making sure we reproduce the same buffer-state in each stage is very important to keep nodes that started syncing at different points actually in sync. E.g. the channel bank may emit bad data while it hasn't fully rebuilt its own buffer, and the next stages may be polluted with that inconsistent data. E.g. a batch may have been encoded in a data tx that would have been pruned if the channel bank was full, instead of the artificial empty buffer. That batch should not get buffered/processed, but dropped. Keeping track of origins per stage aimed to avoid these cross-stage interactions, dropping inconsistent data as soon as possible, while a stage is getting consistent again after resetting it back.

It would help if we can write tests for the resetting, or include in the specs how we make sure we can sync from any older point by using the reset + sequence window properties. Let me know if/how I can help with that.

And I don't want to scope-creep this work, but it would be nice to discuss the possible DoS vectors, maybe in a separate PR: making sure that e.g. the ChannelBank can never grow too large is difficult, but maybe less so if we think about that in this refactor.

Comment on lines +38 to +39
_, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a difference between temporary RPC errors and persistent not-found type of errors. Creating a calldata source with no data (if it doesn't error, but just returns nothing?) may result in accidental missed data, which could become a divergence. IMHO leaving the calldata as-is, and putting it in a separate PR or reducing changes is nice, since it's not quite the same as a derivation stage.

@trianglesphere
Copy link
Contributor Author

I'm more hesitant on the Reset logic: making sure we reproduce the same buffer-state in each stage is very important to keep nodes that started syncing at different points actually in sync

The only stage that actually prunes data is the channel bank. For that stage it is important that the order of operations (insert, remove, prune) remains the same and that we start far enough back that we end up in the same state as if we had been going for the start.

The other important stage is the batch queue. Here we probably want to be careful to not have extra batches, but my suspicion is that it's more ok to put a mix of batches in here because of the batch pruning rules.

@trianglesphere
Copy link
Contributor Author

it would be nice to discuss the possible DoS vectors

What new DoS vectors are you worried about here?

This provides a new set of objects which provide the following API guarantees:
- The opening of a data for a new L1 block will never.
- If it failed to get transactions, it does so in the calls to `Next`.

This greatly simplifies usage of this object when constructing new data. If
the node did not keep track of the internal state, the external users of this
API would have to keep track of this state in a more complex way.
The L1 Retrieval stage is now responsible for pulling data from
the L1 Traversal stage. In addition, the pipeline is responsible
for advancing the state of the L1 Traversal stage.

The L1 Traversal stage only provides access to the current L1 block
once - it pretends to be a queue that is consumed from.
This makes the L1 Retrieval a purely pull based stage. This commit
required large modifications to the channel bank stage in order
for the channel bank to maintain it's own progress.
This again requires a fair amount of changes to channel_in_reader.go
for the channel in reader to maintain its progress state.
Like the rest of the changes, this also required modifications to
the next stage - the batch queue in order for it to manage the
progress API. This commit required even more changes than usual.
I changed the pipeline to be reset to a common starting point
and now use the L2SafeHead block to filter out adding batches &
L1 blocks to the batch queue.
The attributes queue actually had pretty few modifications to work
with the progress API. The logic of switching the batch queue over
was a bit more complex because the batch queue is very stateful, but
still not the worst.
The progress API is very nearly removed from the engine queue stage.
Now that the engine queue is the only step stage, it is easy to
consolidate different loops inside the derivation pipeline.
@trianglesphere
Copy link
Contributor Author

Closing this PR to do this incrementally. The first PR is #3532

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants