Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 110 additions & 12 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ type Engine interface {
SystemConfigL2Fetcher
}

// EngineState provides a read-only interface of the forkchoice state properties of the L2 Engine.
type EngineState interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
}

// EngineControl enables other components to build blocks with the Engine,
// while keeping the forkchoice state and payload-id management internal to
// avoid state inconsistencies between different users of the EngineControl.
type EngineControl interface {
EngineState

// StartPayload requests the engine to start building a block with the given attributes.
// If updateSafe, the resulting block will be marked as a safe block.
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error)
// ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned.
ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error)
// CancelPayload requests the engine to stop building the current block without making it canonical.
// This is optional, as the engine expires building jobs that are left uncompleted, but can still save resources.
CancelPayload(ctx context.Context, force bool) error
// BuildingPayload indicates if a payload is being built, and onto which block it is being built, and whether or not it is a safe payload.
BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool)
}

// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024

Expand Down Expand Up @@ -68,6 +93,10 @@ type EngineQueue struct {
safeHead eth.L2BlockRef
unsafeHead eth.L2BlockRef

buildingOnto eth.L2BlockRef
buildingID eth.PayloadID
buildingSafe bool

// Track when the rollup node changes the forkchoice without engine action,
// e.g. on a reset after a reorg, or after consolidating a block.
// This update may repeat if the engine returns a temporary error.
Expand All @@ -91,6 +120,8 @@ type EngineQueue struct {
l1Fetcher L1Fetcher
}

var _ EngineControl = (*EngineQueue)(nil)

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue {
return &EngineQueue{
Expand Down Expand Up @@ -416,13 +447,11 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
if len(eq.safeAttributes) == 0 {
return nil
}
fc := eth.ForkchoiceState{
HeadBlockHash: eq.safeHead.Hash,
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
}
attrs := eq.safeAttributes[0]
payload, errType, err := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true)
errType, err := eq.StartPayload(ctx, eq.safeHead, attrs, true)
if err == nil {
_, errType, err = eq.ConfirmPayload(ctx)
}
if err != nil {
switch errType {
case BlockInsertTemporaryErr:
Expand Down Expand Up @@ -457,21 +486,89 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
}
}
eq.safeAttributes = eq.safeAttributes[1:]
eq.logSyncProgress("processed safe block derived from L1")

return nil
}

func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
if eq.buildingID != (eth.PayloadID{}) {
eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent)
// TODO: maybe worth it to force-cancel the old payload ID here.
}
fc := eth.ForkchoiceState{
HeadBlockHash: parent.Hash,
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
}
id, errTyp, err := StartPayload(ctx, eq.engine, fc, attrs)
if err != nil {
return errTyp, err
}
eq.buildingID = id
eq.buildingSafe = updateSafe
eq.buildingOnto = parent
return BlockInsertOK, nil
}

func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
if eq.buildingID == (eth.PayloadID{}) {
return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload")
}
if eq.buildingOnto.Hash != eq.unsafeHead.Hash { // E.g. when safe-attributes consolidation fails, it will drop the existing work.
eq.log.Warn("engine is building block that reorgs previous usafe head", "onto", eq.buildingOnto, "unsafe", eq.unsafeHead)
}
fc := eth.ForkchoiceState{
HeadBlockHash: common.Hash{}, // gets overridden
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
}
payload, errTyp, err := ConfirmPayload(ctx, eq.log, eq.engine, fc, eq.buildingID, eq.buildingSafe)
if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", eq.buildingOnto, eq.buildingID, errTyp, err)
}
ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
eq.safeHead = ref

eq.unsafeHead = ref
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.safeAttributes = eq.safeAttributes[1:]
eq.postProcessSafeL2()
eq.logSyncProgress("processed safe block derived from L1")

if eq.buildingSafe {
eq.safeHead = ref
eq.postProcessSafeL2()
eq.metrics.RecordL2Ref("l2_safe", ref)
}
eq.resetBuildingState()
return payload, BlockInsertOK, nil
}

func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error {
// the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API
eq.log.Error("cancelling old block sealing job", "payload", eq.buildingID)
_, err := eq.engine.GetPayload(ctx, eq.buildingID)
if err != nil {
eq.log.Error("failed to cancel block building job", "payload", eq.buildingID, "err", err)
if !force {
return err
}
}
eq.resetBuildingState()
return nil
}

func (eq *EngineQueue) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return eq.buildingOnto, eq.buildingID, eq.buildingSafe
}

func (eq *EngineQueue) resetBuildingState() {
eq.buildingID = eth.PayloadID{}
eq.buildingOnto = eth.L2BlockRef{}
eq.buildingSafe = false
}

// ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
Expand Down Expand Up @@ -517,6 +614,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.unsafeHead = unsafe
eq.safeHead = safe
eq.finalized = finalized
eq.resetBuildingState()
eq.needForkchoiceUpdate = true
eq.finalityData = eq.finalityData[:0]
// note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
Expand Down
13 changes: 0 additions & 13 deletions op-node/rollup/derive/engine_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,6 @@ const (
BlockInsertPayloadErr
)

// InsertHeadBlock creates, executes, and inserts the specified block as the head block.
// It first uses the given FC to start the block creation process and then after the payload is executed,
// sets the FC to the same safe and finalized hashes, but updates the head hash to the new block.
// If updateSafe is true, the head block is considered to be the safe head as well as the head.
// It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid)
func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
id, errTyp, err := StartPayload(ctx, eng, fc, attrs)
if err != nil {
return nil, errTyp, err
}
return ConfirmPayload(ctx, log, eng, fc, id, updateSafe)
}

// StartPayload starts an execution payload building process in the provided Engine, with the given attributes.
// The severity of the error is distinguished to determine whether the same payload attributes may be re-attempted later.
func StartPayload(ctx context.Context, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes) (id eth.PayloadID, errType BlockInsertionErrType, err error) {
Expand Down
21 changes: 21 additions & 0 deletions op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type ResetableStage interface {
}

type EngineQueueStage interface {
EngineControl

FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
Expand Down Expand Up @@ -130,6 +132,25 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head()
}

func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
return dp.eng.StartPayload(ctx, parent, attrs, updateSafe)
}

func (dp *DerivationPipeline) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
return dp.eng.ConfirmPayload(ctx)
}

func (dp *DerivationPipeline) CancelPayload(ctx context.Context, force bool) error {
return dp.eng.CancelPayload(ctx, force)
}

func (dp *DerivationPipeline) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return dp.eng.BuildingPayload()
}

// SetUnsafeHead changes the forkchoice state unsafe head, without changing the engine.
//
// deprecated: use the EngineControl interface instead.
func (dp *DerivationPipeline) SetUnsafeHead(head eth.L2BlockRef) {
dp.eng.SetUnsafeHead(head)
}
Expand Down