diff --git a/op-devstack/README.md b/op-devstack/README.md index 119e6b385b78c..811be91edc98e 100644 --- a/op-devstack/README.md +++ b/op-devstack/README.md @@ -151,4 +151,13 @@ The following environment variables can be used to configure devstack: - `DEVSTACK_ORCHESTRATOR`: Configures the preferred orchestrator kind (see Orchestrator interface section above). - `DEVSTACK_KEYS_SALT`: Seeds the keys generated with `NewHDWallet`. This is useful for "isolating" test runs, and might be needed to reproduce CI and/or acceptance test runs. It can be any string, including the empty one to use the "usual" devkeys. - `DEVNET_ENV_URL`: Used when `DEVSTACK_ORCHESTRATOR=sysext` to specify the network descriptor URL. -- `DEVNET_EXPECT_PRECONDITIONS_MET`: This can be set of force test failures when their pre-conditions are not met, which would otherwise result in them being skipped. This is helpful in particular for runs that do intend to run specific tests (as opposed to whatever is available). `op-acceptor` does set that variable, for example. \ No newline at end of file +- `DEVNET_EXPECT_PRECONDITIONS_MET`: This can be set of force test failures when their pre-conditions are not met, which would otherwise result in them being skipped. This is helpful in particular for runs that do intend to run specific tests (as opposed to whatever is available). `op-acceptor` does set that variable, for example. + +Rust stack env vars: +- `DEVSTACK_L2CL_KIND=kona` to select kona as default L2 CL node +- `DEVSTACK_L2EL_KIND=op-reth` to select op-reth as default L2 EL node +- `KONA_NODE_EXEC_PATH=/home/USERHERE/projects/kona/target/debug/kona-node` to select the kona-node executable to run +- `OP_RETH_EXEC_PATH=/home/USERHERE/projects/reth/target/release/op-reth` to select the op-reth executable to run + +Other useful env vars: +- `DISABLE_OP_E2E_LEGACY=true` to disable the op-e2e package from loading build-artifacts that are not used by devstack. diff --git a/op-devstack/presets/cl_config.go b/op-devstack/presets/cl_config.go index 62dc7a05b36d2..d56cbe4d48f6c 100644 --- a/op-devstack/presets/cl_config.go +++ b/op-devstack/presets/cl_config.go @@ -4,31 +4,30 @@ import ( "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/stack" "github.com/ethereum-optimism/optimism/op-devstack/sysgo" - "github.com/ethereum-optimism/optimism/op-node/config" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" ) func WithExecutionLayerSyncOnVerifiers() stack.CommonOption { return stack.MakeCommon( - sysgo.WithL2CLOption(func(_ devtest.P, id stack.L2CLNodeID, cfg *config.Config) { - // Can't enable ELSync on the sequencer or it will never start sequencing because - // ELSync needs to receive gossip from the sequencer to drive the sync - if !cfg.Driver.SequencerEnabled { - cfg.Sync.SyncMode = sync.ELSync - } - })) + sysgo.WithGlobalL2CLOption(sysgo.L2CLOptionFn( + func(_ devtest.P, id stack.L2CLNodeID, cfg *sysgo.L2CLConfig) { + cfg.VerifierSyncMode = sync.ELSync + }))) } func WithConsensusLayerSync() stack.CommonOption { return stack.MakeCommon( - sysgo.WithL2CLOption(func(_ devtest.P, id stack.L2CLNodeID, cfg *config.Config) { - cfg.Sync.SyncMode = sync.CLSync - })) + sysgo.WithGlobalL2CLOption(sysgo.L2CLOptionFn( + func(_ devtest.P, id stack.L2CLNodeID, cfg *sysgo.L2CLConfig) { + cfg.SequencerSyncMode = sync.CLSync + cfg.VerifierSyncMode = sync.CLSync + }))) } func WithSafeDBEnabled() stack.CommonOption { return stack.MakeCommon( - sysgo.WithL2CLOption(func(p devtest.P, _ stack.L2CLNodeID, cfg *config.Config) { - cfg.SafeDBPath = p.TempDir() - })) + sysgo.WithGlobalL2CLOption(sysgo.L2CLOptionFn( + func(p devtest.P, id stack.L2CLNodeID, cfg *sysgo.L2CLConfig) { + cfg.SafeDBPath = p.TempDir() + }))) } diff --git a/op-devstack/stack/match/labels.go b/op-devstack/stack/match/labels.go index ad2b217cfc04d..97ee2c644c375 100644 --- a/op-devstack/stack/match/labels.go +++ b/op-devstack/stack/match/labels.go @@ -17,19 +17,21 @@ const ( LabelVendor = "vendor" ) -type L2ELVendor string +type Vendor string const ( - OpReth L2ELVendor = "op-reth" - OpGeth L2ELVendor = "op-geth" - Proxyd L2ELVendor = "proxyd" - FlashblocksWebsocketProxy L2ELVendor = "flashblocks-websocket-proxy" + OpReth Vendor = "op-reth" + OpGeth Vendor = "op-geth" + Proxyd Vendor = "proxyd" + FlashblocksWebsocketProxy Vendor = "flashblocks-websocket-proxy" + OpNode Vendor = "op-node" + KonaNode Vendor = "kona-node" ) -func (v L2ELVendor) Match(elems []stack.L2ELNode) []stack.L2ELNode { +func (v Vendor) Match(elems []stack.L2ELNode) []stack.L2ELNode { return WithLabel[stack.L2ELNodeID, stack.L2ELNode](LabelVendor, string(v)).Match(elems) } -func (v L2ELVendor) String() string { +func (v Vendor) String() string { return string(v) } diff --git a/op-devstack/sysgo/control_plane_test.go b/op-devstack/sysgo/control_plane_test.go index 4a911ca411503..6c601d4ffe382 100644 --- a/op-devstack/sysgo/control_plane_test.go +++ b/op-devstack/sysgo/control_plane_test.go @@ -118,14 +118,16 @@ func testL2CLRestart(ids DefaultInteropSystemIDs, system stack.System, control s // stop L2CL control.L2CLNodeState(ids.L2ACL, stack.Stop) - // L2CL API will not work since L2CL stopped + // L2CL API will still kind of work, it is not functioning, + // but since L2CL is behind a proxy, the proxy is still online, and may create a different error. + // The dial will be accepted, and the connection then closed, once the connection behind the proxy fails. { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) _, err := retry.Do[*eth.SyncStatus](ctx, 10, retry.Fixed(time.Millisecond*500), func() (*eth.SyncStatus, error) { return seqA.RollupAPI().SyncStatus(ctx) }) cancel() - require.Error(t, err) + require.Error(t, err, "should not be able to get sync-status when node behind proxy is offline") } // restart L2CL diff --git a/op-devstack/sysgo/faucet.go b/op-devstack/sysgo/faucet.go index c9b630c4d0180..38d11185121da 100644 --- a/op-devstack/sysgo/faucet.go +++ b/op-devstack/sysgo/faucet.go @@ -86,7 +86,7 @@ func WithFaucets(l1ELs []stack.L1ELNodeID, l2ELs []stack.L2ELNodeID) stack.Optio require.True(ok, "need L2 EL for faucet", elID) faucets[id] = &fconf.FaucetEntry{ - ELRPC: endpoint.MustRPC{Value: endpoint.URL(el.userRPC)}, + ELRPC: endpoint.MustRPC{Value: endpoint.URL(el.UserRPC())}, ChainID: elID.ChainID(), TxCfg: fconf.TxManagerConfig{ PrivateKey: funderKeyStr, diff --git a/op-devstack/sysgo/l2_batcher.go b/op-devstack/sysgo/l2_batcher.go index e5a11684d86ca..e4aaf5e35d514 100644 --- a/op-devstack/sysgo/l2_batcher.go +++ b/op-devstack/sysgo/l2_batcher.go @@ -87,8 +87,8 @@ func WithBatcher(batcherID stack.L2BatcherID, l1ELID stack.L1ELNodeID, l2CLID st batcherCLIConfig := &bss.CLIConfig{ L1EthRpc: l1EL.userRPC, - L2EthRpc: []string{l2EL.userRPC}, - RollupRpc: []string{l2CL.userRPC}, + L2EthRpc: []string{l2EL.UserRPC()}, + RollupRpc: []string{l2CL.UserRPC()}, MaxPendingTransactions: 1, MaxChannelDuration: 1, MaxL1TxSize: 120_000, @@ -133,8 +133,8 @@ func WithBatcher(batcherID stack.L2BatcherID, l1ELID stack.L1ELNodeID, l2CLID st service: batcher, rpc: batcher.HTTPEndpoint(), l1RPC: l1EL.userRPC, - l2CLRPC: l2CL.userRPC, - l2ELRPC: l2EL.userRPC, + l2CLRPC: l2CL.UserRPC(), + l2ELRPC: l2EL.UserRPC(), } orch.batchers.Set(batcherID, b) }) diff --git a/op-devstack/sysgo/l2_challenger.go b/op-devstack/sysgo/l2_challenger.go index db112b8b92f7e..815bd8115515c 100644 --- a/op-devstack/sysgo/l2_challenger.go +++ b/op-devstack/sysgo/l2_challenger.go @@ -113,7 +113,7 @@ func WithL2ChallengerPostDeploy(orch *Orchestrator, challengerID stack.L2Challen for i, l2ELID := range l2ELIDs { l2EL, ok := orch.l2ELs.Get(l2ELID) require.True(ok) - l2ELRPCs[i] = l2EL.userRPC + l2ELRPCs[i] = l2EL.UserRPC() } cluster, ok := orch.clusters.Get(*clusterID) require.True(ok) @@ -143,7 +143,7 @@ func WithL2ChallengerPostDeploy(orch *Orchestrator, challengerID stack.L2Challen l2EL, ok := orch.l2ELs.Get(l2ELID) require.True(ok) prestateVariant := shared.MTCannonVariant - cfg, err = shared.NewPreInteropChallengerConfig(dir, l1EL.userRPC, l1CL.beaconHTTPAddr, l2CL.userRPC, l2EL.userRPC, + cfg, err = shared.NewPreInteropChallengerConfig(dir, l1EL.userRPC, l1CL.beaconHTTPAddr, l2CL.UserRPC(), l2EL.UserRPC(), shared.WithFactoryAddress(disputeGameFactoryAddr), shared.WithPrivKey(challengerSecret), shared.WithCannonConfig(rollupCfgs, l2Geneses, prestateVariant), diff --git a/op-devstack/sysgo/l2_cl.go b/op-devstack/sysgo/l2_cl.go index 54b42a62ef402..a7f3a4366dcb9 100644 --- a/op-devstack/sysgo/l2_cl.go +++ b/op-devstack/sysgo/l2_cl.go @@ -1,395 +1,95 @@ package sysgo import ( - "context" - "encoding/hex" - "flag" - "fmt" - "sync" - "time" + "os" - "github.com/ethereum-optimism/optimism/op-service/testutils/tcpproxy" - - altda "github.com/ethereum-optimism/optimism/op-alt-da" - "github.com/ethereum-optimism/optimism/op-chain-ops/devkeys" "github.com/ethereum-optimism/optimism/op-devstack/devtest" - "github.com/ethereum-optimism/optimism/op-devstack/shim" "github.com/ethereum-optimism/optimism/op-devstack/stack" - "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/opnode" - "github.com/ethereum-optimism/optimism/op-node/config" - opNodeFlags "github.com/ethereum-optimism/optimism/op-node/flags" - "github.com/ethereum-optimism/optimism/op-node/p2p" - p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli" - "github.com/ethereum-optimism/optimism/op-node/rollup/driver" - "github.com/ethereum-optimism/optimism/op-node/rollup/interop" nodeSync "github.com/ethereum-optimism/optimism/op-node/rollup/sync" - "github.com/ethereum-optimism/optimism/op-service/apis" - "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/eth" - opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" - "github.com/ethereum-optimism/optimism/op-service/oppprof" - "github.com/ethereum-optimism/optimism/op-service/retry" - oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" - "github.com/ethereum-optimism/optimism/op-service/sources" - "github.com/ethereum-optimism/optimism/op-service/testreq" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - "github.com/urfave/cli/v2" ) -type L2CLNode struct { - mu sync.Mutex - - id stack.L2CLNodeID - opNode *opnode.Opnode - userRPC string - interopEndpoint string - interopJwtSecret eth.Bytes32 - cfg *config.Config - p devtest.P - logger log.Logger - el stack.L2ELNodeID - userProxy *tcpproxy.Proxy - interopProxy *tcpproxy.Proxy +type L2CLNode interface { + hydrate(system stack.ExtensibleSystem) + stack.Lifecycle + UserRPC() string + InteropRPC() (endpoint string, jwtSecret eth.Bytes32) } -var _ stack.Lifecycle = (*L2CLNode)(nil) +type L2CLConfig struct { + // SyncMode to run, if this is a sequencer + SequencerSyncMode nodeSync.Mode + // SyncMode to run, if this is a verifier + VerifierSyncMode nodeSync.Mode -func (n *L2CLNode) hydrate(system stack.ExtensibleSystem) { - require := system.T().Require() - rpcCl, err := client.NewRPC(system.T().Ctx(), system.Logger(), n.userRPC, client.WithLazyDial()) - require.NoError(err) - system.T().Cleanup(rpcCl.Close) + // SafeDBPath is the path to the safe DB to use. Disabled if empty. + SafeDBPath string - sysL2CL := shim.NewL2CLNode(shim.L2CLNodeConfig{ - CommonConfig: shim.NewCommonConfig(system.T()), - ID: n.id, - Client: rpcCl, - InteropEndpoint: n.interopEndpoint, - InteropJwtSecret: n.interopJwtSecret, - }) - l2Net := system.L2Network(stack.L2NetworkID(n.id.ChainID())) - l2Net.(stack.ExtensibleL2Network).AddL2CLNode(sysL2CL) - sysL2CL.(stack.LinkableL2CLNode).LinkEL(l2Net.L2ELNode(n.el)) + IsSequencer bool + IndexingMode bool } -func (n *L2CLNode) Start() { - n.mu.Lock() - defer n.mu.Unlock() - if n.opNode != nil { - n.logger.Warn("Op-node already started") - return - } - - if n.userProxy == nil { - n.userProxy = tcpproxy.New(n.logger.New("proxy", "l2cl-user")) - n.p.Require().NoError(n.userProxy.Start()) - n.p.Cleanup(func() { - n.userProxy.Close() - }) - n.userRPC = "http://" + n.userProxy.Addr() - } - - if n.interopProxy == nil { - n.interopProxy = tcpproxy.New(n.logger.New("proxy", "l2cl-interop")) - n.p.Require().NoError(n.interopProxy.Start()) - n.p.Cleanup(func() { - n.interopProxy.Close() - }) - n.interopEndpoint = "ws://" + n.interopProxy.Addr() - } +func L2CLSequencer() L2CLOption { + return L2CLOptionFn(func(p devtest.P, id stack.L2CLNodeID, cfg *L2CLConfig) { + cfg.IsSequencer = true + }) +} - n.logger.Info("Starting op-node") - opNode, err := opnode.NewOpnode(n.logger, n.cfg, func(err error) { - n.p.Require().NoError(err, "op-node critical error") +func L2CLIndexing() L2CLOption { + return L2CLOptionFn(func(p devtest.P, id stack.L2CLNodeID, cfg *L2CLConfig) { + cfg.IndexingMode = true }) - n.p.Require().NoError(err, "op-node failed to start") - n.logger.Info("Started op-node") - n.opNode = opNode - n.userProxy.SetUpstream(ProxyAddr(n.p.Require(), opNode.UserRPC().RPC())) - interopEndpoint, interopJwtSecret := opNode.InteropRPC() - n.interopProxy.SetUpstream(ProxyAddr(n.p.Require(), interopEndpoint)) - n.interopJwtSecret = interopJwtSecret } -func (n *L2CLNode) Stop() { - n.mu.Lock() - defer n.mu.Unlock() - if n.opNode == nil { - n.logger.Warn("Op-node already stopped") - return +func DefaultL2CLConfig() *L2CLConfig { + return &L2CLConfig{ + SequencerSyncMode: nodeSync.CLSync, + VerifierSyncMode: nodeSync.CLSync, + SafeDBPath: "", + IsSequencer: false, + IndexingMode: false, } - ctx, cancel := context.WithCancel(context.Background()) - cancel() // force-quit - n.logger.Info("Closing op-node") - closeErr := n.opNode.Stop(ctx) - n.logger.Info("Closed op-node", "err", closeErr) - - n.opNode = nil } -func (n *L2CLNode) InteropRPC() (string, eth.Bytes32) { - n.mu.Lock() - defer n.mu.Unlock() - return n.interopEndpoint, n.interopJwtSecret +type L2CLOption interface { + Apply(p devtest.P, id stack.L2CLNodeID, cfg *L2CLConfig) } -type L2CLOption func(p devtest.P, id stack.L2CLNodeID, cfg *config.Config) - -func WithL2CLOption(opt L2CLOption) stack.Option[*Orchestrator] { +// WithGlobalL2CLOption applies the L2CLOption to all L2CLNode instances in this orchestrator +func WithGlobalL2CLOption(opt L2CLOption) stack.Option[*Orchestrator] { return stack.BeforeDeploy(func(o *Orchestrator) { o.l2CLOptions = append(o.l2CLOptions, opt) }) } -func WithL2CLNode(l2CLID stack.L2CLNodeID, isSequencer bool, indexingMode bool, l1CLID stack.L1CLNodeID, l1ELID stack.L1ELNodeID, l2ELID stack.L2ELNodeID) stack.Option[*Orchestrator] { - return stack.AfterDeploy(func(orch *Orchestrator) { - p := orch.P().WithCtx(stack.ContextWithID(orch.P().Ctx(), l2CLID)) - - require := p.Require() - - l2Net, ok := orch.l2Nets.Get(l2CLID.ChainID()) - require.True(ok, "l2 network required") - - l1EL, ok := orch.l1ELs.Get(l1ELID) - require.True(ok, "l1 EL node required") - - l1CL, ok := orch.l1CLs.Get(l1CLID) - require.True(ok, "l1 CL node required") - - l2EL, ok := orch.l2ELs.Get(l2ELID) - require.True(ok, "l2 EL node required") - - var depSet depset.DependencySet - if cluster, ok := orch.ClusterForL2(l2ELID.ChainID()); ok { - depSet = cluster.DepSet() - } - - jwtPath, jwtSecret := orch.writeDefaultJWT() - - logger := p.Logger() - - var p2pSignerSetup p2p.SignerSetup - var p2pConfig *p2p.Config - // code block for P2P setup - { - // make a dummy flagset since p2p config initialization helpers only input cli context - fs := flag.NewFlagSet("", flag.ContinueOnError) - // use default flags - for _, f := range opNodeFlags.P2PFlags(opNodeFlags.EnvVarPrefix) { - require.NoError(f.Apply(fs)) - } - // mandatory P2P flags - require.NoError(fs.Set(opNodeFlags.AdvertiseIPName, "127.0.0.1")) - require.NoError(fs.Set(opNodeFlags.AdvertiseTCPPortName, "0")) - require.NoError(fs.Set(opNodeFlags.AdvertiseUDPPortName, "0")) - require.NoError(fs.Set(opNodeFlags.ListenIPName, "127.0.0.1")) - require.NoError(fs.Set(opNodeFlags.ListenTCPPortName, "0")) - require.NoError(fs.Set(opNodeFlags.ListenUDPPortName, "0")) - // avoid resource unavailable error by using memorydb - require.NoError(fs.Set(opNodeFlags.DiscoveryPathName, "memory")) - require.NoError(fs.Set(opNodeFlags.PeerstorePathName, "memory")) - // For peer ID - networkPrivKey, err := crypto.GenerateKey() - require.NoError(err) - networkPrivKeyHex := hex.EncodeToString(crypto.FromECDSA(networkPrivKey)) - require.NoError(fs.Set(opNodeFlags.P2PPrivRawName, networkPrivKeyHex)) - // Explicitly set to empty; do not default to resolving DNS of external bootnodes - require.NoError(fs.Set(opNodeFlags.BootnodesName, "")) - - cliCtx := cli.NewContext(&cli.App{}, fs, nil) - if isSequencer { - p2pKey, err := orch.keys.Secret(devkeys.SequencerP2PRole.Key(l2CLID.ChainID().ToBig())) - require.NoError(err, "need p2p key for sequencer") - p2pKeyHex := hex.EncodeToString(crypto.FromECDSA(p2pKey)) - require.NoError(fs.Set(opNodeFlags.SequencerP2PKeyName, p2pKeyHex)) - p2pSignerSetup, err = p2pcli.LoadSignerSetup(cliCtx, logger) - require.NoError(err, "failed to load p2p signer") - logger.Info("Sequencer key acquired") - } - p2pConfig, err = p2pcli.NewConfig(cliCtx, l2Net.rollupCfg.BlockTime) - require.NoError(err, "failed to load p2p config") - } - - // specify interop config, but do not configure anything, to disable indexing mode - interopCfg := &interop.Config{} +type L2CLOptionFn func(p devtest.P, id stack.L2CLNodeID, cfg *L2CLConfig) - if indexingMode { - interopCfg = &interop.Config{ - RPCAddr: "127.0.0.1", - // When L2CL starts, store its RPC port here - // given by the os, to reclaim when restart. - RPCPort: 0, - RPCJwtSecretPath: jwtPath, - } - } +var _ L2CLOption = L2CLOptionFn(nil) - nodeCfg := &config.Config{ - L1: &config.L1EndpointConfig{ - L1NodeAddr: l1EL.userRPC, - L1TrustRPC: false, - L1RPCKind: sources.RPCKindDebugGeth, - RateLimit: 0, - BatchSize: 20, - HttpPollInterval: time.Millisecond * 100, - MaxConcurrency: 10, - CacheSize: 0, // auto-adjust to sequence window - }, - L2: &config.L2EndpointConfig{ - L2EngineAddr: l2EL.authRPC, - L2EngineJWTSecret: jwtSecret, - }, - Beacon: &config.L1BeaconEndpointConfig{ - BeaconAddr: l1CL.beacon.BeaconAddr(), - }, - Driver: driver.Config{ - SequencerEnabled: isSequencer, - SequencerConfDepth: 2, - }, - Rollup: *l2Net.rollupCfg, - DependencySet: depSet, - P2PSigner: p2pSignerSetup, // nil when not sequencer - RPC: oprpc.CLIConfig{ - ListenAddr: "127.0.0.1", - // When L2CL starts, store its RPC port here - // given by the os, to reclaim when restart. - ListenPort: 0, - EnableAdmin: true, - }, - InteropConfig: interopCfg, - P2P: p2pConfig, - L1EpochPollInterval: time.Second * 2, - RuntimeConfigReloadInterval: 0, - Tracer: nil, - Sync: nodeSync.Config{ - SyncMode: nodeSync.CLSync, - SkipSyncStartCheck: false, - SupportsPostFinalizationELSync: false, - }, - ConfigPersistence: config.DisabledConfigPersistence{}, - Metrics: opmetrics.CLIConfig{}, - Pprof: oppprof.CLIConfig{}, - SafeDBPath: "", - RollupHalt: "", - Cancel: nil, - ConductorEnabled: false, - ConductorRpc: nil, - ConductorRpcTimeout: 0, - AltDA: altda.CLIConfig{}, - IgnoreMissingPectraBlobSchedule: false, - ExperimentalOPStackAPI: true, - } - for _, opt := range orch.l2CLOptions { - opt(orch.P(), l2CLID, nodeCfg) - } - l2CLNode := &L2CLNode{ - id: l2CLID, - cfg: nodeCfg, - logger: logger, - p: p, - el: l2ELID, - } - require.True(orch.l2CLs.SetIfMissing(l2CLID, l2CLNode), "must not already exist") - l2CLNode.Start() - p.Cleanup(l2CLNode.Stop) - }) +func (fn L2CLOptionFn) Apply(p devtest.P, id stack.L2CLNodeID, cfg *L2CLConfig) { + fn(p, id, cfg) } -func GetP2PClient(ctx context.Context, logger log.Logger, l2CLNode *L2CLNode) (*sources.P2PClient, error) { - rpcClient, err := client.NewRPC(ctx, logger, l2CLNode.userRPC, client.WithLazyDial()) - if err != nil { - return nil, fmt.Errorf("failed to initialize rpc client for p2p client: %w", err) - } - return sources.NewP2PClient(rpcClient), nil -} +// L2CLOptionBundle a list of multiple L2CLOption, to all be applied in order. +type L2CLOptionBundle []L2CLOption -func GetPeerInfo(ctx context.Context, p2pClient *sources.P2PClient) (*apis.PeerInfo, error) { - peerInfo, err := retry.Do(ctx, 3, retry.Exponential(), func() (*apis.PeerInfo, error) { - return p2pClient.Self(ctx) - }) - if err != nil { - return nil, fmt.Errorf("failed to get peer info: %w", err) - } - return peerInfo, nil -} +var _ L2CLOption = L2CLOptionBundle(nil) -func GetPeers(ctx context.Context, p2pClient *sources.P2PClient) (*apis.PeerDump, error) { - peerDump, err := retry.Do(ctx, 3, retry.Exponential(), func() (*apis.PeerDump, error) { - return p2pClient.Peers(ctx, true) - }) - if err != nil { - return nil, fmt.Errorf("failed to get peers: %w", err) +func (l L2CLOptionBundle) Apply(p devtest.P, id stack.L2CLNodeID, cfg *L2CLConfig) { + for _, opt := range l { + p.Require().NotNil(opt, "cannot Apply nil L2CLOption") + opt.Apply(p, id, cfg) } - return peerDump, nil -} - -type p2pClientsAndPeers struct { - client1 *sources.P2PClient - client2 *sources.P2PClient - peerInfo1 *apis.PeerInfo - peerInfo2 *apis.PeerInfo } -func getP2PClientsAndPeers(ctx context.Context, logger log.Logger, require *testreq.Assertions, l2CL1, l2CL2 *L2CLNode) *p2pClientsAndPeers { - p2pClient1, err := GetP2PClient(ctx, logger, l2CL1) - require.NoError(err) - p2pClient2, err := GetP2PClient(ctx, logger, l2CL2) - require.NoError(err) - - peerInfo1, err := GetPeerInfo(ctx, p2pClient1) - require.NoError(err) - peerInfo2, err := GetPeerInfo(ctx, p2pClient2) - require.NoError(err) - - require.True(len(peerInfo1.Addresses) > 0 && len(peerInfo2.Addresses) > 0, "malformed peer info") - - return &p2pClientsAndPeers{ - client1: p2pClient1, - client2: p2pClient2, - peerInfo1: peerInfo1, - peerInfo2: peerInfo2, +// WithL2CLNode adds the default type of L2 CL node. +// The default can be configured with DEVSTACK_L2CL_KIND. +// Tests that depend on specific types can use options like WithKonaNode and WithOpNode directly. +func WithL2CLNode(l2CLID stack.L2CLNodeID, l1CLID stack.L1CLNodeID, l1ELID stack.L1ELNodeID, l2ELID stack.L2ELNodeID, opts ...L2CLOption) stack.Option[*Orchestrator] { + switch os.Getenv("DEVSTACK_L2CL_KIND") { + case "kona": + return WithKonaNode(l2CLID, l1CLID, l1ELID, l2ELID, opts...) + default: + return WithOpNode(l2CLID, l1CLID, l1ELID, l2ELID, opts...) } } - -// WithL2CLP2PConnection connects P2P between two L2CLs -func WithL2CLP2PConnection(l2CL1ID, l2CL2ID stack.L2CLNodeID) stack.Option[*Orchestrator] { - return stack.AfterDeploy(func(orch *Orchestrator) { - require := orch.P().Require() - - l2CL1, ok := orch.l2CLs.Get(l2CL1ID) - require.True(ok, "looking for L2 CL node 1 to connect p2p") - l2CL2, ok := orch.l2CLs.Get(l2CL2ID) - require.True(ok, "looking for L2 CL node 2 to connect p2p") - require.Equal(l2CL1.cfg.Rollup.L2ChainID, l2CL2.cfg.Rollup.L2ChainID, "must be same l2 chain") - - ctx := orch.P().Ctx() - logger := orch.P().Logger() - - p := getP2PClientsAndPeers(ctx, logger, require, l2CL1, l2CL2) - - connectPeer := func(p2pClient *sources.P2PClient, multiAddress string) { - err := retry.Do0(ctx, 6, retry.Exponential(), func() error { - return p2pClient.ConnectPeer(ctx, multiAddress) - }) - require.NoError(err, "failed to connect peer") - } - - connectPeer(p.client1, p.peerInfo2.Addresses[0]) - connectPeer(p.client2, p.peerInfo1.Addresses[0]) - - check := func(peerDump *apis.PeerDump, peerInfo *apis.PeerInfo) { - multiAddress := peerInfo.PeerID.String() - _, ok := peerDump.Peers[multiAddress] - require.True(ok, "peer register invalid") - } - - peerDump1, err := GetPeers(ctx, p.client1) - require.NoError(err) - peerDump2, err := GetPeers(ctx, p.client2) - require.NoError(err) - - check(peerDump1, p.peerInfo2) - check(peerDump2, p.peerInfo1) - }) -} diff --git a/op-devstack/sysgo/l2_cl_kona.go b/op-devstack/sysgo/l2_cl_kona.go new file mode 100644 index 0000000000000..5028e1a51933a --- /dev/null +++ b/op-devstack/sysgo/l2_cl_kona.go @@ -0,0 +1,231 @@ +package sysgo + +import ( + "encoding/hex" + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/crypto" + + "github.com/ethereum-optimism/optimism/op-chain-ops/devkeys" + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/shim" + "github.com/ethereum-optimism/optimism/op-devstack/stack" + "github.com/ethereum-optimism/optimism/op-devstack/stack/match" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/logpipe" + "github.com/ethereum-optimism/optimism/op-service/tasks" + "github.com/ethereum-optimism/optimism/op-service/testutils/tcpproxy" +) + +type KonaNode struct { + mu sync.Mutex + + id stack.L2CLNodeID + + userRPC string + interopEndpoint string // warning: currently not fully supported + interopJwtSecret eth.Bytes32 + el stack.L2ELNodeID + + userProxy *tcpproxy.Proxy + + execPath string + args []string + // Each entry is of the form "key=value". + env []string + + p devtest.P + + sub *SubProcess +} + +func (k *KonaNode) hydrate(system stack.ExtensibleSystem) { + require := system.T().Require() + rpcCl, err := client.NewRPC(system.T().Ctx(), system.Logger(), k.userRPC, client.WithLazyDial()) + require.NoError(err) + system.T().Cleanup(rpcCl.Close) + + sysL2CL := shim.NewL2CLNode(shim.L2CLNodeConfig{ + CommonConfig: shim.NewCommonConfig(system.T()), + ID: k.id, + Client: rpcCl, + InteropEndpoint: k.interopEndpoint, + InteropJwtSecret: k.interopJwtSecret, + }) + sysL2CL.SetLabel(match.LabelVendor, string(match.KonaNode)) + l2Net := system.L2Network(stack.L2NetworkID(k.id.ChainID())) + l2Net.(stack.ExtensibleL2Network).AddL2CLNode(sysL2CL) + sysL2CL.(stack.LinkableL2CLNode).LinkEL(l2Net.L2ELNode(k.el)) +} + +func (k *KonaNode) Start() { + k.mu.Lock() + defer k.mu.Unlock() + if k.sub != nil { + k.p.Logger().Warn("Kona-node already started") + return + } + // Create a proxy for the user RPC, + // so other services can connect, and stay connected, across restarts. + if k.userProxy == nil { + k.userProxy = tcpproxy.New(k.p.Logger()) + k.p.Require().NoError(k.userProxy.Start()) + k.p.Cleanup(func() { + k.userProxy.Close() + }) + k.userRPC = "http://" + k.userProxy.Addr() + } + // Create the sub-process. + // We pipe sub-process logs to the test-logger. + // And inspect them along the way, to get the RPC server address. + logOut := logpipe.ToLogger(k.p.Logger().New("src", "stdout")) + logErr := logpipe.ToLogger(k.p.Logger().New("src", "stderr")) + userRPC := make(chan string, 1) + onLogEntry := func(e logpipe.LogEntry) { + switch e.LogMessage() { + case "RPC server bound to address": + userRPC <- "http://" + e.FieldValue("addr").(string) + } + } + stdOutLogs := logpipe.LogProcessor(func(line []byte) { + e := logpipe.ParseRustStructuredLogs(line) + logOut(e) + onLogEntry(e) + }) + stdErrLogs := logpipe.LogProcessor(func(line []byte) { + e := logpipe.ParseRustStructuredLogs(line) + logErr(e) + }) + k.sub = NewSubProcess(k.p, stdOutLogs, stdErrLogs) + + err := k.sub.Start(k.execPath, k.args, k.env) + k.p.Require().NoError(err, "Must start") + + var userRPCAddr string + k.p.Require().NoError(tasks.Await(k.p.Ctx(), userRPC, &k.userRPC), "need user RPC") + + k.userProxy.SetUpstream(ProxyAddr(k.p.Require(), userRPCAddr)) +} + +// Stop stops the kona node. +// warning: no restarts supported yet, since the RPC port is not remembered. +func (k *KonaNode) Stop() { + k.mu.Lock() + defer k.mu.Unlock() + if k.sub == nil { + k.p.Logger().Warn("kona-node already stopped") + return + } + err := k.sub.Stop() + k.p.Require().NoError(err, "Must stop") + k.sub = nil +} + +func (k *KonaNode) UserRPC() string { + return k.userRPC +} + +func (k *KonaNode) InteropRPC() (endpoint string, jwtSecret eth.Bytes32) { + return k.interopEndpoint, k.interopJwtSecret +} + +var _ L2CLNode = (*KonaNode)(nil) + +func WithKonaNode(l2CLID stack.L2CLNodeID, l1CLID stack.L1CLNodeID, l1ELID stack.L1ELNodeID, l2ELID stack.L2ELNodeID, opts ...L2CLOption) stack.Option[*Orchestrator] { + return stack.AfterDeploy(func(orch *Orchestrator) { + p := orch.P().WithCtx(stack.ContextWithID(orch.P().Ctx(), l2CLID)) + + require := p.Require() + + l2Net, ok := orch.l2Nets.Get(l2CLID.ChainID()) + require.True(ok, "l2 network required") + + l1EL, ok := orch.l1ELs.Get(l1ELID) + require.True(ok, "l1 EL node required") + + l1CL, ok := orch.l1CLs.Get(l1CLID) + require.True(ok, "l1 CL node required") + + l2EL, ok := orch.l2ELs.Get(l2ELID) + require.True(ok, "l2 EL node required") + + cfg := DefaultL2CLConfig() + orch.l2CLOptions.Apply(orch.P(), l2CLID, cfg) // apply global options + L2CLOptionBundle(opts).Apply(orch.P(), l2CLID, cfg) // apply specific options + + tempKonaDir := p.TempDir() + + tempP2PPath := filepath.Join(tempKonaDir, "p2pkey.txt") + + tempRollupCfgPath := filepath.Join(tempKonaDir, "rollup.json") + rollupCfgData, err := json.Marshal(l2Net.rollupCfg) + p.Require().NoError(err, "must write rollup config") + p.Require().NoError(err, os.WriteFile(tempRollupCfgPath, rollupCfgData, 0o644)) + + envVars := []string{ + "KONA_NODE_L1_ETH_RPC=" + l1EL.userRPC, + "KONA_NODE_L1_BEACON=" + l1CL.beaconHTTPAddr, + // TODO: WS RPC addresses do not work and will make the startup panic with a connection error in the + // JWT validation / engine-capabilities setup code-path. + "KONA_NODE_L2_ENGINE_RPC=" + strings.ReplaceAll(l2EL.EngineRPC(), "ws://", "http://"), + "KONA_NODE_L2_ENGINE_AUTH=" + l2EL.JWTPath(), + "KONA_NODE_ROLLUP_CONFIG=" + tempRollupCfgPath, + "KONA_NODE_P2P_NO_DISCOVERY=true", + "KONA_NODE_P2P_PRIV_PATH=" + tempP2PPath, + "KONA_NODE_RPC_ADDR=127.0.0.1", + "KONA_NODE_RPC_PORT=0", + "KONA_NODE_RPC_WS_ENABLED=true", + "KONA_METRICS_ENABLED=false", + "KONA_NODE_LOG_LEVEL=3", // info level + "KONA_NODE_LOG_STDOUT_FORMAT=json", + // p2p ports + "KONA_NODE_P2P_LISTEN_IP=127.0.0.1", + "KONA_NODE_P2P_LISTEN_TCP_PORT=0", + "KONA_NODE_P2P_LISTEN_UDP_PORT=0", + } + if cfg.IsSequencer { + p2pKey, err := orch.keys.Secret(devkeys.SequencerP2PRole.Key(l2CLID.ChainID().ToBig())) + require.NoError(err, "need p2p key for sequencer") + p2pKeyHex := "0x" + hex.EncodeToString(crypto.FromECDSA(p2pKey)) + // TODO: Kona should support loading keys from a file + //tempSeqKeyPath := filepath.Join(tempKonaDir, "p2p-sequencer.txt") + //p.Require().NoError(err, os.WriteFile(tempSeqKeyPath, []byte(p2pKeyHex), 0o644)) + envVars = append(envVars, + "KONA_NODE_P2P_SEQUENCER_KEY="+p2pKeyHex, + "KONA_NODE_SEQUENCER_L1_CONFS=0", + "KONA_NODE_MODE=Sequencer", + ) + } else { + envVars = append(envVars, + "KONA_NODE_MODE=Validator", + ) + } + + execPath := os.Getenv("KONA_NODE_EXEC_PATH") + p.Require().NotEmpty(execPath, "KONA_NODE_EXEC_PATH environment variable must be set") + _, err = os.Stat(execPath) + p.Require().NotErrorIs(err, os.ErrNotExist, "executable must exist") + + k := &KonaNode{ + id: l2CLID, + userRPC: "", // retrieved from logs + interopEndpoint: "", // retrieved from logs + interopJwtSecret: eth.Bytes32{}, + el: l2ELID, + execPath: execPath, + args: []string{"node"}, + env: envVars, + p: p, + } + p.Logger().Info("Starting kona-node") + k.Start() + p.Cleanup(k.Stop) + p.Logger().Info("Kona-node is up", "rpc", k.UserRPC()) + require.True(orch.l2CLs.SetIfMissing(l2CLID, k), "must not already exist") + }) +} diff --git a/op-devstack/sysgo/l2_cl_opnode.go b/op-devstack/sysgo/l2_cl_opnode.go new file mode 100644 index 0000000000000..4cf5e7dcb0f8e --- /dev/null +++ b/op-devstack/sysgo/l2_cl_opnode.go @@ -0,0 +1,305 @@ +package sysgo + +import ( + "context" + "encoding/hex" + "flag" + "sync" + "time" + + "github.com/urfave/cli/v2" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + + altda "github.com/ethereum-optimism/optimism/op-alt-da" + "github.com/ethereum-optimism/optimism/op-chain-ops/devkeys" + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/shim" + "github.com/ethereum-optimism/optimism/op-devstack/stack" + "github.com/ethereum-optimism/optimism/op-devstack/stack/match" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/opnode" + "github.com/ethereum-optimism/optimism/op-node/config" + opNodeFlags "github.com/ethereum-optimism/optimism/op-node/flags" + "github.com/ethereum-optimism/optimism/op-node/p2p" + p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli" + "github.com/ethereum-optimism/optimism/op-node/rollup/driver" + "github.com/ethereum-optimism/optimism/op-node/rollup/interop" + nodeSync "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/eth" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-service/oppprof" + oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-service/testutils/tcpproxy" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" +) + +type OpNode struct { + mu sync.Mutex + + id stack.L2CLNodeID + opNode *opnode.Opnode + userRPC string + interopEndpoint string + interopJwtSecret eth.Bytes32 + cfg *config.Config + p devtest.P + logger log.Logger + el stack.L2ELNodeID + userProxy *tcpproxy.Proxy + interopProxy *tcpproxy.Proxy +} + +var _ L2CLNode = (*OpNode)(nil) + +func (n *OpNode) hydrate(system stack.ExtensibleSystem) { + require := system.T().Require() + rpcCl, err := client.NewRPC(system.T().Ctx(), system.Logger(), n.userRPC, client.WithLazyDial()) + require.NoError(err) + system.T().Cleanup(rpcCl.Close) + + sysL2CL := shim.NewL2CLNode(shim.L2CLNodeConfig{ + CommonConfig: shim.NewCommonConfig(system.T()), + ID: n.id, + Client: rpcCl, + InteropEndpoint: n.interopEndpoint, + InteropJwtSecret: n.interopJwtSecret, + }) + sysL2CL.SetLabel(match.LabelVendor, string(match.OpNode)) + l2Net := system.L2Network(stack.L2NetworkID(n.id.ChainID())) + l2Net.(stack.ExtensibleL2Network).AddL2CLNode(sysL2CL) + sysL2CL.(stack.LinkableL2CLNode).LinkEL(l2Net.L2ELNode(n.el)) +} + +func (n *OpNode) UserRPC() string { + return n.userRPC +} + +func (n *OpNode) InteropRPC() (endpoint string, jwtSecret eth.Bytes32) { + // Make sure to use the proxied interop endpoint + return n.interopEndpoint, n.interopJwtSecret +} + +func (n *OpNode) Start() { + n.mu.Lock() + defer n.mu.Unlock() + if n.opNode != nil { + n.logger.Warn("Op-node already started") + return + } + + if n.userProxy == nil { + n.userProxy = tcpproxy.New(n.logger.New("proxy", "l2cl-user")) + n.p.Require().NoError(n.userProxy.Start()) + n.p.Cleanup(func() { + n.userProxy.Close() + }) + n.userRPC = "http://" + n.userProxy.Addr() + } + if n.interopProxy == nil { + n.interopProxy = tcpproxy.New(n.logger.New("proxy", "l2cl-interop")) + n.p.Require().NoError(n.interopProxy.Start()) + n.p.Cleanup(func() { + n.interopProxy.Close() + }) + n.interopEndpoint = "ws://" + n.interopProxy.Addr() + } + n.logger.Info("Starting op-node") + opNode, err := opnode.NewOpnode(n.logger, n.cfg, func(err error) { + n.p.Require().NoError(err, "op-node critical error") + }) + n.p.Require().NoError(err, "op-node failed to start") + n.logger.Info("Started op-node") + n.opNode = opNode + + n.userProxy.SetUpstream(ProxyAddr(n.p.Require(), opNode.UserRPC().RPC())) + + interopEndpoint, interopJwtSecret := opNode.InteropRPC() + n.interopProxy.SetUpstream(ProxyAddr(n.p.Require(), interopEndpoint)) + n.interopJwtSecret = interopJwtSecret +} + +func (n *OpNode) Stop() { + n.mu.Lock() + defer n.mu.Unlock() + if n.opNode == nil { + n.logger.Warn("Op-node already stopped") + return + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() // force-quit + n.logger.Info("Closing op-node") + closeErr := n.opNode.Stop(ctx) + n.logger.Info("Closed op-node", "err", closeErr) + + n.opNode = nil +} + +func WithOpNode(l2CLID stack.L2CLNodeID, l1CLID stack.L1CLNodeID, l1ELID stack.L1ELNodeID, l2ELID stack.L2ELNodeID, opts ...L2CLOption) stack.Option[*Orchestrator] { + return stack.AfterDeploy(func(orch *Orchestrator) { + p := orch.P().WithCtx(stack.ContextWithID(orch.P().Ctx(), l2CLID)) + + require := p.Require() + + l2Net, ok := orch.l2Nets.Get(l2CLID.ChainID()) + require.True(ok, "l2 network required") + + l1EL, ok := orch.l1ELs.Get(l1ELID) + require.True(ok, "l1 EL node required") + + l1CL, ok := orch.l1CLs.Get(l1CLID) + require.True(ok, "l1 CL node required") + + l2EL, ok := orch.l2ELs.Get(l2ELID) + require.True(ok, "l2 EL node required") + + cfg := DefaultL2CLConfig() + orch.l2CLOptions.Apply(p, l2CLID, cfg) // apply global options + L2CLOptionBundle(opts).Apply(p, l2CLID, cfg) // apply specific options + + syncMode := cfg.VerifierSyncMode + if cfg.IsSequencer { + syncMode = cfg.SequencerSyncMode + // Sanity check, to navigate legacy sync-mode test assumptions. + // Can't enable ELSync on the sequencer or it will never start sequencing because + // ELSync needs to receive gossip from the sequencer to drive the sync + p.Require().NotEqual(nodeSync.ELSync, syncMode, "sequencer cannot use EL sync") + } + + var depSet depset.DependencySet + if cluster, ok := orch.ClusterForL2(l2ELID.ChainID()); ok { + depSet = cluster.DepSet() + } + + jwtPath, jwtSecret := orch.writeDefaultJWT() + + logger := p.Logger() + + var p2pSignerSetup p2p.SignerSetup + var p2pConfig *p2p.Config + // code block for P2P setup + { + // make a dummy flagset since p2p config initialization helpers only input cli context + fs := flag.NewFlagSet("", flag.ContinueOnError) + // use default flags + for _, f := range opNodeFlags.P2PFlags(opNodeFlags.EnvVarPrefix) { + require.NoError(f.Apply(fs)) + } + // mandatory P2P flags + require.NoError(fs.Set(opNodeFlags.AdvertiseIPName, "127.0.0.1")) + require.NoError(fs.Set(opNodeFlags.AdvertiseTCPPortName, "0")) + require.NoError(fs.Set(opNodeFlags.AdvertiseUDPPortName, "0")) + require.NoError(fs.Set(opNodeFlags.ListenIPName, "127.0.0.1")) + require.NoError(fs.Set(opNodeFlags.ListenTCPPortName, "0")) + require.NoError(fs.Set(opNodeFlags.ListenUDPPortName, "0")) + // avoid resource unavailable error by using memorydb + require.NoError(fs.Set(opNodeFlags.DiscoveryPathName, "memory")) + require.NoError(fs.Set(opNodeFlags.PeerstorePathName, "memory")) + // For peer ID + networkPrivKey, err := crypto.GenerateKey() + require.NoError(err) + networkPrivKeyHex := hex.EncodeToString(crypto.FromECDSA(networkPrivKey)) + require.NoError(fs.Set(opNodeFlags.P2PPrivRawName, networkPrivKeyHex)) + // Explicitly set to empty; do not default to resolving DNS of external bootnodes + require.NoError(fs.Set(opNodeFlags.BootnodesName, "")) + + cliCtx := cli.NewContext(&cli.App{}, fs, nil) + if cfg.IsSequencer { + p2pKey, err := orch.keys.Secret(devkeys.SequencerP2PRole.Key(l2CLID.ChainID().ToBig())) + require.NoError(err, "need p2p key for sequencer") + p2pKeyHex := hex.EncodeToString(crypto.FromECDSA(p2pKey)) + require.NoError(fs.Set(opNodeFlags.SequencerP2PKeyName, p2pKeyHex)) + p2pSignerSetup, err = p2pcli.LoadSignerSetup(cliCtx, logger) + require.NoError(err, "failed to load p2p signer") + logger.Info("Sequencer key acquired") + } + p2pConfig, err = p2pcli.NewConfig(cliCtx, l2Net.rollupCfg.BlockTime) + require.NoError(err, "failed to load p2p config") + } + + // specify interop config, but do not configure anything, to disable indexing mode + interopCfg := &interop.Config{} + + if cfg.IndexingMode { + interopCfg = &interop.Config{ + RPCAddr: "127.0.0.1", + // When L2CL starts, store its RPC port here + // given by the os, to reclaim when restart. + RPCPort: 0, + RPCJwtSecretPath: jwtPath, + } + } + + nodeCfg := &config.Config{ + L1: &config.L1EndpointConfig{ + L1NodeAddr: l1EL.userRPC, + L1TrustRPC: false, + L1RPCKind: sources.RPCKindDebugGeth, + RateLimit: 0, + BatchSize: 20, + HttpPollInterval: time.Millisecond * 100, + MaxConcurrency: 10, + CacheSize: 0, // auto-adjust to sequence window + }, + L2: &config.L2EndpointConfig{ + L2EngineAddr: l2EL.EngineRPC(), + L2EngineJWTSecret: jwtSecret, + }, + Beacon: &config.L1BeaconEndpointConfig{ + BeaconAddr: l1CL.beacon.BeaconAddr(), + }, + Driver: driver.Config{ + SequencerEnabled: cfg.IsSequencer, + SequencerConfDepth: 2, + }, + Rollup: *l2Net.rollupCfg, + DependencySet: depSet, + P2PSigner: p2pSignerSetup, // nil when not sequencer + RPC: oprpc.CLIConfig{ + ListenAddr: "127.0.0.1", + // When L2CL starts, store its RPC port here + // given by the os, to reclaim when restart. + ListenPort: 0, + EnableAdmin: true, + }, + InteropConfig: interopCfg, + P2P: p2pConfig, + L1EpochPollInterval: time.Second * 2, + RuntimeConfigReloadInterval: 0, + Tracer: nil, + Sync: nodeSync.Config{ + SyncMode: syncMode, + SkipSyncStartCheck: false, + SupportsPostFinalizationELSync: false, + }, + ConfigPersistence: config.DisabledConfigPersistence{}, + Metrics: opmetrics.CLIConfig{}, + Pprof: oppprof.CLIConfig{}, + SafeDBPath: "", + RollupHalt: "", + Cancel: nil, + ConductorEnabled: false, + ConductorRpc: nil, + ConductorRpcTimeout: 0, + AltDA: altda.CLIConfig{}, + IgnoreMissingPectraBlobSchedule: false, + ExperimentalOPStackAPI: true, + } + if cfg.SafeDBPath != "" { + nodeCfg.SafeDBPath = cfg.SafeDBPath + } + + l2CLNode := &OpNode{ + id: l2CLID, + cfg: nodeCfg, + logger: logger, + p: p, + el: l2ELID, + } + require.True(orch.l2CLs.SetIfMissing(l2CLID, l2CLNode), "must not already exist") + l2CLNode.Start() + p.Cleanup(l2CLNode.Stop) + }) +} diff --git a/op-devstack/sysgo/l2_cl_p2p_util.go b/op-devstack/sysgo/l2_cl_p2p_util.go new file mode 100644 index 0000000000000..911434cd28e3c --- /dev/null +++ b/op-devstack/sysgo/l2_cl_p2p_util.go @@ -0,0 +1,114 @@ +package sysgo + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-devstack/stack" + "github.com/ethereum-optimism/optimism/op-service/apis" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/retry" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-service/testreq" +) + +func GetP2PClient(ctx context.Context, logger log.Logger, l2CLNode L2CLNode) (*sources.P2PClient, error) { + rpcClient, err := client.NewRPC(ctx, logger, l2CLNode.UserRPC(), client.WithLazyDial()) + if err != nil { + return nil, fmt.Errorf("failed to initialize rpc client for p2p client: %w", err) + } + return sources.NewP2PClient(rpcClient), nil +} + +func GetPeerInfo(ctx context.Context, p2pClient *sources.P2PClient) (*apis.PeerInfo, error) { + peerInfo, err := retry.Do(ctx, 3, retry.Exponential(), func() (*apis.PeerInfo, error) { + return p2pClient.Self(ctx) + }) + if err != nil { + return nil, fmt.Errorf("failed to get peer info: %w", err) + } + return peerInfo, nil +} + +func GetPeers(ctx context.Context, p2pClient *sources.P2PClient) (*apis.PeerDump, error) { + peerDump, err := retry.Do(ctx, 3, retry.Exponential(), func() (*apis.PeerDump, error) { + return p2pClient.Peers(ctx, true) + }) + if err != nil { + return nil, fmt.Errorf("failed to get peers: %w", err) + } + return peerDump, nil +} + +type p2pClientsAndPeers struct { + client1 *sources.P2PClient + client2 *sources.P2PClient + peerInfo1 *apis.PeerInfo + peerInfo2 *apis.PeerInfo +} + +func getP2PClientsAndPeers(ctx context.Context, logger log.Logger, + require *testreq.Assertions, l2CL1, l2CL2 L2CLNode) *p2pClientsAndPeers { + p2pClient1, err := GetP2PClient(ctx, logger, l2CL1) + require.NoError(err) + p2pClient2, err := GetP2PClient(ctx, logger, l2CL2) + require.NoError(err) + + peerInfo1, err := GetPeerInfo(ctx, p2pClient1) + require.NoError(err) + peerInfo2, err := GetPeerInfo(ctx, p2pClient2) + require.NoError(err) + + require.True(len(peerInfo1.Addresses) > 0 && len(peerInfo2.Addresses) > 0, "malformed peer info") + + return &p2pClientsAndPeers{ + client1: p2pClient1, + client2: p2pClient2, + peerInfo1: peerInfo1, + peerInfo2: peerInfo2, + } +} + +// WithL2CLP2PConnection connects P2P between two L2CLs +func WithL2CLP2PConnection(l2CL1ID, l2CL2ID stack.L2CLNodeID) stack.Option[*Orchestrator] { + return stack.AfterDeploy(func(orch *Orchestrator) { + require := orch.P().Require() + + l2CL1, ok := orch.l2CLs.Get(l2CL1ID) + require.True(ok, "looking for L2 CL node 1 to connect p2p") + l2CL2, ok := orch.l2CLs.Get(l2CL2ID) + require.True(ok, "looking for L2 CL node 2 to connect p2p") + require.Equal(l2CL1ID.ChainID(), l2CL2ID.ChainID(), "must be same l2 chain") + + ctx := orch.P().Ctx() + logger := orch.P().Logger() + + p := getP2PClientsAndPeers(ctx, logger, require, l2CL1, l2CL2) + + connectPeer := func(p2pClient *sources.P2PClient, multiAddress string) { + err := retry.Do0(ctx, 6, retry.Exponential(), func() error { + return p2pClient.ConnectPeer(ctx, multiAddress) + }) + require.NoError(err, "failed to connect peer") + } + + connectPeer(p.client1, p.peerInfo2.Addresses[0]) + connectPeer(p.client2, p.peerInfo1.Addresses[0]) + + check := func(peerDump *apis.PeerDump, peerInfo *apis.PeerInfo) { + multiAddress := peerInfo.PeerID.String() + _, ok := peerDump.Peers[multiAddress] + require.True(ok, "peer register invalid") + } + + peerDump1, err := GetPeers(ctx, p.client1) + require.NoError(err) + peerDump2, err := GetPeers(ctx, p.client2) + require.NoError(err) + + check(peerDump1, p.peerInfo2) + check(peerDump2, p.peerInfo1) + }) +} diff --git a/op-devstack/sysgo/l2_el.go b/op-devstack/sysgo/l2_el.go index aedc0e3b43f11..898c1506aecd9 100644 --- a/op-devstack/sysgo/l2_el.go +++ b/op-devstack/sysgo/l2_el.go @@ -1,220 +1,75 @@ package sysgo import ( - "context" - "net" - "net/url" - "slices" - "sync" - "time" + "os" "github.com/ethereum-optimism/optimism/op-devstack/devtest" - "github.com/ethereum-optimism/optimism/op-devstack/shim" "github.com/ethereum-optimism/optimism/op-devstack/stack" - "github.com/ethereum-optimism/optimism/op-devstack/stack/match" - "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth" - "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" - "github.com/ethereum-optimism/optimism/op-service/client" - "github.com/ethereum-optimism/optimism/op-service/dial" - "github.com/ethereum-optimism/optimism/op-service/testreq" - "github.com/ethereum-optimism/optimism/op-service/testutils/tcpproxy" - "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/log" - gn "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" ) -type L2ELNode struct { - mu sync.Mutex - - p devtest.P - logger log.Logger - id stack.L2ELNodeID - l2Net *L2Network - jwtPath string - supervisorRPC string - l2Geth *geth.GethInstance - - authRPC string - userRPC string - - authProxy *tcpproxy.Proxy - userProxy *tcpproxy.Proxy +type L2ELNode interface { + hydrate(system stack.ExtensibleSystem) + stack.Lifecycle + UserRPC() string + EngineRPC() string + JWTPath() string } -func (n *L2ELNode) hydrate(system stack.ExtensibleSystem) { - require := system.T().Require() - rpcCl, err := client.NewRPC(system.T().Ctx(), system.Logger(), n.userRPC, client.WithLazyDial()) - require.NoError(err) - system.T().Cleanup(rpcCl.Close) - - l2Net := system.L2Network(stack.L2NetworkID(n.id.ChainID())) - sysL2EL := shim.NewL2ELNode(shim.L2ELNodeConfig{ - RollupCfg: l2Net.RollupConfig(), - ELNodeConfig: shim.ELNodeConfig{ - CommonConfig: shim.NewCommonConfig(system.T()), - Client: rpcCl, - ChainID: n.id.ChainID(), - }, - ID: n.id, - }) - sysL2EL.SetLabel(match.LabelVendor, string(match.OpGeth)) - l2Net.(stack.ExtensibleL2Network).AddL2ELNode(sysL2EL) +type L2ELConfig struct { + SupervisorID *stack.SupervisorID } -func (n *L2ELNode) Start() { - n.mu.Lock() - defer n.mu.Unlock() - if n.l2Geth != nil { - n.logger.Warn("op-geth already started") - return - } - - if n.authProxy == nil { - n.authProxy = tcpproxy.New(n.logger.New("proxy", "l2el-auth")) - n.p.Require().NoError(n.authProxy.Start()) - n.p.Cleanup(func() { - n.authProxy.Close() - }) - n.authRPC = "ws://" + n.authProxy.Addr() - } - if n.userProxy == nil { - n.userProxy = tcpproxy.New(n.logger.New("proxy", "l2el-user")) - n.p.Require().NoError(n.userProxy.Start()) - n.p.Cleanup(func() { - n.userProxy.Close() - }) - n.userRPC = "ws://" + n.userProxy.Addr() - } - - require := n.p.Require() - l2Geth, err := geth.InitL2(n.id.String(), n.l2Net.genesis, n.jwtPath, - func(ethCfg *ethconfig.Config, nodeCfg *gn.Config) error { - ethCfg.InteropMessageRPC = n.supervisorRPC - ethCfg.InteropMempoolFiltering = true - nodeCfg.P2P = p2p.Config{ - NoDiscovery: true, - ListenAddr: "127.0.0.1:0", - MaxPeers: 10, - } - return nil - }) - require.NoError(err) - require.NoError(l2Geth.Node.Start()) - n.l2Geth = l2Geth - n.authProxy.SetUpstream(ProxyAddr(require, l2Geth.AuthRPC().RPC())) - n.userProxy.SetUpstream(ProxyAddr(require, l2Geth.UserRPC().RPC())) +func L2ELWithSupervisor(supervisorID stack.SupervisorID) L2ELOption { + return L2ELOptionFn(func(p devtest.P, id stack.L2ELNodeID, cfg *L2ELConfig) { + cfg.SupervisorID = &supervisorID + }) } -func (n *L2ELNode) Stop() { - n.mu.Lock() - defer n.mu.Unlock() - if n.l2Geth == nil { - n.logger.Warn("op-geth already stopped") - return +func DefaultL2ELConfig() *L2ELConfig { + return &L2ELConfig{ + SupervisorID: nil, } - n.logger.Info("Closing op-geth", "id", n.id) - closeErr := n.l2Geth.Close() - n.logger.Info("Closed op-geth", "id", n.id, "err", closeErr) - n.l2Geth = nil } -func ProxyAddr(require *testreq.Assertions, urlStr string) string { - u, err := url.Parse(urlStr) - require.NoError(err) - return net.JoinHostPort(u.Hostname(), u.Port()) +type L2ELOption interface { + Apply(p devtest.P, id stack.L2ELNodeID, cfg *L2ELConfig) } -func WithL2ELNode(id stack.L2ELNodeID, supervisorID *stack.SupervisorID) stack.Option[*Orchestrator] { - return stack.AfterDeploy(func(orch *Orchestrator) { - p := orch.P().WithCtx(stack.ContextWithID(orch.P().Ctx(), id)) - - require := p.Require() - - l2Net, ok := orch.l2Nets.Get(id.ChainID()) - require.True(ok, "L2 network required") - - jwtPath, _ := orch.writeDefaultJWT() - - useInterop := l2Net.genesis.Config.InteropTime != nil - - supervisorRPC := "" - if useInterop { - require.NotNil(supervisorID, "supervisor is required for interop") - sup, ok := orch.supervisors.Get(*supervisorID) - require.True(ok, "supervisor is required for interop") - supervisorRPC = sup.userRPC - } - - logger := p.Logger() - - l2EL := &L2ELNode{ - id: id, - p: orch.P(), - logger: logger, - l2Net: l2Net, - jwtPath: jwtPath, - supervisorRPC: supervisorRPC, - } - l2EL.Start() - p.Cleanup(func() { - l2EL.Stop() - }) - require.True(orch.l2ELs.SetIfMissing(id, l2EL), "must be unique L2 EL node") +// WithGlobalL2ELOption applies the L2ELOption to all L2ELNode instances in this orchestrator +func WithGlobalL2ELOption(opt L2ELOption) stack.Option[*Orchestrator] { + return stack.BeforeDeploy(func(o *Orchestrator) { + o.l2ELOptions = append(o.l2ELOptions, opt) }) } -func WithL2ELP2PConnection(l2EL1ID, l2EL2ID stack.L2ELNodeID) stack.Option[*Orchestrator] { - return stack.AfterDeploy(func(orch *Orchestrator) { - require := orch.P().Require() - - l2EL1, ok := orch.l2ELs.Get(l2EL1ID) - require.True(ok, "looking for L2 EL node 1 to connect p2p") - l2EL2, ok := orch.l2ELs.Get(l2EL2ID) - require.True(ok, "looking for L2 EL node 2 to connect p2p") - require.Equal(l2EL1.l2Net.rollupCfg.L2ChainID, l2EL2.l2Net.rollupCfg.L2ChainID, "must be same l2 chain") - - ctx := orch.P().Ctx() - logger := orch.P().Logger() +type L2ELOptionFn func(p devtest.P, id stack.L2ELNodeID, cfg *L2ELConfig) - rpc1, err := dial.DialRPCClientWithTimeout(ctx, logger, l2EL1.userRPC) - require.NoError(err, "failed to connect to el1 rpc") - defer rpc1.Close() - rpc2, err := dial.DialRPCClientWithTimeout(ctx, logger, l2EL2.userRPC) - require.NoError(err, "failed to connect to el2 rpc") - defer rpc2.Close() +var _ L2ELOption = L2ELOptionFn(nil) - ConnectP2P(orch.P().Ctx(), require, rpc1, rpc2) - }) -} - -type RpcCaller interface { - CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error +func (fn L2ELOptionFn) Apply(p devtest.P, id stack.L2ELNodeID, cfg *L2ELConfig) { + fn(p, id, cfg) } -// ConnectP2P creates a p2p peer connection between node1 and node2. -func ConnectP2P(ctx context.Context, require *testreq.Assertions, initiator RpcCaller, acceptor RpcCaller) { - var targetInfo p2p.NodeInfo - require.NoError(acceptor.CallContext(ctx, &targetInfo, "admin_nodeInfo"), "get node info") +// L2ELOptionBundle a list of multiple L2ELOption, to all be applied in order. +type L2ELOptionBundle []L2ELOption - var peerAdded bool - require.NoError(initiator.CallContext(ctx, &peerAdded, "admin_addPeer", targetInfo.Enode), "add peer") - require.True(peerAdded, "should have added peer successfully") +var _ L2ELOption = L2ELOptionBundle(nil) - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - err := wait.For(ctx, time.Second, func() (bool, error) { - var peers []peer - if err := initiator.CallContext(ctx, &peers, "admin_peers"); err != nil { - return false, err - } - return slices.ContainsFunc(peers, func(p peer) bool { - return p.ID == targetInfo.ID - }), nil - }) - require.NoError(err, "The peer was not connected") +func (l L2ELOptionBundle) Apply(p devtest.P, id stack.L2ELNodeID, cfg *L2ELConfig) { + for _, opt := range l { + p.Require().NotNil(opt, "cannot Apply nil L2ELOption") + opt.Apply(p, id, cfg) + } } -type peer struct { - ID string `json:"id"` +// WithL2ELNode adds the default type of L2 CL node. +// The default can be configured with DEVSTACK_L2EL_KIND. +// Tests that depend on specific types can use options like WithKonaNode and WithOpNode directly. +func WithL2ELNode(id stack.L2ELNodeID, opts ...L2ELOption) stack.Option[*Orchestrator] { + switch os.Getenv("DEVSTACK_L2EL_KIND") { + case "op-reth": + return WithOpReth(id, opts...) + default: + return WithOpGeth(id, opts...) + } } diff --git a/op-devstack/sysgo/l2_el_opgeth.go b/op-devstack/sysgo/l2_el_opgeth.go new file mode 100644 index 0000000000000..c2afb102e8666 --- /dev/null +++ b/op-devstack/sysgo/l2_el_opgeth.go @@ -0,0 +1,169 @@ +package sysgo + +import ( + "sync" + + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/log" + gn "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/shim" + "github.com/ethereum-optimism/optimism/op-devstack/stack" + "github.com/ethereum-optimism/optimism/op-devstack/stack/match" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/testutils/tcpproxy" +) + +type OpGeth struct { + mu sync.Mutex + + p devtest.P + logger log.Logger + id stack.L2ELNodeID + l2Net *L2Network + jwtPath string + supervisorRPC string + l2Geth *geth.GethInstance + + authRPC string + userRPC string + + authProxy *tcpproxy.Proxy + userProxy *tcpproxy.Proxy +} + +var _ L2ELNode = (*OpGeth)(nil) + +func (n *OpGeth) UserRPC() string { + return n.userRPC +} + +func (n *OpGeth) EngineRPC() string { + return n.authRPC +} + +func (n *OpGeth) JWTPath() string { + return n.jwtPath +} + +func (n *OpGeth) hydrate(system stack.ExtensibleSystem) { + require := system.T().Require() + rpcCl, err := client.NewRPC(system.T().Ctx(), system.Logger(), n.userRPC, client.WithLazyDial()) + require.NoError(err) + system.T().Cleanup(rpcCl.Close) + + l2Net := system.L2Network(stack.L2NetworkID(n.id.ChainID())) + sysL2EL := shim.NewL2ELNode(shim.L2ELNodeConfig{ + RollupCfg: l2Net.RollupConfig(), + ELNodeConfig: shim.ELNodeConfig{ + CommonConfig: shim.NewCommonConfig(system.T()), + Client: rpcCl, + ChainID: n.id.ChainID(), + }, + ID: n.id, + }) + sysL2EL.SetLabel(match.LabelVendor, string(match.OpGeth)) + l2Net.(stack.ExtensibleL2Network).AddL2ELNode(sysL2EL) +} + +func (n *OpGeth) Start() { + n.mu.Lock() + defer n.mu.Unlock() + if n.l2Geth != nil { + n.logger.Warn("op-geth already started") + return + } + + if n.authProxy == nil { + n.authProxy = tcpproxy.New(n.logger.New("proxy", "l2el-auth")) + n.p.Require().NoError(n.authProxy.Start()) + n.p.Cleanup(func() { + n.authProxy.Close() + }) + n.authRPC = "ws://" + n.authProxy.Addr() + } + if n.userProxy == nil { + n.userProxy = tcpproxy.New(n.logger.New("proxy", "l2el-user")) + n.p.Require().NoError(n.userProxy.Start()) + n.p.Cleanup(func() { + n.userProxy.Close() + }) + n.userRPC = "ws://" + n.userProxy.Addr() + } + + require := n.p.Require() + l2Geth, err := geth.InitL2(n.id.String(), n.l2Net.genesis, n.jwtPath, + func(ethCfg *ethconfig.Config, nodeCfg *gn.Config) error { + ethCfg.InteropMessageRPC = n.supervisorRPC + ethCfg.InteropMempoolFiltering = true + nodeCfg.P2P = p2p.Config{ + NoDiscovery: true, + ListenAddr: "127.0.0.1:0", + MaxPeers: 10, + } + return nil + }) + require.NoError(err) + require.NoError(l2Geth.Node.Start()) + n.l2Geth = l2Geth + n.authProxy.SetUpstream(ProxyAddr(require, l2Geth.AuthRPC().RPC())) + n.userProxy.SetUpstream(ProxyAddr(require, l2Geth.UserRPC().RPC())) +} + +func (n *OpGeth) Stop() { + n.mu.Lock() + defer n.mu.Unlock() + if n.l2Geth == nil { + n.logger.Warn("op-geth already stopped") + return + } + n.logger.Info("Closing op-geth", "id", n.id) + closeErr := n.l2Geth.Close() + n.logger.Info("Closed op-geth", "id", n.id, "err", closeErr) + n.l2Geth = nil +} + +func WithOpGeth(id stack.L2ELNodeID, opts ...L2ELOption) stack.Option[*Orchestrator] { + return stack.AfterDeploy(func(orch *Orchestrator) { + p := orch.P().WithCtx(stack.ContextWithID(orch.P().Ctx(), id)) + require := p.Require() + + l2Net, ok := orch.l2Nets.Get(id.ChainID()) + require.True(ok, "L2 network required") + + cfg := DefaultL2ELConfig() + orch.l2ELOptions.Apply(p, id, cfg) // apply global options + L2ELOptionBundle(opts).Apply(p, id, cfg) // apply specific options + + jwtPath, _ := orch.writeDefaultJWT() + + useInterop := l2Net.genesis.Config.InteropTime != nil + + supervisorRPC := "" + if useInterop { + require.NotNil(cfg.SupervisorID, "supervisor is required for interop") + sup, ok := orch.supervisors.Get(*cfg.SupervisorID) + require.True(ok, "supervisor is required for interop") + supervisorRPC = sup.userRPC + } + + logger := p.Logger() + + l2EL := &OpGeth{ + id: id, + p: orch.P(), + logger: logger, + l2Net: l2Net, + jwtPath: jwtPath, + supervisorRPC: supervisorRPC, + } + l2EL.Start() + p.Cleanup(func() { + l2EL.Stop() + }) + require.True(orch.l2ELs.SetIfMissing(id, l2EL), "must be unique L2 EL node") + }) +} diff --git a/op-devstack/sysgo/l2_el_opreth.go b/op-devstack/sysgo/l2_el_opreth.go new file mode 100644 index 0000000000000..35d7dd89c60a5 --- /dev/null +++ b/op-devstack/sysgo/l2_el_opreth.go @@ -0,0 +1,251 @@ +package sysgo + +import ( + "encoding/json" + "os" + "path/filepath" + "sync" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/shim" + "github.com/ethereum-optimism/optimism/op-devstack/stack" + "github.com/ethereum-optimism/optimism/op-devstack/stack/match" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/logpipe" + "github.com/ethereum-optimism/optimism/op-service/tasks" + "github.com/ethereum-optimism/optimism/op-service/testutils/tcpproxy" +) + +type OpReth struct { + mu sync.Mutex + + id stack.L2ELNodeID + l2Net *L2Network + jwtPath string + authRPC string + userRPC string + + authProxy *tcpproxy.Proxy + userProxy *tcpproxy.Proxy + + execPath string + args []string + // Each entry is of the form "key=value". + env []string + + p devtest.P + + sub *SubProcess +} + +var _ L2ELNode = (*OpReth)(nil) + +func (n *OpReth) hydrate(system stack.ExtensibleSystem) { + require := system.T().Require() + rpcCl, err := client.NewRPC(system.T().Ctx(), system.Logger(), n.userRPC, client.WithLazyDial()) + require.NoError(err) + system.T().Cleanup(rpcCl.Close) + + l2Net := system.L2Network(stack.L2NetworkID(n.id.ChainID())) + sysL2EL := shim.NewL2ELNode(shim.L2ELNodeConfig{ + RollupCfg: l2Net.RollupConfig(), + ELNodeConfig: shim.ELNodeConfig{ + CommonConfig: shim.NewCommonConfig(system.T()), + Client: rpcCl, + ChainID: n.id.ChainID(), + }, + ID: n.id, + }) + sysL2EL.SetLabel(match.LabelVendor, string(match.OpReth)) + l2Net.(stack.ExtensibleL2Network).AddL2ELNode(sysL2EL) +} + +func (n *OpReth) Start() { + n.mu.Lock() + defer n.mu.Unlock() + if n.sub != nil { + n.p.Logger().Warn("op-reth already started") + return + } + if n.authProxy == nil { + n.authProxy = tcpproxy.New(n.p.Logger()) + n.p.Require().NoError(n.authProxy.Start()) + n.p.Cleanup(func() { + n.authProxy.Close() + }) + n.authRPC = "ws://" + n.authProxy.Addr() + } + if n.userProxy == nil { + n.userProxy = tcpproxy.New(n.p.Logger()) + n.p.Require().NoError(n.userProxy.Start()) + n.p.Cleanup(func() { + n.userProxy.Close() + }) + n.userRPC = "ws://" + n.userProxy.Addr() + } + logOut := logpipe.ToLogger(n.p.Logger().New("src", "stdout")) + logErr := logpipe.ToLogger(n.p.Logger().New("src", "stderr")) + userRPC := make(chan string, 1) + authRPC := make(chan string, 1) + onLogEntry := func(e logpipe.LogEntry) { + switch e.LogMessage() { + case "RPC WS server started": + select { + case userRPC <- "ws://" + e.FieldValue("url").(string): + default: + } + case "RPC auth server started": + select { + case authRPC <- "ws://" + e.FieldValue("url").(string): + default: + } + } + } + stdOutLogs := logpipe.LogProcessor(func(line []byte) { + e := logpipe.ParseRustStructuredLogs(line) + logOut(e) + onLogEntry(e) + }) + stdErrLogs := logpipe.LogProcessor(func(line []byte) { + e := logpipe.ParseRustStructuredLogs(line) + logErr(e) + }) + n.sub = NewSubProcess(n.p, stdOutLogs, stdErrLogs) + + err := n.sub.Start(n.execPath, n.args, n.env) + n.p.Require().NoError(err, "Must start") + + var userRPCAddr, authRPCAddr string + n.p.Require().NoError(tasks.Await(n.p.Ctx(), userRPC, &userRPCAddr), "need user RPC") + n.p.Require().NoError(tasks.Await(n.p.Ctx(), authRPC, &authRPCAddr), "need auth RPC") + + n.userProxy.SetUpstream(ProxyAddr(n.p.Require(), userRPCAddr)) + n.authProxy.SetUpstream(ProxyAddr(n.p.Require(), authRPCAddr)) +} + +// Stop stops the op-reth node. +// warning: no restarts supported yet, since the RPC port is not remembered. +func (n *OpReth) Stop() { + n.mu.Lock() + defer n.mu.Unlock() + err := n.sub.Stop() + n.p.Require().NoError(err, "Must stop") + n.sub = nil +} + +func (n *OpReth) UserRPC() string { + return n.userRPC +} + +func (n *OpReth) EngineRPC() string { + return n.authRPC +} + +func (n *OpReth) JWTPath() string { + return n.jwtPath +} + +func WithOpReth(id stack.L2ELNodeID, opts ...L2ELOption) stack.Option[*Orchestrator] { + return stack.AfterDeploy(func(orch *Orchestrator) { + p := orch.P().WithCtx(stack.ContextWithID(orch.P().Ctx(), id)) + require := p.Require() + + l2Net, ok := orch.l2Nets.Get(id.ChainID()) + require.True(ok, "L2 network required") + + cfg := DefaultL2ELConfig() + orch.l2ELOptions.Apply(p, id, cfg) // apply global options + L2ELOptionBundle(opts).Apply(p, id, cfg) // apply specific options + + jwtPath, _ := orch.writeDefaultJWT() + + useInterop := l2Net.genesis.Config.InteropTime != nil + + supervisorRPC := "" + if useInterop { + require.NotNil(cfg.SupervisorID, "supervisor is required for interop") + sup, ok := orch.supervisors.Get(*cfg.SupervisorID) + require.True(ok, "supervisor is required for interop") + supervisorRPC = sup.userRPC + } + + tempDir := p.TempDir() + data, err := json.Marshal(l2Net.genesis) + p.Require().NoError(err, "must json-encode genesis") + chainConfigPath := filepath.Join(tempDir, "genesis.json") + p.Require().NoError(os.WriteFile(chainConfigPath, data, 0o644), "must write genesis file") + + dataDirPath := filepath.Join(tempDir, "data") + p.Require().NoError(os.MkdirAll(dataDirPath, 0o755), "must create datadir") + + // reth writes logs not just to stdout, but also to file, + // and to global user-cache by default, rather than the datadir. + // So we customize this to temp-dir too, to not pollute the user-cache dir. + logDirPath := filepath.Join(tempDir, "logs") + p.Require().NoError(os.MkdirAll(dataDirPath, 0o755), "must create logs dir") + + tempP2PPath := filepath.Join(tempDir, "p2pkey.txt") + + execPath := os.Getenv("OP_RETH_EXEC_PATH") + p.Require().NotEmpty(execPath, "OP_RETH_EXEC_PATH environment variable must be set") + _, err = os.Stat(execPath) + p.Require().NotErrorIs(err, os.ErrNotExist, "executable must exist") + + // reth does not support env-var configuration like the Go services, + // so we use the CLI flags instead. + args := []string{ + "node", + "--chain=" + chainConfigPath, + "--with-unused-ports", + "--datadir=" + dataDirPath, + "--log.file.directory=" + logDirPath, + "--disable-nat", + "--disable-dns-discovery", + "--disable-discv4-discovery", + "--p2p-secret-key=" + tempP2PPath, + "--nat=none", + "--addr=127.0.0.1", + "--port=0", + "--http", + "--http.addr=127.0.0.1", + "--http.port=0", + "--http.api=admin,debug,eth,net,trace,txpool,web3,rpc,reth,miner", + "--ws", + "--ws.addr=127.0.0.1", + "--ws.port=0", + "--ws.api=admin,debug,eth,net,trace,txpool,web3,rpc,reth,miner", + "--ipcdisable", + "--authrpc.addr=127.0.0.1", + "--authrpc.port=0", + "--authrpc.jwtsecret=" + jwtPath, + "--txpool.minimum-priority-fee=1", + "--txpool.nolocals", + "--builder.interval=100ms", + "--builder.deadline=2", + "--log.stdout.format=json", + "--color=never", + "-vvvv", + } + if supervisorRPC != "" { + args = append(args, "--rollup.supervisor-http="+supervisorRPC) + } + + l2EL := &OpReth{ + id: id, + l2Net: l2Net, + jwtPath: jwtPath, + authRPC: "", + userRPC: "", + execPath: execPath, + args: args, + env: []string{}, + p: p, + } + + p.Logger().Info("Starting op-reth") + l2EL.Start() + p.Cleanup(l2EL.Stop) + p.Logger().Info("op-reth is ready", "userRPC", l2EL.userRPC, "authRPC", l2EL.authRPC) + require.True(orch.l2ELs.SetIfMissing(id, l2EL), "must be unique L2 EL node") + }) +} diff --git a/op-devstack/sysgo/l2_el_p2p_util.go b/op-devstack/sysgo/l2_el_p2p_util.go new file mode 100644 index 0000000000000..fc7adf04f36d7 --- /dev/null +++ b/op-devstack/sysgo/l2_el_p2p_util.go @@ -0,0 +1,69 @@ +package sysgo + +import ( + "context" + "slices" + "time" + + "github.com/ethereum/go-ethereum/p2p" + + "github.com/ethereum-optimism/optimism/op-devstack/stack" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" + "github.com/ethereum-optimism/optimism/op-service/dial" + "github.com/ethereum-optimism/optimism/op-service/testreq" +) + +func WithL2ELP2PConnection(l2EL1ID, l2EL2ID stack.L2ELNodeID) stack.Option[*Orchestrator] { + return stack.AfterDeploy(func(orch *Orchestrator) { + require := orch.P().Require() + + l2EL1, ok := orch.l2ELs.Get(l2EL1ID) + require.True(ok, "looking for L2 EL node 1 to connect p2p") + l2EL2, ok := orch.l2ELs.Get(l2EL2ID) + require.True(ok, "looking for L2 EL node 2 to connect p2p") + require.Equal(l2EL1ID.ChainID(), l2EL2ID.ChainID(), "must be same l2 chain") + + ctx := orch.P().Ctx() + logger := orch.P().Logger() + + rpc1, err := dial.DialRPCClientWithTimeout(ctx, logger, l2EL1.UserRPC()) + require.NoError(err, "failed to connect to el1 rpc") + defer rpc1.Close() + rpc2, err := dial.DialRPCClientWithTimeout(ctx, logger, l2EL2.UserRPC()) + require.NoError(err, "failed to connect to el2 rpc") + defer rpc2.Close() + + ConnectP2P(orch.P().Ctx(), require, rpc1, rpc2) + }) +} + +type RpcCaller interface { + CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error +} + +// ConnectP2P creates a p2p peer connection between node1 and node2. +func ConnectP2P(ctx context.Context, require *testreq.Assertions, initiator RpcCaller, acceptor RpcCaller) { + var targetInfo p2p.NodeInfo + require.NoError(acceptor.CallContext(ctx, &targetInfo, "admin_nodeInfo"), "get node info") + + var peerAdded bool + require.NoError(initiator.CallContext(ctx, &peerAdded, "admin_addPeer", targetInfo.Enode), "add peer") + require.True(peerAdded, "should have added peer successfully") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + err := wait.For(ctx, time.Second, func() (bool, error) { + var peers []peer + if err := initiator.CallContext(ctx, &peers, "admin_peers"); err != nil { + return false, err + } + return slices.ContainsFunc(peers, func(p peer) bool { + return p.ID == targetInfo.ID + }), nil + }) + require.NoError(err, "The peer was not connected") +} + +type peer struct { + ID string `json:"id"` +} diff --git a/op-devstack/sysgo/l2_proposer.go b/op-devstack/sysgo/l2_proposer.go index 37089f9b62637..6c2934d895db7 100644 --- a/op-devstack/sysgo/l2_proposer.go +++ b/op-devstack/sysgo/l2_proposer.go @@ -123,7 +123,7 @@ func WithProposerPostDeploy(orch *Orchestrator, proposerID stack.L2ProposerID, l require.NotNil(l2CLID, "need L2 CL to connect to pre-interop") l2CL, ok := orch.l2CLs.Get(*l2CLID) require.True(ok) - proposerCLIConfig.RollupRpc = l2CL.userRPC + proposerCLIConfig.RollupRpc = l2CL.UserRPC() } proposer, err := ps.ProposerServiceFromCLIConfig(ctx, "0.0.1", proposerCLIConfig, logger) diff --git a/op-devstack/sysgo/orchestrator.go b/op-devstack/sysgo/orchestrator.go index c6f3baa0e412c..25820e85e0af8 100644 --- a/op-devstack/sysgo/orchestrator.go +++ b/op-devstack/sysgo/orchestrator.go @@ -31,7 +31,8 @@ type Orchestrator struct { // options batcherOptions []BatcherOption proposerOptions []ProposerOption - l2CLOptions []L2CLOption + l2CLOptions L2CLOptionBundle + l2ELOptions L2ELOptionBundle deployerPipelineOptions []DeployerPipelineOption superchains locks.RWMap[stack.SuperchainID, *Superchain] @@ -40,8 +41,8 @@ type Orchestrator struct { l2Nets locks.RWMap[eth.ChainID, *L2Network] l1ELs locks.RWMap[stack.L1ELNodeID, *L1ELNode] l1CLs locks.RWMap[stack.L1CLNodeID, *L1CLNode] - l2ELs locks.RWMap[stack.L2ELNodeID, *L2ELNode] - l2CLs locks.RWMap[stack.L2CLNodeID, *L2CLNode] + l2ELs locks.RWMap[stack.L2ELNodeID, L2ELNode] + l2CLs locks.RWMap[stack.L2CLNodeID, L2CLNode] supervisors locks.RWMap[stack.SupervisorID, *Supervisor] testSequencers locks.RWMap[stack.TestSequencerID, *TestSequencer] batchers locks.RWMap[stack.L2BatcherID, *L2Batcher] @@ -122,8 +123,8 @@ func (o *Orchestrator) Hydrate(sys stack.ExtensibleSystem) { o.l2Nets.Range(rangeHydrateFn[eth.ChainID, *L2Network](sys)) o.l1ELs.Range(rangeHydrateFn[stack.L1ELNodeID, *L1ELNode](sys)) o.l1CLs.Range(rangeHydrateFn[stack.L1CLNodeID, *L1CLNode](sys)) - o.l2ELs.Range(rangeHydrateFn[stack.L2ELNodeID, *L2ELNode](sys)) - o.l2CLs.Range(rangeHydrateFn[stack.L2CLNodeID, *L2CLNode](sys)) + o.l2ELs.Range(rangeHydrateFn[stack.L2ELNodeID, L2ELNode](sys)) + o.l2CLs.Range(rangeHydrateFn[stack.L2CLNodeID, L2CLNode](sys)) o.supervisors.Range(rangeHydrateFn[stack.SupervisorID, *Supervisor](sys)) o.testSequencers.Range(rangeHydrateFn[stack.TestSequencerID, *TestSequencer](sys)) o.batchers.Range(rangeHydrateFn[stack.L2BatcherID, *L2Batcher](sys)) diff --git a/op-devstack/sysgo/proxy.go b/op-devstack/sysgo/proxy.go new file mode 100644 index 0000000000000..3e932f220a86e --- /dev/null +++ b/op-devstack/sysgo/proxy.go @@ -0,0 +1,14 @@ +package sysgo + +import ( + "net" + "net/url" + + "github.com/ethereum-optimism/optimism/op-service/testreq" +) + +func ProxyAddr(require *testreq.Assertions, urlStr string) string { + u, err := url.Parse(urlStr) + require.NoError(err) + return net.JoinHostPort(u.Hostname(), u.Port()) +} diff --git a/op-devstack/sysgo/subproc.go b/op-devstack/sysgo/subproc.go new file mode 100644 index 0000000000000..799357a049733 --- /dev/null +++ b/op-devstack/sysgo/subproc.go @@ -0,0 +1,163 @@ +package sysgo + +import ( + "context" + "fmt" + "os" + "os/exec" + "sync" + "time" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-service/logpipe" +) + +// SubProcess is a process that can be started, and stopped, and restarted. +// +// If at any point the process fails to start or exit successfully, +// the failure is reported to the devtest.P. +// +// If the sub-process exits by itself, the exit is detected, +// and if not successful (non-zero exit code on unix) it also reports failure to the devtest.P. +// +// Sub-process logs are assumed to be structured JSON logs, and are piped to the logger. +type SubProcess struct { + p devtest.P + cmd *exec.Cmd + + stdOutLogs logpipe.LogProcessor + stdErrLogs logpipe.LogProcessor + + waitCtx context.Context // closed when process-Wait completes + + mu sync.Mutex +} + +func NewSubProcess(p devtest.P, stdOutLogs, stdErrLogs logpipe.LogProcessor) *SubProcess { + return &SubProcess{ + p: p, + stdOutLogs: stdOutLogs, + stdErrLogs: stdErrLogs, + } +} + +func (sp *SubProcess) Start(cmdPath string, args []string, env []string) error { + sp.mu.Lock() + defer sp.mu.Unlock() + if sp.cmd != nil { + return fmt.Errorf("process is still running (PID: %d)", sp.cmd.Process.Pid) + } + cmd := exec.Command(cmdPath, args...) + cmd.Env = append(os.Environ(), env...) + stdout, err := cmd.StdoutPipe() + sp.p.Require().NoError(err, "stdout err") + stderr, err := cmd.StderrPipe() + sp.p.Require().NoError(err, "stderr err") + go func() { + err := logpipe.PipeLogs(stdout, sp.stdOutLogs) + sp.p.Require().NoError(err, "stdout logging error") + }() + go func() { + err := logpipe.PipeLogs(stderr, sp.stdErrLogs) + sp.p.Require().NoError(err, "stderr logging error") + }() + if err := cmd.Start(); err != nil { + return err + } + sp.cmd = cmd + + subCtx, subCancel := context.WithCancelCause(context.Background()) + go func() { + state, err := cmd.Process.Wait() + subCancel(err) + sp.p.Require().NoError(err, "Sub-process failed to be closed") + sp.p.Logger().Info("Sub-process stopped", "exitCode", state.ExitCode(), "pid", state.Pid()) + // if it exited on its own, then we care about the error. If not, we (or the user) signaled it. + if state.Exited() { + sp.p.Require().True(state.Success(), "Sub-process closed with error status: %s", state.String()) + } + }() + sp.waitCtx = subCtx + + sp.p.Cleanup(func() { + err := sp.Stop() + if err != nil { + sp.p.Logger().Error("Shutdown error", "err", err) + } + }) + return nil +} + +// Kill stops the process, and does not wait for it to complete. +func (sp *SubProcess) Kill() error { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // don't wait, just force it to stop immediately + return sp.GracefulStop(ctx) +} + +// Stop implements the default control-panel interface, +// and gracefully stops with a 10-second timeout. +func (sp *SubProcess) Stop() error { + // by default, for control-panel, use an interrupt and a 10-second grace + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + return sp.GracefulStop(ctx) +} + +// GracefulStop sends an interrupt and waits for the process to stop. +// If the given ctx is closed, a forced shutdown (process kill) is pursued. +func (sp *SubProcess) GracefulStop(ctx context.Context) error { + sp.mu.Lock() + defer sp.mu.Unlock() + if sp.cmd == nil { + return nil // already stopped gracefully + } + + if ctx.Err() == nil && sp.waitCtx.Err() == nil { + // if not force-closing, and not already done, then try an interrupt first. + sp.p.Logger().Info("Sending interrupt") + if err := sp.cmd.Process.Signal(os.Interrupt); err != nil { + return err + } + } + select { + case <-ctx.Done(): + sp.p.Logger().Warn("Sub-process did not respond to interrupt, force-closing now") + err := sp.cmd.Process.Kill() + if err != nil { + return fmt.Errorf("failed to force-close sub-process: %w", err) + } + sp.p.Logger().Info("Successfully force-closed sub-process") + // resources of cmd.Process will be cleaned up by the Process.Wait + case <-sp.waitCtx.Done(): + if err := context.Cause(sp.waitCtx); err != nil && err != context.Canceled { + sp.p.Logger().Warn("Sub-process exited with error", "err", err) + } else { + sp.p.Logger().Info("Sub-process gracefully exited") + } + } + sp.cmd = nil + sp.waitCtx = nil + return nil +} + +// Wait waits for the process to complete. +func (sp *SubProcess) Wait(ctx context.Context) error { + sp.mu.Lock() + defer sp.mu.Unlock() + if sp.waitCtx == nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-sp.waitCtx.Done(): + if err := context.Cause(sp.waitCtx); err != nil && err != context.Canceled { + sp.p.Logger().Warn("Sub-process exited with error", "err", err) + return err + } else { + sp.p.Logger().Info("Sub-process gracefully exited") + return nil + } + } +} diff --git a/op-devstack/sysgo/subproc_test.go b/op-devstack/sysgo/subproc_test.go new file mode 100644 index 0000000000000..59ab24133aa0b --- /dev/null +++ b/op-devstack/sysgo/subproc_test.go @@ -0,0 +1,77 @@ +package sysgo + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-service/logpipe" + "github.com/ethereum-optimism/optimism/op-service/testlog" +) + +func TestSubProcess(gt *testing.T) { + tLog := testlog.Logger(gt, log.LevelInfo) + logger, capt := testlog.CaptureLogger(gt, log.LevelInfo) + + onFailNow := func(v bool) { + panic("fail") + } + onSkipNow := func() { + panic("skip") + } + p := devtest.NewP(context.Background(), logger, onFailNow, onSkipNow) + gt.Cleanup(p.Close) + + logProc := logpipe.LogProcessor(func(line []byte) { + logger.Info(string(line)) + tLog.Info("Sub-process logged message", "line", string(line)) + }) + sp := NewSubProcess(p, logProc, logProc) + + gt.Log("Running first sub-process") + testSleep(gt, capt, sp) + gt.Log("Restarting, second run") + capt.Clear() + testSleep(gt, capt, sp) + gt.Log("Trying a different command now") + capt.Clear() + testEcho(gt, capt, sp) + gt.Log("Second run of different command") + capt.Clear() + testEcho(gt, capt, sp) +} + +// testEcho tests that we can handle a sub-process that completes on its own +func testEcho(gt *testing.T, capt *testlog.CapturingHandler, sp *SubProcess) { + require.NoError(gt, sp.Start("/bin/echo", []string{"hello world"}, []string{})) + gt.Log("Started sub-process") + require.NoError(gt, sp.Wait(context.Background()), "echo must complete") + require.NoError(gt, sp.Stop()) + gt.Log("Stopped sub-process") + + require.NotNil(gt, capt.FindLog( + testlog.NewMessageFilter("hello world"))) + + require.NotNil(gt, capt.FindLog( + testlog.NewMessageFilter("Sub-process gracefully exited"))) +} + +// testSleep tests that we can force shut down a sub-process that is stuck +func testSleep(gt *testing.T, capt *testlog.CapturingHandler, sp *SubProcess) { + // Sleep for very, very, long + require.NoError(gt, sp.Start("/bin/sleep", []string{"10000000000"}, []string{})) + gt.Log("Started sub-process") + // Shut down the process before the sleep completes + require.NoError(gt, sp.Kill()) + gt.Log("Killed sub-process") + + require.NotNil(gt, capt.FindLog( + testlog.NewMessageFilter("Sub-process did not respond to interrupt, force-closing now"))) + + require.NotNil(gt, capt.FindLog( + testlog.NewMessageFilter("Successfully force-closed sub-process"))) +} diff --git a/op-devstack/sysgo/superroot.go b/op-devstack/sysgo/superroot.go index ab1cdc9951598..03b4a76471122 100644 --- a/op-devstack/sysgo/superroot.go +++ b/op-devstack/sysgo/superroot.go @@ -46,7 +46,7 @@ func WithSuperRoots(l1ChainID eth.ChainID, l1ELID stack.L1ELNodeID, l2CLID stack l2CL, ok := o.l2CLs.Get(l2CLID) require.True(ok, "must have L2 CL node") - rollupClientProvider, err := dial.NewStaticL2RollupProvider(t.Ctx(), t.Logger(), l2CL.opNode.UserRPC().RPC()) + rollupClientProvider, err := dial.NewStaticL2RollupProvider(t.Ctx(), t.Logger(), l2CL.UserRPC()) require.NoError(err) rollupClient, err := rollupClientProvider.RollupClient(t.Ctx()) require.NoError(err) diff --git a/op-devstack/sysgo/sync_tester.go b/op-devstack/sysgo/sync_tester.go index f4b58527886e9..6ce5bc3677a3f 100644 --- a/op-devstack/sysgo/sync_tester.go +++ b/op-devstack/sysgo/sync_tester.go @@ -59,7 +59,7 @@ func WithSyncTesters(l2ELs []stack.L2ELNodeID) stack.Option[*Orchestrator] { require.True(ok, "need L2 EL for sync tester", elID) syncTesters[id] = &stconf.SyncTesterEntry{ - ELRPC: endpoint.MustRPC{Value: endpoint.URL(el.userRPC)}, + ELRPC: endpoint.MustRPC{Value: endpoint.URL(el.UserRPC())}, // EngineRPC: endpoint.MustRPC{Value: endpoint.URL(el.authRPC)}, // JwtPath: el.jwtPath, ChainID: elID.ChainID(), diff --git a/op-devstack/sysgo/system.go b/op-devstack/sysgo/system.go index ba21757ac19cb..0d935b372fbe8 100644 --- a/op-devstack/sysgo/system.go +++ b/op-devstack/sysgo/system.go @@ -64,8 +64,8 @@ func DefaultMinimalSystem(dest *DefaultMinimalSystemIDs) stack.Option[*Orchestra opt.Add(WithL1Nodes(ids.L1EL, ids.L1CL)) - opt.Add(WithL2ELNode(ids.L2EL, nil)) - opt.Add(WithL2CLNode(ids.L2CL, true, false, ids.L1CL, ids.L1EL, ids.L2EL)) + opt.Add(WithL2ELNode(ids.L2EL)) + opt.Add(WithL2CLNode(ids.L2CL, ids.L1CL, ids.L1EL, ids.L2EL, L2CLSequencer())) opt.Add(WithBatcher(ids.L2Batcher, ids.L1EL, ids.L2CL, ids.L2EL)) opt.Add(WithProposer(ids.L2Proposer, ids.L1EL, &ids.L2CL, nil)) @@ -121,8 +121,8 @@ func DefaultMinimalSystemWithSyncTester(dest *DefaultMinimalSystemWithSyncTester opt.Add(WithL1Nodes(ids.L1EL, ids.L1CL)) - opt.Add(WithL2ELNode(ids.L2EL, nil)) - opt.Add(WithL2CLNode(ids.L2CL, true, false, ids.L1CL, ids.L1EL, ids.L2EL)) + opt.Add(WithL2ELNode(ids.L2EL)) + opt.Add(WithL2CLNode(ids.L2CL, ids.L1CL, ids.L1EL, ids.L2EL, L2CLSequencer())) opt.Add(WithBatcher(ids.L2Batcher, ids.L1EL, ids.L2CL, ids.L2EL)) opt.Add(WithProposer(ids.L2Proposer, ids.L1EL, &ids.L2CL, nil)) @@ -227,8 +227,8 @@ func baseInteropSystem(ids *DefaultSingleChainInteropSystemIDs) stack.Option[*Or opt.Add(WithSupervisor(ids.Supervisor, ids.Cluster, ids.L1EL)) - opt.Add(WithL2ELNode(ids.L2AEL, &ids.Supervisor)) - opt.Add(WithL2CLNode(ids.L2ACL, true, true, ids.L1CL, ids.L1EL, ids.L2AEL)) + opt.Add(WithL2ELNode(ids.L2AEL, L2ELWithSupervisor(ids.Supervisor))) + opt.Add(WithL2CLNode(ids.L2ACL, ids.L1CL, ids.L1EL, ids.L2AEL, L2CLSequencer(), L2CLIndexing())) opt.Add(WithTestSequencer(ids.TestSequencer, ids.L1CL, ids.L2ACL, ids.L1EL, ids.L2AEL)) opt.Add(WithBatcher(ids.L2ABatcher, ids.L1EL, ids.L2ACL, ids.L2AEL)) @@ -277,8 +277,8 @@ func DefaultInteropSystem(dest *DefaultInteropSystemIDs) stack.Option[*Orchestra WithPrefundedL2(ids.L1.ChainID(), ids.L2B.ChainID()), WithInteropAtGenesis(), // this can be overridden by later options )) - opt.Add(WithL2ELNode(ids.L2BEL, &ids.Supervisor)) - opt.Add(WithL2CLNode(ids.L2BCL, true, true, ids.L1CL, ids.L1EL, ids.L2BEL)) + opt.Add(WithL2ELNode(ids.L2BEL, L2ELWithSupervisor(ids.Supervisor))) + opt.Add(WithL2CLNode(ids.L2BCL, ids.L1CL, ids.L1EL, ids.L2BEL, L2CLSequencer(), L2CLIndexing())) opt.Add(WithBatcher(ids.L2BBatcher, ids.L1EL, ids.L2BCL, ids.L2BEL)) opt.Add(WithManagedBySupervisor(ids.L2BCL, ids.Supervisor)) @@ -337,10 +337,11 @@ func defaultSuperProofsSystem(dest *DefaultInteropSystemIDs, deployerOpts ...Dep opt.Add(WithSupervisor(ids.Supervisor, ids.Cluster, ids.L1EL)) - opt.Add(WithL2ELNode(ids.L2AEL, &ids.Supervisor)) - opt.Add(WithL2CLNode(ids.L2ACL, true, true, ids.L1CL, ids.L1EL, ids.L2AEL)) - opt.Add(WithL2ELNode(ids.L2BEL, &ids.Supervisor)) - opt.Add(WithL2CLNode(ids.L2BCL, true, true, ids.L1CL, ids.L1EL, ids.L2BEL)) + opt.Add(WithL2ELNode(ids.L2AEL, L2ELWithSupervisor(ids.Supervisor))) + opt.Add(WithL2CLNode(ids.L2ACL, ids.L1CL, ids.L1EL, ids.L2AEL, L2CLSequencer(), L2CLIndexing())) + + opt.Add(WithL2ELNode(ids.L2BEL, L2ELWithSupervisor(ids.Supervisor))) + opt.Add(WithL2CLNode(ids.L2BCL, ids.L1CL, ids.L1EL, ids.L2BEL, L2CLSequencer(), L2CLIndexing())) opt.Add(WithTestSequencer(ids.TestSequencer, ids.L1CL, ids.L2ACL, ids.L1EL, ids.L2AEL)) @@ -399,11 +400,11 @@ func MultiSupervisorInteropSystem(dest *MultiSupervisorInteropSystemIDs) stack.O // add backup supervisor opt.Add(WithSupervisor(ids.SupervisorSecondary, ids.Cluster, ids.L1EL)) - opt.Add(WithL2ELNode(ids.L2A2EL, &ids.SupervisorSecondary)) - opt.Add(WithL2CLNode(ids.L2A2CL, false, true, ids.L1CL, ids.L1EL, ids.L2A2EL)) + opt.Add(WithL2ELNode(ids.L2A2EL, L2ELWithSupervisor(ids.SupervisorSecondary))) + opt.Add(WithL2CLNode(ids.L2A2CL, ids.L1CL, ids.L1EL, ids.L2A2EL, L2CLIndexing())) - opt.Add(WithL2ELNode(ids.L2B2EL, &ids.SupervisorSecondary)) - opt.Add(WithL2CLNode(ids.L2B2CL, false, true, ids.L1CL, ids.L1EL, ids.L2B2EL)) + opt.Add(WithL2ELNode(ids.L2B2EL, L2ELWithSupervisor(ids.SupervisorSecondary))) + opt.Add(WithL2CLNode(ids.L2B2CL, ids.L1CL, ids.L1EL, ids.L2B2EL, L2CLIndexing())) // verifier must be also managed or it cannot advance // we attach verifier L2CL with backup supervisor diff --git a/op-devstack/sysgo/system_singlechain_multinode.go b/op-devstack/sysgo/system_singlechain_multinode.go index ef86414781689..a58b632fa7f93 100644 --- a/op-devstack/sysgo/system_singlechain_multinode.go +++ b/op-devstack/sysgo/system_singlechain_multinode.go @@ -27,8 +27,8 @@ func DefaultSingleChainMultiNodeSystem(dest *DefaultSingleChainMultiNodeSystemID opt := stack.Combine[*Orchestrator]() opt.Add(DefaultMinimalSystem(&dest.DefaultMinimalSystemIDs)) - opt.Add(WithL2ELNode(ids.L2ELB, nil)) - opt.Add(WithL2CLNode(ids.L2CLB, false, false, ids.L1CL, ids.L1EL, ids.L2ELB)) + opt.Add(WithL2ELNode(ids.L2ELB)) + opt.Add(WithL2CLNode(ids.L2CLB, ids.L1CL, ids.L1EL, ids.L2ELB)) // P2P connect L2CL nodes opt.Add(WithL2CLP2PConnection(ids.L2CL, ids.L2CLB)) diff --git a/op-devstack/sysgo/test_sequencer.go b/op-devstack/sysgo/test_sequencer.go index ffc0a52f6dfe1..fe46a79b279a8 100644 --- a/op-devstack/sysgo/test_sequencer.go +++ b/op-devstack/sysgo/test_sequencer.go @@ -117,10 +117,10 @@ func WithTestSequencer(testSequencerID stack.TestSequencerID, l1CLID stack.L1CLN Value: endpoint.HttpURL(l1EL.userRPC), }, L2EL: endpoint.MustRPC{ - Value: endpoint.HttpURL(l2EL.userRPC), + Value: endpoint.HttpURL(l2EL.UserRPC()), }, L2CL: endpoint.MustRPC{ - Value: endpoint.HttpURL(l2CL.userRPC), + Value: endpoint.HttpURL(l2CL.UserRPC()), }, }, }, @@ -149,7 +149,7 @@ func WithTestSequencer(testSequencerID stack.TestSequencerID, l1CLID stack.L1CLN cid_L2: { Standard: &standardcommitter.Config{ RPC: endpoint.MustRPC{ - Value: endpoint.HttpURL(l2CL.userRPC), + Value: endpoint.HttpURL(l2CL.UserRPC()), }, }, }, @@ -161,7 +161,7 @@ func WithTestSequencer(testSequencerID stack.TestSequencerID, l1CLID stack.L1CLN pid_L2: { Standard: &standardpublisher.Config{ RPC: endpoint.MustRPC{ - Value: endpoint.HttpURL(l2CL.userRPC), + Value: endpoint.HttpURL(l2CL.UserRPC()), }, }, }, diff --git a/op-service/README.md b/op-service/README.md index 8626cc355e30c..e5a9cad89b0e3 100644 --- a/op-service/README.md +++ b/op-service/README.md @@ -28,6 +28,7 @@ Pull requests: [monorepo](https://github.com/ethereum-optimism/optimism/pulls?q= ├── jsonutil - JSON encoding/decoding utils ├── locks - Lock utils, like read-write wrapped types ├── log - Logging CLI and middleware utils +├── logpipe - Logs streaming from io.Reader to logger ├── logfilter - Logging filters ├── logmods - Log handler wrapping/unwrapping utils ├── metrics - Metrics types, metering abstractions, server utils diff --git a/op-service/logpipe/pipe.go b/op-service/logpipe/pipe.go new file mode 100644 index 0000000000000..64e7d1b89d37e --- /dev/null +++ b/op-service/logpipe/pipe.go @@ -0,0 +1,132 @@ +package logpipe + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "io" + "log/slog" + + oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum/go-ethereum/log" +) + +type rawRustJSONLog struct { + //"timestamp" ignored + Level string `json:"level"` + Fields map[string]any `json:"fields"` + //"target" ignored" +} + +type StructuredRustLogEntry struct { + Message string + Level slog.Level + Fields map[string]any +} + +func ParseRustStructuredLogs(line []byte) LogEntry { + dec := json.NewDecoder(bytes.NewReader(line)) + dec.UseNumber() // to preserve number formatting + var e rawRustJSONLog + if err := dec.Decode(&e); err != nil { + return StructuredRustLogEntry{ + Message: "Invalid JSON", + Level: slog.LevelWarn, + Fields: map[string]any{"line": string(line)}, + } + } + lvl, err := oplog.LevelFromString(e.Level) + if err != nil { + lvl = log.LevelInfo + } + msg, _ := e.Fields["message"].(string) + delete(e.Fields, "message") + + return StructuredRustLogEntry{ + Message: msg, + Level: lvl, + Fields: e.Fields, + } +} + +func (e StructuredRustLogEntry) LogLevel() slog.Level { + return e.Level +} + +func (e StructuredRustLogEntry) LogMessage() string { + return e.Message +} + +func (e StructuredRustLogEntry) LogFields() []any { + attrs := make([]any, 0, len(e.Fields)) + for k, v := range e.Fields { + if x, ok := v.(json.Number); ok { + v = x.String() + } + attrs = append(attrs, slog.Any(k, v)) + } + return attrs +} + +func (e StructuredRustLogEntry) FieldValue(key string) any { + return e.Fields[key] +} + +type LogEntry interface { + LogLevel() slog.Level + LogMessage() string + LogFields() []any + FieldValue(key string) any +} + +type LogProcessor func(line []byte) + +type LogParser func(line []byte) LogEntry + +func ToLogger(logger log.Logger) func(e LogEntry) { + return func(e LogEntry) { + msg := e.LogMessage() + attrs := e.LogFields() + lvl := e.LogLevel() + + if lvl >= log.LevelCrit { + // If a sub-process has a critical error, this process can handle it + // Don't force an os.Exit, downgrade to error instead + lvl = log.LevelError + attrs = append(attrs, slog.String("innerLevel", "CRIT")) + } + logger.Log(lvl, msg, attrs...) + } +} + +// PipeLogs reads logs from the provided io.ReadCloser (e.g., subprocess stdout), +// and outputs them to the provider logger. +// +// This: +// 1. assumes each line is a JSON object +// 2. parses it +// 3. extracts the "level" and optional "msg" +// 4. treats remaining fields as structured attributes +// 5. logs the entries using the provided log.Logger +// +// Non-JSON lines are logged as warnings. +// Crit level is mapped to error-level, to prevent untrusted crit logs from stopping the process. +// This function processes until the stream ends, and closes the reader. +// This returns the first read error (If we run into EOF, nil returned is returned instead). +func PipeLogs(r io.ReadCloser, onLog LogProcessor) (outErr error) { + defer func() { + outErr = errors.Join(outErr, r.Close()) + }() + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + lineBytes := scanner.Bytes() + if len(lineBytes) == 0 { + continue // Skip empty lines + } + onLog(lineBytes) + } + + return scanner.Err() +} diff --git a/op-service/logpipe/pipe_test.go b/op-service/logpipe/pipe_test.go new file mode 100644 index 0000000000000..19b4a1baee405 --- /dev/null +++ b/op-service/logpipe/pipe_test.go @@ -0,0 +1,63 @@ +package logpipe + +import ( + "bytes" + "io" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-service/testlog" +) + +func TestPipeLogs(t *testing.T) { + logger, capt := testlog.CaptureLogger(t, log.LevelTrace) + + wg := new(sync.WaitGroup) + wg.Add(2) + + r, w := io.Pipe() + // Write the log output to the pipe + go func() { + defer wg.Done() + _, err := io.Copy(w, bytes.NewReader([]byte(`{"level": "DEBUG", "fields": {"message": "hello", "foo": 1}}`+"\n"))) + require.NoError(t, err) + _, err = io.Copy(w, bytes.NewReader([]byte(`test invalid JSON`+"\n"))) + require.NoError(t, err) + _, err = io.Copy(w, bytes.NewReader([]byte(`{"fields": {"message": "world", "bar": "sunny"}, "level": "INFO"}`+"\n"))) + require.NoError(t, err) + require.NoError(t, w.Close()) + }() + // Read the log output from the pipe + go func() { + defer wg.Done() + toLogger := ToLogger(logger) + logProc := func(line []byte) { + toLogger(ParseRustStructuredLogs(line)) + } + err := PipeLogs(r, logProc) + require.NoError(t, err) + }() + wg.Wait() + + entry1 := capt.FindLog( + testlog.NewLevelFilter(log.LevelDebug), + testlog.NewAttributesContainsFilter("foo", "1")) + require.NotNil(t, entry1) + require.Equal(t, "hello", entry1.Message) + + entry2 := capt.FindLog( + testlog.NewLevelFilter(log.LevelWarn), + testlog.NewAttributesContainsFilter("line", "test invalid JSON")) + require.NotNil(t, entry2) + require.Equal(t, "Invalid JSON", entry2.Message) + + entry3 := capt.FindLog( + testlog.NewLevelFilter(log.LevelInfo), + testlog.NewAttributesContainsFilter("bar", "sunny")) + require.NotNil(t, entry3) + require.Equal(t, "world", entry3.Message) +} diff --git a/op-service/tasks/await.go b/op-service/tasks/await.go new file mode 100644 index 0000000000000..490fd2ca17437 --- /dev/null +++ b/op-service/tasks/await.go @@ -0,0 +1,15 @@ +package tasks + +import "context" + +// Await waits for a value, and sets it to the destination value. +// This returns an error if the context closes before a value is received from the channel. +func Await[E any](ctx context.Context, src chan E, dest *E) error { + select { + case <-ctx.Done(): + return ctx.Err() + case x := <-src: + *dest = x + return nil + } +} diff --git a/op-up/main.go b/op-up/main.go index 6bbce88b0dfe7..92ab70650f576 100644 --- a/op-up/main.go +++ b/op-up/main.go @@ -71,7 +71,7 @@ func run() error { } ids := sysgo.NewDefaultMinimalSystemIDs(sysgo.DefaultL1ID, sysgo.DefaultL2AID) - presets.DoMain(testingM{}, stack.MakeCommon(stack.Combine( + presets.DoMain(testingM{}, stack.MakeCommon[*sysgo.Orchestrator](stack.Combine( sysgo.WithMnemonicKeys(devkeys.TestMnemonic), sysgo.WithDeployer(), @@ -84,8 +84,8 @@ func run() error { sysgo.WithL1Nodes(ids.L1EL, ids.L1CL), - sysgo.WithL2ELNode(ids.L2EL, nil), - sysgo.WithL2CLNode(ids.L2CL, true, false, ids.L1CL, ids.L1EL, ids.L2EL), + sysgo.WithL2ELNode(ids.L2EL), + sysgo.WithL2CLNode(ids.L2CL, ids.L1CL, ids.L1EL, ids.L2EL, sysgo.L2CLSequencer()), sysgo.WithBatcher(ids.L2Batcher, ids.L1EL, ids.L2CL, ids.L2EL), sysgo.WithProposer(ids.L2Proposer, ids.L1EL, &ids.L2CL, nil),