diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 3c360f05a3736..a33b9563c4853 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -7,6 +7,7 @@ import ( "math/big" "os" "path" + "sort" "strings" "testing" "time" @@ -119,14 +120,6 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { JWTFilePath: writeDefaultJWT(t), JWTSecret: testingJWTSecret, Nodes: map[string]*rollupNode.Config{ - "verifier": { - Driver: driver.Config{ - VerifierConfDepth: 0, - SequencerConfDepth: 0, - SequencerEnabled: false, - }, - L1EpochPollInterval: time.Second * 4, - }, "sequencer": { Driver: driver.Config{ VerifierConfDepth: 0, @@ -141,6 +134,14 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { }, L1EpochPollInterval: time.Second * 4, }, + "verifier": { + Driver: driver.Config{ + VerifierConfDepth: 0, + SequencerConfDepth: 0, + SequencerEnabled: false, + }, + L1EpochPollInterval: time.Second * 4, + }, }, Loggers: map[string]log.Logger{ "verifier": testlog.Logger(t, log.LvlInfo).New("role", "verifier"), @@ -225,7 +226,43 @@ func (sys *System) Close() { sys.Mocknet.Close() } -func (cfg SystemConfig) Start() (*System, error) { +type systemConfigHook func(sCfg *SystemConfig, s *System) + +type SystemConfigOption struct { + key string + role string + action systemConfigHook +} + +type SystemConfigOptions struct { + opts map[string]systemConfigHook +} + +func NewSystemConfigOptions(_opts []SystemConfigOption) (SystemConfigOptions, error) { + opts := make(map[string]systemConfigHook) + for _, opt := range _opts { + if _, ok := opts[opt.key+":"+opt.role]; ok { + return SystemConfigOptions{}, fmt.Errorf("duplicate option for key %s and role %s", opt.key, opt.role) + } + opts[opt.key+":"+opt.role] = opt.action + } + + return SystemConfigOptions{ + opts: opts, + }, nil +} + +func (s *SystemConfigOptions) Get(key, role string) (systemConfigHook, bool) { + v, ok := s.opts[key+":"+role] + return v, ok +} + +func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { + opts, err := NewSystemConfigOptions(_opts) + if err != nil { + return nil, err + } + sys := &System{ cfg: cfg, Nodes: make(map[string]*node.Node), @@ -457,7 +494,17 @@ func (cfg SystemConfig) Start() (*System, error) { snapLog.SetHandler(log.DiscardHandler()) // Rollup nodes - for name, nodeConfig := range cfg.Nodes { + + // Ensure we are looping through the nodes in alphabetical order + ks := make([]string, 0, len(cfg.Nodes)) + for k := range cfg.Nodes { + ks = append(ks, k) + } + // Sort strings in ascending alphabetical order + sort.Strings(ks) + + for _, name := range ks { + nodeConfig := cfg.Nodes[name] c := *nodeConfig // copy c.Rollup = makeRollupConfig() @@ -482,6 +529,10 @@ func (cfg SystemConfig) Start() (*System, error) { return nil, err } sys.RollupNodes[name] = node + + if action, ok := opts.Get("afterRollupNodeStart", name); ok { + action(&cfg, sys) + } } if cfg.P2PTopology != nil { diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index ce6b30bccd02e..5f232d85f60d6 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -649,6 +649,90 @@ func TestSystemMockP2P(t *testing.T) { require.Contains(t, received, receiptVerif.BlockHash) } +// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that +// the nodes can sync L2 blocks before they are confirmed on L1. +// +// Test steps: +// 1. Spin up the nodes (P2P is disabled on the verifier) +// 2. Send a transaction to the sequencer. +// 3. Wait for the TX to be mined on the sequencer chain. +// 5. Wait for the verifier to detect a gap in the payload queue vs. the unsafe head +// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain. +// 7. Wait for the verifier to sync the unsafe chain into the safe chain. +// 8. Verify that the TX is included in the verifier's safe chain. +func TestSystemMockAltSync(t *testing.T) { + parallel(t) + if !verboseGethNodes { + log.Root().SetHandler(log.DiscardHandler()) + } + + cfg := DefaultSystemConfig(t) + // slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do. + // Keep the seq window small so the L2 chain is started quick + cfg.DeployConfig.L1BlockTime = 10 + + var published, received []common.Hash + seqTracer, verifTracer := new(FnTracer), new(FnTracer) + seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) { + published = append(published, payload.BlockHash) + } + verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) { + received = append(received, payload.BlockHash) + } + cfg.Nodes["sequencer"].Tracer = seqTracer + cfg.Nodes["verifier"].Tracer = verifTracer + + sys, err := cfg.Start(SystemConfigOption{ + key: "afterRollupNodeStart", + role: "sequencer", + action: func(sCfg *SystemConfig, system *System) { + rpc, _ := system.Nodes["sequencer"].Attach() // never errors + cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{ + Rpc: client.NewBaseRPCClient(rpc), + } + }, + }) + require.Nil(t, err, "Error starting up system") + defer sys.Close() + + l2Seq := sys.Clients["sequencer"] + l2Verif := sys.Clients["verifier"] + + // Transactor Account + ethPrivKey := cfg.Secrets.Alice + + // Submit a TX to L2 sequencer node + toAddr := common.Address{0xff, 0xff} + tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{ + ChainID: cfg.L2ChainIDBig(), + Nonce: 0, + To: &toAddr, + Value: big.NewInt(1_000_000_000), + GasTipCap: big.NewInt(10), + GasFeeCap: big.NewInt(200), + Gas: 21000, + }) + err = l2Seq.SendTransaction(context.Background(), tx) + require.Nil(t, err, "Sending L2 tx to sequencer") + + // Wait for tx to be mined on the L2 sequencer chain + receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second) + require.Nil(t, err, "Waiting for L2 tx on sequencer") + + // Wait for alt RPC sync to pick up the blocks on the sequencer chain + receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 12*time.Duration(sys.RollupConfig.BlockTime)*time.Second) + require.Nil(t, err, "Waiting for L2 tx on verifier") + + require.Equal(t, receiptSeq, receiptVerif) + + // Verify that the tx was received via RPC sync (P2P is disabled) + require.Contains(t, received, receiptVerif.BlockHash) + + // Verify that everything that was received was published + require.GreaterOrEqual(t, len(published), len(received)) + require.ElementsMatch(t, received, published[:len(received)]) +} + // TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node. func TestSystemDenseTopology(t *testing.T) { parallel(t) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index df0deb56be5f4..86d38ad626d88 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -185,6 +185,12 @@ var ( EnvVar: prefixEnvVar("HEARTBEAT_URL"), Value: "https://heartbeat.optimism.io", } + BackupL2UnsafeSyncRPC = cli.StringFlag{ + Name: "l2.backup-unsafe-sync-rpc", + Usage: "Set the backup L2 unsafe sync RPC endpoint.", + EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"), + Required: false, + } ) var requiredFlags = []cli.Flag{ @@ -219,6 +225,7 @@ var optionalFlags = append([]cli.Flag{ HeartbeatEnabledFlag, HeartbeatMonikerFlag, HeartbeatURLFlag, + BackupL2UnsafeSyncRPC, }, p2pFlags...) // Flags contains the list of configuration options available to the binary. diff --git a/op-node/node/client.go b/op-node/node/client.go index 7d4f7e128b1b1..d83a258fb1d29 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -19,6 +19,11 @@ type L2EndpointSetup interface { Check() error } +type L2SyncEndpointSetup interface { + Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error) + Check() error +} + type L1EndpointSetup interface { // Setup a RPC client to a L1 node to pull rollup input-data from. // The results of the RPC client may be trusted for faster processing, or strictly validated. @@ -75,6 +80,50 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client return p.Client, nil } +// L2SyncEndpointConfig contains configuration for the fallback sync endpoint +type L2SyncEndpointConfig struct { + // Address of the L2 RPC to use for backup sync + L2NodeAddr string +} + +var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil) + +func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { + l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr) + if err != nil { + return nil, err + } + + return l2Node, nil +} + +func (cfg *L2SyncEndpointConfig) Check() error { + if cfg.L2NodeAddr == "" { + return errors.New("empty L2 Node Address") + } + + return nil +} + +type L2SyncRPCConfig struct { + // RPC endpoint to use for syncing + Rpc client.RPC +} + +var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil) + +func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { + return cfg.Rpc, nil +} + +func (cfg *L2SyncRPCConfig) Check() error { + if cfg.Rpc == nil { + return errors.New("rpc cannot be nil") + } + + return nil +} + type L1EndpointConfig struct { L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required) diff --git a/op-node/node/config.go b/op-node/node/config.go index 41f2e458f1c9b..8d95f79f360af 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -13,8 +13,9 @@ import ( ) type Config struct { - L1 L1EndpointSetup - L2 L2EndpointSetup + L1 L1EndpointSetup + L2 L2EndpointSetup + L2Sync L2SyncEndpointSetup Driver driver.Config diff --git a/op-node/node/node.go b/op-node/node/node.go index ed194818d2f8f..2c2e43521f692 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -197,7 +197,28 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger return err } - n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n.log, snapshotLog, n.metrics) + var syncClient *sources.SyncClient + // If the L2 sync config is present, use it to create a sync client + if cfg.L2Sync != nil { + if err := cfg.L2Sync.Check(); err != nil { + log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err) + } else { + rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log) + if err != nil { + return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err) + } + + // The sync client's RPC is always trusted + config := sources.SyncClientDefaultConfig(&cfg.Rollup, true) + + syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config) + if err != nil { + return fmt.Errorf("failed to create sync client: %w", err) + } + } + } + + n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics) return nil } @@ -263,13 +284,21 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error { func (n *OpNode) Start(ctx context.Context) error { n.log.Info("Starting execution engine driver") + // start driving engine: sync blocks by deriving them from L1 and driving them into the engine - err := n.l2Driver.Start() - if err != nil { + if err := n.l2Driver.Start(); err != nil { n.log.Error("Could not start a rollup node", "err", err) return err } + // If the backup unsafe sync client is enabled, start its event loop + if n.l2Driver.L2SyncCl != nil { + if err := n.l2Driver.L2SyncCl.Start(); err != nil { + n.log.Error("Could not start the backup sync client", "err", err) + return err + } + } + return nil } @@ -382,6 +411,13 @@ func (n *OpNode) Close() error { if err := n.l2Driver.Close(); err != nil { result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err)) } + + // If the L2 sync client is present & running, close it. + if n.l2Driver.L2SyncCl != nil { + if err := n.l2Driver.L2SyncCl.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err)) + } + } } // close L2 engine RPC client diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 4f69d2621bad3..b3207063fefc3 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -131,8 +131,9 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M metrics: metrics, finalityData: make([]FinalityData, 0, finalityLookback), unsafePayloads: PayloadsQueue{ - MaxSize: maxUnsafePayloadsMemory, - SizeFn: payloadMemSize, + MaxSize: maxUnsafePayloadsMemory, + SizeFn: payloadMemSize, + blockNos: make(map[uint64]bool), }, prev: prev, l1Fetcher: l1Fetcher, @@ -662,3 +663,20 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System eq.logSyncProgress("reset derivation work") return io.EOF } + +// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. +// If there is no gap, the difference between end and start will be 0. +func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) { + // The start of the gap is always the unsafe head + 1 + start = eq.unsafeHead.Number + 1 + + // If the priority queue is empty, the end is the first block number at the top of the priority queue + // Otherwise, the end is the expected block number + if first := eq.unsafePayloads.Peek(); first != nil { + end = first.ID().Number + } else { + end = expectedNumber + } + + return start, end +} diff --git a/op-node/rollup/derive/payloads_queue.go b/op-node/rollup/derive/payloads_queue.go index d71ccf8c90409..10f7215562e15 100644 --- a/op-node/rollup/derive/payloads_queue.go +++ b/op-node/rollup/derive/payloads_queue.go @@ -77,6 +77,7 @@ type PayloadsQueue struct { pq payloadsByNumber currentSize uint64 MaxSize uint64 + blockNos map[uint64]bool SizeFn func(p *eth.ExecutionPayload) uint64 } @@ -99,6 +100,9 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { if p == nil { return errors.New("cannot add nil payload") } + if upq.blockNos[p.ID().Number] { + return errors.New("cannot add duplicate payload") + } size := upq.SizeFn(p) if size > upq.MaxSize { return fmt.Errorf("cannot add payload %s, payload mem size %d is larger than max queue size %d", p.ID(), size, upq.MaxSize) @@ -111,6 +115,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { for upq.currentSize > upq.MaxSize { upq.Pop() } + upq.blockNos[p.ID().Number] = true return nil } @@ -132,5 +137,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload { } ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep upq.currentSize -= ps.size + // remove the key from the blockNos map + delete(upq.blockNos, ps.payload.ID().Number) return ps.payload } diff --git a/op-node/rollup/derive/payloads_queue_test.go b/op-node/rollup/derive/payloads_queue_test.go index 49890228fe794..0b244b98f5e4f 100644 --- a/op-node/rollup/derive/payloads_queue_test.go +++ b/op-node/rollup/derive/payloads_queue_test.go @@ -75,8 +75,9 @@ func TestPayloadMemSize(t *testing.T) { func TestPayloadsQueue(t *testing.T) { pq := PayloadsQueue{ - MaxSize: payloadMemFixedCost * 3, - SizeFn: payloadMemSize, + MaxSize: payloadMemFixedCost * 3, + SizeFn: payloadMemSize, + blockNos: make(map[uint64]bool), } require.Equal(t, 0, pq.Len()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek()) @@ -85,6 +86,7 @@ func TestPayloadsQueue(t *testing.T) { a := ð.ExecutionPayload{BlockNumber: 3} b := ð.ExecutionPayload{BlockNumber: 4} c := ð.ExecutionPayload{BlockNumber: 5} + d := ð.ExecutionPayload{BlockNumber: 6} bAlt := ð.ExecutionPayload{BlockNumber: 4} require.NoError(t, pq.Push(b)) require.Equal(t, pq.Len(), 1) @@ -105,28 +107,33 @@ func TestPayloadsQueue(t *testing.T) { require.Equal(t, pq.Pop(), a) require.Equal(t, pq.Len(), 2, "expecting to pop the lowest") - require.NoError(t, pq.Push(bAlt)) - require.Equal(t, pq.Len(), 3) - require.Equal(t, pq.Peek(), b, "expecting b to be lowest, compared to bAlt and c") + require.Equal(t, pq.Peek(), b, "expecting b to be lowest, compared to c") require.Equal(t, pq.Pop(), b) - require.Equal(t, pq.Len(), 2) - require.Equal(t, pq.MemSize(), 2*payloadMemFixedCost) - - require.Equal(t, pq.Pop(), bAlt) require.Equal(t, pq.Len(), 1) - require.Equal(t, pq.Peek(), c, "expecting c to only remain") + require.Equal(t, pq.MemSize(), payloadMemFixedCost) + + require.Equal(t, pq.Pop(), c) + require.Equal(t, pq.Len(), 0, "expecting no items to remain") - d := ð.ExecutionPayload{BlockNumber: 5, Transactions: []eth.Data{make([]byte, payloadMemFixedCost*3+1)}} - require.Error(t, pq.Push(d), "cannot add payloads that are too large") + e := ð.ExecutionPayload{BlockNumber: 5, Transactions: []eth.Data{make([]byte, payloadMemFixedCost*3+1)}} + require.Error(t, pq.Push(e), "cannot add payloads that are too large") require.NoError(t, pq.Push(b)) + require.Equal(t, pq.Len(), 1, "expecting b") + require.Equal(t, pq.Peek(), b) + require.NoError(t, pq.Push(c)) require.Equal(t, pq.Len(), 2, "expecting b, c") require.Equal(t, pq.Peek(), b) require.NoError(t, pq.Push(a)) require.Equal(t, pq.Len(), 3, "expecting a, b, c") require.Equal(t, pq.Peek(), a) - require.NoError(t, pq.Push(bAlt)) - require.Equal(t, pq.Len(), 3, "expecting b, bAlt, c") + + // No duplicates allowed + require.Error(t, pq.Push(bAlt)) + + require.NoError(t, pq.Push(d)) + require.Equal(t, pq.Len(), 3) + require.Equal(t, pq.Peek(), b, "expecting b, c, d") require.NotContainsf(t, pq.pq[:], a, "a should be dropped after 3 items already exist under max size constraint") } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index da7b8ca0a8a62..33c86b52583e5 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -51,6 +51,7 @@ type EngineQueueStage interface { Finalize(l1Origin eth.L1BlockRef) AddUnsafePayload(payload *eth.ExecutionPayload) + GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) Step(context.Context) error } @@ -160,6 +161,12 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { dp.eng.AddUnsafePayload(payload) } +// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. +// If there is no gap, the start and end will be 0. +func (dp *DerivationPipeline) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) { + return dp.eng.GetUnsafeQueueGap(expectedNumber) +} + // Step tries to progress the buffer. // An EOF is returned if there pipeline is blocked by waiting for new L1 data. // If ctx errors no error is returned, but the step may exit early in a state that can still be continued. diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index fc4a4ef925fc0..508035013da95 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/sources" ) type Metrics interface { @@ -48,6 +49,7 @@ type DerivationPipeline interface { Reset() Step(ctx context.Context) error AddUnsafePayload(payload *eth.ExecutionPayload) + GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) Finalize(ref eth.L1BlockRef) FinalizedL1() eth.L1BlockRef Finalized() eth.L2BlockRef @@ -80,7 +82,7 @@ type Network interface { } // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. -func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { +func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, syncClient *sources.SyncClient, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { l1State := NewL1State(log, metrics) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) @@ -112,5 +114,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne l1SafeSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10), + L2SyncCl: syncClient, } } diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 72e939703190e..f099cb4a7bd0a 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-service/backoff" ) @@ -63,7 +64,11 @@ type Driver struct { l1SafeSig chan eth.L1BlockRef l1FinalizedSig chan eth.L1BlockRef + // Backup unsafe sync client + L2SyncCl *sources.SyncClient + // L2 Signals: + unsafeL2Payloads chan *eth.ExecutionPayload l1 L1Chain @@ -195,6 +200,12 @@ func (s *Driver) eventLoop() { sequencerTimer.Reset(delay) } + // Create a ticker to check if there is a gap in the engine queue every 15 seconds + // If there is, we send requests to the backup RPC to retrieve the missing payloads + // and add them to the unsafe queue. + altSyncTicker := time.NewTicker(15 * time.Second) + defer altSyncTicker.Stop() + for { // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // This may adjust at any time based on fork-choice changes or previous errors. @@ -223,6 +234,12 @@ func (s *Driver) eventLoop() { } } planSequencerAction() // schedule the next sequencer action to keep the sequencing looping + case <-altSyncTicker.C: + // Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch + // missing payloads from the backup RPC (if it is configured). + if s.L2SyncCl != nil { + s.checkForGapInUnsafeQueue(ctx) + } case payload := <-s.unsafeL2Payloads: s.snapshot("New unsafe payload") s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) @@ -442,3 +459,36 @@ type hashAndErrorChannel struct { hash common.Hash err chan error } + +// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC. +// WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides +// emitting warning logs) if the requests fail. +func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) { + // subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that + // difference by the block time to get the expected L2 block number at the current time. If the + // unsafe head does not have this block number, then there is a gap in the queue. + wallClock := uint64(time.Now().Unix()) + genesisTimestamp := s.config.Genesis.L2Time + wallClockGenesisDiff := wallClock - genesisTimestamp + expectedL2Block := wallClockGenesisDiff / s.config.BlockTime + + start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block) + size := end - start + + // Check if there is a gap between the unsafe head and the expected L2 block number at the current time. + if size > 0 { + s.log.Warn("Gap in payload queue tip and expected unsafe chain detected", "start", start, "end", end, "size", size) + s.log.Info("Attempting to fetch missing payloads from backup RPC", "start", start, "end", end, "size", size) + + // Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently. + // Concurrent requests are safe here due to the engine queue being a priority queue. + for blockNumber := start; blockNumber <= end; blockNumber++ { + select { + case s.L2SyncCl.FetchUnsafeBlock <- blockNumber: + // Do nothing- the block number was successfully sent into the channel + default: + return // If the channel is full, return and wait for the next iteration of the event loop + } + } + } +} diff --git a/op-node/service.go b/op-node/service.go index 39c3a17ab7d90..48848babc76b5 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -36,10 +36,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { return nil, err } - driverConfig, err := NewDriverConfig(ctx) - if err != nil { - return nil, err - } + driverConfig := NewDriverConfig(ctx) p2pSignerSetup, err := p2pcli.LoadSignerSetup(ctx) if err != nil { @@ -51,19 +48,19 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { return nil, fmt.Errorf("failed to load p2p config: %w", err) } - l1Endpoint, err := NewL1EndpointConfig(ctx) - if err != nil { - return nil, fmt.Errorf("failed to load l1 endpoint info: %w", err) - } + l1Endpoint := NewL1EndpointConfig(ctx) l2Endpoint, err := NewL2EndpointConfig(ctx, log) if err != nil { return nil, fmt.Errorf("failed to load l2 endpoints info: %w", err) } + l2SyncEndpoint := NewL2SyncEndpointConfig(ctx) + cfg := &node.Config{ L1: l1Endpoint, L2: l2Endpoint, + L2Sync: l2SyncEndpoint, Rollup: *rollupConfig, Driver: *driverConfig, RPC: node.RPCConfig{ @@ -96,12 +93,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { return cfg, nil } -func NewL1EndpointConfig(ctx *cli.Context) (*node.L1EndpointConfig, error) { +func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig { return &node.L1EndpointConfig{ L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name), L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name), L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))), - }, nil + } } func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConfig, error) { @@ -134,13 +131,21 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf }, nil } -func NewDriverConfig(ctx *cli.Context) (*driver.Config, error) { +// NewL2SyncEndpointConfig returns a pointer to a L2SyncEndpointConfig if the +// flag is set, otherwise nil. +func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig { + return &node.L2SyncEndpointConfig{ + L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name), + } +} + +func NewDriverConfig(ctx *cli.Context) *driver.Config { return &driver.Config{ VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name), SequencerConfDepth: ctx.GlobalUint64(flags.SequencerL1Confs.Name), SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name), SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name), - }, nil + } } func NewRollupConfig(ctx *cli.Context) (*rollup.Config, error) { diff --git a/op-node/sources/sync_client.go b/op-node/sources/sync_client.go new file mode 100644 index 0000000000000..6c5aa6dccf9db --- /dev/null +++ b/op-node/sources/sync_client.go @@ -0,0 +1,122 @@ +package sources + +import ( + "context" + "errors" + "sync" + + "github.com/ethereum-optimism/optimism/op-node/client" + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/sources/caching" + "github.com/ethereum/go-ethereum/log" + "github.com/libp2p/go-libp2p/core/peer" +) + +var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not be nil") + +// RpcSyncPeer is a mock PeerID for the RPC sync client. +var RpcSyncPeer peer.ID = "ALT_RPC_SYNC" + +type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error + +type SyncClientInterface interface { + Start() error + Close() error + fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) +} + +type SyncClient struct { + *L2Client + FetchUnsafeBlock chan uint64 + done chan struct{} + receivePayload receivePayload + wg sync.WaitGroup +} + +var _ SyncClientInterface = (*SyncClient)(nil) + +type SyncClientConfig struct { + L2ClientConfig +} + +func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientConfig { + return &SyncClientConfig{ + *L2ClientDefaultConfig(config, trustRPC), + } +} + +func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) { + l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig) + if err != nil { + return nil, err + } + + return &SyncClient{ + L2Client: l2Client, + FetchUnsafeBlock: make(chan uint64, 128), + done: make(chan struct{}), + receivePayload: receiver, + }, nil +} + +// Start starts up the state loop. +// The loop will have been started if err is not nil. +func (s *SyncClient) Start() error { + s.wg.Add(1) + go s.eventLoop() + return nil +} + +// Close sends a signal to the event loop to stop. +func (s *SyncClient) Close() error { + s.done <- struct{}{} + s.wg.Wait() + return nil +} + +// eventLoop is the main event loop for the sync client. +func (s *SyncClient) eventLoop() { + defer s.wg.Done() + s.log.Info("Starting sync client event loop") + + for { + select { + case <-s.done: + return + case blockNumber := <-s.FetchUnsafeBlock: + s.fetchUnsafeBlockFromRpc(context.Background(), blockNumber) + } + } +} + +// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC. +// WARNING: This function fails silently (aside from warning logs). +// +// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now, +// the `eth_getBlockByNumber` method is more widely available. +func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) { + s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber) + + payload, err := s.PayloadByNumber(ctx, blockNumber) + if err != nil { + s.log.Warn("Failed to convert block to execution payload", "block number", blockNumber, "err", err) + return + } + + // Signature validation is not necessary here since the backup RPC is trusted. + if _, ok := payload.CheckBlockHash(); !ok { + s.log.Warn("Received invalid payload from backup RPC; invalid block hash", "payload", payload.ID()) + return + } + + s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID()) + + // Send the retrieved payload to the `unsafeL2Payloads` channel. + if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil { + s.log.Warn("Failed to send payload into the driver's unsafeL2Payloads channel", "payload", payload.ID(), "err", err) + return + } else { + s.log.Info("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID()) + } +}