Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions op-conductor/conductor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
ExecutionP2pMinPeerCount: ctx.Uint64(flags.HealthcheckExecutionP2pMinPeerCount.Name),
ExecutionP2pRPCUrl: executionP2pRpcUrl,
ExecutionP2pCheckApi: executionP2pCheckApi,
RollupBoostPartialHealthinessToleranceLimit: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceLimit.Name),
RollupBoostPartialHealthinessToleranceIntervalSeconds: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds.Name),
},
RollupCfg: *rollupCfg,
RPCEnableProxy: ctx.Bool(flags.RPCEnableProxy.Name),
Expand Down Expand Up @@ -225,6 +227,12 @@ type HealthCheckConfig struct {

// ExecutionP2pMinPeerCount is the minimum number of EL P2P peers required for the sequencer to be healthy.
ExecutionP2pMinPeerCount uint64

// RollupBoostPartialHealthinessToleranceLimit is the amount of rollup-boost partial unhealthiness failures to tolerate within a configurable time frame
RollupBoostPartialHealthinessToleranceLimit uint64

// RollupBoostPartialHealthinessToleranceIntervalSeconds is the time frame within which `RollupBoostToleratePartialHealthinessToleranceIntervalLimit` is evaluated
RollupBoostPartialHealthinessToleranceIntervalSeconds uint64
}

func (c *HealthCheckConfig) Check() error {
Expand All @@ -251,5 +259,8 @@ func (c *HealthCheckConfig) Check() error {
return fmt.Errorf("invalid el p2p check api")
}
}
if (c.RollupBoostPartialHealthinessToleranceLimit != 0 && c.RollupBoostPartialHealthinessToleranceIntervalSeconds == 0) || (c.RollupBoostPartialHealthinessToleranceLimit == 0 && c.RollupBoostPartialHealthinessToleranceIntervalSeconds != 0) {
return fmt.Errorf("only one of RollupBoostPartialHealthinessToleranceLimit or RollupBoostPartialHealthinessToleranceIntervalSeconds found to be defined. Either define both of them or none.")
}
return nil
}
2 changes: 2 additions & 0 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
rb,
elP2p,
c.cfg.HealthCheck.ExecutionP2pMinPeerCount,
c.cfg.HealthCheck.RollupBoostPartialHealthinessToleranceLimit,
c.cfg.HealthCheck.RollupBoostPartialHealthinessToleranceIntervalSeconds,
)
c.healthUpdateCh = c.hmon.Subscribe()

