diff --git a/op-e2e/actions/l1_replica.go b/op-e2e/actions/l1_replica.go index 5a68543f8922b..ac74d3f10da8d 100644 --- a/op-e2e/actions/l1_replica.go +++ b/op-e2e/actions/l1_replica.go @@ -177,8 +177,16 @@ func (s *L1Replica) L1Client(t Testing, cfg *rollup.Config) *sources.L1Client { // ActL1FinalizeNext finalizes the next block, which must be marked as safe before doing so (see ActL1SafeNext). func (s *L1Replica) ActL1FinalizeNext(t Testing) { safe := s.l1Chain.CurrentSafeBlock() - finalizedNum := s.l1Chain.CurrentFinalizedBlock().NumberU64() - if safe.NumberU64() <= finalizedNum { + safeNum := uint64(0) + if safe != nil { + safeNum = safe.NumberU64() + } + finalized := s.l1Chain.CurrentFinalizedBlock() + finalizedNum := uint64(0) + if finalized != nil { + finalizedNum = finalized.NumberU64() + } + if safeNum <= finalizedNum { t.InvalidAction("need to move forward safe block before moving finalized block") return } @@ -192,7 +200,11 @@ func (s *L1Replica) ActL1FinalizeNext(t Testing) { // ActL1SafeNext marks the next unsafe block as safe. func (s *L1Replica) ActL1SafeNext(t Testing) { safe := s.l1Chain.CurrentSafeBlock() - next := s.l1Chain.GetBlockByNumber(safe.NumberU64() + 1) + safeNum := uint64(0) + if safe != nil { + safeNum = safe.NumberU64() + } + next := s.l1Chain.GetBlockByNumber(safeNum + 1) if next == nil { t.InvalidAction("if head of chain is marked as safe then there's no next block") return diff --git a/op-e2e/actions/l2_batcher.go b/op-e2e/actions/l2_batcher.go index f9d6da4ab7ed1..09487690993e8 100644 --- a/op-e2e/actions/l2_batcher.go +++ b/op-e2e/actions/l2_batcher.go @@ -7,15 +7,16 @@ import ( "io" "math/big" - "github.com/ethereum-optimism/optimism/op-node/eth" - "github.com/ethereum-optimism/optimism/op-node/rollup" - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" ) type SyncStatusAPI interface { @@ -149,8 +150,8 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing) { data.WriteByte(derive.DerivationVersion0) // subtract one, to account for the version byte if err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { + s.l2ChannelOut = nil s.l2Submitting = false - // there may still be some data to submit } else if err != nil { s.l2Submitting = false t.Fatalf("failed to output channel data to frame: %v", err) diff --git a/op-e2e/actions/l2_batcher_test.go b/op-e2e/actions/l2_batcher_test.go index 114057e170831..c1fef6127c2d8 100644 --- a/op-e2e/actions/l2_batcher_test.go +++ b/op-e2e/actions/l2_batcher_test.go @@ -4,12 +4,14 @@ import ( "math/big" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/stretchr/testify/require" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/testlog" ) @@ -87,3 +89,108 @@ func TestBatcher(gt *testing.T) { require.False(t, isPending) require.NotNil(t, vTx) } + +func TestL2Finalization(gt *testing.T) { + t := NewDefaultTesting(gt) + dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams) + sd := e2eutils.Setup(t, dp, defaultAlloc) + log := testlog.Logger(t, log.LvlDebug) + miner, engine, sequencer := setupSequencerTest(t, sd, log) + + sequencer.ActL2PipelineFull(t) + + // build an empty L1 block (#1), mark it as justified + miner.ActEmptyBlock(t) + miner.ActL1SafeNext(t) // #0 -> #1 + + // sequencer builds L2 chain, up to and including a block that has the new L1 block as origin + sequencer.ActL1HeadSignal(t) + sequencer.ActBuildToL1Head(t) + + sequencer.ActL2PipelineFull(t) + sequencer.ActL1SafeSignal(t) + require.Equal(t, uint64(1), sequencer.SyncStatus().SafeL1.Number) + + // build another L1 block (#2), mark it as justified. And mark previous justified as finalized. + miner.ActEmptyBlock(t) + miner.ActL1SafeNext(t) // #1 -> #2 + miner.ActL1FinalizeNext(t) // #0 -> #1 + sequencer.ActL1HeadSignal(t) + sequencer.ActBuildToL1Head(t) + + // continue to build L2 chain referencing the new L1 blocks + sequencer.ActL2PipelineFull(t) + sequencer.ActL1FinalizedSignal(t) + sequencer.ActL1SafeSignal(t) + require.Equal(t, uint64(2), sequencer.SyncStatus().SafeL1.Number) + require.Equal(t, uint64(1), sequencer.SyncStatus().FinalizedL1.Number) + require.Equal(t, uint64(0), sequencer.SyncStatus().FinalizedL2.Number, "L2 block has to be included on L1 before it can be finalized") + + batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ + MinL1TxSize: 0, + MaxL1TxSize: 128_000, + BatcherKey: dp.Secrets.Batcher, + }, sequencer.RollupClient(), miner.EthClient(), engine.EthClient()) + + heightToSubmit := sequencer.SyncStatus().UnsafeL2.Number + + batcher.ActSubmitAll(t) + // confirm batch on L1, block #3 + miner.ActL1StartBlock(12)(t) + miner.ActL1IncludeTx(dp.Addresses.Batcher)(t) + miner.ActL1EndBlock(t) + + // read the batch + sequencer.ActL2PipelineFull(t) + require.Equal(t, uint64(0), sequencer.SyncStatus().FinalizedL2.Number, "Batch must be included in finalized part of L1 chain for L2 block to finalize") + + // build some more L2 blocks, so there is an unsafe part again that hasn't been submitted yet + sequencer.ActL1HeadSignal(t) + sequencer.ActBuildToL1Head(t) + + // submit those blocks too, block #4 + batcher.ActSubmitAll(t) + miner.ActL1StartBlock(12)(t) + miner.ActL1IncludeTx(dp.Addresses.Batcher)(t) + miner.ActL1EndBlock(t) + + // add some more L1 blocks #5, #6 + miner.ActEmptyBlock(t) + miner.ActEmptyBlock(t) + + // and more unsafe L2 blocks + sequencer.ActL1HeadSignal(t) + sequencer.ActBuildToL1Head(t) + + // move safe/finalize markers: finalize the L1 chain block with the first batch, but not the second + miner.ActL1SafeNext(t) // #2 -> #3 + miner.ActL1SafeNext(t) // #3 -> #4 + miner.ActL1FinalizeNext(t) // #1 -> #2 + miner.ActL1FinalizeNext(t) // #2 -> #3 + + sequencer.ActL2PipelineFull(t) + sequencer.ActL1FinalizedSignal(t) + sequencer.ActL1SafeSignal(t) + sequencer.ActL1HeadSignal(t) + require.Equal(t, uint64(6), sequencer.SyncStatus().HeadL1.Number) + require.Equal(t, uint64(4), sequencer.SyncStatus().SafeL1.Number) + require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number) + require.Equal(t, heightToSubmit, sequencer.SyncStatus().FinalizedL2.Number, "finalized L2 blocks in first batch") + + // need to act with the engine on the signals still + sequencer.ActL2PipelineFull(t) + + engCl := engine.EngineClient(t, sd.RollupCfg) + engBlock, err := engCl.L2BlockRefByLabel(t.Ctx(), eth.Finalized) + require.NoError(t, err) + require.Equal(t, heightToSubmit, engBlock.Number, "engine finalizes what rollup node finalizes") + + // Now try to finalize block 4, but with a bad/malicious alternative hash. + // If we get this false signal, we shouldn't finalize the L2 chain. + altBlock4 := sequencer.SyncStatus().SafeL1 + altBlock4.Hash = common.HexToHash("0xdead") + sequencer.derivation.Finalize(altBlock4) + sequencer.ActL2PipelineFull(t) + require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number) + require.Equal(t, heightToSubmit, sequencer.SyncStatus().FinalizedL2.Number, "unknown/bad finalized L1 blocks are ignored") +} diff --git a/op-e2e/actions/l2_engine.go b/op-e2e/actions/l2_engine.go index c2757e9dffde1..c91499ddfbf94 100644 --- a/op-e2e/actions/l2_engine.go +++ b/op-e2e/actions/l2_engine.go @@ -175,7 +175,7 @@ func (e *L2Engine) ActL2IncludeTx(from common.Address) Action { e.l2GasPool, e.l2BuildingState, e.l2BuildingHeader, tx, &e.l2BuildingHeader.GasUsed, *e.l2Chain.GetVMConfig()) if err != nil { e.l2TxFailed = append(e.l2TxFailed, tx) - t.Fatalf("failed to apply transaction to L1 block (tx %d): %v", len(e.l2Transactions), err) + t.Fatalf("failed to apply transaction to L2 block (tx %d): %v", len(e.l2Transactions), err) } e.l2Receipts = append(e.l2Receipts, receipt) e.l2Transactions = append(e.l2Transactions, tx) diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index d6871cd6a56a3..de741bf74d404 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -118,13 +118,14 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { func (s *L2Verifier) SyncStatus() *eth.SyncStatus { return ð.SyncStatus{ - CurrentL1: s.derivation.Origin(), - HeadL1: s.l1State.L1Head(), - SafeL1: s.l1State.L1Safe(), - FinalizedL1: s.l1State.L1Finalized(), - UnsafeL2: s.L2Unsafe(), - SafeL2: s.L2Safe(), - FinalizedL2: s.L2Finalized(), + CurrentL1: s.derivation.Origin(), + CurrentL1Finalized: s.derivation.FinalizedL1(), + HeadL1: s.l1State.L1Head(), + SafeL1: s.l1State.L1Safe(), + FinalizedL1: s.l1State.L1Finalized(), + UnsafeL2: s.L2Unsafe(), + SafeL2: s.L2Safe(), + FinalizedL2: s.L2Finalized(), } } @@ -160,15 +161,16 @@ func (s *L2Verifier) ActL1HeadSignal(t Testing) { } func (s *L2Verifier) ActL1SafeSignal(t Testing) { - head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe) + safe, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe) require.NoError(t, err) - s.l1State.HandleNewL1SafeBlock(head) + s.l1State.HandleNewL1SafeBlock(safe) } func (s *L2Verifier) ActL1FinalizedSignal(t Testing) { - head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized) + finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized) require.NoError(t, err) - s.l1State.HandleNewL1FinalizedBlock(head) + s.l1State.HandleNewL1FinalizedBlock(finalized) + s.derivation.Finalize(finalized) } // ActL2PipelineStep runs one iteration of the L2 derivation pipeline diff --git a/op-node/eth/sync_status.go b/op-node/eth/sync_status.go index d9a2c124c5e6d..cb180ae2572b5 100644 --- a/op-node/eth/sync_status.go +++ b/op-node/eth/sync_status.go @@ -3,10 +3,18 @@ package eth // SyncStatus is a snapshot of the driver. // Values may be zeroed if not yet initialized. type SyncStatus struct { - // CurrentL1 is the block that the derivation process is currently at, - // this may not be fully derived into L2 data yet. + // CurrentL1 is the L1 block that the derivation process is currently at in the inner-most stage. + // This may not be fully derived into L2 data yet. + // The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block. // If the node is synced, this matches the HeadL1, minus the verifier confirmation distance. CurrentL1 L1BlockRef `json:"current_l1"` + // CurrentL1Finalized is the L1 block that the derivation process is currently accepting as finalized + // in the inner-most stage, + // This may not be fully derived into L2 data yet. + // The finalized L2 blocks were produced/included fully from the L1 chain up to and including this L1 block. + // This may lag behind the FinalizedL1 when the FinalizedL1 could not yet be verified + // to be canonical w.r.t. the currently derived L2 chain. It may be zeroed if no block could be verified yet. + CurrentL1Finalized L1BlockRef `json:"current_l1_finalized"` // HeadL1 is the perceived head of the L1 chain, no confirmation distance. // The head is not guaranteed to build on the other L1 sync status fields, // as the node may be in progress of resetting to adapt to a L1 reorg. diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 212976a1b8277..1446839ada433 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -72,7 +72,7 @@ type EngineQueue struct { // This update may repeat if the engine returns a temporary error. needForkchoiceUpdate bool - finalizedL1 eth.BlockID + finalizedL1 eth.L1BlockRef safeAttributes []*eth.PayloadAttributes unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps @@ -83,7 +83,7 @@ type EngineQueue struct { engine Engine prev NextAttributesProvider - origin eth.L1BlockRef // only used for pipeline resets + origin eth.L1BlockRef // updated on resets, and whenever we read from the previous stage. metrics Metrics l1Fetcher L1Fetcher @@ -106,6 +106,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M } } +// Origin identifies the L1 chain (incl.) that included and/or produced all the safe L2 blocks. func (eq *EngineQueue) Origin() eth.L1BlockRef { return eq.origin } @@ -134,9 +135,28 @@ func (eq *EngineQueue) AddSafeAttributes(attributes *eth.PayloadAttributes) { eq.safeAttributes = append(eq.safeAttributes, attributes) } -func (eq *EngineQueue) Finalize(l1Origin eth.BlockID) { - eq.finalizedL1 = l1Origin - eq.tryFinalizeL2() +func (eq *EngineQueue) Finalize(l1Origin eth.L1BlockRef) { + if l1Origin.Number < eq.finalizedL1.Number { + eq.log.Error("ignoring old L1 finalized block signal! Is the L1 provider corrupted?", "prev_finalized_l1", eq.finalizedL1, "signaled_finalized_l1", l1Origin) + return + } + // Perform a safety check: the L1 finalization signal is only accepted if we previously processed the L1 block. + // This prevents a corrupt L1 provider from tricking us in recognizing a L1 block inconsistent with the L1 chain we are on. + // Missing a finality signal due to empty buffer is fine, it will finalize when the buffer is filled again. + for _, fd := range eq.finalityData { + if fd.L1Block == l1Origin.ID() { + eq.finalizedL1 = l1Origin + eq.tryFinalizeL2() + return + } + } + eq.log.Warn("ignoring finalization signal for unknown L1 block, waiting for new L1 blocks in buffer", "prev_finalized_l1", eq.finalizedL1, "signaled_finalized_l1", l1Origin) +} + +// FinalizedL1 identifies the L1 chain (incl.) that included and/or produced all the finalized L2 blocks. +// This may return a zeroed ID if no finalization signals have been seen yet. +func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef { + return eq.finalizedL1 } func (eq *EngineQueue) Finalized() eth.L2BlockRef { @@ -167,6 +187,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error { } outOfData := false if len(eq.safeAttributes) == 0 { + eq.origin = eq.prev.Origin() if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF { outOfData = true } else if err != nil { @@ -191,7 +212,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error { // and then marks the latest fully derived L2 block from this as finalized, // or defaults to the current finalized L2 block. func (eq *EngineQueue) tryFinalizeL2() { - if eq.finalizedL1 == (eth.BlockID{}) { + if eq.finalizedL1 == (eth.L1BlockRef{}) { return // if no L1 information is finalized yet, then skip this } // default to keep the same finalized block @@ -200,6 +221,7 @@ func (eq *EngineQueue) tryFinalizeL2() { for _, fd := range eq.finalityData { if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number { finalizedL2 = fd.L2Block + eq.needForkchoiceUpdate = true } } eq.finalized = finalizedL2 @@ -214,11 +236,11 @@ func (eq *EngineQueue) postProcessSafeL2() { eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) } // remember the last L2 block that we fully derived from the given finality data - if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number { + if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number { // append entry for new L1 block eq.finalityData = append(eq.finalityData, FinalityData{ L2Block: eq.safeHead, - L1Block: eq.prev.Origin().ID(), + L1Block: eq.origin.ID(), }) } else { // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry @@ -233,7 +255,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { "l2_safe", eq.safeHead, "l2_unsafe", eq.unsafeHead, "l2_time", eq.unsafeHead.Time, - "l1_derived", eq.prev.Origin(), + "l1_derived", eq.origin, ) } diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index bc09bad092b4d..e6b22f43560ed 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -250,7 +250,7 @@ func TestEngineQueue_Finalize(t *testing.T) { eq.postProcessSafeL2() // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) - eq.Finalize(refD.ID()) + eq.Finalize(refD) require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized") diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 18c12702aa2cb..2fc60838701ae 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -5,9 +5,10 @@ import ( "fmt" "io" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" - "github.com/ethereum/go-ethereum/log" ) type Metrics interface { @@ -30,13 +31,14 @@ type ResetableStage interface { } type EngineQueueStage interface { + FinalizedL1() eth.L1BlockRef Finalized() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef Origin() eth.L1BlockRef SetUnsafeHead(head eth.L2BlockRef) - Finalize(l1Origin eth.BlockID) + Finalize(l1Origin eth.L1BlockRef) AddSafeAttributes(attributes *eth.PayloadAttributes) AddUnsafePayload(payload *eth.ExecutionPayload) Step(context.Context) error @@ -97,14 +99,22 @@ func (dp *DerivationPipeline) Reset() { dp.resetting = 0 } +// Origin is the L1 block of the inner-most stage of the derivation pipeline, +// i.e. the L1 chain up to and including this point included and/or produced all the safe L2 blocks. func (dp *DerivationPipeline) Origin() eth.L1BlockRef { return dp.eng.Origin() } -func (dp *DerivationPipeline) Finalize(l1Origin eth.BlockID) { +func (dp *DerivationPipeline) Finalize(l1Origin eth.L1BlockRef) { dp.eng.Finalize(l1Origin) } +// FinalizedL1 is the L1 finalization of the inner-most stage of the derivation pipeline, +// i.e. the L1 chain up to and including this point included and/or produced all the finalized L2 blocks. +func (dp *DerivationPipeline) FinalizedL1() eth.L1BlockRef { + return dp.eng.FinalizedL1() +} + func (dp *DerivationPipeline) Finalized() eth.L2BlockRef { return dp.eng.Finalized() } diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 8fac03ffb7eaa..d9cda4ba58d9c 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -39,6 +39,7 @@ type L2Chain interface { derive.Engine L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error) L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error) + L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error) } type DerivationPipeline interface { @@ -46,7 +47,8 @@ type DerivationPipeline interface { Step(ctx context.Context) error SetUnsafeHead(head eth.L2BlockRef) AddUnsafePayload(payload *eth.ExecutionPayload) - Finalize(ref eth.BlockID) + Finalize(ref eth.L1BlockRef) + FinalizedL1() eth.L1BlockRef Finalized() eth.L2BlockRef SafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef @@ -92,7 +94,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne l1State: l1State, derivation: derivationPipeline, idleDerivation: false, - syncStatusReq: make(chan chan eth.SyncStatus, 10), + stateReq: make(chan chan struct{}), forceReset: make(chan chan struct{}, 10), config: cfg, driverConfig: driverCfg, diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index ed46fe02fc77e..36325c675b12c 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -30,8 +30,8 @@ type Driver struct { // When the derivation pipeline is waiting for new data to do anything idleDerivation bool - // Requests for sync status. Synchronized with event loop to avoid reading an inconsistent sync status. - syncStatusReq chan chan eth.SyncStatus + // Requests to block the event loop for synchronous execution to avoid reading an inconsistent state + stateReq chan chan struct{} // Upon receiving a channel in this channel, the derivation pipeline is forced to be reset. // It tells the caller that the reset occurred by closing the passed in channel. @@ -300,7 +300,7 @@ func (s *Driver) eventLoop() { // no step, justified L1 information does not do anything for L2 derivation or status case newL1Finalized := <-s.l1FinalizedSig: s.l1State.HandleNewL1FinalizedBlock(newL1Finalized) - s.derivation.Finalize(newL1Finalized.ID()) + s.derivation.Finalize(newL1Finalized) reqStep() // we may be able to mark more L2 data as finalized now case <-delayedStepReq: delayedStepReq = nil @@ -342,16 +342,8 @@ func (s *Driver) eventLoop() { stepAttempts = 0 reqStep() // continue with the next step if we can } - case respCh := <-s.syncStatusReq: - respCh <- eth.SyncStatus{ - CurrentL1: s.derivation.Origin(), - HeadL1: s.l1State.L1Head(), - SafeL1: s.l1State.L1Safe(), - FinalizedL1: s.l1State.L1Finalized(), - UnsafeL2: s.derivation.UnsafeL2Head(), - SafeL2: s.derivation.SafeL2Head(), - FinalizedL2: s.derivation.Finalized(), - } + case respCh := <-s.stateReq: + respCh <- struct{}{} case respCh := <-s.forceReset: s.log.Warn("Derivation pipeline is manually reset") s.derivation.Reset() @@ -381,18 +373,48 @@ func (s *Driver) ResetDerivationPipeline(ctx context.Context) error { } } +// syncStatus returns the current sync status, and should only be called synchronously with +// the driver event loop to avoid retrieval of an inconsistent status. +func (s *Driver) syncStatus() *eth.SyncStatus { + return ð.SyncStatus{ + CurrentL1: s.derivation.Origin(), + CurrentL1Finalized: s.derivation.FinalizedL1(), + HeadL1: s.l1State.L1Head(), + SafeL1: s.l1State.L1Safe(), + FinalizedL1: s.l1State.L1Finalized(), + UnsafeL2: s.derivation.UnsafeL2Head(), + SafeL2: s.derivation.SafeL2Head(), + FinalizedL2: s.derivation.Finalized(), + } +} + +// SyncStatus blocks the driver event loop and captures the syncing status. +// If the event loop is too busy and the context expires, a context error is returned. func (s *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { - respCh := make(chan eth.SyncStatus, 1) + wait := make(chan struct{}) select { + case s.stateReq <- wait: + resp := s.syncStatus() + <-wait + return resp, nil case <-ctx.Done(): return nil, ctx.Err() - case s.syncStatusReq <- respCh: - select { - case <-ctx.Done(): - return nil, ctx.Err() - case resp := <-respCh: - return &resp, nil - } + } +} + +// BlockRefWithStatus blocks the driver event loop and captures the syncing status, +// along with an L2 block reference by number consistent with that same status. +// If the event loop is too busy and the context expires, a context error is returned. +func (s *Driver) BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2BlockRef, *eth.SyncStatus, error) { + wait := make(chan struct{}) + select { + case s.stateReq <- wait: + resp := s.syncStatus() + ref, err := s.l2.L2BlockRefByNumber(ctx, num) + <-wait + return ref, resp, err + case <-ctx.Done(): + return eth.L2BlockRef{}, nil, ctx.Err() } }