diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 3de3eef2216fe..20f02ba7eba4a 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -33,6 +33,31 @@ type Engine interface { SystemConfigL2Fetcher } +// EngineState provides a read-only interface of the forkchoice state properties of the L2 Engine. +type EngineState interface { + Finalized() eth.L2BlockRef + UnsafeL2Head() eth.L2BlockRef + SafeL2Head() eth.L2BlockRef +} + +// EngineControl enables other components to build blocks with the Engine, +// while keeping the forkchoice state and payload-id management internal to +// avoid state inconsistencies between different users of the EngineControl. +type EngineControl interface { + EngineState + + // StartPayload requests the engine to start building a block with the given attributes. + // If updateSafe, the resulting block will be marked as a safe block. + StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) + // ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned. + ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) + // CancelPayload requests the engine to stop building the current block without making it canonical. + // This is optional, as the engine expires building jobs that are left uncompleted, but can still save resources. + CancelPayload(ctx context.Context, force bool) error + // BuildingPayload indicates if a payload is being built, and onto which block it is being built, and whether or not it is a safe payload. + BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) +} + // Max memory used for buffering unsafe payloads const maxUnsafePayloadsMemory = 500 * 1024 * 1024 @@ -68,6 +93,10 @@ type EngineQueue struct { safeHead eth.L2BlockRef unsafeHead eth.L2BlockRef + buildingOnto eth.L2BlockRef + buildingID eth.PayloadID + buildingSafe bool + // Track when the rollup node changes the forkchoice without engine action, // e.g. on a reset after a reorg, or after consolidating a block. // This update may repeat if the engine returns a temporary error. @@ -91,6 +120,8 @@ type EngineQueue struct { l1Fetcher L1Fetcher } +var _ EngineControl = (*EngineQueue)(nil) + // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { return &EngineQueue{ @@ -416,13 +447,11 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { if len(eq.safeAttributes) == 0 { return nil } - fc := eth.ForkchoiceState{ - HeadBlockHash: eq.safeHead.Hash, - SafeBlockHash: eq.safeHead.Hash, - FinalizedBlockHash: eq.finalized.Hash, - } attrs := eq.safeAttributes[0] - payload, errType, err := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true) + errType, err := eq.StartPayload(ctx, eq.safeHead, attrs, true) + if err == nil { + _, errType, err = eq.ConfirmPayload(ctx) + } if err != nil { switch errType { case BlockInsertTemporaryErr: @@ -457,21 +486,89 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)) } } + eq.safeAttributes = eq.safeAttributes[1:] + eq.logSyncProgress("processed safe block derived from L1") + + return nil +} + +func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { + if eq.buildingID != (eth.PayloadID{}) { + eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent) + // TODO: maybe worth it to force-cancel the old payload ID here. + } + fc := eth.ForkchoiceState{ + HeadBlockHash: parent.Hash, + SafeBlockHash: eq.safeHead.Hash, + FinalizedBlockHash: eq.finalized.Hash, + } + id, errTyp, err := StartPayload(ctx, eq.engine, fc, attrs) + if err != nil { + return errTyp, err + } + eq.buildingID = id + eq.buildingSafe = updateSafe + eq.buildingOnto = parent + return BlockInsertOK, nil +} + +func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) { + if eq.buildingID == (eth.PayloadID{}) { + return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload") + } + if eq.buildingOnto.Hash != eq.unsafeHead.Hash { // E.g. when safe-attributes consolidation fails, it will drop the existing work. + eq.log.Warn("engine is building block that reorgs previous usafe head", "onto", eq.buildingOnto, "unsafe", eq.unsafeHead) + } + fc := eth.ForkchoiceState{ + HeadBlockHash: common.Hash{}, // gets overridden + SafeBlockHash: eq.safeHead.Hash, + FinalizedBlockHash: eq.finalized.Hash, + } + payload, errTyp, err := ConfirmPayload(ctx, eq.log, eq.engine, fc, eq.buildingID, eq.buildingSafe) + if err != nil { + return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", eq.buildingOnto, eq.buildingID, errTyp, err) + } ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis) if err != nil { - return NewTemporaryError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err)) + return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err)) } - eq.safeHead = ref + eq.unsafeHead = ref - eq.metrics.RecordL2Ref("l2_safe", ref) eq.metrics.RecordL2Ref("l2_unsafe", ref) - eq.safeAttributes = eq.safeAttributes[1:] - eq.postProcessSafeL2() - eq.logSyncProgress("processed safe block derived from L1") + if eq.buildingSafe { + eq.safeHead = ref + eq.postProcessSafeL2() + eq.metrics.RecordL2Ref("l2_safe", ref) + } + eq.resetBuildingState() + return payload, BlockInsertOK, nil +} + +func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error { + // the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API + eq.log.Error("cancelling old block sealing job", "payload", eq.buildingID) + _, err := eq.engine.GetPayload(ctx, eq.buildingID) + if err != nil { + eq.log.Error("failed to cancel block building job", "payload", eq.buildingID, "err", err) + if !force { + return err + } + } + eq.resetBuildingState() return nil } +func (eq *EngineQueue) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) { + return eq.buildingOnto, eq.buildingID, eq.buildingSafe +} + +func (eq *EngineQueue) resetBuildingState() { + eq.buildingID = eth.PayloadID{} + eq.buildingOnto = eth.L2BlockRef{} + eq.buildingSafe = false +} + // ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error { @@ -517,6 +614,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System eq.unsafeHead = unsafe eq.safeHead = safe eq.finalized = finalized + eq.resetBuildingState() eq.needForkchoiceUpdate = true eq.finalityData = eq.finalityData[:0] // note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. diff --git a/op-node/rollup/derive/engine_update.go b/op-node/rollup/derive/engine_update.go index 2bbfe7ed93935..527768b09e774 100644 --- a/op-node/rollup/derive/engine_update.go +++ b/op-node/rollup/derive/engine_update.go @@ -79,19 +79,6 @@ const ( BlockInsertPayloadErr ) -// InsertHeadBlock creates, executes, and inserts the specified block as the head block. -// It first uses the given FC to start the block creation process and then after the payload is executed, -// sets the FC to the same safe and finalized hashes, but updates the head hash to the new block. -// If updateSafe is true, the head block is considered to be the safe head as well as the head. -// It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid) -func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) { - id, errTyp, err := StartPayload(ctx, eng, fc, attrs) - if err != nil { - return nil, errTyp, err - } - return ConfirmPayload(ctx, log, eng, fc, id, updateSafe) -} - // StartPayload starts an execution payload building process in the provided Engine, with the given attributes. // The severity of the error is distinguished to determine whether the same payload attributes may be re-attempted later. func StartPayload(ctx context.Context, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes) (id eth.PayloadID, errType BlockInsertionErrType, err error) { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 3030185162cb0..de4c2ccd5fd55 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -31,6 +31,8 @@ type ResetableStage interface { } type EngineQueueStage interface { + EngineControl + FinalizedL1() eth.L1BlockRef Finalized() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef @@ -130,6 +132,25 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef { return dp.eng.UnsafeL2Head() } +func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { + return dp.eng.StartPayload(ctx, parent, attrs, updateSafe) +} + +func (dp *DerivationPipeline) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) { + return dp.eng.ConfirmPayload(ctx) +} + +func (dp *DerivationPipeline) CancelPayload(ctx context.Context, force bool) error { + return dp.eng.CancelPayload(ctx, force) +} + +func (dp *DerivationPipeline) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) { + return dp.eng.BuildingPayload() +} + +// SetUnsafeHead changes the forkchoice state unsafe head, without changing the engine. +// +// deprecated: use the EngineControl interface instead. func (dp *DerivationPipeline) SetUnsafeHead(head eth.L2BlockRef) { dp.eng.SetUnsafeHead(head) }