diff --git a/op-chain-ops/script/console2_gen.go b/op-chain-ops/script/console2_gen.go index 9430a9441cce..4a7cc9a7aa7b 100644 --- a/op-chain-ops/script/console2_gen.go +++ b/op-chain-ops/script/console2_gen.go @@ -1511,4 +1511,3 @@ func (c *ConsolePrecompile) Log_59cfcbe3(p0 *big.Int, p1 *big.Int, p2 *big.Int, func (c *ConsolePrecompile) Log_193fb800(p0 *big.Int, p1 *big.Int, p2 *big.Int, p3 *big.Int) { c.log("p0", p0, "p1", p1, "p2", p2, "p3", p3) } - diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index a1ccef871538..2a39193c6bcc 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -647,9 +647,12 @@ func (oc *OpConductor) action() { oc.log.Debug("exiting action with status and error", "status", status, "err", err) if err != nil { - oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status) - time.Sleep(oc.retryBackoff()) - oc.queueAction() + select { + case <-oc.shutdownCtx.Done(): + case <-time.After(oc.retryBackoff()): + oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status) + oc.queueAction() + } return } @@ -683,18 +686,33 @@ func (oc *OpConductor) transferLeader() error { } func (oc *OpConductor) stopSequencer() error { - oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load()) - - _, err := oc.ctrl.StopSequencer(context.Background()) - if err != nil { + oc.log.Info( + "stopping sequencer", + "server", oc.cons.ServerID(), + "leader", oc.leader.Load(), + "healthy", oc.healthy.Load(), + "active", oc.seqActive.Load()) + + // Quoting (@zhwrd): StopSequencer is called after conductor loses leadership. In the event that + // the StopSequencer call fails, it actually has little real consequences because the sequencer + // cant produce a block and gossip / commit it to the raft log (requires leadership). Once + // conductor comes back up it will check its leader and sequencer state and attempt to stop the + // sequencer again. So it is "okay" to fail to stop a sequencer, the state will eventually be + // rectified and we won't have two active sequencers that are actually producing blocks. + // + // To that end we allow to cancel the StopSequencer call if we're shutting down. + latestHead, err := oc.ctrl.StopSequencer(oc.shutdownCtx) + if err == nil { + // None of the consensus state should have changed here so don't log it again. + oc.log.Info("stopped sequencer", "latestHead", latestHead) + } else { if strings.Contains(err.Error(), driver.ErrSequencerAlreadyStopped.Error()) { - oc.log.Warn("sequencer already stopped.", "err", err) + oc.log.Warn("sequencer already stopped", "err", err) } else { return errors.Wrap(err, "failed to stop sequencer") } } oc.metrics.RecordStopSequencer(err == nil) - oc.seqActive.Store(false) return nil } diff --git a/op-e2e/setup.go b/op-e2e/setup.go index c0168b7d207d..cd07e081d0dd 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -415,7 +415,7 @@ func (sys *System) Close() { } for name, node := range sys.RollupNodes { - if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) { + if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) && !errors.Is(err, postCtx.Err()) { combinedErr = errors.Join(combinedErr, fmt.Errorf("stop rollup node %v: %w", name, err)) } } diff --git a/op-node/node/config.go b/op-node/node/config.go index 5ca724d905c6..6b7be8f268fd 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -172,3 +172,7 @@ func (cfg *Config) Check() error { } return nil } + +func (cfg *Config) P2PEnabled() bool { + return cfg.P2P != nil && !cfg.P2P.Disabled() +} diff --git a/op-node/node/node.go b/op-node/node/node.go index a4fd3e8db08f..0f174c7a372c 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "github.com/ethereum-optimism/optimism/op-node/rollup/sequencing" + "github.com/hashicorp/go-multierror" "github.com/libp2p/go-libp2p/core/peer" @@ -40,6 +42,8 @@ type closableSafeDB interface { } type OpNode struct { + // Retain the config to test for active features rather than test for runtime state. + cfg *Config log log.Logger appVersion string metrics *metrics.Metrics @@ -93,6 +97,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m } n := &OpNode{ + cfg: cfg, log: log, appVersion: appVersion, metrics: m, @@ -134,7 +139,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error { if err := n.initP2PSigner(ctx, cfg); err != nil { return fmt.Errorf("failed to init the P2P signer: %w", err) } - if err := n.initP2P(ctx, cfg); err != nil { + if err := n.initP2P(cfg); err != nil { return fmt.Errorf("failed to init the P2P stack: %w", err) } // Only expose the server at the end, ensuring all RPC backend components are initialized. @@ -407,7 +412,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error { if err != nil { return err } - if n.p2pNode != nil { + if n.p2pEnabled() { server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics)) } if cfg.RPC.EnableAdmin { @@ -454,14 +459,20 @@ func (n *OpNode) initPProf(cfg *Config) error { return nil } -func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { - if cfg.P2P != nil { +func (n *OpNode) p2pEnabled() bool { + return n.cfg.P2PEnabled() +} + +func (n *OpNode) initP2P(cfg *Config) (err error) { + if n.p2pNode != nil { + panic("p2p node already initialized") + } + if n.p2pEnabled() { // TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue. - p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false) - if err != nil || p2pNode == nil { - return err + n.p2pNode, err = p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false) + if err != nil { + return } - n.p2pNode = p2pNode if n.p2pNode.Dv5Udp() != nil { go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers()) } @@ -469,15 +480,14 @@ func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { return nil } -func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error { +func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) (err error) { // the p2p signer setup is optional if cfg.P2PSigner == nil { - return nil + return } // p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional - var err error n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx) - return err + return } func (n *OpNode) Start(ctx context.Context) error { @@ -533,7 +543,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa n.tracer.OnPublishL2Payload(ctx, envelope) // publish to p2p, if we are running p2p at all - if n.p2pNode != nil { + if n.p2pEnabled() { payload := envelope.ExecutionPayload if n.p2pSigner == nil { return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID()) @@ -547,7 +557,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error { // ignore if it's from ourselves - if n.p2pNode != nil && from == n.p2pNode.Host().ID() { + if n.p2pEnabled() && from == n.p2pNode.Host().ID() { return nil } @@ -568,9 +578,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope * } func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { - if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() { + if n.p2pEnabled() && n.p2pNode.AltSyncEnabled() { if unixTimeStale(start.Time, 12*time.Hour) { - n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time) + n.log.Debug( + "ignoring request to sync L2 range, timestamp is too old for p2p", + "start", start, + "end", end, + "start_time", start.Time) return nil } return n.p2pNode.RequestL2Range(ctx, start, end) @@ -606,10 +620,26 @@ func (n *OpNode) Stop(ctx context.Context) error { result = multierror.Append(result, fmt.Errorf("failed to close RPC server: %w", err)) } } + + // Stop sequencer and report last hash. l2Driver can be nil if we're cleaning up a failed init. + if n.l2Driver != nil { + latestHead, err := n.l2Driver.StopSequencer(ctx) + switch { + case errors.Is(err, sequencing.ErrSequencerNotEnabled): + case errors.Is(err, driver.ErrSequencerAlreadyStopped): + n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead) + case err == nil: + n.log.Info("stopped sequencer", "latestHead", latestHead) + default: + result = multierror.Append(result, fmt.Errorf("error stopping sequencer: %w", err)) + } + } if n.p2pNode != nil { if err := n.p2pNode.Close(); err != nil { result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err)) } + // Prevent further use of p2p. + n.p2pNode = nil } if n.p2pSigner != nil { if err := n.p2pSigner.Close(); err != nil { diff --git a/op-node/p2p/config.go b/op-node/p2p/config.go index 94b75a95de26..ee21ba20fc39 100644 --- a/op-node/p2p/config.go +++ b/op-node/p2p/config.go @@ -48,6 +48,7 @@ type HostMetrics interface { // SetupP2P provides a host and discovery service for usage in the rollup node. type SetupP2P interface { Check() error + // Looks like this was started to prevent partially inited p2p. Disabled() bool // Host creates a libp2p host service. Returns nil, nil if p2p is disabled. Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) diff --git a/op-node/p2p/node.go b/op-node/p2p/node.go index 4c88556ddd9c..70f7dbc67c0b 100644 --- a/op-node/p2p/node.go +++ b/op-node/p2p/node.go @@ -52,10 +52,23 @@ type NodeP2P struct { // NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil. // If metrics are configured, a bandwidth monitor will be spawned in a goroutine. -func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) (*NodeP2P, error) { +func NewNodeP2P( + resourcesCtx context.Context, + rollupCfg *rollup.Config, + log log.Logger, + setup SetupP2P, + gossipIn GossipIn, + l2Chain L2Chain, + runCfg GossipRuntimeConfig, + metrics metrics.Metricer, + elSyncEnabled bool, +) (*NodeP2P, error) { if setup == nil { return nil, errors.New("p2p node cannot be created without setup") } + if setup.Disabled() { + return nil, errors.New("SetupP2P.Disabled is true") + } var n NodeP2P if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics, elSyncEnabled); err != nil { closeErr := n.Close() @@ -65,12 +78,24 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. return nil, err } if n.host == nil { - return nil, nil + // See prior comment about n.host optionality: + // TODO(CLI-4016): host is not optional, NodeP2P as a whole is. + panic("host is not optional if p2p is enabled") } return &n, nil } -func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) error { +func (n *NodeP2P) init( + resourcesCtx context.Context, + rollupCfg *rollup.Config, + log log.Logger, + setup SetupP2P, + gossipIn GossipIn, + l2Chain L2Chain, + runCfg GossipRuntimeConfig, + metrics metrics.Metricer, + elSyncEnabled bool, +) error { bwc := p2pmetrics.NewBandwidthCounter() n.log = log @@ -85,86 +110,83 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l return fmt.Errorf("failed to start p2p host: %w", err) } - // TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong - if n.host != nil { - // Enable extra features, if any. During testing we don't setup the most advanced host all the time. - if extra, ok := n.host.(ExtraHostFeatures); ok { - n.gater = extra.ConnectionGater() - n.connMgr = extra.ConnectionManager() - } - eps, ok := n.host.Peerstore().(store.ExtendedPeerstore) - if !ok { - return fmt.Errorf("cannot init without extended peerstore: %w", err) - } - n.store = eps - scoreParams := setup.PeerScoringParams() + // Enable extra features, if any. During testing we don't setup the most advanced host all the time. + if extra, ok := n.host.(ExtraHostFeatures); ok { + n.gater = extra.ConnectionGater() + n.connMgr = extra.ConnectionManager() + } + eps, ok := n.host.Peerstore().(store.ExtendedPeerstore) + if !ok { + return fmt.Errorf("cannot init without extended peerstore: %w", err) + } + n.store = eps + scoreParams := setup.PeerScoringParams() - if scoreParams != nil { - n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers) - } else { - n.appScorer = &NoopApplicationScorer{} - } - // Activate the P2P req-resp sync if enabled by feature-flag. - if setup.ReqRespSyncEnabled() && !elSyncEnabled { - n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer) - n.host.Network().Notify(&network.NotifyBundle{ - ConnectedF: func(nw network.Network, conn network.Conn) { - n.syncCl.AddPeer(conn.RemotePeer()) - }, - DisconnectedF: func(nw network.Network, conn network.Conn) { - // only when no connection is available, we can remove the peer - if nw.Connectedness(conn.RemotePeer()) == network.NotConnected { - n.syncCl.RemovePeer(conn.RemotePeer()) - } - }, - }) - n.syncCl.Start() - // the host may already be connected to peers, add them all to the sync client - for _, peerID := range n.host.Network().Peers() { - n.syncCl.AddPeer(peerID) - } - if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy - n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics) - // register the sync protocol with libp2p host - payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest) - n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber) - } + if scoreParams != nil { + n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers) + } else { + n.appScorer = &NoopApplicationScorer{} + } + // Activate the P2P req-resp sync if enabled by feature-flag. + if setup.ReqRespSyncEnabled() && !elSyncEnabled { + n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer) + n.host.Network().Notify(&network.NotifyBundle{ + ConnectedF: func(nw network.Network, conn network.Conn) { + n.syncCl.AddPeer(conn.RemotePeer()) + }, + DisconnectedF: func(nw network.Network, conn network.Conn) { + // only when no connection is available, we can remove the peer + if nw.Connectedness(conn.RemotePeer()) == network.NotConnected { + n.syncCl.RemovePeer(conn.RemotePeer()) + } + }, + }) + n.syncCl.Start() + // the host may already be connected to peers, add them all to the sync client + for _, peerID := range n.host.Network().Peers() { + n.syncCl.AddPeer(peerID) } - n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log) - // notify of any new connections/streams/etc. - n.host.Network().Notify(NewNetworkNotifier(log, metrics)) - // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. - n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log) - if err != nil { - return fmt.Errorf("failed to start gossipsub router: %w", err) + if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy + n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics) + // register the sync protocol with libp2p host + payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest) + n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber) } - n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn) - if err != nil { - return fmt.Errorf("failed to join blocks gossip topic: %w", err) - } - log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String()) + } + n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log) + // notify of any new connections/streams/etc. + n.host.Network().Notify(NewNetworkNotifier(log, metrics)) + // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. + n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log) + if err != nil { + return fmt.Errorf("failed to start gossipsub router: %w", err) + } + n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn) + if err != nil { + return fmt.Errorf("failed to join blocks gossip topic: %w", err) + } + log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String()) - tcpPort, err := FindActiveTCPPort(n.host) - if err != nil { - log.Warn("failed to find what TCP port p2p is binded to", "err", err) - } + tcpPort, err := FindActiveTCPPort(n.host) + if err != nil { + log.Warn("failed to find what TCP port p2p is binded to", "err", err) + } - // All nil if disabled. - n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort) - if err != nil { - return fmt.Errorf("failed to start discv5: %w", err) - } + // All nil if disabled. + n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort) + if err != nil { + return fmt.Errorf("failed to start discv5: %w", err) + } - if metrics != nil { - go metrics.RecordBandwidth(resourcesCtx, bwc) - } + if metrics != nil { + go metrics.RecordBandwidth(resourcesCtx, bwc) + } - if setup.BanPeers() { - n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration()) - n.peerMonitor.Start() - } - n.appScorer.start() + if setup.BanPeers() { + n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration()) + n.peerMonitor.Start() } + n.appScorer.start() return nil }