From b1242b1d972c15324a02ade748a48d02f3f11ecb Mon Sep 17 00:00:00 2001 From: protolambda Date: Thu, 25 Aug 2022 03:40:02 +0200 Subject: [PATCH] op-node: poll for L1 safe and finalized data changes, deduplicate L2 head tracking --- op-batcher/batch_submitter.go | 6 +- op-e2e/geth.go | 59 +++++++- op-e2e/system_test.go | 2 + op-node/eth/heads.go | 39 ++++++ op-node/flags/flags.go | 9 ++ op-node/node/config.go | 4 + op-node/node/node.go | 44 ++++-- op-node/rollup/derive/engine_queue.go | 31 ++++- op-node/rollup/derive/pipeline.go | 14 +- op-node/rollup/driver/conf_depth.go | 2 +- op-node/rollup/driver/driver.go | 10 +- op-node/rollup/driver/state.go | 191 ++++++++++++++++---------- op-node/service.go | 5 +- 13 files changed, 319 insertions(+), 97 deletions(-) diff --git a/op-batcher/batch_submitter.go b/op-batcher/batch_submitter.go index 4fed5942f0b04..82a7b74b2a009 100644 --- a/op-batcher/batch_submitter.go +++ b/op-batcher/batch_submitter.go @@ -284,7 +284,11 @@ mainLoop: l.log.Warn("issue fetching L2 head", "err", err) continue } - l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock) + if syncStatus.HeadL1 == (eth.L1BlockRef{}) { + l.log.Info("Rollup node has no L1 head info yet") + continue + } + l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1) if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number { l.log.Trace("No unsubmitted blocks from sequencer") continue diff --git a/op-e2e/geth.go b/op-e2e/geth.go index b849e8f3a6811..ec301fd6bccaf 100644 --- a/op-e2e/geth.go +++ b/op-e2e/geth.go @@ -8,9 +8,8 @@ import ( "math/big" "time" - "github.com/ethereum/go-ethereum" - rollupEth "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -20,6 +19,7 @@ import ( "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/node" hdwallet "github.com/miguelmota/go-ethereum-hdwallet" @@ -102,7 +102,60 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"}, } - return createGethNode(false, nodeConfig, ethConfig, []*ecdsa.PrivateKey{pk}) + l1Node, l1Eth, err := createGethNode(false, nodeConfig, ethConfig, []*ecdsa.PrivateKey{pk}) + if err != nil { + return nil, nil, err + } + + // Clique does not have safe/finalized block info. But we do want to test the usage of that, + // since post-merge L1 has it (incl. Goerli testnet which is already upgraded). So we mock it on top of clique. + l1Node.RegisterLifecycle(&fakeSafeFinalizedL1{ + eth: l1Eth, + // for testing purposes we make it really fast, otherwise we don't see it finalize in short tests + finalizedDistance: 8, + safeDistance: 4, + }) + + return l1Node, l1Eth, nil +} + +type fakeSafeFinalizedL1 struct { + eth *eth.Ethereum + finalizedDistance uint64 + safeDistance uint64 + sub ethereum.Subscription +} + +var _ node.Lifecycle = (*fakeSafeFinalizedL1)(nil) + +func (f *fakeSafeFinalizedL1) Start() error { + headChanges := make(chan core.ChainHeadEvent, 10) + headsSub := f.eth.BlockChain().SubscribeChainHeadEvent(headChanges) + f.sub = event.NewSubscription(func(quit <-chan struct{}) error { + defer headsSub.Unsubscribe() + for { + select { + case head := <-headChanges: + num := head.Block.NumberU64() + if num > f.finalizedDistance { + toFinalize := f.eth.BlockChain().GetBlockByNumber(num - f.finalizedDistance) + f.eth.BlockChain().SetFinalized(toFinalize) + } + if num > f.safeDistance { + toSafe := f.eth.BlockChain().GetBlockByNumber(num - f.safeDistance) + f.eth.BlockChain().SetSafe(toSafe) + } + case <-quit: + return nil + } + } + }) + return nil +} + +func (f *fakeSafeFinalizedL1) Stop() error { + f.sub.Unsubscribe() + return nil } // init a geth node. diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index 20343584d4a64..9024a20ece2cd 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -110,6 +110,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig { SequencerConfDepth: 0, SequencerEnabled: false, }, + L1EpochPollInterval: time.Second * 4, }, "sequencer": { Driver: driver.Config{ @@ -123,6 +124,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig { ListenPort: 9093, EnableAdmin: true, }, + L1EpochPollInterval: time.Second * 4, }, }, Loggers: map[string]log.Logger{ diff --git a/op-node/eth/heads.go b/op-node/eth/heads.go index 125cb8759b316..af92990f0f1d6 100644 --- a/op-node/eth/heads.go +++ b/op-node/eth/heads.go @@ -2,10 +2,12 @@ package eth import ( "context" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" ) // HeadSignalFn is used as callback function to accept head-signals @@ -43,3 +45,40 @@ func WatchHeadChanges(ctx context.Context, src NewHeadSource, fn HeadSignalFn) ( } }), nil } + +type L1BlockRefsSource interface { + L1BlockRefByLabel(ctx context.Context, label BlockLabel) (L1BlockRef, error) +} + +// PollBlockChanges opens a polling loop to fetch the L1 block reference with the given label, +// on provided interval and with request timeout. Results are returned with provided callback fn, +// which may block to pause/back-pressure polling. +func PollBlockChanges(ctx context.Context, log log.Logger, src L1BlockRefsSource, fn HeadSignalFn, + label BlockLabel, interval time.Duration, timeout time.Duration) ethereum.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + if interval <= 0 { + log.Warn("polling of block is disabled", "interval", interval, "label", label) + <-quit + return nil + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + reqCtx, reqCancel := context.WithTimeout(ctx, timeout) + ref, err := src.L1BlockRefByLabel(reqCtx, label) + reqCancel() + if err != nil { + log.Warn("failed to poll L1 block", "label", label, "err", err) + } else { + fn(ctx, ref) + } + case <-ctx.Done(): + return ctx.Err() + case <-quit: + return nil + } + } + }) +} diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 2677a16dac0ef..ac0879779df95 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -2,6 +2,7 @@ package flags import ( "fmt" + "time" "github.com/urfave/cli" ) @@ -81,6 +82,13 @@ var ( Required: false, Value: 4, } + L1EpochPollIntervalFlag = cli.DurationFlag{ + Name: "l1.epoch-poll-interval", + Usage: "Poll interval for retrieving new L1 epoch updates such as safe and finalized block changes. Disabled if 0 or negative.", + EnvVar: prefixEnvVar("L1_EPOCH_POLL_INTERVAL"), + Required: false, + Value: time.Second * 12 * 32, + } LogLevelFlag = cli.StringFlag{ Name: "log.level", Usage: "The lowest log level that will be output", @@ -154,6 +162,7 @@ var optionalFlags = append([]cli.Flag{ VerifierL1Confs, SequencerEnabledFlag, SequencerL1Confs, + L1EpochPollIntervalFlag, LogLevelFlag, LogFormatFlag, LogColorFlag, diff --git a/op-node/node/config.go b/op-node/node/config.go index 11df416c4a931..5c10ff5656166 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "time" "github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/rollup" @@ -30,6 +31,9 @@ type Config struct { Pprof PprofConfig + // Used to poll the L1 for new finalized or safe blocks + L1EpochPollInterval time.Duration + // Optional Tracer Tracer } diff --git a/op-node/node/node.go b/op-node/node/node.go index 38dbd9e38bc0e..89b73a5cdb7c4 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -25,14 +25,18 @@ type OpNode struct { log log.Logger appVersion string metrics *metrics.Metrics - l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error) - l1Source *sources.L1Client // L1 Client to fetch data from - l2Driver *driver.Driver // L2 Engine to Sync - l2Source *sources.EngineClient // L2 Execution Engine RPC bindings - server *rpcServer // RPC server hosting the rollup-node API - p2pNode *p2p.NodeP2P // P2P node functionality - p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer - tracer Tracer // tracer to get events for testing/debugging + + l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error) + l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling) + l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling) + + l1Source *sources.L1Client // L1 Client to fetch data from + l2Driver *driver.Driver // L2 Engine to Sync + l2Source *sources.EngineClient // L2 Execution Engine RPC bindings + server *rpcServer // RPC server hosting the rollup-node API + p2pNode *p2p.NodeP2P // P2P node functionality + p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer + tracer Tracer // tracer to get events for testing/debugging // some resources cannot be stopped directly, like the p2p gossipsub router (not our design), // and depend on this ctx to be closed. @@ -129,6 +133,13 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { } n.log.Error("l1 heads subscription error", "err", err) }() + + // Poll for the safe L1 block and finalized block, + // which only change once per epoch at most and may be delayed. + n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Safe, eth.Safe, + cfg.L1EpochPollInterval, time.Second*10) + n.l1FinalizedSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized, + cfg.L1EpochPollInterval, time.Second*10) return nil } @@ -233,7 +244,24 @@ func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) { if err := n.l2Driver.OnL1Head(ctx, sig); err != nil { n.log.Warn("failed to notify engine driver of L1 head change", "err", err) } +} + +func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) { + // Pass on the event to the L2 Engine + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + if err := n.l2Driver.OnL1Safe(ctx, sig); err != nil { + n.log.Warn("failed to notify engine driver of L1 safe block change", "err", err) + } +} +func (n *OpNode) OnNewL1Finalized(ctx context.Context, sig eth.L1BlockRef) { + // Pass on the event to the L2 Engine + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + if err := n.l2Driver.OnL1Finalized(ctx, sig); err != nil { + n.log.Warn("failed to notify engine driver of L1 finalized block change", "err", err) + } } func (n *OpNode) PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error { diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 1b42e10a5413b..bf77cd2be9cdb 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -46,13 +46,15 @@ type EngineQueue struct { unsafePayloads []*eth.ExecutionPayload engine Engine + + metrics Metrics } var _ AttributesQueueOutput = (*EngineQueue)(nil) // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine) *EngineQueue { - return &EngineQueue{log: log, cfg: cfg, engine: engine} +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { + return &EngineQueue{log: log, cfg: cfg, engine: engine, metrics: metrics} } func (eq *EngineQueue) Progress() Progress { @@ -61,6 +63,7 @@ func (eq *EngineQueue) Progress() Progress { func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) { eq.unsafeHead = head + eq.metrics.RecordL2Ref("l2_unsafe", head) } func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) { @@ -129,6 +132,17 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { // return nil //} +func (eq *EngineQueue) logSyncProgress(reason string) { + eq.log.Info("Sync progress", + "reason", reason, + "l2_finalized", eq.finalized, + "l2_safe", eq.safeHead, + "l2_unsafe", eq.unsafeHead, + "l2_time", eq.unsafeHead.Time, + "l1_derived", eq.progress.Origin, + ) +} + func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { first := eq.unsafePayloads[0] @@ -181,7 +195,9 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { } eq.unsafeHead = ref eq.unsafePayloads = eq.unsafePayloads[1:] + eq.metrics.RecordL2Ref("l2_unsafe", ref) eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) + eq.logSyncProgress("unsafe payload from sequencer") return nil } @@ -195,6 +211,7 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error { // For some reason the unsafe head is behind the safe head. Log it, and correct it. eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead) eq.unsafeHead = eq.safeHead + eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead) return nil } } @@ -222,7 +239,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error eq.safeHead = ref // unsafe head stays the same, we did not reorg the chain. eq.safeAttributes = eq.safeAttributes[1:] - eq.log.Trace("Reconciled safe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) + eq.logSyncProgress("reconciled with L1") return nil } @@ -266,8 +283,10 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { } eq.safeHead = ref eq.unsafeHead = ref + eq.metrics.RecordL2Ref("l2_safe", ref) + eq.metrics.RecordL2Ref("l2_unsafe", ref) eq.safeAttributes = eq.safeAttributes[1:] - eq.log.Trace("Inserted safe block", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) + eq.logSyncProgress("processed safe block derived from L1") return nil } @@ -299,5 +318,9 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error Origin: l1Origin, Closed: false, } + eq.metrics.RecordL2Ref("l2_finalized", eq.finalized) // todo(proto): finalized L2 block updates + eq.metrics.RecordL2Ref("l2_safe", safe) + eq.metrics.RecordL2Ref("l2_unsafe", unsafe) + eq.logSyncProgress("reset derivation work") return io.EOF } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 98629253d4e75..a54dc26c3d57a 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -10,6 +10,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type Metrics interface { + RecordL1Ref(name string, ref eth.L1BlockRef) + RecordL2Ref(name string, ref eth.L2BlockRef) +} + type L1Fetcher interface { L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) L1BlockRefByNumberFetcher @@ -71,11 +76,13 @@ type DerivationPipeline struct { stages []Stage eng EngineQueueStage + + metrics Metrics } // NewDerivationPipeline creates a derivation pipeline, which should be reset before use. -func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine) *DerivationPipeline { - eng := NewEngineQueue(log, cfg, engine) +func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline { + eng := NewEngineQueue(log, cfg, engine, metrics) attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) batchQueue := NewBatchQueue(log, cfg, attributesQueue) chInReader := NewChannelInReader(log, batchQueue) @@ -93,6 +100,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch active: 0, stages: stages, eng: eng, + metrics: metrics, } } @@ -137,6 +145,8 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { // An error is expected when the underlying source closes. // When Step returns nil, it should be called again, to continue the derivation process. func (dp *DerivationPipeline) Step(ctx context.Context) error { + defer dp.metrics.RecordL1Ref("l1_derived", dp.Progress().Origin) + // if any stages need to be reset, do that first. if dp.resetting < len(dp.stages) { if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF { diff --git a/op-node/rollup/driver/conf_depth.go b/op-node/rollup/driver/conf_depth.go index 365ef861b46aa..e308dfb93eb12 100644 --- a/op-node/rollup/driver/conf_depth.go +++ b/op-node/rollup/driver/conf_depth.go @@ -27,7 +27,7 @@ func NewConfDepth(depth uint64, l1Head func() eth.L1BlockRef, fetcher derive.L1F // Any block numbers that are within confirmation depth of the L1 head are mocked to be "not found", // effectively hiding the uncertain part of the L1 chain. func (c *confDepth) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) { - // TODO: performance optimization: buffer the l1Head, invalidate any reorged previous buffer content, + // TODO: performance optimization: buffer the l1Unsafe, invalidate any reorged previous buffer content, // and instantly return the origin by number from the buffer if we can. if num == 0 || c.depth == 0 || num+c.depth <= c.l1Head().Number { diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index a6a7453cc9fb9..3edc970b217d7 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -79,7 +79,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne var state *state verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, func() eth.L1BlockRef { return state.l1Head }, l1) - derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2) + derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics) state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network, metrics) return &Driver{s: state} } @@ -88,6 +88,14 @@ func (d *Driver) OnL1Head(ctx context.Context, head eth.L1BlockRef) error { return d.s.OnL1Head(ctx, head) } +func (d *Driver) OnL1Safe(ctx context.Context, safe eth.L1BlockRef) error { + return d.s.OnL1Safe(ctx, safe) +} + +func (d *Driver) OnL1Finalized(ctx context.Context, finalized eth.L1BlockRef) error { + return d.s.OnL1Finalized(ctx, finalized) +} + func (d *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error { return d.s.OnUnsafeL2Payload(ctx, payload) } diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index b723fde384d8f..ffeff19ff3b33 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -16,7 +16,8 @@ import ( "github.com/ethereum/go-ethereum/log" ) -// SyncStatus is a snapshot of the driver +// SyncStatus is a snapshot of the driver. +// Values may be zeroed if not yet initialized. type SyncStatus struct { // CurrentL1 is the block that the derivation process is currently at, // this may not be fully derived into L2 data yet. @@ -25,7 +26,9 @@ type SyncStatus struct { // HeadL1 is the perceived head of the L1 chain, no confirmation distance. // The head is not guaranteed to build on the other L1 sync status fields, // as the node may be in progress of resetting to adapt to a L1 reorg. - HeadL1 eth.L1BlockRef `json:"head_l1"` + HeadL1 eth.L1BlockRef `json:"head_l1"` + SafeL1 eth.L1BlockRef `json:"safe_l1"` + FinalizedL1 eth.L1BlockRef `json:"finalized_l1"` // UnsafeL2 is the absolute tip of the L2 chain, // pointing to block data that has not been submitted to L1 yet. // The sequencer is building this, and verifiers may also be ahead of the @@ -40,14 +43,13 @@ type SyncStatus struct { } type state struct { - // Chain State - l1Head eth.L1BlockRef // Latest recorded head of the L1 Chain, independent of derivation work - l2Head eth.L2BlockRef // L2 Unsafe Head - l2SafeHead eth.L2BlockRef // L2 Safe Head - this is the head of the L2 chain as derived from L1 - l2Finalized eth.L2BlockRef // L2 Block that will never be reversed + // Latest recorded head, safe block and finalized block of the L1 Chain, independent of derivation work + l1Head eth.L1BlockRef + l1Safe eth.L1BlockRef + l1Finalized eth.L1BlockRef // The derivation pipeline is reset whenever we reorg. - // The derivation pipeline determines the new l2SafeHead. + // The derivation pipeline determines the new l2Safe. derivation DerivationPipeline // When the derivation pipeline is waiting for new data to do anything @@ -66,13 +68,23 @@ type state struct { // Driver config: verifier and sequencer settings DriverConfig *Config - // Connections (in/out) - l1Heads chan eth.L1BlockRef + // L1 Signals: + // + // Not all L1 blocks, or all changes, have to be signalled: + // the derivation process traverses the chain and handles reorgs as necessary, + // the driver just needs to be aware of the *latest* signals enough so to not + // lag behind actionable data. + l1HeadSig chan eth.L1BlockRef + l1SafeSig chan eth.L1BlockRef + l1FinalizedSig chan eth.L1BlockRef + + // L2 Signals: unsafeL2Payloads chan *eth.ExecutionPayload - l1 L1Chain - l2 L2Chain - output outputInterface - network Network // may be nil, network for is optional + + l1 L1Chain + l2 L2Chain + output outputInterface + network Network // may be nil, network for is optional metrics Metrics log log.Logger @@ -101,23 +113,16 @@ func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config output: output, network: network, metrics: metrics, - l1Heads: make(chan eth.L1BlockRef, 10), + l1HeadSig: make(chan eth.L1BlockRef, 10), + l1SafeSig: make(chan eth.L1BlockRef, 10), + l1FinalizedSig: make(chan eth.L1BlockRef, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10), } } -// Start starts up the state loop. The context is only for initialization. +// Start starts up the state loop. // The loop will have been started iff err is not nil. -func (s *state) Start(ctx context.Context) error { - l1Head, err := s.l1.L1BlockRefByLabel(ctx, eth.Unsafe) - if err != nil { - return err - } - s.l1Head = l1Head - s.l2Head, _ = s.l2.L2BlockRefByLabel(ctx, eth.Unsafe) - s.metrics.RecordL1Ref("l1_head", s.l1Head) - s.metrics.RecordL2Ref("l2_unsafe", s.l2Head) - +func (s *state) Start(_ context.Context) error { s.derivation.Reset() s.wg.Add(1) @@ -132,11 +137,33 @@ func (s *state) Close() error { return nil } -func (s *state) OnL1Head(ctx context.Context, head eth.L1BlockRef) error { +// OnL1Head signals the driver that the L1 chain changed the "unsafe" block, +// also known as head of the chain, or "latest". +func (s *state) OnL1Head(ctx context.Context, unsafe eth.L1BlockRef) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s.l1HeadSig <- unsafe: + return nil + } +} + +// OnL1Safe signals the driver that the L1 chain changed the "safe", +// also known as the justified checkpoint (as seen on L1 beacon-chain). +func (s *state) OnL1Safe(ctx context.Context, safe eth.L1BlockRef) error { select { case <-ctx.Done(): return ctx.Err() - case s.l1Heads <- head: + case s.l1SafeSig <- safe: + return nil + } +} + +func (s *state) OnL1Finalized(ctx context.Context, finalized eth.L1BlockRef) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s.l1FinalizedSig <- finalized: return nil } } @@ -150,37 +177,54 @@ func (s *state) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPay } } -func (s *state) handleNewL1Block(newL1Head eth.L1BlockRef) { +func (s *state) handleNewL1HeadBlock(head eth.L1BlockRef) { // We don't need to do anything if the head hasn't changed. - if s.l1Head.Hash == newL1Head.Hash { - s.log.Trace("Received L1 head signal that is the same as the current head", "l1Head", newL1Head) - } else if s.l1Head.Hash == newL1Head.ParentHash { + if s.l1Head == (eth.L1BlockRef{}) { + s.log.Info("Received first L1 head signal", "l1_head", head) + } else if s.l1Head.Hash == head.Hash { + s.log.Trace("Received L1 head signal that is the same as the current head", "l1_head", head) + } else if s.l1Head.Hash == head.ParentHash { // We got a new L1 block whose parent hash is the same as the current L1 head. Means we're // dealing with a linear extension (new block is the immediate child of the old one). - s.log.Debug("L1 head moved forward", "l1Head", newL1Head) + s.log.Debug("L1 head moved forward", "l1_head", head) } else { - if s.l1Head.Number >= newL1Head.Number { - s.metrics.RecordL1ReorgDepth(s.l1Head.Number - newL1Head.Number) + if s.l1Head.Number >= head.Number { + s.metrics.RecordL1ReorgDepth(s.l1Head.Number - head.Number) } // New L1 block is not the same as the current head or a single step linear extension. - // This could either be a long L1 extension, or a reorg. Both can be handled the same way. - s.log.Warn("L1 Head signal indicates an L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", newL1Head.ParentHash, "new_l1_head", newL1Head) + // This could either be a long L1 extension, or a reorg, or we simply missed a head update. + s.log.Warn("L1 head signal indicates a possible L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", head.ParentHash, "new_l1_head", head) } - s.metrics.RecordL1Ref("l1_head", newL1Head) - s.l1Head = newL1Head + s.snapshot("New L1 Head") + s.metrics.RecordL1Ref("l1_head", head) + s.l1Head = head +} + +func (s *state) handleNewL1SafeBlock(safe eth.L1BlockRef) { + s.log.Info("New L1 safe block", "l1_safe", safe) + s.metrics.RecordL1Ref("l1_safe", safe) + s.l1Safe = safe +} + +func (s *state) handleNewL1FinalizedBlock(finalized eth.L1BlockRef) { + s.log.Info("New L1 finalized block", "l1_finalized", finalized) + s.metrics.RecordL1Ref("l1_finalized", finalized) + s.l1Finalized = finalized + // TODO(proto): forward signal to derivation to finalize L2 chain as well } // findL1Origin determines what the next L1 Origin should be. // The L1 Origin is either the L2 Head's Origin, or the following L1 block // if the next L2 block's time is greater than or equal to the L2 Head's Origin. func (s *state) findL1Origin(ctx context.Context) (eth.L1BlockRef, error) { + l2Head := s.derivation.UnsafeL2Head() // If we are at the head block, don't do a lookup. - if s.l2Head.L1Origin.Hash == s.l1Head.Hash { + if l2Head.L1Origin.Hash == s.l1Head.Hash { return s.l1Head, nil } // Grab a reference to the current L1 origin block. - currentOrigin, err := s.l1.L1BlockRefByHash(ctx, s.l2Head.L1Origin.Hash) + currentOrigin, err := s.l1.L1BlockRefByHash(ctx, l2Head.L1Origin.Hash) if err != nil { return eth.L1BlockRef{}, err } @@ -191,7 +235,7 @@ func (s *state) findL1Origin(ctx context.Context) (eth.L1BlockRef, error) { s.log.Info("sequencing with old origin to preserve conf depth", "current", currentOrigin, "current_time", currentOrigin.Time, "l1_head", s.l1Head, "l1_head_time", s.l1Head.Time, - "l2_head", s.l2Head, "l2_head_time", s.l2Head.Time, + "l2_head", l2Head, "l2_head_time", l2Head.Time, "depth", s.DriverConfig.SequencerConfDepth) return currentOrigin, nil } @@ -209,7 +253,7 @@ func (s *state) findL1Origin(ctx context.Context) (eth.L1BlockRef, error) { // could decide to continue to build on top of the previous origin until the Sequencer runs out // of slack. For simplicity, we implement our Sequencer to always start building on the latest // L1 block when we can. - if s.l2Head.Time+s.Config.BlockTime >= nextOrigin.Time { + if l2Head.Time+s.Config.BlockTime >= nextOrigin.Time { return nextOrigin, nil } @@ -233,27 +277,30 @@ func (s *state) createNewL2Block(ctx context.Context) error { return nil } + l2Head := s.derivation.UnsafeL2Head() + l2Safe := s.derivation.SafeL2Head() + l2Finalized := s.derivation.Finalized() + // Should never happen. Sequencer will halt if we get into this situation somehow. - nextL2Time := s.l2Head.Time + s.Config.BlockTime + nextL2Time := l2Head.Time + s.Config.BlockTime if nextL2Time < l1Origin.Time { s.log.Error("Cannot build L2 block for time before L1 origin", - "l2Head", s.l2Head, "nextL2Time", nextL2Time, "l1Origin", l1Origin, "l1OriginTime", l1Origin.Time) + "l2Unsafe", l2Head, "nextL2Time", nextL2Time, "l1Origin", l1Origin, "l1OriginTime", l1Origin.Time) return fmt.Errorf("cannot build L2 block on top %s for time %d before L1 origin %s at time %d", - s.l2Head, nextL2Time, l1Origin, l1Origin.Time) + l2Head, nextL2Time, l1Origin, l1Origin.Time) } // Actually create the new block. - newUnsafeL2Head, payload, err := s.output.createNewBlock(ctx, s.l2Head, s.l2SafeHead.ID(), s.l2Finalized.ID(), l1Origin) + newUnsafeL2Head, payload, err := s.output.createNewBlock(ctx, l2Head, l2Safe.ID(), l2Finalized.ID(), l1Origin) if err != nil { - s.log.Error("Could not extend chain as sequencer", "err", err, "l2UnsafeHead", s.l2Head, "l1Origin", l1Origin) + s.log.Error("Could not extend chain as sequencer", "err", err, "l2_parent", l2Head, "l1_origin", l1Origin) return err } // Update our L2 head block based on the new unsafe block we just generated. s.derivation.SetUnsafeHead(newUnsafeL2Head) - s.l2Head = newUnsafeL2Head - s.log.Info("Sequenced new l2 block", "l2Head", s.l2Head, "l1Origin", s.l2Head.L1Origin, "txs", len(payload.Transactions), "time", s.l2Head.Time) + s.log.Info("Sequenced new l2 block", "l2_unsafe", newUnsafeL2Head, "l1_origin", newUnsafeL2Head.L1Origin, "txs", len(payload.Transactions), "time", newUnsafeL2Head.Time) s.metrics.CountSequencedTxs(len(payload.Transactions)) if s.network != nil { @@ -363,8 +410,9 @@ func (s *state) eventLoop() { // We need to catch up to the next origin as quickly as possible. We can do this by // requesting a new block ASAP instead of waiting for the next tick. // We don't request a block if the confirmation depth is not met. - if s.l1Head.Number > s.l2Head.L1Origin.Number+s.DriverConfig.SequencerConfDepth { - s.log.Trace("Asking for a second L2 block asap", "l2Head", s.l2Head) + l2Head := s.derivation.UnsafeL2Head() + if s.l1Head.Number > l2Head.L1Origin.Number+s.DriverConfig.SequencerConfDepth { + s.log.Trace("Building another L2 block asap to catch up with L1 head", "l2_unsafe", l2Head, "l2_unsafe_l1_origin", l2Head.L1Origin, "l1_head", s.l1Head) // But not too quickly to minimize busy-waiting for new blocks time.AfterFunc(time.Millisecond*10, reqL2BlockCreation) } @@ -376,11 +424,15 @@ func (s *state) eventLoop() { s.metrics.RecordReceivedUnsafePayload(payload) reqStep() - case newL1Head := <-s.l1Heads: - s.log.Info("new l1 Head") - s.snapshot("New L1 Head") - s.handleNewL1Block(newL1Head) + case newL1Head := <-s.l1HeadSig: + s.handleNewL1HeadBlock(newL1Head) reqStep() // a new L1 head may mean we have the data to not get an EOF again. + case newL1Safe := <-s.l1SafeSig: + s.handleNewL1SafeBlock(newL1Safe) + // no step, justified L1 information does not do anything for L2 derivation or status + case newL1Finalized := <-s.l1FinalizedSig: + s.handleNewL1FinalizedBlock(newL1Finalized) + reqStep() // we may be able to mark more L2 data as finalized now case <-delayedStepReq: delayedStepReq = nil step() @@ -417,28 +469,17 @@ func (s *state) eventLoop() { continue } else { stepAttempts = 0 - finalized, safe, unsafe := s.derivation.Finalized(), s.derivation.SafeL2Head(), s.derivation.UnsafeL2Head() - // log sync progress when it changes - if s.l2Finalized != finalized || s.l2SafeHead != safe || s.l2Head != unsafe { - s.log.Info("Sync progress", "finalized", finalized, "safe", safe, "unsafe", unsafe) - s.metrics.RecordL2Ref("l2_finalized", finalized) - s.metrics.RecordL2Ref("l2_safe", safe) - s.metrics.RecordL2Ref("l2_unsafe", unsafe) - } - s.metrics.RecordL1Ref("l1_derived", s.derivation.Progress().Origin) - // update the heads - s.l2Finalized = finalized - s.l2SafeHead = safe - s.l2Head = unsafe reqStep() // continue with the next step if we can } case respCh := <-s.syncStatusReq: respCh <- SyncStatus{ CurrentL1: s.derivation.Progress().Origin, HeadL1: s.l1Head, - UnsafeL2: s.l2Head, - SafeL2: s.l2SafeHead, - FinalizedL2: s.l2Finalized, + SafeL1: s.l1Safe, + FinalizedL1: s.l1Finalized, + UnsafeL2: s.derivation.UnsafeL2Head(), + SafeL2: s.derivation.SafeL2Head(), + FinalizedL2: s.derivation.Finalized(), } case respCh := <-s.forceReset: s.log.Warn("Derivation pipeline is manually reset") @@ -499,7 +540,7 @@ func (s *state) snapshot(event string) { "event", event, "l1Head", deferJSONString{s.l1Head}, "l1Current", deferJSONString{s.derivation.Progress().Origin}, - "l2Head", deferJSONString{s.l2Head}, - "l2SafeHead", deferJSONString{s.l2SafeHead}, - "l2FinalizedHead", deferJSONString{s.l2Finalized}) + "l2Head", deferJSONString{s.derivation.UnsafeL2Head()}, + "l2Safe", deferJSONString{s.derivation.SafeL2Head()}, + "l2FinalizedHead", deferJSONString{s.derivation.Finalized()}) } diff --git a/op-node/service.go b/op-node/service.go index 223f8214d0985..0b936030e5235 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -78,8 +78,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ListenAddr: ctx.GlobalString(flags.PprofAddrFlag.Name), ListenPort: ctx.GlobalString(flags.PprofPortFlag.Name), }, - P2P: p2pConfig, - P2PSigner: p2pSignerSetup, + P2P: p2pConfig, + P2PSigner: p2pSignerSetup, + L1EpochPollInterval: ctx.GlobalDuration(flags.L1EpochPollIntervalFlag.Name), } if err := cfg.Check(); err != nil { return nil, err