Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion op-batcher/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ mainLoop:
l.log.Warn("issue fetching L2 head", "err", err)
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock)
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
continue
Expand Down
59 changes: 56 additions & 3 deletions op-e2e/geth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"math/big"
"time"

"github.com/ethereum/go-ethereum"

rollupEth "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/node"

hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
Expand Down Expand Up @@ -102,7 +102,60 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
}

return createGethNode(false, nodeConfig, ethConfig, []*ecdsa.PrivateKey{pk})
l1Node, l1Eth, err := createGethNode(false, nodeConfig, ethConfig, []*ecdsa.PrivateKey{pk})
if err != nil {
return nil, nil, err
}

// Clique does not have safe/finalized block info. But we do want to test the usage of that,
// since post-merge L1 has it (incl. Goerli testnet which is already upgraded). So we mock it on top of clique.
l1Node.RegisterLifecycle(&fakeSafeFinalizedL1{
eth: l1Eth,
// for testing purposes we make it really fast, otherwise we don't see it finalize in short tests
finalizedDistance: 8,
safeDistance: 4,
})

return l1Node, l1Eth, nil
}

type fakeSafeFinalizedL1 struct {
eth *eth.Ethereum
finalizedDistance uint64
safeDistance uint64
sub ethereum.Subscription
}

var _ node.Lifecycle = (*fakeSafeFinalizedL1)(nil)

func (f *fakeSafeFinalizedL1) Start() error {
headChanges := make(chan core.ChainHeadEvent, 10)
headsSub := f.eth.BlockChain().SubscribeChainHeadEvent(headChanges)
f.sub = event.NewSubscription(func(quit <-chan struct{}) error {
defer headsSub.Unsubscribe()
for {
select {
case head := <-headChanges:
num := head.Block.NumberU64()
if num > f.finalizedDistance {
toFinalize := f.eth.BlockChain().GetBlockByNumber(num - f.finalizedDistance)
f.eth.BlockChain().SetFinalized(toFinalize)
}
if num > f.safeDistance {
toSafe := f.eth.BlockChain().GetBlockByNumber(num - f.safeDistance)
f.eth.BlockChain().SetSafe(toSafe)
}
case <-quit:
return nil
}
}
})
return nil
}

func (f *fakeSafeFinalizedL1) Stop() error {
f.sub.Unsubscribe()
return nil
}

// init a geth node.
Expand Down
2 changes: 2 additions & 0 deletions op-e2e/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
"sequencer": {
Driver: driver.Config{
Expand All @@ -123,6 +124,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
ListenPort: 9093,
EnableAdmin: true,
},
L1EpochPollInterval: time.Second * 4,
},
},
Loggers: map[string]log.Logger{
Expand Down
39 changes: 39 additions & 0 deletions op-node/eth/heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package eth

import (
"context"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)

// HeadSignalFn is used as callback function to accept head-signals
Expand Down Expand Up @@ -43,3 +45,40 @@ func WatchHeadChanges(ctx context.Context, src NewHeadSource, fn HeadSignalFn) (
}
}), nil
}

type L1BlockRefsSource interface {
L1BlockRefByLabel(ctx context.Context, label BlockLabel) (L1BlockRef, error)
}

// PollBlockChanges opens a polling loop to fetch the L1 block reference with the given label,
// on provided interval and with request timeout. Results are returned with provided callback fn,
// which may block to pause/back-pressure polling.
func PollBlockChanges(ctx context.Context, log log.Logger, src L1BlockRefsSource, fn HeadSignalFn,
label BlockLabel, interval time.Duration, timeout time.Duration) ethereum.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
if interval <= 0 {
log.Warn("polling of block is disabled", "interval", interval, "label", label)
<-quit
return nil
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
reqCtx, reqCancel := context.WithTimeout(ctx, timeout)
ref, err := src.L1BlockRefByLabel(reqCtx, label)
reqCancel()
if err != nil {
log.Warn("failed to poll L1 block", "label", label, "err", err)
} else {
fn(ctx, ref)
}
case <-ctx.Done():
return ctx.Err()
case <-quit:
return nil
}
}
})
}
9 changes: 9 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flags

