Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions op-e2e/actions/interop/interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/interop"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
Expand Down Expand Up @@ -139,6 +140,11 @@ func (sa *SupervisorActor) SyncCrossSafe(t helpers.Testing, chainID types.ChainI
require.NoError(t, sa.backend.SyncCrossSafe(chainID))
}

func (sa *SupervisorActor) SyncFinalizedL1(t helpers.Testing, ref eth.BlockRef) {
sa.backend.SyncFinalizedL1(ref)
require.Equal(t, ref, sa.backend.FinalizedL1())
}

// worldToDepSet converts a set of chain configs into a dependency-set for the supervisor.
func worldToDepSet(t helpers.Testing, worldOutput *interopgen.WorldOutput) *depset.StaticConfigDependencySet {
depSetCfg := make(map[types.ChainID]*depset.StaticConfigDependency)
Expand Down
1 change: 1 addition & 0 deletions op-e2e/actions/interop/interop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func TestFullInterop(gt *testing.T) {
actors.L1Miner.ActL1FinalizeNext(t)
actors.ChainA.Sequencer.ActL1SafeSignal(t)
actors.ChainA.Sequencer.ActL1FinalizedSignal(t)
actors.Supervisor.SyncFinalizedL1(t, status.HeadL1)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
finalizedL2BlockID, err := actors.Supervisor.Finalized(t.Ctx(), actors.ChainA.ChainID)
require.NoError(t, err)
Expand Down
18 changes: 18 additions & 0 deletions op-e2e/interop/interop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ func TestInterop_IsolatedChains(t *testing.T) {
setupAndRun(t, config, test)
}

// TestInterop_SupervisorFinality tests that the supervisor updates its finality
// It waits for the finalized block to advance past the genesis block.
func TestInterop_SupervisorFinality(t *testing.T) {
test := func(t *testing.T, s2 SuperSystem) {
supervisor := s2.SupervisorClient()
require.Eventually(t, func() bool {
final, err := supervisor.FinalizedL1(context.Background())
require.NoError(t, err)
return final.Number > 0
// this test takes about 30 seconds, with a longer Eventually timeout for CI
}, time.Second*60, time.Second, "wait for finalized block to be greater than 0")
}
config := SuperSystemConfig{
mempoolFiltering: false,
}
setupAndRun(t, config, test)
}

// TestInterop_EmitLogs tests a simple interop scenario
// Chains A and B exist, but no messages are sent between them.
// A contract is deployed on each chain, and logs are emitted repeatedly.
Expand Down
11 changes: 4 additions & 7 deletions op-node/rollup/interop/interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type InteropBackend interface {

UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error
UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.BlockRef) error
UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.L1BlockRef) error
}