Expand Down
12 changes: 12 additions & 0 deletions op-conductor/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ var (
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_EXECUTION_P2P_CHECK_API"),
Value: "net",
}
HealthCheckRollupBoostPartialHealthinessToleranceLimit = &cli.Uint64Flag{
Name: "healthcheck.rollup-boost-partial-healthiness-tolerance-limit",
Usage: "Sets the count of rollup-boost partial healthiness failures to occur before marking op-conducto as unhealthy. Default is 0 with which a single occurrence of rollup-boost partial healthiness is enough to set op-conductor as unhealthy",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_ROLLUP_BOOST_PARTIAL_HEALTHINESS_TOLERANCE_LIMIT"),
}
HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds = &cli.Uint64Flag{
Name: "healthcheck.rollup-boost-partial-healthiness-tolerance-interval-seconds",
Usage: "The time frame within which rollup-boost partial healthiness tolerance is evaluated",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_ROLLUP_BOOST_PARTIAL_HEALTHINESS_TOLERANCE_INTERVAL_SECONDS"),
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -213,6 +223,8 @@ var optionalFlags = []cli.Flag{
HealthcheckExecutionP2pMinPeerCount,
HealthcheckExecutionP2pRPCUrl,
HealthcheckExecutionP2pCheckApi,
HealthCheckRollupBoostPartialHealthinessToleranceLimit,
HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds,
}

func init() {
Expand Down
32 changes: 24 additions & 8 deletions op-conductor/health/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type HealthMonitor interface {
// interval is the interval between health checks measured in seconds.
// safeInterval is the interval between safe head progress measured in seconds.
// minPeerCount is the minimum number of peers required for the sequencer to be healthy.
func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p apis.P2PClient, supervisor SupervisorHealthAPI, rb client.RollupBoostClient, elP2pClient client.ElP2PClient, minElP2pPeers uint64) HealthMonitor {
func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p apis.P2PClient, supervisor SupervisorHealthAPI, rb client.RollupBoostClient, elP2pClient client.ElP2PClient, minElP2pPeers uint64, rollupBoostToleratePartialHealthinessToleranceLimit uint64, rollupBoostToleratePartialHealthinessToleranceIntervalSeconds uint64) HealthMonitor {
hm := &SequencerHealthMonitor{
log: log,
metrics: metrics,
Expand All @@ -50,7 +50,7 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva
safeEnabled: safeEnabled,
safeInterval: safeInterval,
minPeerCount: minPeerCount,
timeProviderFn: currentTimeProvicer,
timeProviderFn: currentTimeProvider,
node: node,
p2p: p2p,
supervisor: supervisor,
Expand All @@ -64,6 +64,14 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva
elP2pClient: elP2pClient,
}
}
if rollupBoostToleratePartialHealthinessToleranceLimit != 0 {
hm.rollupBoostPartialHealthinessToleranceLimit = rollupBoostToleratePartialHealthinessToleranceLimit
var err error
hm.rollupBoostPartialHealthinessToleranceCounter, err = NewTimeBoundedRotatingCounter(rollupBoostToleratePartialHealthinessToleranceIntervalSeconds)
if err != nil {
panic(fmt.Errorf("failed to setup health monitor: %w", err))
}
}

return hm
}
Expand Down Expand Up @@ -93,11 +101,13 @@ type SequencerHealthMonitor struct {

timeProviderFn func() uint64

node dial.RollupClientInterface
p2p apis.P2PClient
supervisor SupervisorHealthAPI
rb client.RollupBoostClient
elP2p *ElP2pHealthMonitor
node dial.RollupClientInterface
p2p apis.P2PClient
supervisor SupervisorHealthAPI
rb client.RollupBoostClient
elP2p *ElP2pHealthMonitor
rollupBoostPartialHealthinessToleranceLimit uint64
rollupBoostPartialHealthinessToleranceCounter *timeBoundedRotatingCounter
}

var _ HealthMonitor = (*SequencerHealthMonitor)(nil)
Expand Down Expand Up @@ -288,8 +298,14 @@ func (hm *SequencerHealthMonitor) checkRollupBoost(ctx context.Context) error {
case client.HealthStatusHealthy:
return nil
case client.HealthStatusPartial:
if hm.rollupBoostPartialHealthinessToleranceCounter != nil && hm.rollupBoostPartialHealthinessToleranceCounter.CurrentValue() < hm.rollupBoostPartialHealthinessToleranceLimit {
latestValue := hm.rollupBoostPartialHealthinessToleranceCounter.Increment()
hm.log.Debug("Rollup boost partial unhealthiness failure tolerated", "currentValue", latestValue, "limit", hm.rollupBoostPartialHealthinessToleranceLimit)
return nil
}
hm.log.Error("Rollup boost is partial failure, builder is down but fallback execution client is up", "err", ErrRollupBoostPartiallyHealthy)
return ErrRollupBoostPartiallyHealthy

case client.HealthStatusUnhealthy:
hm.log.Error("Rollup boost total failure, both builder and fallback execution client are down", "err", ErrRollupBoostNotHealthy)
return ErrRollupBoostNotHealthy
Expand All @@ -306,6 +322,6 @@ func calculateTimeDiff(now, then uint64) uint64 {
return now - then
}

func currentTimeProvicer() uint64 {
func currentTimeProvider() uint64 {
return uint64(time.Now().Unix())
}
81 changes: 81 additions & 0 deletions op-conductor/health/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,16 @@ func (s *HealthMonitorTestSuite) SetupMonitor(
return monitor
}

type monitorOpts func(*SequencerHealthMonitor)

// SetupMonitorWithRollupBoost creates a HealthMonitor that includes a RollupBoostClient
func (s *HealthMonitorTestSuite) SetupMonitorWithRollupBoost(
now, unsafeInterval, safeInterval uint64,
mockRollupClient *testutils.MockRollupClient,
mockP2P *p2pMocks.API,
mockRollupBoost *clientmocks.RollupBoostClient,
elP2pClient client.ElP2PClient,
opts ...monitorOpts,
) *SequencerHealthMonitor {
tp := &timeProvider{now: now}
if mockP2P == nil {
Expand Down Expand Up @@ -137,6 +140,9 @@ func (s *HealthMonitorTestSuite) SetupMonitorWithRollupBoost(
elP2pClient: elP2pClient,
}
}
for _, opt := range opts {
opt(monitor)
}
err := monitor.Start(context.Background())
s.NoError(err)
return monitor
Expand Down Expand Up @@ -442,6 +448,81 @@ func (s *HealthMonitorTestSuite) TestRollupBoostPartialStatus() {
s.NoError(monitor.Stop())
}

func (s *HealthMonitorTestSuite) TestRollupBoostPartialStatusWithTolerance() {
s.T().Parallel()
now := uint64(time.Now().Unix())

// Setup healthy node conditions
rc := &testutils.MockRollupClient{}
ss1 := mockSyncStatus(now-1, 1, now-3, 0)

// because 6 healthchecks are going to be expected cause 6 calls of sync status
for i := 0; i < 6; i++ {
rc.ExpectSyncStatus(ss1, nil)
}

// Setup healthy peer count
pc := &p2pMocks.API{}
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
pc.EXPECT().PeerStats(mock.Anything).Return(ps1, nil)

// Setup partial rollup boost status (treated as unhealthy)
rb := &clientmocks.RollupBoostClient{}
rb.EXPECT().Healthcheck(mock.Anything).Return(client.HealthStatusPartial, nil)

toleranceLimit := uint64(2)
toleranceIntervalSeconds := uint64(6)

timeBoundedRotatingCounter, err := NewTimeBoundedRotatingCounter(toleranceIntervalSeconds)
s.Nil(err)

tp := &timeProvider{now: 1758792282}

// Start monitor with all dependencies as well as tolerance of 2 rollup-boost partial unhealthiness per 3s period
monitor := s.SetupMonitorWithRollupBoost(now, 60, 60, rc, pc, rb, nil, func(shm *SequencerHealthMonitor) {
timeBoundedRotatingCounter.timeProviderFn = tp.Now

// pollute the cache of timeBoundRotatingCounter with 998 elements so as to later test the lazy cleanup
// note: the 999th and 1000th element will be added by the first healthcheck run
for i := 0; i < 999; i++ {
timeBoundedRotatingCounter.temporalCache[int64(i)] = uint64(1)
}

shm.rollupBoostPartialHealthinessToleranceCounter = timeBoundedRotatingCounter
shm.rollupBoostPartialHealthinessToleranceLimit = toleranceLimit
})

healthUpdateCh := monitor.Subscribe()

s.Eventually(func() bool {
return len(timeBoundedRotatingCounter.temporalCache) == 1000
}, time.Second*3, time.Second*1)

firstHealthStatus := <-healthUpdateCh
secondHealthStatus := <-healthUpdateCh
thirdHealthStatus := <-healthUpdateCh

s.Nil(firstHealthStatus)
s.Nil(secondHealthStatus)
s.Equal(ErrRollupBoostPartiallyHealthy, thirdHealthStatus)

tp.Now() // simulate another second passing
// by now, because of three healthchecks, six seconds (CurrentValue + Increment + CurrentValue + Increment + CurrentValue + tp.Now()) have been simulated to pass (by the timeProviderFn)
// this should reset the time bound counter, thereby allowing partial unhealthiness failures to be tolerated again

fourthHealthStatus := <-healthUpdateCh
fifthHealthStatus := <-healthUpdateCh
sixthHealthStatus := <-healthUpdateCh

s.Nil(fourthHealthStatus)
s.Nil(fifthHealthStatus)
s.Equal(ErrRollupBoostPartiallyHealthy, sixthHealthStatus)

s.NoError(monitor.Stop())
}

func (s *HealthMonitorTestSuite) TestRollupBoostHealthy() {
s.T().Parallel()
now := uint64(time.Now().Unix())
Expand Down
58 changes: 58 additions & 0 deletions op-conductor/health/timeboundcounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package health

import (
"fmt"
"sync"
)

// this is a type of counter which keeps on incrementing until its reset interval is hit after which it resets to 0
// this can be used to track time-based rate-limit, error counts, etc.
type timeBoundedRotatingCounter struct {
resetIntervalSeconds uint64
timeProviderFn func() uint64

mut *sync.RWMutex
temporalCache map[int64]uint64
}

func NewTimeBoundedRotatingCounter(resetIntervalSeconds uint64) (*timeBoundedRotatingCounter, error) {
if resetIntervalSeconds == 0 {
return nil, fmt.Errorf("reset interval seconds must be more than 0")
}
return &timeBoundedRotatingCounter{
resetIntervalSeconds: resetIntervalSeconds,
mut: &sync.RWMutex{},
temporalCache: map[int64]uint64{},
timeProviderFn: currentTimeProvider,
}, nil
}

func (t *timeBoundedRotatingCounter) Increment() uint64 {
// let's take `resetIntervalSeconds` as 60s
// truncatedTimestamp is current timestamp rounded off by 60s (resetIntervalSeconds)
// thereby generating a value which stays same until the next 60s helping track and incrementing the counter corresponding to it for the next 60s
currentTsSeconds := t.timeProviderFn()
truncatedTimestamp := int64(currentTsSeconds / t.resetIntervalSeconds)
t.mut.Lock()
// a lazy cleanup subroutine to the clean the cache when it's grown enough, preventing memory leaks
defer func() {
defer t.mut.Unlock()
if len(t.temporalCache) > 1000 {
newCache := map[int64]uint64{
truncatedTimestamp: t.temporalCache[truncatedTimestamp],
}
t.temporalCache = newCache // garbage collector should take care of the old cache
}
}()

t.temporalCache[truncatedTimestamp]++
return t.temporalCache[truncatedTimestamp]
}

func (t *timeBoundedRotatingCounter) CurrentValue() uint64 {
currentTsSeconds := t.timeProviderFn()
truncatedTimestamp := int64(currentTsSeconds / t.resetIntervalSeconds)
t.mut.RLock()
defer t.mut.RUnlock()
return t.temporalCache[truncatedTimestamp]
}
Loading