Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conductor and sequencer p2p refactoring #11455

Merged
merged 10 commits into from
Aug 29, 2024
1 change: 0 additions & 1 deletion op-chain-ops/script/console2_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,4 +1511,3 @@ func (c *ConsolePrecompile) Log_59cfcbe3(p0 *big.Int, p1 *big.Int, p2 *big.Int,
func (c *ConsolePrecompile) Log_193fb800(p0 *big.Int, p1 *big.Int, p2 *big.Int, p3 *big.Int) {
c.log("p0", p0, "p1", p1, "p2", p2, "p3", p3)
}

36 changes: 27 additions & 9 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,12 @@ func (oc *OpConductor) action() {

oc.log.Debug("exiting action with status and error", "status", status, "err", err)
if err != nil {
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
time.Sleep(oc.retryBackoff())
oc.queueAction()
select {
case <-oc.shutdownCtx.Done():
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
case <-time.After(oc.retryBackoff()):
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
oc.queueAction()
}
return
}

Expand Down Expand Up @@ -683,18 +686,33 @@ func (oc *OpConductor) transferLeader() error {
}

func (oc *OpConductor) stopSequencer() error {
oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())

_, err := oc.ctrl.StopSequencer(context.Background())
if err != nil {
oc.log.Info(
"stopping sequencer",
"server", oc.cons.ServerID(),
"leader", oc.leader.Load(),
"healthy", oc.healthy.Load(),
"active", oc.seqActive.Load())

// Quoting (@zhwrd): StopSequencer is called after conductor loses leadership. In the event that
// the StopSequencer call fails, it actually has little real consequences because the sequencer
// cant produce a block and gossip / commit it to the raft log (requires leadership). Once
// conductor comes back up it will check its leader and sequencer state and attempt to stop the
// sequencer again. So it is "okay" to fail to stop a sequencer, the state will eventually be
// rectified and we won't have two active sequencers that are actually producing blocks.
//
// To that end we allow to cancel the StopSequencer call if we're shutting down.
latestHead, err := oc.ctrl.StopSequencer(oc.shutdownCtx)
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
// None of the consensus state should have changed here so don't log it again.
oc.log.Info("stopped sequencer", "latestHead", latestHead)
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
} else {
if strings.Contains(err.Error(), driver.ErrSequencerAlreadyStopped.Error()) {
oc.log.Warn("sequencer already stopped.", "err", err)
oc.log.Warn("sequencer already stopped", "err", err)
} else {
return errors.Wrap(err, "failed to stop sequencer")
}
}
oc.metrics.RecordStopSequencer(err == nil)

oc.seqActive.Store(false)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (sys *System) Close() {
}

for name, node := range sys.RollupNodes {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) && !errors.Is(err, postCtx.Err()) {
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop rollup node %v: %w", name, err))
}
}
Expand Down
64 changes: 47 additions & 17 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"

Expand Down Expand Up @@ -42,6 +44,8 @@ type closableSafeDB interface {
}

type OpNode struct {
// Retain the config to test for active features rather than test for runtime state.
cfg *Config
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
log log.Logger
appVersion string
metrics *metrics.Metrics
Expand Down Expand Up @@ -95,6 +99,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m
}

n := &OpNode{
cfg: cfg,
log: log,
appVersion: appVersion,
metrics: m,
Expand Down Expand Up @@ -136,7 +141,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
if err := n.initP2PSigner(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the P2P signer: %w", err)
}
if err := n.initP2P(ctx, cfg); err != nil {
if err := n.initP2P(cfg); err != nil {
return fmt.Errorf("failed to init the P2P stack: %w", err)
}
// Only expose the server at the end, ensuring all RPC backend components are initialized.
Expand Down Expand Up @@ -410,7 +415,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error {
if err != nil {
return err
}
if n.p2pNode != nil {
if n.p2pEnabled() {
server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
}
if cfg.RPC.EnableAdmin {
Expand Down Expand Up @@ -445,7 +450,7 @@ func (n *OpNode) initHeartbeat(cfg *Config) {
return
}
var peerID string
if cfg.P2P.Disabled() {
if !n.p2pEnabled() {
peerID = "disabled"
} else {
peerID = n.P2P().Host().ID().String()
Expand Down Expand Up @@ -483,30 +488,35 @@ func (n *OpNode) initPProf(cfg *Config) error {
return nil
}

func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
func (n *OpNode) p2pEnabled() bool {
return n.cfg.P2P != nil && !n.cfg.P2P.Disabled()
}
anacrolix marked this conversation as resolved.
Show resolved Hide resolved

func (n *OpNode) initP2P(cfg *Config) (err error) {
if n.p2pNode != nil {
panic("p2p node already initialized")
}
if n.p2pEnabled() {
// TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue.
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil || p2pNode == nil {
return err
n.p2pNode, err = p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil {
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
return
}
n.p2pNode = p2pNode
if n.p2pNode.Dv5Udp() != nil {
go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
}
}
return nil
}

func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) (err error) {
// the p2p signer setup is optional
if cfg.P2PSigner == nil {
return nil
return
}
// p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional
var err error
n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx)
return err
return
}

func (n *OpNode) Start(ctx context.Context) error {
Expand Down Expand Up @@ -562,7 +572,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
n.tracer.OnPublishL2Payload(ctx, envelope)

// publish to p2p, if we are running p2p at all
if n.p2pNode != nil {
if n.p2pEnabled() {
payload := envelope.ExecutionPayload
if n.p2pSigner == nil {
return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
Expand All @@ -576,7 +586,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
if n.p2pEnabled() && from == n.p2pNode.Host().ID() {
return nil
}

Expand All @@ -597,9 +607,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *
}

func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
if n.p2pEnabled() && n.p2pNode.AltSyncEnabled() {
if unixTimeStale(start.Time, 12*time.Hour) {
n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
n.log.Debug(
"ignoring request to sync L2 range, timestamp is too old for p2p",
"start", start,
"end", end,
"start_time", start.Time)
return nil
}
return n.p2pNode.RequestL2Range(ctx, start, end)
Expand Down Expand Up @@ -635,10 +649,26 @@ func (n *OpNode) Stop(ctx context.Context) error {
result = multierror.Append(result, fmt.Errorf("failed to close RPC server: %w", err))
}
}

// Stop sequencer and report last hash. l2Driver can be nil if we're cleaning up a failed init.
if n.l2Driver != nil {
latestHead, err := n.l2Driver.StopSequencer(ctx)
switch {
case errors.Is(err, sequencing.ErrSequencerNotEnabled):
case errors.Is(err, driver.ErrSequencerAlreadyStopped):
n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead)
case err == nil:
n.log.Info("stopped sequencer", "latestHead", latestHead)
default:
result = multierror.Append(result, fmt.Errorf("error stopping sequencer: %w", err))
}
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
}
// Prevent further use of p2p.
n.p2pNode = nil
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions op-node/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type HostMetrics interface {
// SetupP2P provides a host and discovery service for usage in the rollup node.
type SetupP2P interface {
Check() error
// Looks like this was started to prevent partially inited p2p.
Disabled() bool
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error)
Expand Down
Loading