diff --git a/op-conductor/conductor/config.go b/op-conductor/conductor/config.go index b41aed34a729b..90700b5a91498 100644 --- a/op-conductor/conductor/config.go +++ b/op-conductor/conductor/config.go @@ -185,6 +185,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { ExecutionP2pMinPeerCount: ctx.Uint64(flags.HealthcheckExecutionP2pMinPeerCount.Name), ExecutionP2pRPCUrl: executionP2pRpcUrl, ExecutionP2pCheckApi: executionP2pCheckApi, + RollupBoostPartialHealthinessToleranceLimit: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceLimit.Name), + RollupBoostPartialHealthinessToleranceIntervalSeconds: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds.Name), }, RollupCfg: *rollupCfg, RPCEnableProxy: ctx.Bool(flags.RPCEnableProxy.Name), @@ -225,6 +227,12 @@ type HealthCheckConfig struct { // ExecutionP2pMinPeerCount is the minimum number of EL P2P peers required for the sequencer to be healthy. ExecutionP2pMinPeerCount uint64 + + // RollupBoostPartialHealthinessToleranceLimit is the amount of rollup-boost partial unhealthiness failures to tolerate within a configurable time frame + RollupBoostPartialHealthinessToleranceLimit uint64 + + // RollupBoostPartialHealthinessToleranceIntervalSeconds is the time frame within which `RollupBoostToleratePartialHealthinessToleranceIntervalLimit` is evaluated + RollupBoostPartialHealthinessToleranceIntervalSeconds uint64 } func (c *HealthCheckConfig) Check() error { @@ -251,5 +259,8 @@ func (c *HealthCheckConfig) Check() error { return fmt.Errorf("invalid el p2p check api") } } + if (c.RollupBoostPartialHealthinessToleranceLimit != 0 && c.RollupBoostPartialHealthinessToleranceIntervalSeconds == 0) || (c.RollupBoostPartialHealthinessToleranceLimit == 0 && c.RollupBoostPartialHealthinessToleranceIntervalSeconds != 0) { + return fmt.Errorf("only one of RollupBoostPartialHealthinessToleranceLimit or RollupBoostPartialHealthinessToleranceIntervalSeconds found to be defined. Either define both of them or none.") + } return nil } diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index e8e2ef3d35bcc..b9f25710c6bf3 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -263,6 +263,8 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error { rb, elP2p, c.cfg.HealthCheck.ExecutionP2pMinPeerCount, + c.cfg.HealthCheck.RollupBoostPartialHealthinessToleranceLimit, + c.cfg.HealthCheck.RollupBoostPartialHealthinessToleranceIntervalSeconds, ) c.healthUpdateCh = c.hmon.Subscribe() diff --git a/op-conductor/flags/flags.go b/op-conductor/flags/flags.go index c4a6bc32d8c45..96e6eeafd51ab 100644 --- a/op-conductor/flags/flags.go +++ b/op-conductor/flags/flags.go @@ -180,6 +180,16 @@ var ( EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_EXECUTION_P2P_CHECK_API"), Value: "net", } + HealthCheckRollupBoostPartialHealthinessToleranceLimit = &cli.Uint64Flag{ + Name: "healthcheck.rollup-boost-partial-healthiness-tolerance-limit", + Usage: "Sets the count of rollup-boost partial healthiness failures to occur before marking op-conducto as unhealthy. Default is 0 with which a single occurrence of rollup-boost partial healthiness is enough to set op-conductor as unhealthy", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_ROLLUP_BOOST_PARTIAL_HEALTHINESS_TOLERANCE_LIMIT"), + } + HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds = &cli.Uint64Flag{ + Name: "healthcheck.rollup-boost-partial-healthiness-tolerance-interval-seconds", + Usage: "The time frame within which rollup-boost partial healthiness tolerance is evaluated", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_ROLLUP_BOOST_PARTIAL_HEALTHINESS_TOLERANCE_INTERVAL_SECONDS"), + } ) var requiredFlags = []cli.Flag{ @@ -213,6 +223,8 @@ var optionalFlags = []cli.Flag{ HealthcheckExecutionP2pMinPeerCount, HealthcheckExecutionP2pRPCUrl, HealthcheckExecutionP2pCheckApi, + HealthCheckRollupBoostPartialHealthinessToleranceLimit, + HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds, } func init() { diff --git a/op-conductor/health/monitor.go b/op-conductor/health/monitor.go index 56091139989e0..dc5bd88fbd4b2 100644 --- a/op-conductor/health/monitor.go +++ b/op-conductor/health/monitor.go @@ -39,7 +39,7 @@ type HealthMonitor interface { // interval is the interval between health checks measured in seconds. // safeInterval is the interval between safe head progress measured in seconds. // minPeerCount is the minimum number of peers required for the sequencer to be healthy. -func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p apis.P2PClient, supervisor SupervisorHealthAPI, rb client.RollupBoostClient, elP2pClient client.ElP2PClient, minElP2pPeers uint64) HealthMonitor { +func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p apis.P2PClient, supervisor SupervisorHealthAPI, rb client.RollupBoostClient, elP2pClient client.ElP2PClient, minElP2pPeers uint64, rollupBoostToleratePartialHealthinessToleranceLimit uint64, rollupBoostToleratePartialHealthinessToleranceIntervalSeconds uint64) HealthMonitor { hm := &SequencerHealthMonitor{ log: log, metrics: metrics, @@ -50,7 +50,7 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva safeEnabled: safeEnabled, safeInterval: safeInterval, minPeerCount: minPeerCount, - timeProviderFn: currentTimeProvicer, + timeProviderFn: currentTimeProvider, node: node, p2p: p2p, supervisor: supervisor, @@ -64,6 +64,14 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva elP2pClient: elP2pClient, } } + if rollupBoostToleratePartialHealthinessToleranceLimit != 0 { + hm.rollupBoostPartialHealthinessToleranceLimit = rollupBoostToleratePartialHealthinessToleranceLimit + var err error + hm.rollupBoostPartialHealthinessToleranceCounter, err = NewTimeBoundedRotatingCounter(rollupBoostToleratePartialHealthinessToleranceIntervalSeconds) + if err != nil { + panic(fmt.Errorf("failed to setup health monitor: %w", err)) + } + } return hm } @@ -93,11 +101,13 @@ type SequencerHealthMonitor struct { timeProviderFn func() uint64 - node dial.RollupClientInterface - p2p apis.P2PClient - supervisor SupervisorHealthAPI - rb client.RollupBoostClient - elP2p *ElP2pHealthMonitor + node dial.RollupClientInterface + p2p apis.P2PClient + supervisor SupervisorHealthAPI + rb client.RollupBoostClient + elP2p *ElP2pHealthMonitor + rollupBoostPartialHealthinessToleranceLimit uint64 + rollupBoostPartialHealthinessToleranceCounter *timeBoundedRotatingCounter } var _ HealthMonitor = (*SequencerHealthMonitor)(nil) @@ -288,8 +298,14 @@ func (hm *SequencerHealthMonitor) checkRollupBoost(ctx context.Context) error { case client.HealthStatusHealthy: return nil case client.HealthStatusPartial: + if hm.rollupBoostPartialHealthinessToleranceCounter != nil && hm.rollupBoostPartialHealthinessToleranceCounter.CurrentValue() < hm.rollupBoostPartialHealthinessToleranceLimit { + latestValue := hm.rollupBoostPartialHealthinessToleranceCounter.Increment() + hm.log.Debug("Rollup boost partial unhealthiness failure tolerated", "currentValue", latestValue, "limit", hm.rollupBoostPartialHealthinessToleranceLimit) + return nil + } hm.log.Error("Rollup boost is partial failure, builder is down but fallback execution client is up", "err", ErrRollupBoostPartiallyHealthy) return ErrRollupBoostPartiallyHealthy + case client.HealthStatusUnhealthy: hm.log.Error("Rollup boost total failure, both builder and fallback execution client are down", "err", ErrRollupBoostNotHealthy) return ErrRollupBoostNotHealthy @@ -306,6 +322,6 @@ func calculateTimeDiff(now, then uint64) uint64 { return now - then } -func currentTimeProvicer() uint64 { +func currentTimeProvider() uint64 { return uint64(time.Now().Unix()) } diff --git a/op-conductor/health/monitor_test.go b/op-conductor/health/monitor_test.go index 9bbfd69d53f69..af85b8ff4270c 100644 --- a/op-conductor/health/monitor_test.go +++ b/op-conductor/health/monitor_test.go @@ -97,6 +97,8 @@ func (s *HealthMonitorTestSuite) SetupMonitor( return monitor } +type monitorOpts func(*SequencerHealthMonitor) + // SetupMonitorWithRollupBoost creates a HealthMonitor that includes a RollupBoostClient func (s *HealthMonitorTestSuite) SetupMonitorWithRollupBoost( now, unsafeInterval, safeInterval uint64, @@ -104,6 +106,7 @@ func (s *HealthMonitorTestSuite) SetupMonitorWithRollupBoost( mockP2P *p2pMocks.API, mockRollupBoost *clientmocks.RollupBoostClient, elP2pClient client.ElP2PClient, + opts ...monitorOpts, ) *SequencerHealthMonitor { tp := &timeProvider{now: now} if mockP2P == nil { @@ -137,6 +140,9 @@ func (s *HealthMonitorTestSuite) SetupMonitorWithRollupBoost( elP2pClient: elP2pClient, } } + for _, opt := range opts { + opt(monitor) + } err := monitor.Start(context.Background()) s.NoError(err) return monitor @@ -442,6 +448,81 @@ func (s *HealthMonitorTestSuite) TestRollupBoostPartialStatus() { s.NoError(monitor.Stop()) } +func (s *HealthMonitorTestSuite) TestRollupBoostPartialStatusWithTolerance() { + s.T().Parallel() + now := uint64(time.Now().Unix()) + + // Setup healthy node conditions + rc := &testutils.MockRollupClient{} + ss1 := mockSyncStatus(now-1, 1, now-3, 0) + + // because 6 healthchecks are going to be expected cause 6 calls of sync status + for i := 0; i < 6; i++ { + rc.ExpectSyncStatus(ss1, nil) + } + + // Setup healthy peer count + pc := &p2pMocks.API{} + ps1 := &p2p.PeerStats{ + Connected: healthyPeerCount, + } + pc.EXPECT().PeerStats(mock.Anything).Return(ps1, nil) + + // Setup partial rollup boost status (treated as unhealthy) + rb := &clientmocks.RollupBoostClient{} + rb.EXPECT().Healthcheck(mock.Anything).Return(client.HealthStatusPartial, nil) + + toleranceLimit := uint64(2) + toleranceIntervalSeconds := uint64(6) + + timeBoundedRotatingCounter, err := NewTimeBoundedRotatingCounter(toleranceIntervalSeconds) + s.Nil(err) + + tp := &timeProvider{now: 1758792282} + + // Start monitor with all dependencies as well as tolerance of 2 rollup-boost partial unhealthiness per 3s period + monitor := s.SetupMonitorWithRollupBoost(now, 60, 60, rc, pc, rb, nil, func(shm *SequencerHealthMonitor) { + timeBoundedRotatingCounter.timeProviderFn = tp.Now + + // pollute the cache of timeBoundRotatingCounter with 998 elements so as to later test the lazy cleanup + // note: the 999th and 1000th element will be added by the first healthcheck run + for i := 0; i < 999; i++ { + timeBoundedRotatingCounter.temporalCache[int64(i)] = uint64(1) + } + + shm.rollupBoostPartialHealthinessToleranceCounter = timeBoundedRotatingCounter + shm.rollupBoostPartialHealthinessToleranceLimit = toleranceLimit + }) + + healthUpdateCh := monitor.Subscribe() + + s.Eventually(func() bool { + return len(timeBoundedRotatingCounter.temporalCache) == 1000 + }, time.Second*3, time.Second*1) + + firstHealthStatus := <-healthUpdateCh + secondHealthStatus := <-healthUpdateCh + thirdHealthStatus := <-healthUpdateCh + + s.Nil(firstHealthStatus) + s.Nil(secondHealthStatus) + s.Equal(ErrRollupBoostPartiallyHealthy, thirdHealthStatus) + + tp.Now() // simulate another second passing + // by now, because of three healthchecks, six seconds (CurrentValue + Increment + CurrentValue + Increment + CurrentValue + tp.Now()) have been simulated to pass (by the timeProviderFn) + // this should reset the time bound counter, thereby allowing partial unhealthiness failures to be tolerated again + + fourthHealthStatus := <-healthUpdateCh + fifthHealthStatus := <-healthUpdateCh + sixthHealthStatus := <-healthUpdateCh + + s.Nil(fourthHealthStatus) + s.Nil(fifthHealthStatus) + s.Equal(ErrRollupBoostPartiallyHealthy, sixthHealthStatus) + + s.NoError(monitor.Stop()) +} + func (s *HealthMonitorTestSuite) TestRollupBoostHealthy() { s.T().Parallel() now := uint64(time.Now().Unix()) diff --git a/op-conductor/health/timeboundcounter.go b/op-conductor/health/timeboundcounter.go new file mode 100644 index 0000000000000..58918ead9116d --- /dev/null +++ b/op-conductor/health/timeboundcounter.go @@ -0,0 +1,58 @@ +package health + +import ( + "fmt" + "sync" +) + +// this is a type of counter which keeps on incrementing until its reset interval is hit after which it resets to 0 +// this can be used to track time-based rate-limit, error counts, etc. +type timeBoundedRotatingCounter struct { + resetIntervalSeconds uint64 + timeProviderFn func() uint64 + + mut *sync.RWMutex + temporalCache map[int64]uint64 +} + +func NewTimeBoundedRotatingCounter(resetIntervalSeconds uint64) (*timeBoundedRotatingCounter, error) { + if resetIntervalSeconds == 0 { + return nil, fmt.Errorf("reset interval seconds must be more than 0") + } + return &timeBoundedRotatingCounter{ + resetIntervalSeconds: resetIntervalSeconds, + mut: &sync.RWMutex{}, + temporalCache: map[int64]uint64{}, + timeProviderFn: currentTimeProvider, + }, nil +} + +func (t *timeBoundedRotatingCounter) Increment() uint64 { + // let's take `resetIntervalSeconds` as 60s + // truncatedTimestamp is current timestamp rounded off by 60s (resetIntervalSeconds) + // thereby generating a value which stays same until the next 60s helping track and incrementing the counter corresponding to it for the next 60s + currentTsSeconds := t.timeProviderFn() + truncatedTimestamp := int64(currentTsSeconds / t.resetIntervalSeconds) + t.mut.Lock() + // a lazy cleanup subroutine to the clean the cache when it's grown enough, preventing memory leaks + defer func() { + defer t.mut.Unlock() + if len(t.temporalCache) > 1000 { + newCache := map[int64]uint64{ + truncatedTimestamp: t.temporalCache[truncatedTimestamp], + } + t.temporalCache = newCache // garbage collector should take care of the old cache + } + }() + + t.temporalCache[truncatedTimestamp]++ + return t.temporalCache[truncatedTimestamp] +} + +func (t *timeBoundedRotatingCounter) CurrentValue() uint64 { + currentTsSeconds := t.timeProviderFn() + truncatedTimestamp := int64(currentTsSeconds / t.resetIntervalSeconds) + t.mut.RLock() + defer t.mut.RUnlock() + return t.temporalCache[truncatedTimestamp] +} diff --git a/op-conductor/health/timeboundcounter_test.go b/op-conductor/health/timeboundcounter_test.go new file mode 100644 index 0000000000000..f27c128bf0d6b --- /dev/null +++ b/op-conductor/health/timeboundcounter_test.go @@ -0,0 +1,127 @@ +package health + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTimeBoundedRotatingCounterSetup(t *testing.T) { + t.Parallel() + t.Run("fail with 0 interval seconds value", func(t *testing.T) { + counter, err := NewTimeBoundedRotatingCounter(0) + require.Error(t, err) + require.Nil(t, counter) + }) + + t.Run("succeed with non-zero interval seconds value", func(t *testing.T) { + counter, err := NewTimeBoundedRotatingCounter(2) + require.NoError(t, err) + require.NotNil(t, counter) + }) +} + +func TestTimeBoundedRotatingCounterIncrement(t *testing.T) { + + mockTimeProvider := &timeProvider{now: 0} // every access to .Now() will increment its value simulating a one-second time passing + + resetInterval := uint64(6) + counter, err := NewTimeBoundedRotatingCounter(resetInterval) + require.NoError(t, err) + require.NotNil(t, counter) + counter.timeProviderFn = mockTimeProvider.Now + + require.Equal(t, int(mockTimeProvider.now), 0) + require.Equal(t, uint64(0), counter.CurrentValue()) + require.Equal(t, int(mockTimeProvider.now), 1) + + newValue := counter.Increment() + require.Equal(t, uint64(1), newValue) + require.Equal(t, int(mockTimeProvider.now), 2) + require.Equal(t, uint64(1), counter.CurrentValue()) + require.Equal(t, int(mockTimeProvider.now), 3) + + newValue = counter.Increment() + require.Equal(t, uint64(2), newValue) + require.Equal(t, int(mockTimeProvider.now), 4) + require.Equal(t, uint64(2), counter.CurrentValue()) + require.Equal(t, int(mockTimeProvider.now), 5) + + newValue = counter.Increment() + require.Equal(t, uint64(3), newValue) + require.Equal(t, int(mockTimeProvider.now), 6) + require.Equal(t, uint64(0), counter.CurrentValue()) // the next second counter rotates returning 0 as the current value + require.Equal(t, int(mockTimeProvider.now), 7) + + newValue = counter.Increment() + require.Equal(t, uint64(1), newValue) + require.Equal(t, int(mockTimeProvider.now), 8) + require.Equal(t, uint64(1), counter.CurrentValue()) + require.Equal(t, int(mockTimeProvider.now), 9) + + newValue = counter.Increment() + require.Equal(t, uint64(2), newValue) + require.Equal(t, int(mockTimeProvider.now), 10) + require.Equal(t, uint64(2), counter.CurrentValue()) + require.Equal(t, int(mockTimeProvider.now), 11) + + newValue = counter.Increment() + require.Equal(t, uint64(3), newValue) + require.Equal(t, int(mockTimeProvider.now), 12) + require.Equal(t, uint64(0), counter.CurrentValue()) // the next second counter rotates returning 0 as the current value + require.Equal(t, int(mockTimeProvider.now), 13) + +} + +// To test the bad path: comment out mut.RLock() and mut.RUnlock() in the CurrentValue() method, and run this test again +// you'll see a "fatal error: concurrent map read and map write" +func TestTimeBoundedRotatingCounterConcurrentAccess(t *testing.T) { + mockTimeProvider := &timeProvider{now: 0} + + counter, err := NewTimeBoundedRotatingCounter(1) + require.NoError(t, err) + require.NotNil(t, counter) + counter.timeProviderFn = mockTimeProvider.Now + + wg := &sync.WaitGroup{} + wg.Add(2000) + + write := func() { + defer wg.Done() + counter.Increment() + } + read := func() { + defer wg.Done() + counter.CurrentValue() + } + require.NotPanics(t, func() { + for i := 0; i < 1000; i++ { + go write() + go read() + } + wg.Wait() + }) +} + +func TestTimeBoundedRotatingCounterLazyCleanup(t *testing.T) { + mockTimeProvider := &timeProvider{now: 0} + + // a counter with a reset interval of 2 ensuring every two-seconds the counter's cache would track a new key:value + // we'll trigger the 2-second increment by calling .Increment() and .CurrentValue() because both under the hood, would call .Now() of the mockTimeProvider + counter, err := NewTimeBoundedRotatingCounter(2) + require.NoError(t, err) + require.NotNil(t, counter) + counter.timeProviderFn = mockTimeProvider.Now + + for i := 0; i < 1000; i++ { + counter.Increment() // trigger a 1-second time increase + counter.CurrentValue() // trigger another 1-second time increase, causing the counter interval to reset ensuring next Increment would write a new key in the cache + } + + require.Equal(t, 1000, len(counter.temporalCache)) + + // 1001th increment should trigger the lazy cleanup this time + counter.Increment() + require.Equal(t, 1, len(counter.temporalCache)) +}