import (
"fmt"
"time"

"github.com/urfave/cli"
)
Expand Down Expand Up @@ -81,6 +82,13 @@ var (
Required: false,
Value: 4,
}
L1EpochPollIntervalFlag = cli.DurationFlag{
Name: "l1.epoch-poll-interval",
Usage: "Poll interval for retrieving new L1 epoch updates such as safe and finalized block changes. Disabled if 0 or negative.",
EnvVar: prefixEnvVar("L1_EPOCH_POLL_INTERVAL"),
Required: false,
Value: time.Second * 12 * 32,
}
LogLevelFlag = cli.StringFlag{
Name: "log.level",
Usage: "The lowest log level that will be output",
Expand Down Expand Up @@ -154,6 +162,7 @@ var optionalFlags = append([]cli.Flag{
VerifierL1Confs,
SequencerEnabledFlag,
SequencerL1Confs,
L1EpochPollIntervalFlag,
LogLevelFlag,
LogFormatFlag,
LogColorFlag,
Expand Down
4 changes: 4 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math"
"time"

"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
Expand All @@ -30,6 +31,9 @@ type Config struct {

Pprof PprofConfig

// Used to poll the L1 for new finalized or safe blocks
L1EpochPollInterval time.Duration

// Optional
Tracer Tracer
}
Expand Down
44 changes: 36 additions & 8 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ type OpNode struct {
log log.Logger
appVersion string
metrics *metrics.Metrics
l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging

l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)

l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging

// some resources cannot be stopped directly, like the p2p gossipsub router (not our design),
// and depend on this ctx to be closed.
Expand Down Expand Up @@ -129,6 +133,13 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
}
n.log.Error("l1 heads subscription error", "err", err)
}()

// Poll for the safe L1 block and finalized block,
// which only change once per epoch at most and may be delayed.
n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Safe, eth.Safe,
cfg.L1EpochPollInterval, time.Second*10)
n.l1FinalizedSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized,
cfg.L1EpochPollInterval, time.Second*10)
return nil
}

Expand Down Expand Up @@ -233,7 +244,24 @@ func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
if err := n.l2Driver.OnL1Head(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
}
}

func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) {
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := n.l2Driver.OnL1Safe(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 safe block change", "err", err)
}
}

func (n *OpNode) OnNewL1Finalized(ctx context.Context, sig eth.L1BlockRef) {
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := n.l2Driver.OnL1Finalized(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 finalized block change", "err", err)
}
}

func (n *OpNode) PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error {
Expand Down
31 changes: 27 additions & 4 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ type EngineQueue struct {
unsafePayloads []*eth.ExecutionPayload

engine Engine

metrics Metrics
}

var _ AttributesQueueOutput = (*EngineQueue)(nil)

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine) *EngineQueue {
return &EngineQueue{log: log, cfg: cfg, engine: engine}
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
return &EngineQueue{log: log, cfg: cfg, engine: engine, metrics: metrics}
}

func (eq *EngineQueue) Progress() Progress {
Expand All @@ -61,6 +63,7 @@ func (eq *EngineQueue) Progress() Progress {

func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
eq.unsafeHead = head
eq.metrics.RecordL2Ref("l2_unsafe", head)
}

func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
Expand Down Expand Up @@ -129,6 +132,17 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
// return nil
//}

func (eq *EngineQueue) logSyncProgress(reason string) {
eq.log.Info("Sync progress",
"reason", reason,
"l2_finalized", eq.finalized,
"l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time,
"l1_derived", eq.progress.Origin,
)
}

func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first := eq.unsafePayloads[0]

Expand Down Expand Up @@ -181,7 +195,9 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
}
eq.unsafeHead = ref
eq.unsafePayloads = eq.unsafePayloads[1:]
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("unsafe payload from sequencer")

return nil
}
Expand All @@ -195,6 +211,7 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
// For some reason the unsafe head is behind the safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead)
eq.unsafeHead = eq.safeHead
eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
return nil
}
}
Expand Down Expand Up @@ -222,7 +239,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
eq.safeHead = ref
// unsafe head stays the same, we did not reorg the chain.
eq.safeAttributes = eq.safeAttributes[1:]
eq.log.Trace("Reconciled safe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("reconciled with L1")

return nil
}
Expand Down Expand Up @@ -266,8 +283,10 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
}
eq.safeHead = ref
eq.unsafeHead = ref
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.safeAttributes = eq.safeAttributes[1:]
eq.log.Trace("Inserted safe block", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("processed safe block derived from L1")

return nil
}
Expand Down Expand Up @@ -299,5 +318,9 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
Origin: l1Origin,
Closed: false,
}
eq.metrics.RecordL2Ref("l2_finalized", eq.finalized) // todo(proto): finalized L2 block updates
eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
eq.logSyncProgress("reset derivation work")
return io.EOF
}
Loading