diff --git a/op-e2e/actions/interop/interop.go b/op-e2e/actions/interop/interop.go index 4ac2b0cb80ffc..800d8b5a33f40 100644 --- a/op-e2e/actions/interop/interop.go +++ b/op-e2e/actions/interop/interop.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/interop" + "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-supervisor/config" @@ -139,6 +140,11 @@ func (sa *SupervisorActor) SyncCrossSafe(t helpers.Testing, chainID types.ChainI require.NoError(t, sa.backend.SyncCrossSafe(chainID)) } +func (sa *SupervisorActor) SyncFinalizedL1(t helpers.Testing, ref eth.BlockRef) { + sa.backend.SyncFinalizedL1(ref) + require.Equal(t, ref, sa.backend.FinalizedL1()) +} + // worldToDepSet converts a set of chain configs into a dependency-set for the supervisor. func worldToDepSet(t helpers.Testing, worldOutput *interopgen.WorldOutput) *depset.StaticConfigDependencySet { depSetCfg := make(map[types.ChainID]*depset.StaticConfigDependency) diff --git a/op-e2e/actions/interop/interop_test.go b/op-e2e/actions/interop/interop_test.go index a39da702db8c5..810cd31349827 100644 --- a/op-e2e/actions/interop/interop_test.go +++ b/op-e2e/actions/interop/interop_test.go @@ -90,6 +90,7 @@ func TestFullInterop(gt *testing.T) { actors.L1Miner.ActL1FinalizeNext(t) actors.ChainA.Sequencer.ActL1SafeSignal(t) actors.ChainA.Sequencer.ActL1FinalizedSignal(t) + actors.Supervisor.SyncFinalizedL1(t, status.HeadL1) actors.ChainA.Sequencer.ActL2PipelineFull(t) finalizedL2BlockID, err := actors.Supervisor.Finalized(t.Ctx(), actors.ChainA.ChainID) require.NoError(t, err) diff --git a/op-e2e/interop/interop_test.go b/op-e2e/interop/interop_test.go index 81f09c125aa4c..1df24c2b8f31e 100644 --- a/op-e2e/interop/interop_test.go +++ b/op-e2e/interop/interop_test.go @@ -105,6 +105,24 @@ func TestInterop_IsolatedChains(t *testing.T) { setupAndRun(t, config, test) } +// TestInterop_SupervisorFinality tests that the supervisor updates its finality +// It waits for the finalized block to advance past the genesis block. +func TestInterop_SupervisorFinality(t *testing.T) { + test := func(t *testing.T, s2 SuperSystem) { + supervisor := s2.SupervisorClient() + require.Eventually(t, func() bool { + final, err := supervisor.FinalizedL1(context.Background()) + require.NoError(t, err) + return final.Number > 0 + // this test takes about 30 seconds, with a longer Eventually timeout for CI + }, time.Second*60, time.Second, "wait for finalized block to be greater than 0") + } + config := SuperSystemConfig{ + mempoolFiltering: false, + } + setupAndRun(t, config, test) +} + // TestInterop_EmitLogs tests a simple interop scenario // Chains A and B exist, but no messages are sent between them. // A contract is deployed on each chain, and logs are emitted repeatedly. diff --git a/op-node/rollup/interop/interop.go b/op-node/rollup/interop/interop.go index 94fa77a5b3092..8fd5c1b7e2e3d 100644 --- a/op-node/rollup/interop/interop.go +++ b/op-node/rollup/interop/interop.go @@ -32,7 +32,6 @@ type InteropBackend interface { UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.BlockRef) error - UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.L1BlockRef) error } // For testing usage, the backend of the supervisor implements the interface, no need for RPC. @@ -153,12 +152,10 @@ func (d *InteropDeriver) onFinalizedL1(x finality.FinalizeL1Event) { if !d.cfg.IsInterop(x.FinalizedL1.Time) { return } - d.log.Debug("Signaling finalized L1 update to interop backend", "finalized", x.FinalizedL1) - ctx, cancel := context.WithTimeout(d.driverCtx, rpcTimeout) - defer cancel() - if err := d.backend.UpdateFinalizedL1(ctx, d.chainID, x.FinalizedL1); err != nil { - d.log.Warn("Failed to signal finalized L1 block to interop backend", "finalized", x.FinalizedL1, "err", err) - } + // there used to be code here which sent the finalized L1 block to the supervisor + // but the supervisor manages its own finality now + // so we don't need to do anything here besides emit the event. + // New L2 blocks may be ready to finalize now that the backend knows of new L1 finalized info. d.emitter.Emit(engine.RequestFinalizedUpdateEvent{}) } diff --git a/op-node/rollup/interop/interop_test.go b/op-node/rollup/interop/interop_test.go index d4fb87f0dd924..f061406fd5b3e 100644 --- a/op-node/rollup/interop/interop_test.go +++ b/op-node/rollup/interop/interop_test.go @@ -158,7 +158,6 @@ func TestInteropDeriver(t *testing.T) { t.Run("finalized L1 trigger cross-L2 finality check", func(t *testing.T) { emitter.ExpectOnce(engine.RequestFinalizedUpdateEvent{}) finalizedL1 := testutils.RandomBlockRef(rng) - interopBackend.ExpectUpdateFinalizedL1(chainID, finalizedL1, nil) interopDeriver.OnEvent(finality.FinalizeL1Event{ FinalizedL1: finalizedL1, }) diff --git a/op-service/sources/supervisor_client.go b/op-service/sources/supervisor_client.go index d6191b9cfb208..9b7bc71944dd1 100644 --- a/op-service/sources/supervisor_client.go +++ b/op-service/sources/supervisor_client.go @@ -114,6 +114,15 @@ func (cl *SupervisorClient) Finalized(ctx context.Context, chainID types.ChainID return result, err } +func (cl *SupervisorClient) FinalizedL1(ctx context.Context) (eth.BlockRef, error) { + var result eth.BlockRef + err := cl.client.CallContext( + ctx, + &result, + "supervisor_finalizedL1") + return result, err +} + func (cl *SupervisorClient) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (eth.BlockRef, error) { var result eth.BlockRef err := cl.client.CallContext( @@ -144,15 +153,6 @@ func (cl *SupervisorClient) UpdateLocalSafe(ctx context.Context, chainID types.C lastDerived) } -func (cl *SupervisorClient) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalizedL1 eth.L1BlockRef) error { - return cl.client.CallContext( - ctx, - nil, - "supervisor_updateFinalizedL1", - chainID, - finalizedL1) -} - func (cl *SupervisorClient) Close() { cl.client.Close() } diff --git a/op-service/testutils/mock_interop_backend.go b/op-service/testutils/mock_interop_backend.go index af6762c204cd6..9b43d7724f9cd 100644 --- a/op-service/testutils/mock_interop_backend.go +++ b/op-service/testutils/mock_interop_backend.go @@ -94,10 +94,6 @@ func (m *MockInteropBackend) UpdateFinalizedL1(ctx context.Context, chainID type return *result.Get(0).(*error) } -func (m *MockInteropBackend) ExpectUpdateFinalizedL1(chainID types.ChainID, finalized eth.L1BlockRef, err error) { - m.Mock.On("UpdateFinalizedL1", chainID, finalized).Once().Return(&err) -} - func (m *MockInteropBackend) AssertExpectations(t mock.TestingT) { m.Mock.AssertExpectations(t) } diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index 722cabcf3eba2..ff8ed6bfa52ff 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -464,6 +464,10 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI return v.ID(), nil } +func (su *SupervisorBackend) FinalizedL1() eth.BlockRef { + return su.chainDBs.FinalizedL1() +} + func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) { v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived) if err != nil { @@ -492,10 +496,6 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types. return nil } -func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error { - return su.chainDBs.UpdateFinalizedL1(finalized) -} - // Access to synchronous processing for tests // ---------------------------- @@ -523,3 +523,7 @@ func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error { } return ch.ProcessWork() } + +func (su *SupervisorBackend) SyncFinalizedL1(ref eth.BlockRef) { + processors.MaybeUpdateFinalizedL1(context.Background(), su.logger, su.chainDBs, ref) +} diff --git a/op-supervisor/supervisor/backend/db/query.go b/op-supervisor/supervisor/backend/db/query.go index 5897ff3c9242b..5e9503c39ade5 100644 --- a/op-supervisor/supervisor/backend/db/query.go +++ b/op-supervisor/supervisor/backend/db/query.go @@ -148,6 +148,10 @@ func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom types.BlockSea return crossDB.Latest() } +func (db *ChainsDB) FinalizedL1() eth.BlockRef { + return db.finalizedL1.Get() +} + func (db *ChainsDB) Finalized(chainID types.ChainID) (types.BlockSeal, error) { finalizedL1 := db.finalizedL1.Get() if finalizedL1 == (eth.L1BlockRef{}) { diff --git a/op-supervisor/supervisor/backend/mock.go b/op-supervisor/supervisor/backend/mock.go index b40c5209d5ef2..6d36e57513613 100644 --- a/op-supervisor/supervisor/backend/mock.go +++ b/op-supervisor/supervisor/backend/mock.go @@ -62,6 +62,10 @@ func (m *MockBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth return eth.BlockID{}, nil } +func (m *MockBackend) FinalizedL1() eth.BlockRef { + return eth.BlockRef{} +} + func (m *MockBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) { return eth.BlockRef{}, nil } diff --git a/op-supervisor/supervisor/backend/processors/l1_processor.go b/op-supervisor/supervisor/backend/processors/l1_processor.go index 063acd056e0ae..4f024fb599e20 100644 --- a/op-supervisor/supervisor/backend/processors/l1_processor.go +++ b/op-supervisor/supervisor/backend/processors/l1_processor.go @@ -8,23 +8,28 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/log" ) type chainsDB interface { RecordNewL1(ref eth.BlockRef) error LastCommonL1() (types.BlockSeal, error) + FinalizedL1() eth.BlockRef + UpdateFinalizedL1(finalized eth.BlockRef) error } type L1Source interface { L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) + L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) } type L1Processor struct { - log log.Logger - client L1Source - clientMu sync.Mutex - running atomic.Bool + log log.Logger + client L1Source + clientMu sync.RWMutex + running atomic.Bool + finalitySub ethereum.Subscription currentNumber uint64 tickDuration time.Duration @@ -38,11 +43,12 @@ type L1Processor struct { func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor { ctx, cancel := context.WithCancel(context.Background()) + tickDuration := 6 * time.Second return &L1Processor{ client: client, db: cdb, log: log.New("service", "l1-processor"), - tickDuration: 6 * time.Second, + tickDuration: tickDuration, ctx: ctx, cancel: cancel, } @@ -51,7 +57,20 @@ func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor func (p *L1Processor) AttachClient(client L1Source) { p.clientMu.Lock() defer p.clientMu.Unlock() + // unsubscribe from the old client + if p.finalitySub != nil { + p.finalitySub.Unsubscribe() + } + // make the new client the active one p.client = client + // resubscribe to the new client + p.finalitySub = eth.PollBlockChanges( + p.log, + p.client, + p.handleFinalized, + eth.Finalized, + 3*time.Second, + 10*time.Second) } func (p *L1Processor) Start() { @@ -68,6 +87,13 @@ func (p *L1Processor) Start() { } p.wg.Add(1) go p.worker() + p.finalitySub = eth.PollBlockChanges( + p.log, + p.client, + p.handleFinalized, + eth.Finalized, + p.tickDuration, + p.tickDuration) } func (p *L1Processor) Stop() { @@ -103,8 +129,8 @@ func (p *L1Processor) worker() { // if a new block is found, it is recorded in the database and the target number is updated // in the future it will also kick of derivation management for the sync nodes func (p *L1Processor) work() error { - p.clientMu.Lock() - defer p.clientMu.Unlock() + p.clientMu.RLock() + defer p.clientMu.RUnlock() nextNumber := p.currentNumber + 1 ref, err := p.client.L1BlockRefByNumber(p.ctx, nextNumber) if err != nil { @@ -125,3 +151,30 @@ func (p *L1Processor) work() error { p.currentNumber = nextNumber return nil } + +// handleFinalized is called when a new finalized block is received from the L1 chain subscription +// it updates the database with the new finalized block if it is newer than the current one +func (p *L1Processor) handleFinalized(ctx context.Context, sig eth.L1BlockRef) { + MaybeUpdateFinalizedL1(ctx, p.log, p.db, sig) +} + +// MaybeUpdateFinalizedL1 updates the database with the new finalized block if it is newer than the current one +// it is defined outside of the L1Processor so tests can call it directly without having a processor +func MaybeUpdateFinalizedL1(ctx context.Context, logger log.Logger, db chainsDB, ref eth.L1BlockRef) { + // do something with the new block + logger.Debug("Received new Finalized L1 block", "block", ref) + currentFinalized := db.FinalizedL1() + if currentFinalized.Number > ref.Number { + logger.Warn("Finalized block in database is newer than subscribed finalized block", "current", currentFinalized, "new", ref) + return + } + if ref.Number > currentFinalized.Number || currentFinalized == (eth.BlockRef{}) { + // update the database with the new finalized block + if err := db.UpdateFinalizedL1(ref); err != nil { + logger.Warn("Failed to update finalized L1", "err", err) + return + } + logger.Debug("Updated finalized L1 block", "block", ref) + } + +} diff --git a/op-supervisor/supervisor/backend/processors/l1_processor_test.go b/op-supervisor/supervisor/backend/processors/l1_processor_test.go index 143ebb66b74ea..528adf183da05 100644 --- a/op-supervisor/supervisor/backend/processors/l1_processor_test.go +++ b/op-supervisor/supervisor/backend/processors/l1_processor_test.go @@ -14,8 +14,10 @@ import ( ) type mockChainsDB struct { - recordNewL1Fn func(ref eth.BlockRef) error - lastCommonL1Fn func() (types.BlockSeal, error) + recordNewL1Fn func(ref eth.BlockRef) error + lastCommonL1Fn func() (types.BlockSeal, error) + finalizedL1Fn func() eth.BlockRef + updateFinalizedL1Fn func(finalized eth.BlockRef) error } func (m *mockChainsDB) RecordNewL1(ref eth.BlockRef) error { @@ -32,10 +34,28 @@ func (m *mockChainsDB) LastCommonL1() (types.BlockSeal, error) { return types.BlockSeal{}, nil } +func (m *mockChainsDB) FinalizedL1() eth.BlockRef { + if m.finalizedL1Fn != nil { + return m.finalizedL1Fn() + } + return eth.BlockRef{} +} + +func (m *mockChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error { + if m.updateFinalizedL1Fn != nil { + return m.updateFinalizedL1Fn(finalized) + } + return nil +} + type mockL1BlockRefByNumberFetcher struct { l1BlockByNumberFn func() (eth.L1BlockRef, error) } +func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByLabel(context.Context, eth.BlockLabel) (eth.L1BlockRef, error) { + return eth.L1BlockRef{}, nil +} + func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) { if m.l1BlockByNumberFn != nil { return m.l1BlockByNumberFn() @@ -103,5 +123,37 @@ func TestL1Processor(t *testing.T) { }, 10*time.Second, 100*time.Millisecond) }) - + t.Run("Updates L1 Finalized", func(t *testing.T) { + proc := processorForTesting() + proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef { + return eth.BlockRef{Number: 0} + } + proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error { + require.Equal(t, uint64(10), finalized.Number) + return nil + } + proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10}) + }) + t.Run("No L1 Finalized Update for Same Number", func(t *testing.T) { + proc := processorForTesting() + proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef { + return eth.BlockRef{Number: 10} + } + proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error { + require.Fail(t, "should not be called") + return nil + } + proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10}) + }) + t.Run("No L1 Finalized Update When Behind", func(t *testing.T) { + proc := processorForTesting() + proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef { + return eth.BlockRef{Number: 20} + } + proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error { + require.Fail(t, "should not be called") + return nil + } + proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10}) + }) } diff --git a/op-supervisor/supervisor/frontend/frontend.go b/op-supervisor/supervisor/frontend/frontend.go index 6a43e1fedb0f0..b46fecae88c88 100644 --- a/op-supervisor/supervisor/frontend/frontend.go +++ b/op-supervisor/supervisor/frontend/frontend.go @@ -21,12 +21,12 @@ type QueryBackend interface { UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) + FinalizedL1() eth.BlockRef } type UpdatesBackend interface { UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error - UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error } type Backend interface { @@ -67,6 +67,10 @@ func (q *QueryFrontend) Finalized(ctx context.Context, chainID types.ChainID) (e return q.Supervisor.Finalized(ctx, chainID) } +func (q *QueryFrontend) FinalizedL1() eth.BlockRef { + return q.Supervisor.FinalizedL1() +} + func (q *QueryFrontend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) { return q.Supervisor.CrossDerivedFrom(ctx, chainID, derived) } @@ -105,7 +109,3 @@ func (u *UpdatesFrontend) UpdateLocalUnsafe(ctx context.Context, chainID types.C func (u *UpdatesFrontend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error { return u.Supervisor.UpdateLocalSafe(ctx, chainID, derivedFrom, lastDerived) } - -func (u *UpdatesFrontend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error { - return u.Supervisor.UpdateFinalizedL1(ctx, chainID, finalized) -}