Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conductor and sequencer p2p refactoring #11455

Merged
merged 10 commits into from
Aug 29, 2024
33 changes: 23 additions & 10 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,13 @@ func (oc *OpConductor) action() {

oc.log.Debug("exiting action with status and error", "status", status, "err", err)
if err != nil {
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
time.Sleep(oc.retryBackoff())
oc.queueAction()
select {
case <-oc.shutdownCtx.Done():
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
case <-time.After(oc.retryBackoff()):
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
time.Sleep(oc.retryBackoff())
oc.queueAction()
}
return
}

Expand Down Expand Up @@ -683,18 +687,27 @@ func (oc *OpConductor) transferLeader() error {
}

func (oc *OpConductor) stopSequencer() error {
oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())

_, err := oc.ctrl.StopSequencer(context.Background())
if err != nil {
if strings.Contains(err.Error(), driver.ErrSequencerAlreadyStopped.Error()) {
oc.log.Warn("sequencer already stopped.", "err", err)
oc.log.Info(
"stopping sequencer",
"server", oc.cons.ServerID(),
"leader", oc.leader.Load(),
"healthy", oc.healthy.Load(),
"active", oc.seqActive.Load())

// Getting stuck stopping a sequencer can't be good. Is it okay to fail to stop a sequencer on
// shutdown? From what I can tell it is.
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
latestHead, err := oc.ctrl.StopSequencer(oc.shutdownCtx)
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
// None of the consensus state should have changed here so don't log it again.
oc.log.Info("stopped sequencer", "latestHead", latestHead)
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
} else {
if errors.Is(err, driver.ErrSequencerAlreadyStopped) {
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
oc.log.Warn("sequencer already stopped", "err", err)
} else {
return errors.Wrap(err, "failed to stop sequencer")
}
}
oc.metrics.RecordStopSequencer(err == nil)

oc.seqActive.Store(false)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (sys *System) Close() {
}

for name, node := range sys.RollupNodes {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) && !errors.Is(err, postCtx.Err()) {
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop rollup node %v: %w", name, err))
}
}
Expand Down
60 changes: 44 additions & 16 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type closableSafeDB interface {
}

type OpNode struct {
// Retain the config to test for active features rather than test for runtime state.
cfg *Config
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
log log.Logger
appVersion string
metrics *metrics.Metrics
Expand Down Expand Up @@ -95,6 +97,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m
}

n := &OpNode{
cfg: cfg,
log: log,
appVersion: appVersion,
metrics: m,
Expand Down Expand Up @@ -136,7 +139,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
if err := n.initP2PSigner(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the P2P signer: %w", err)
}
if err := n.initP2P(ctx, cfg); err != nil {
if err := n.initP2P(cfg); err != nil {
return fmt.Errorf("failed to init the P2P stack: %w", err)
}
// Only expose the server at the end, ensuring all RPC backend components are initialized.
Expand Down Expand Up @@ -410,7 +413,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error {
if err != nil {
return err
}
if n.p2pNode != nil {
if n.p2pEnabled() {
server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
}
if cfg.RPC.EnableAdmin {
Expand Down Expand Up @@ -448,6 +451,7 @@ func (n *OpNode) initHeartbeat(cfg *Config) {
if cfg.P2P.Disabled() {
peerID = "disabled"
} else {
// Is there a check for p2p enabled missing here? Is it implied that p2p is enabled if there's a heartbeat?
peerID = n.P2P().Host().ID().String()
}
anacrolix marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -483,30 +487,35 @@ func (n *OpNode) initPProf(cfg *Config) error {
return nil
}

func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
func (n *OpNode) p2pEnabled() bool {
return n.cfg.P2P != nil
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
}

func (n *OpNode) initP2P(cfg *Config) (err error) {
if n.p2pNode != nil {
panic("p2p node already initialized")
}
if n.p2pEnabled() {
// TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue.
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil || p2pNode == nil {
return err
n.p2pNode, err = p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil {
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
return
}
n.p2pNode = p2pNode
if n.p2pNode.Dv5Udp() != nil {
go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
}
}
return nil
}

func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) (err error) {
// the p2p signer setup is optional
if cfg.P2PSigner == nil {
return nil
return
}
// p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional
var err error
n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx)
return err
return
}

func (n *OpNode) Start(ctx context.Context) error {
Expand Down Expand Up @@ -562,7 +571,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
n.tracer.OnPublishL2Payload(ctx, envelope)

// publish to p2p, if we are running p2p at all
if n.p2pNode != nil {
if n.p2pEnabled() {
payload := envelope.ExecutionPayload
if n.p2pSigner == nil {
return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
Expand All @@ -576,7 +585,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
if n.p2pEnabled() && from == n.p2pNode.Host().ID() {
return nil
}

Expand All @@ -597,9 +606,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *
}

func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
if n.p2pEnabled() && n.p2pNode.AltSyncEnabled() {
if unixTimeStale(start.Time, 12*time.Hour) {
n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
n.log.Debug(
"ignoring request to sync L2 range, timestamp is too old for p2p",
"start", start,
"end", end,
"start_time", start.Time)
return nil
}
return n.p2pNode.RequestL2Range(ctx, start, end)
Expand Down Expand Up @@ -635,10 +648,25 @@ func (n *OpNode) Stop(ctx context.Context) error {
result = multierror.Append(result, fmt.Errorf("failed to close RPC server: %w", err))
}
}

// Stop sequencer and report last hash. l2Driver can be nil if we're cleaning up a failed init.
if n.l2Driver != nil && n.cfg.Driver.SequencerEnabled {
latestHead, err := n.l2Driver.StopSequencer(ctx)
switch {
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
case errors.Is(err, driver.ErrSequencerAlreadyStopped):
n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead)
case err == nil:
n.log.Info("stopped sequencer", "latestHead", latestHead)
default:
result = multierror.Append(result, fmt.Errorf("error stopping sequencer: %w", err))
}
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
}
// Prevent further use of p2p.
n.p2pNode = nil
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
Expand Down