// For testing usage, the backend of the supervisor implements the interface, no need for RPC.
Expand Down Expand Up @@ -153,12 +152,10 @@ func (d *InteropDeriver) onFinalizedL1(x finality.FinalizeL1Event) {
if !d.cfg.IsInterop(x.FinalizedL1.Time) {
return
}
d.log.Debug("Signaling finalized L1 update to interop backend", "finalized", x.FinalizedL1)
ctx, cancel := context.WithTimeout(d.driverCtx, rpcTimeout)
defer cancel()
if err := d.backend.UpdateFinalizedL1(ctx, d.chainID, x.FinalizedL1); err != nil {
d.log.Warn("Failed to signal finalized L1 block to interop backend", "finalized", x.FinalizedL1, "err", err)
}
// there used to be code here which sent the finalized L1 block to the supervisor
// but the supervisor manages its own finality now
// so we don't need to do anything here besides emit the event.

// New L2 blocks may be ready to finalize now that the backend knows of new L1 finalized info.
d.emitter.Emit(engine.RequestFinalizedUpdateEvent{})
}
Expand Down
1 change: 0 additions & 1 deletion op-node/rollup/interop/interop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func TestInteropDeriver(t *testing.T) {
t.Run("finalized L1 trigger cross-L2 finality check", func(t *testing.T) {
emitter.ExpectOnce(engine.RequestFinalizedUpdateEvent{})
finalizedL1 := testutils.RandomBlockRef(rng)
interopBackend.ExpectUpdateFinalizedL1(chainID, finalizedL1, nil)
interopDeriver.OnEvent(finality.FinalizeL1Event{
FinalizedL1: finalizedL1,
})
Expand Down
18 changes: 9 additions & 9 deletions op-service/sources/supervisor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ func (cl *SupervisorClient) Finalized(ctx context.Context, chainID types.ChainID
return result, err
}

func (cl *SupervisorClient) FinalizedL1(ctx context.Context) (eth.BlockRef, error) {
var result eth.BlockRef
err := cl.client.CallContext(
ctx,
&result,
"supervisor_finalizedL1")
return result, err
}

func (cl *SupervisorClient) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (eth.BlockRef, error) {
var result eth.BlockRef
err := cl.client.CallContext(
Expand Down Expand Up @@ -144,15 +153,6 @@ func (cl *SupervisorClient) UpdateLocalSafe(ctx context.Context, chainID types.C
lastDerived)
}

func (cl *SupervisorClient) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalizedL1 eth.L1BlockRef) error {
return cl.client.CallContext(
ctx,
nil,
"supervisor_updateFinalizedL1",
chainID,
finalizedL1)
}

func (cl *SupervisorClient) Close() {
cl.client.Close()
}
4 changes: 0 additions & 4 deletions op-service/testutils/mock_interop_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ func (m *MockInteropBackend) UpdateFinalizedL1(ctx context.Context, chainID type
return *result.Get(0).(*error)
}

func (m *MockInteropBackend) ExpectUpdateFinalizedL1(chainID types.ChainID, finalized eth.L1BlockRef, err error) {
m.Mock.On("UpdateFinalizedL1", chainID, finalized).Once().Return(&err)
}

func (m *MockInteropBackend) AssertExpectations(t mock.TestingT) {
m.Mock.AssertExpectations(t)
}
12 changes: 8 additions & 4 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI
return v.ID(), nil
}

func (su *SupervisorBackend) FinalizedL1() eth.BlockRef {
return su.chainDBs.FinalizedL1()
}

func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
if err != nil {
Expand Down Expand Up @@ -492,10 +496,6 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.
return nil
}

func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
return su.chainDBs.UpdateFinalizedL1(finalized)
}

// Access to synchronous processing for tests
// ----------------------------

Expand Down Expand Up @@ -523,3 +523,7 @@ func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
}
return ch.ProcessWork()
}

func (su *SupervisorBackend) SyncFinalizedL1(ref eth.BlockRef) {
processors.MaybeUpdateFinalizedL1(context.Background(), su.logger, su.chainDBs, ref)
}
4 changes: 4 additions & 0 deletions op-supervisor/supervisor/backend/db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom types.BlockSea
return crossDB.Latest()
}

func (db *ChainsDB) FinalizedL1() eth.BlockRef {
return db.finalizedL1.Get()
}

func (db *ChainsDB) Finalized(chainID types.ChainID) (types.BlockSeal, error) {
finalizedL1 := db.finalizedL1.Get()
if finalizedL1 == (eth.L1BlockRef{}) {
Expand Down
4 changes: 4 additions & 0 deletions op-supervisor/supervisor/backend/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (m *MockBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth
return eth.BlockID{}, nil
}

func (m *MockBackend) FinalizedL1() eth.BlockRef {
return eth.BlockRef{}
}

func (m *MockBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
return eth.BlockRef{}, nil
}
Expand Down
67 changes: 60 additions & 7 deletions op-supervisor/supervisor/backend/processors/l1_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,28 @@ import (

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
)

type chainsDB interface {
RecordNewL1(ref eth.BlockRef) error
LastCommonL1() (types.BlockSeal, error)
FinalizedL1() eth.BlockRef
UpdateFinalizedL1(finalized eth.BlockRef) error
}

type L1Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error)
}

