diff --git a/op-e2e/setup.go b/op-e2e/setup.go index aa2ea05a851e2..525eae163be98 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -335,9 +335,9 @@ func (cfg SystemConfig) start() (*System, error) { L1NodeAddr: l1Node.WSEndpoint(), L1TrustRPC: false, } - rollupCfg.L2s = &rollupNode.L2EndpointsConfig{ - L2EngineAddrs: []string{sys.nodes[name].WSAuthEndpoint()}, - L2EngineJWTSecrets: [][32]byte{cfg.JWTSecret}, + rollupCfg.L2 = &rollupNode.L2EndpointConfig{ + L2EngineAddr: sys.nodes[name].WSAuthEndpoint(), + L2EngineJWTSecret: cfg.JWTSecret, } } diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 1c9c092a363e0..d319ecddf64a1 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -19,9 +19,9 @@ var ( Value: "http://127.0.0.1:8545", EnvVar: prefixEnvVar("L1_ETH_RPC"), } - L2EngineAddrs = cli.StringSliceFlag{ + L2EngineAddr = cli.StringFlag{ Name: "l2", - Usage: "Addresses of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)", + Usage: "Address of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)", Required: true, EnvVar: prefixEnvVar("L2_ENGINE_RPC"), } @@ -50,13 +50,13 @@ var ( Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data", EnvVar: prefixEnvVar("L1_TRUST_RPC"), } - L2EngineJWTSecret = cli.StringSliceFlag{ - Name: "l2.jwt-secret", - Usage: "Paths to JWT secret keys, one per L2 endpoint, in the same order as the provided l2 addresses. " + - "Keys are 32 bytes, hex encoded in a file. A new key per endpoint will be generated if left empty.", - Required: false, - Value: &cli.StringSlice{}, - EnvVar: prefixEnvVar("L2_ENGINE_AUTH"), + L2EngineJWTSecret = cli.StringFlag{ + Name: "l2.jwt-secret", + Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.", + EnvVar: prefixEnvVar("L2_ENGINE_AUTH"), + Required: false, + Value: "", + Destination: new(string), } SequencingEnabledFlag = cli.BoolFlag{ Name: "sequencing.enabled", @@ -91,7 +91,7 @@ var ( var requiredFlags = []cli.Flag{ L1NodeAddr, - L2EngineAddrs, + L2EngineAddr, RollupConfig, RPCListenAddr, RPCListenPort, diff --git a/op-node/node/client.go b/op-node/node/client.go index 106e0a726ab67..f5b7e3d1f8686 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -10,9 +10,9 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -type L2EndpointsSetup interface { +type L2EndpointSetup interface { // Setup a RPC client to a L2 execution engine to process rollup blocks with. - Setup(ctx context.Context, log log.Logger) (cl []*rpc.Client, err error) + Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, err error) Check() error } @@ -21,62 +21,53 @@ type L1EndpointSetup interface { Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) } -type L2EndpointsConfig struct { - L2EngineAddrs []string // Addresses of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required) +type L2EndpointConfig struct { + L2EngineAddr string // Address of L2 Engine JSON-RPC endpoint to use (engine and eth namespace required) - // JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication, one per L2 engine. + // JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication. // Any value for an IPC connection. - L2EngineJWTSecrets [][32]byte + L2EngineJWTSecret [32]byte } -var _ L2EndpointsSetup = (*L2EndpointsConfig)(nil) +var _ L2EndpointSetup = (*L2EndpointConfig)(nil) -func (cfg *L2EndpointsConfig) Check() error { - if len(cfg.L2EngineAddrs) == 0 { - return errors.New("need at least one L2 engine to connect to") - } - if len(cfg.L2EngineAddrs) != len(cfg.L2EngineJWTSecrets) { - return fmt.Errorf("have %d L2 engines, but %d authentication secrets", len(cfg.L2EngineAddrs), len(cfg.L2EngineJWTSecrets)) +func (cfg *L2EndpointConfig) Check() error { + if cfg.L2EngineAddr == "" { + return errors.New("empty L2 Engine Address") } + return nil } -func (cfg *L2EndpointsConfig) Setup(ctx context.Context, log log.Logger) ([]*rpc.Client, error) { +func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) { if err := cfg.Check(); err != nil { return nil, err } - var out []*rpc.Client - for i, addr := range cfg.L2EngineAddrs { - auth := rpc.NewJWTAuthProvider(cfg.L2EngineJWTSecrets[i]) - l2Node, err := dialRPCClientWithBackoff(ctx, log, addr, auth) - if err != nil { - // close clients again if we cannot complete the full setup - for _, cl := range out { - cl.Close() - } - return out, err - } - out = append(out, l2Node) + auth := rpc.NewJWTAuthProvider(cfg.L2EngineJWTSecret) + l2Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L2EngineAddr, auth) + if err != nil { + return nil, err } - return out, nil + + return l2Node, nil } // PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines type PreparedL2Endpoints struct { - Clients []*rpc.Client + Client *rpc.Client } func (p *PreparedL2Endpoints) Check() error { - if len(p.Clients) == 0 { - return errors.New("need at least one L2 engine to connect to") + if p.Client == nil { + return errors.New("client cannot be nil") } return nil } -var _ L2EndpointsSetup = (*PreparedL2Endpoints)(nil) +var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil) -func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) ([]*rpc.Client, error) { - return p.Clients, nil +func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) { + return p.Client, nil } type L1EndpointConfig struct { diff --git a/op-node/node/config.go b/op-node/node/config.go index 3cf4c0c9ea4e6..bf8f986b9458c 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -9,8 +9,8 @@ import ( ) type Config struct { - L1 L1EndpointSetup - L2s L2EndpointsSetup + L1 L1EndpointSetup + L2 L2EndpointSetup Rollup rollup.Config @@ -36,7 +36,7 @@ type RPCConfig struct { // Check verifies that the given configuration makes sense func (cfg *Config) Check() error { - if err := cfg.L2s.Check(); err != nil { + if err := cfg.L2.Check(); err != nil { return fmt.Errorf("l2 endpoint config error: %v", err) } if err := cfg.Rollup.Check(); err != nil { diff --git a/op-node/node/node.go b/op-node/node/node.go index 9eddb39da68a7..ebb9ccc23d672 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -2,9 +2,7 @@ package node import ( "context" - "errors" "fmt" - "sync" "time" "github.com/libp2p/go-libp2p-core/peer" @@ -29,9 +27,8 @@ type OpNode struct { appVersion string l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error) l1Source *l1.Source // Source to fetch data from (also implements the Downloader interface) - l2Lock sync.Mutex // Mutex to safely add and use different L2 resources in parallel - l2Engines []*driver.Driver // engines to keep synced - l2Nodes []*rpc.Client // L2 Execution Engines to close at shutdown + l2Engine *driver.Driver // L2 Engine to Sync + l2Node *rpc.Client // L2 Execution Engine RPC connections to close at shutdown server *rpcServer // RPC server hosting the rollup-node API p2pNode *p2p.NodeP2P // P2P node functionality p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer @@ -76,7 +73,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) if err := n.initL1(ctx, cfg); err != nil { return err } - if err := n.initL2s(ctx, cfg, snapshotLog); err != nil { + if err := n.initL2(ctx, cfg, snapshotLog); err != nil { return err } if err := n.initP2PSigner(ctx, cfg); err != nil { @@ -129,48 +126,26 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { return nil } -// AttachEngine attaches an engine to the rollup node. -func (n *OpNode) AttachEngine(ctx context.Context, cfg *Config, tag string, cl *rpc.Client, snapshotLog log.Logger) error { - n.l2Lock.Lock() - defer n.l2Lock.Unlock() - - engLog := n.log.New("engine", tag) - - client, err := l2.NewSource(cl, &cfg.Rollup.Genesis, engLog) +func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error { + rpcClient, err := cfg.L2.Setup(ctx, n.log) + if err != nil { + return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err) + } + n.l2Node = rpcClient + client, err := l2.NewSource(rpcClient, &cfg.Rollup.Genesis, n.log) if err != nil { - cl.Close() return err } - snap := snapshotLog.New("engine_addr", tag) - engine := driver.NewDriver(cfg.Rollup, client, n.l1Source, n, engLog, snap, cfg.Sequencer) - - n.l2Nodes = append(n.l2Nodes, cl) - n.l2Engines = append(n.l2Engines, engine) - return nil -} + snap := snapshotLog.New() + n.l2Engine = driver.NewDriver(cfg.Rollup, client, n.l1Source, n, n.log, snap, cfg.Sequencer) -func (n *OpNode) initL2s(ctx context.Context, cfg *Config, snapshotLog log.Logger) error { - clients, err := cfg.L2s.Setup(ctx, n.log) - if err != nil { - return fmt.Errorf("failed to setup L2 execution-engine RPC client(s): %v", err) - } - for i, cl := range clients { - if err := n.AttachEngine(ctx, cfg, fmt.Sprintf("eng_%d", i), cl, snapshotLog); err != nil { - return fmt.Errorf("failed to attach configured engine %d: %v", i, err) - } - } return nil } func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error { - if len(n.l2Nodes) == 0 { - return errors.New("need at least one L2 node to serve rollup RPC") - } - l2Node := n.l2Nodes[0] - // TODO: attach the p2p node ID to the snapshot logger - client, err := l2.NewReadOnlySource(l2Node, &cfg.Rollup.Genesis, n.log) + client, err := l2.NewReadOnlySource(n.l2Node, &cfg.Rollup.Genesis, n.log) if err != nil { return err } @@ -214,38 +189,30 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error { } func (n *OpNode) Start(ctx context.Context) error { - n.log.Info("Starting execution engine driver(s)") - for _, eng := range n.l2Engines { - // Request initial head update, default to genesis otherwise - reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10) - // start driving engine: sync blocks by deriving them from L1 and driving them into the engine - err := eng.Start(reqCtx) - reqCancel() - if err != nil { - n.log.Error("Could not start a rollup node", "err", err) - return err - } + n.log.Info("Starting execution engine driver") + // Request initial head update, default to genesis otherwise + reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10) + // start driving engine: sync blocks by deriving them from L1 and driving them into the engine + err := n.l2Engine.Start(reqCtx) + reqCancel() + if err != nil { + n.log.Error("Could not start a rollup node", "err", err) + return err } return nil } func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) { - n.l2Lock.Lock() - defer n.l2Lock.Unlock() - n.tracer.OnNewL1Head(ctx, sig) - // fan-out to all engine drivers - for _, eng := range n.l2Engines { - go func(eng *driver.Driver) { - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - if err := eng.OnL1Head(ctx, sig); err != nil { - n.log.Warn("failed to notify engine driver of L1 head change", "err", err) - } - }(eng) + // Pass on the event to the L2 Engine + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + if err := n.l2Engine.OnL1Head(ctx, sig); err != nil { + n.log.Warn("failed to notify engine driver of L1 head change", "err", err) } + } func (n *OpNode) PublishL2Payload(ctx context.Context, payload *l2.ExecutionPayload) error { @@ -264,9 +231,6 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, payload *l2.ExecutionPayl } func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l2.ExecutionPayload) error { - n.l2Lock.Lock() - defer n.l2Lock.Unlock() - // ignore if it's from ourselves if n.p2pNode != nil && from == n.p2pNode.Host().ID() { return nil @@ -276,16 +240,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l n.log.Info("Received signed execution payload from p2p", "id", payload.ID(), "peer", from) - // fan-out to all engine drivers - for _, eng := range n.l2Engines { - go func(eng *driver.Driver) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - if err := eng.OnUnsafeL2Payload(ctx, payload); err != nil { - n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID()) - } - }(eng) + // Pass on the event to the L2 Engine + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + if err := n.l2Engine.OnUnsafeL2Payload(ctx, payload); err != nil { + n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID()) } + return nil } @@ -302,12 +263,12 @@ func (n *OpNode) Close() error { } if n.p2pNode != nil { if err := n.p2pNode.Close(); err != nil { - result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %v", err)) + result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err)) } } if n.p2pSigner != nil { if err := n.p2pSigner.Close(); err != nil { - result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %v", err)) + result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %w", err)) } } @@ -320,16 +281,18 @@ func (n *OpNode) Close() error { n.l1HeadsSub.Unsubscribe() } - // close L2 engines - for _, eng := range n.l2Engines { - if err := eng.Close(); err != nil { - result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %v", err)) + // close L2 engine + if n.l2Engine != nil { + if err := n.l2Engine.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err)) } } - // close L2 nodes - for _, n := range n.l2Nodes { - n.Close() + + // close L2 node + if n.l2Node != nil { + n.l2Node.Close() } + // close L1 data source if n.l1Source != nil { n.l1Source.Close() diff --git a/op-node/service.go b/op-node/service.go index 6aff510dab91f..1b668670d640b 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -44,14 +44,14 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { return nil, fmt.Errorf("failed to load l1 endpoint info: %v", err) } - l2Endpoints, err := NewL2EndpointsConfig(ctx, log) + l2Endpoint, err := NewL2EndpointConfig(ctx, log) if err != nil { return nil, fmt.Errorf("failed to load l2 endpoints info: %v", err) } cfg := &node.Config{ L1: l1Endpoint, - L2s: l2Endpoints, + L2: l2Endpoint, Rollup: *rollupConfig, Sequencer: enableSequencing, RPC: node.RPCConfig{ @@ -74,38 +74,33 @@ func NewL1EndpointConfig(ctx *cli.Context) (*node.L1EndpointConfig, error) { }, nil } -func NewL2EndpointsConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointsConfig, error) { - l2Addrs := ctx.GlobalStringSlice(flags.L2EngineAddrs.Name) - engineJWTSecrets := ctx.GlobalStringSlice(flags.L2EngineJWTSecret.Name) - var secrets [][32]byte - for i, fileName := range engineJWTSecrets { - fileName = strings.TrimSpace(fileName) - if fileName == "" { - return nil, fmt.Errorf("file-name of jwt secret %d is empty", i) +func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConfig, error) { + l2Addr := ctx.GlobalString(flags.L2EngineAddr.Name) + fileName := ctx.GlobalString(flags.L2EngineJWTSecret.Name) + var secret [32]byte + fileName = strings.TrimSpace(fileName) + if fileName == "" { + return nil, fmt.Errorf("file-name of jwt secret is empty") + } + if data, err := os.ReadFile(fileName); err == nil { + jwtSecret := common.FromHex(strings.TrimSpace(string(data))) + if len(jwtSecret) != 32 { + return nil, fmt.Errorf("invalid jwt secret in path %s, not 32 hex-formatted bytes", fileName) + } + copy(secret[:], jwtSecret) + } else { + log.Warn("Failed to read JWT secret from file, generating a new one now. Configure L2 geth with --authrpc.jwt-secret=" + fmt.Sprintf("%q", fileName)) + if _, err := io.ReadFull(rand.Reader, secret[:]); err != nil { + return nil, fmt.Errorf("failed to generate jwt secret: %v", err) } - if data, err := os.ReadFile(fileName); err == nil { - jwtSecret := common.FromHex(strings.TrimSpace(string(data))) - if len(jwtSecret) != 32 { - return nil, fmt.Errorf("invalid jwt secret in path %s, not 32 hex-formatted bytes", fileName) - } - var secret [32]byte - copy(secret[:], jwtSecret) - secrets = append(secrets, secret) - } else { - log.Warn("Failed to read JWT secret from file, generating a new one now. Configure L2 geth with --authrpc.jwt-secret=" + fmt.Sprintf("%q", fileName)) - var secret [32]byte - if _, err := io.ReadFull(rand.Reader, secret[:]); err != nil { - return nil, fmt.Errorf("failed to generate jwt secret: %v", err) - } - secrets = append(secrets, secret) - if err := os.WriteFile(fileName, []byte(hexutil.Encode(secret[:])), 0600); err != nil { - return nil, err - } + if err := os.WriteFile(fileName, []byte(hexutil.Encode(secret[:])), 0600); err != nil { + return nil, err } } - return &node.L2EndpointsConfig{ - L2EngineAddrs: l2Addrs, - L2EngineJWTSecrets: secrets, + + return &node.L2EndpointConfig{ + L2EngineAddr: l2Addr, + L2EngineJWTSecret: secret, }, nil }