Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ func (cfg SystemConfig) start() (*System, error) {
L1NodeAddr: l1Node.WSEndpoint(),
L1TrustRPC: false,
}
rollupCfg.L2s = &rollupNode.L2EndpointsConfig{
L2EngineAddrs: []string{sys.nodes[name].WSAuthEndpoint()},
L2EngineJWTSecrets: [][32]byte{cfg.JWTSecret},
rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: sys.nodes[name].WSAuthEndpoint(),
L2EngineJWTSecret: cfg.JWTSecret,
}
}

Expand Down
20 changes: 10 additions & 10 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ var (
Value: "http://127.0.0.1:8545",
EnvVar: prefixEnvVar("L1_ETH_RPC"),
}
L2EngineAddrs = cli.StringSliceFlag{
L2EngineAddr = cli.StringFlag{
Name: "l2",
Usage: "Addresses of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)",
Usage: "Address of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)",
Required: true,
EnvVar: prefixEnvVar("L2_ENGINE_RPC"),
}
Expand Down Expand Up @@ -50,13 +50,13 @@ var (
Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data",
EnvVar: prefixEnvVar("L1_TRUST_RPC"),
}
L2EngineJWTSecret = cli.StringSliceFlag{
Name: "l2.jwt-secret",
Usage: "Paths to JWT secret keys, one per L2 endpoint, in the same order as the provided l2 addresses. " +
"Keys are 32 bytes, hex encoded in a file. A new key per endpoint will be generated if left empty.",
Required: false,
Value: &cli.StringSlice{},
EnvVar: prefixEnvVar("L2_ENGINE_AUTH"),
L2EngineJWTSecret = cli.StringFlag{
Name: "l2.jwt-secret",
Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.",
EnvVar: prefixEnvVar("L2_ENGINE_AUTH"),
Required: false,
Value: "",
Destination: new(string),
}
SequencingEnabledFlag = cli.BoolFlag{
Name: "sequencing.enabled",
Expand Down Expand Up @@ -91,7 +91,7 @@ var (

var requiredFlags = []cli.Flag{
L1NodeAddr,
L2EngineAddrs,
L2EngineAddr,
RollupConfig,
RPCListenAddr,
RPCListenPort,
Expand Down
57 changes: 24 additions & 33 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

type L2EndpointsSetup interface {
type L2EndpointSetup interface {
// Setup a RPC client to a L2 execution engine to process rollup blocks with.
Setup(ctx context.Context, log log.Logger) (cl []*rpc.Client, err error)
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, err error)
Check() error
}

Expand All @@ -21,62 +21,53 @@ type L1EndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error)
}

type L2EndpointsConfig struct {
L2EngineAddrs []string // Addresses of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)
type L2EndpointConfig struct {
L2EngineAddr string // Address of L2 Engine JSON-RPC endpoint to use (engine and eth namespace required)

// JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication, one per L2 engine.
// JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication.
// Any value for an IPC connection.
L2EngineJWTSecrets [][32]byte
L2EngineJWTSecret [32]byte
}

var _ L2EndpointsSetup = (*L2EndpointsConfig)(nil)
var _ L2EndpointSetup = (*L2EndpointConfig)(nil)

func (cfg *L2EndpointsConfig) Check() error {
if len(cfg.L2EngineAddrs) == 0 {
return errors.New("need at least one L2 engine to connect to")
}
if len(cfg.L2EngineAddrs) != len(cfg.L2EngineJWTSecrets) {
return fmt.Errorf("have %d L2 engines, but %d authentication secrets", len(cfg.L2EngineAddrs), len(cfg.L2EngineJWTSecrets))
func (cfg *L2EndpointConfig) Check() error {
if cfg.L2EngineAddr == "" {
return errors.New("empty L2 Engine Address")
}

return nil
}

func (cfg *L2EndpointsConfig) Setup(ctx context.Context, log log.Logger) ([]*rpc.Client, error) {
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
if err := cfg.Check(); err != nil {
return nil, err
}
var out []*rpc.Client
for i, addr := range cfg.L2EngineAddrs {
auth := rpc.NewJWTAuthProvider(cfg.L2EngineJWTSecrets[i])
l2Node, err := dialRPCClientWithBackoff(ctx, log, addr, auth)
if err != nil {
// close clients again if we cannot complete the full setup
for _, cl := range out {
cl.Close()
}
return out, err
}
out = append(out, l2Node)
auth := rpc.NewJWTAuthProvider(cfg.L2EngineJWTSecret)
l2Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L2EngineAddr, auth)
if err != nil {
return nil, err
}
return out, nil

return l2Node, nil
}

// PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines
type PreparedL2Endpoints struct {
Clients []*rpc.Client
Client *rpc.Client
}

func (p *PreparedL2Endpoints) Check() error {
if len(p.Clients) == 0 {
return errors.New("need at least one L2 engine to connect to")
if p.Client == nil {
return errors.New("client cannot be nil")
}
return nil
}

var _ L2EndpointsSetup = (*PreparedL2Endpoints)(nil)
var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil)

func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) ([]*rpc.Client, error) {
return p.Clients, nil
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
return p.Client, nil
}

type L1EndpointConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

type Config struct {
L1 L1EndpointSetup
L2s L2EndpointsSetup
L1 L1EndpointSetup
L2 L2EndpointSetup

Rollup rollup.Config

Expand All @@ -36,7 +36,7 @@ type RPCConfig struct {

// Check verifies that the given configuration makes sense
func (cfg *Config) Check() error {
if err := cfg.L2s.Check(); err != nil {
if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %v", err)
}
if err := cfg.Rollup.Check(); err != nil {
Expand Down
127 changes: 45 additions & 82 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package node

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -29,9 +27,8 @@ type OpNode struct {
appVersion string
l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
l1Source *l1.Source // Source to fetch data from (also implements the Downloader interface)
l2Lock sync.Mutex // Mutex to safely add and use different L2 resources in parallel
l2Engines []*driver.Driver // engines to keep synced
l2Nodes []*rpc.Client // L2 Execution Engines to close at shutdown
l2Engine *driver.Driver // L2 Engine to Sync
l2Node *rpc.Client // L2 Execution Engine RPC connections to close at shutdown
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
Expand Down Expand Up @@ -76,7 +73,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL1(ctx, cfg); err != nil {
return err
}
if err := n.initL2s(ctx, cfg, snapshotLog); err != nil {
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return err
}
if err := n.initP2PSigner(ctx, cfg); err != nil {
Expand Down Expand Up @@ -129,48 +126,26 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
return nil
}

// AttachEngine attaches an engine to the rollup node.
func (n *OpNode) AttachEngine(ctx context.Context, cfg *Config, tag string, cl *rpc.Client, snapshotLog log.Logger) error {
n.l2Lock.Lock()
defer n.l2Lock.Unlock()

engLog := n.log.New("engine", tag)

client, err := l2.NewSource(cl, &cfg.Rollup.Genesis, engLog)
func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
rpcClient, err := cfg.L2.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err)
}
n.l2Node = rpcClient
client, err := l2.NewSource(rpcClient, &cfg.Rollup.Genesis, n.log)
if err != nil {
cl.Close()
return err
}

snap := snapshotLog.New("engine_addr", tag)
engine := driver.NewDriver(cfg.Rollup, client, n.l1Source, n, engLog, snap, cfg.Sequencer)

n.l2Nodes = append(n.l2Nodes, cl)
n.l2Engines = append(n.l2Engines, engine)
return nil
}
snap := snapshotLog.New()
n.l2Engine = driver.NewDriver(cfg.Rollup, client, n.l1Source, n, n.log, snap, cfg.Sequencer)

func (n *OpNode) initL2s(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
clients, err := cfg.L2s.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client(s): %v", err)
}
for i, cl := range clients {
if err := n.AttachEngine(ctx, cfg, fmt.Sprintf("eng_%d", i), cl, snapshotLog); err != nil {
return fmt.Errorf("failed to attach configured engine %d: %v", i, err)
}
}
return nil
}

func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
if len(n.l2Nodes) == 0 {
return errors.New("need at least one L2 node to serve rollup RPC")
}
l2Node := n.l2Nodes[0]

// TODO: attach the p2p node ID to the snapshot logger
client, err := l2.NewReadOnlySource(l2Node, &cfg.Rollup.Genesis, n.log)
client, err := l2.NewReadOnlySource(n.l2Node, &cfg.Rollup.Genesis, n.log)
if err != nil {
return err
}
Expand Down Expand Up @@ -214,38 +189,30 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
}

func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Starting execution engine driver(s)")
for _, eng := range n.l2Engines {
// Request initial head update, default to genesis otherwise
reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10)
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := eng.Start(reqCtx)
reqCancel()
if err != nil {
n.log.Error("Could not start a rollup node", "err", err)
return err
}
n.log.Info("Starting execution engine driver")
// Request initial head update, default to genesis otherwise
reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10)
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := n.l2Engine.Start(reqCtx)
reqCancel()
if err != nil {
n.log.Error("Could not start a rollup node", "err", err)
return err
}

return nil
}

func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
n.l2Lock.Lock()
defer n.l2Lock.Unlock()

n.tracer.OnNewL1Head(ctx, sig)

// fan-out to all engine drivers
for _, eng := range n.l2Engines {
go func(eng *driver.Driver) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := eng.OnL1Head(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
}
}(eng)
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := n.l2Engine.OnL1Head(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
}

}

func (n *OpNode) PublishL2Payload(ctx context.Context, payload *l2.ExecutionPayload) error {
Expand All @@ -264,9 +231,6 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, payload *l2.ExecutionPayl
}

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l2.ExecutionPayload) error {
n.l2Lock.Lock()
defer n.l2Lock.Unlock()

// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
return nil
Expand All @@ -276,16 +240,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l

n.log.Info("Received signed execution payload from p2p", "id", payload.ID(), "peer", from)

// fan-out to all engine drivers
for _, eng := range n.l2Engines {
go func(eng *driver.Driver) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if err := eng.OnUnsafeL2Payload(ctx, payload); err != nil {
n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID())
}
}(eng)
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if err := n.l2Engine.OnUnsafeL2Payload(ctx, payload); err != nil {
n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID())
}

return nil
}

Expand All @@ -302,12 +263,12 @@ func (n *OpNode) Close() error {
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %v", err))
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
}
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %v", err))
result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %w", err))
}
}

Expand All @@ -320,16 +281,18 @@ func (n *OpNode) Close() error {
n.l1HeadsSub.Unsubscribe()
}

// close L2 engines
for _, eng := range n.l2Engines {
if err := eng.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %v", err))
// close L2 engine
if n.l2Engine != nil {
if err := n.l2Engine.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
}
}
// close L2 nodes
for _, n := range n.l2Nodes {
n.Close()

// close L2 node
if n.l2Node != nil {
n.l2Node.Close()
}

// close L1 data source
if n.l1Source != nil {
n.l1Source.Close()
Expand Down
Loading