type L1Processor struct {
log log.Logger
client L1Source
clientMu sync.Mutex
running atomic.Bool
log log.Logger
client L1Source
clientMu sync.RWMutex
running atomic.Bool
finalitySub ethereum.Subscription

currentNumber uint64
tickDuration time.Duration
Expand All @@ -38,11 +43,12 @@ type L1Processor struct {

func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor {
ctx, cancel := context.WithCancel(context.Background())
tickDuration := 6 * time.Second
return &L1Processor{
client: client,
db: cdb,
log: log.New("service", "l1-processor"),
tickDuration: 6 * time.Second,
tickDuration: tickDuration,
ctx: ctx,
cancel: cancel,
}
Expand All @@ -51,7 +57,20 @@ func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor
func (p *L1Processor) AttachClient(client L1Source) {
p.clientMu.Lock()
defer p.clientMu.Unlock()
// unsubscribe from the old client
if p.finalitySub != nil {
p.finalitySub.Unsubscribe()
}
// make the new client the active one
p.client = client
// resubscribe to the new client
p.finalitySub = eth.PollBlockChanges(
p.log,
p.client,
p.handleFinalized,
eth.Finalized,
3*time.Second,
10*time.Second)
}

func (p *L1Processor) Start() {
Expand All @@ -68,6 +87,13 @@ func (p *L1Processor) Start() {
}
p.wg.Add(1)
go p.worker()
p.finalitySub = eth.PollBlockChanges(
p.log,
p.client,
p.handleFinalized,
eth.Finalized,
p.tickDuration,
p.tickDuration)
}

func (p *L1Processor) Stop() {
Expand Down Expand Up @@ -103,8 +129,8 @@ func (p *L1Processor) worker() {
// if a new block is found, it is recorded in the database and the target number is updated
// in the future it will also kick of derivation management for the sync nodes
func (p *L1Processor) work() error {
p.clientMu.Lock()
defer p.clientMu.Unlock()
p.clientMu.RLock()
defer p.clientMu.RUnlock()
nextNumber := p.currentNumber + 1
ref, err := p.client.L1BlockRefByNumber(p.ctx, nextNumber)
if err != nil {
Expand All @@ -125,3 +151,30 @@ func (p *L1Processor) work() error {
p.currentNumber = nextNumber
return nil
}

// handleFinalized is called when a new finalized block is received from the L1 chain subscription
// it updates the database with the new finalized block if it is newer than the current one
func (p *L1Processor) handleFinalized(ctx context.Context, sig eth.L1BlockRef) {
MaybeUpdateFinalizedL1(ctx, p.log, p.db, sig)
}

// MaybeUpdateFinalizedL1 updates the database with the new finalized block if it is newer than the current one
// it is defined outside of the L1Processor so tests can call it directly without having a processor
func MaybeUpdateFinalizedL1(ctx context.Context, logger log.Logger, db chainsDB, ref eth.L1BlockRef) {
// do something with the new block
logger.Debug("Received new Finalized L1 block", "block", ref)
currentFinalized := db.FinalizedL1()
if currentFinalized.Number > ref.Number {
logger.Warn("Finalized block in database is newer than subscribed finalized block", "current", currentFinalized, "new", ref)
return
}
if ref.Number > currentFinalized.Number || currentFinalized == (eth.BlockRef{}) {
// update the database with the new finalized block
if err := db.UpdateFinalizedL1(ref); err != nil {
logger.Warn("Failed to update finalized L1", "err", err)
return
}
logger.Debug("Updated finalized L1 block", "block", ref)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
)

type mockChainsDB struct {
recordNewL1Fn func(ref eth.BlockRef) error
lastCommonL1Fn func() (types.BlockSeal, error)
recordNewL1Fn func(ref eth.BlockRef) error
lastCommonL1Fn func() (types.BlockSeal, error)
finalizedL1Fn func() eth.BlockRef
updateFinalizedL1Fn func(finalized eth.BlockRef) error
}

func (m *mockChainsDB) RecordNewL1(ref eth.BlockRef) error {
Expand All @@ -32,10 +34,28 @@ func (m *mockChainsDB) LastCommonL1() (types.BlockSeal, error) {
return types.BlockSeal{}, nil
}

func (m *mockChainsDB) FinalizedL1() eth.BlockRef {
if m.finalizedL1Fn != nil {
return m.finalizedL1Fn()
}
return eth.BlockRef{}
}

func (m *mockChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
if m.updateFinalizedL1Fn != nil {
return m.updateFinalizedL1Fn(finalized)
}
return nil
}

type mockL1BlockRefByNumberFetcher struct {
l1BlockByNumberFn func() (eth.L1BlockRef, error)
}

func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByLabel(context.Context, eth.BlockLabel) (eth.L1BlockRef, error) {
return eth.L1BlockRef{}, nil
}

func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) {
if m.l1BlockByNumberFn != nil {
return m.l1BlockByNumberFn()
Expand Down Expand Up @@ -103,5 +123,37 @@ func TestL1Processor(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)

})

t.Run("Updates L1 Finalized", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef {
return eth.BlockRef{Number: 0}
}
proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error {
require.Equal(t, uint64(10), finalized.Number)
return nil
}
proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10})
})
t.Run("No L1 Finalized Update for Same Number", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef {
return eth.BlockRef{Number: 10}
}
proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error {
require.Fail(t, "should not be called")
return nil
}
proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10})
})
t.Run("No L1 Finalized Update When Behind", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef {
return eth.BlockRef{Number: 20}
}
proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error {
require.Fail(t, "should not be called")
return nil
}
proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10})
})
}
Loading