Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 61 additions & 10 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"os"
"path"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -119,14 +120,6 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
JWTFilePath: writeDefaultJWT(t),
JWTSecret: testingJWTSecret,
Nodes: map[string]*rollupNode.Config{
"verifier": {
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
"sequencer": {
Driver: driver.Config{
VerifierConfDepth: 0,
Expand All @@ -141,6 +134,14 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
},
L1EpochPollInterval: time.Second * 4,
},
"verifier": {
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
},
Loggers: map[string]log.Logger{
"verifier": testlog.Logger(t, log.LvlInfo).New("role", "verifier"),
Expand Down Expand Up @@ -225,7 +226,43 @@ func (sys *System) Close() {
sys.Mocknet.Close()
}

func (cfg SystemConfig) Start() (*System, error) {
type systemConfigHook func(sCfg *SystemConfig, s *System)

type SystemConfigOption struct {
key string
role string
action systemConfigHook
}

type SystemConfigOptions struct {
opts map[string]systemConfigHook
}

func NewSystemConfigOptions(_opts []SystemConfigOption) (SystemConfigOptions, error) {
opts := make(map[string]systemConfigHook)
for _, opt := range _opts {
if _, ok := opts[opt.key+":"+opt.role]; ok {
return SystemConfigOptions{}, fmt.Errorf("duplicate option for key %s and role %s", opt.key, opt.role)
}
opts[opt.key+":"+opt.role] = opt.action
}

return SystemConfigOptions{
opts: opts,
}, nil
}

func (s *SystemConfigOptions) Get(key, role string) (systemConfigHook, bool) {
v, ok := s.opts[key+":"+role]
return v, ok
}

func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
opts, err := NewSystemConfigOptions(_opts)
if err != nil {
return nil, err
}

sys := &System{
cfg: cfg,
Nodes: make(map[string]*node.Node),
Expand Down Expand Up @@ -457,7 +494,17 @@ func (cfg SystemConfig) Start() (*System, error) {
snapLog.SetHandler(log.DiscardHandler())

// Rollup nodes
for name, nodeConfig := range cfg.Nodes {

// Ensure we are looping through the nodes in alphabetical order
ks := make([]string, 0, len(cfg.Nodes))
for k := range cfg.Nodes {
ks = append(ks, k)
}
// Sort strings in ascending alphabetical order
sort.Strings(ks)

for _, name := range ks {
nodeConfig := cfg.Nodes[name]
c := *nodeConfig // copy
c.Rollup = makeRollupConfig()

Expand All @@ -482,6 +529,10 @@ func (cfg SystemConfig) Start() (*System, error) {
return nil, err
}
sys.RollupNodes[name] = node

if action, ok := opts.Get("afterRollupNodeStart", name); ok {
action(&cfg, sys)
}
}

if cfg.P2PTopology != nil {
Expand Down
84 changes: 84 additions & 0 deletions op-e2e/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,90 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptVerif.BlockHash)
}

// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
//
// Test steps:
// 1. Spin up the nodes (P2P is disabled on the verifier)
// 2. Send a transaction to the sequencer.
// 3. Wait for the TX to be mined on the sequencer chain.
// 5. Wait for the verifier to detect a gap in the payload queue vs. the unsafe head
// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain.
// 7. Wait for the verifier to sync the unsafe chain into the safe chain.
// 8. Verify that the TX is included in the verifier's safe chain.
func TestSystemMockAltSync(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}

cfg := DefaultSystemConfig(t)
// slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do.
// Keep the seq window small so the L2 chain is started quick
cfg.DeployConfig.L1BlockTime = 10

var published, received []common.Hash
seqTracer, verifTracer := new(FnTracer), new(FnTracer)
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.BlockHash)
}
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received = append(received, payload.BlockHash)
}
cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["verifier"].Tracer = verifTracer

sys, err := cfg.Start(SystemConfigOption{
key: "afterRollupNodeStart",
role: "sequencer",
action: func(sCfg *SystemConfig, system *System) {
rpc, _ := system.Nodes["sequencer"].Attach() // never errors
cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{
Rpc: client.NewBaseRPCClient(rpc),
}
},
})
require.Nil(t, err, "Error starting up system")
defer sys.Close()

l2Seq := sys.Clients["sequencer"]
l2Verif := sys.Clients["verifier"]

// Transactor Account
ethPrivKey := cfg.Secrets.Alice

// Submit a TX to L2 sequencer node
toAddr := common.Address{0xff, 0xff}
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Nonce: 0,
To: &toAddr,
Value: big.NewInt(1_000_000_000),
GasTipCap: big.NewInt(10),
GasFeeCap: big.NewInt(200),
Gas: 21000,
})
err = l2Seq.SendTransaction(context.Background(), tx)
require.Nil(t, err, "Sending L2 tx to sequencer")

// Wait for tx to be mined on the L2 sequencer chain
receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer")

// Wait for alt RPC sync to pick up the blocks on the sequencer chain
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 12*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")

require.Equal(t, receiptSeq, receiptVerif)

// Verify that the tx was received via RPC sync (P2P is disabled)
require.Contains(t, received, receiptVerif.BlockHash)

// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received))
require.ElementsMatch(t, received, published[:len(received)])
}

// TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node.
func TestSystemDenseTopology(t *testing.T) {
parallel(t)
Expand Down
7 changes: 7 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ var (
EnvVar: prefixEnvVar("HEARTBEAT_URL"),
Value: "https://heartbeat.optimism.io",
}
BackupL2UnsafeSyncRPC = cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc",
Usage: "Set the backup L2 unsafe sync RPC endpoint.",
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"),
Required: false,
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -219,6 +225,7 @@ var optionalFlags = append([]cli.Flag{
HeartbeatEnabledFlag,
HeartbeatMonikerFlag,
HeartbeatURLFlag,
BackupL2UnsafeSyncRPC,
}, p2pFlags...)

// Flags contains the list of configuration options available to the binary.
Expand Down
49 changes: 49 additions & 0 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type L2EndpointSetup interface {
Check() error
}

type L2SyncEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
Check() error
}

type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from.
// The results of the RPC client may be trusted for faster processing, or strictly validated.
Expand Down Expand Up @@ -75,6 +80,50 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client
return p.Client, nil
}

// L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct {
// Address of the L2 RPC to use for backup sync
L2NodeAddr string
}

var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)

func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil {
return nil, err
}

return l2Node, nil
}

func (cfg *L2SyncEndpointConfig) Check() error {
if cfg.L2NodeAddr == "" {
return errors.New("empty L2 Node Address")
}

return nil
}

type L2SyncRPCConfig struct {
// RPC endpoint to use for syncing
Rpc client.RPC
}

var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil)

func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return cfg.Rpc, nil
}

func (cfg *L2SyncRPCConfig) Check() error {
if cfg.Rpc == nil {
return errors.New("rpc cannot be nil")
}

return nil
}

type L1EndpointConfig struct {
L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required)

Expand Down
5 changes: 3 additions & 2 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
)

type Config struct {
L1 L1EndpointSetup
L2 L2EndpointSetup
L1 L1EndpointSetup
L2 L2EndpointSetup
L2Sync L2SyncEndpointSetup

Driver driver.Config

Expand Down
42 changes: 39 additions & 3 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,28 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err
}

n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n.log, snapshotLog, n.metrics)
var syncClient *sources.SyncClient
// If the L2 sync config is present, use it to create a sync client
if cfg.L2Sync != nil {
if err := cfg.L2Sync.Check(); err != nil {
log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err)
} else {
rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
}

// The sync client's RPC is always trusted
config := sources.SyncClientDefaultConfig(&cfg.Rollup, true)

syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
if err != nil {
return fmt.Errorf("failed to create sync client: %w", err)
}
}
}

n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics)

return nil
}
Expand Down Expand Up @@ -263,13 +284,21 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {

func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Starting execution engine driver")

// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := n.l2Driver.Start()
if err != nil {
if err := n.l2Driver.Start(); err != nil {
n.log.Error("Could not start a rollup node", "err", err)
return err
}

// If the backup unsafe sync client is enabled, start its event loop
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Start(); err != nil {
n.log.Error("Could not start the backup sync client", "err", err)
return err
}
}

return nil
}

Expand Down Expand Up @@ -382,6 +411,13 @@ func (n *OpNode) Close() error {
if err := n.l2Driver.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
}

// If the L2 sync client is present & running, close it.
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
}
}
}

// close L2 engine RPC client
Expand Down
Loading