Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions op-conductor/client/mocks/RollupBoostClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions op-conductor/client/rollup_boost.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package client

import (
"context"
"fmt"
"io"
"net/http"
)

const (
// HealthzEndpoint is the fixed path for health checks
HealthzEndpoint = "/healthz"
)

type HealthStatus string

const (
HealthStatusHealthy HealthStatus = "healthy"
HealthStatusPartial HealthStatus = "partial"
HealthStatusUnhealthy HealthStatus = "unhealthy"
)

type RollupBoostClient interface {
Healthcheck(ctx context.Context) (HealthStatus, error)
}

type rollupBoostClient struct {
baseURL string
httpClient *http.Client
}

func NewRollupBoostClient(baseURL string, httpClient *http.Client) RollupBoostClient {
if httpClient == nil {
httpClient = http.DefaultClient
}
return &rollupBoostClient{
baseURL: baseURL,
httpClient: httpClient,
}
}

func (c *rollupBoostClient) Healthcheck(ctx context.Context) (HealthStatus, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+HealthzEndpoint, nil)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("failed to make request: %w", err)
}
defer resp.Body.Close()

// Read and discard body to ensure connection reuse
_, _ = io.Copy(io.Discard, resp.Body)

switch resp.StatusCode {
case http.StatusOK: // 200
return HealthStatusHealthy, nil
case http.StatusPartialContent: // 206
return HealthStatusPartial, nil
case http.StatusServiceUnavailable: // 503
return HealthStatusUnhealthy, nil
default:
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
}
70 changes: 70 additions & 0 deletions op-conductor/client/rollup_boost_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package client

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRollupBoostHealthcheck(t *testing.T) {
tests := []struct {
name string
statusCode int
want HealthStatus
wantErr bool
}{
{
name: "healthy response",
statusCode: http.StatusOK,
want: HealthStatusHealthy,
wantErr: false,
},
{
name: "partial health response",
statusCode: http.StatusPartialContent,
want: HealthStatusPartial,
wantErr: false,
},
{
name: "unhealthy response",
statusCode: http.StatusServiceUnavailable,
want: HealthStatusUnhealthy,
wantErr: false,
},
{
name: "unexpected status code",
statusCode: http.StatusBadRequest,
want: "",
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a test server that returns the desired status code
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, HealthzEndpoint, r.URL.Path)
assert.Equal(t, http.MethodGet, r.Method)
w.WriteHeader(tt.statusCode)
}))
defer server.Close()

// Create client that points to our test server
client := NewRollupBoostClient(server.URL, server.Client())

// Test the healthcheck
got, err := client.Healthcheck(context.Background())

if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.want, got)
}
})
}
}
32 changes: 20 additions & 12 deletions op-conductor/conductor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ type Config struct {
// SupervisorRPC is the HTTP provider URL for supervisor.
SupervisorRPC string

// RollupBoostEnabled is true if the rollup boost is enabled.
RollupBoostEnabled bool

// RollupBoostHealthcheckTimeout is the timeout for rollup boost healthcheck.
RollupBoostHealthcheckTimeout time.Duration

// Paused is true if the conductor should start in a paused state.
Paused bool

Expand Down Expand Up @@ -139,18 +145,20 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
// The consensus server will advertise the address it binds to if this is empty/unspecified.
ConsensusAdvertisedAddr: ctx.String(flags.AdvertisedFullAddr.Name),

RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name),
RaftServerID: ctx.String(flags.RaftServerID.Name),
RaftStorageDir: ctx.String(flags.RaftStorageDir.Name),
RaftSnapshotInterval: ctx.Duration(flags.RaftSnapshotInterval.Name),
RaftSnapshotThreshold: ctx.Uint64(flags.RaftSnapshotThreshold.Name),
RaftTrailingLogs: ctx.Uint64(flags.RaftTrailingLogs.Name),
RaftHeartbeatTimeout: ctx.Duration(flags.RaftHeartbeatTimeout.Name),
RaftLeaderLeaseTimeout: ctx.Duration(flags.RaftLeaderLeaseTimeout.Name),
NodeRPC: ctx.String(flags.NodeRPC.Name),
ExecutionRPC: ctx.String(flags.ExecutionRPC.Name),
SupervisorRPC: ctx.String(flags.SupervisorRPC.Name),
Paused: ctx.Bool(flags.Paused.Name),
RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name),
RaftServerID: ctx.String(flags.RaftServerID.Name),
RaftStorageDir: ctx.String(flags.RaftStorageDir.Name),
RaftSnapshotInterval: ctx.Duration(flags.RaftSnapshotInterval.Name),
RaftSnapshotThreshold: ctx.Uint64(flags.RaftSnapshotThreshold.Name),
RaftTrailingLogs: ctx.Uint64(flags.RaftTrailingLogs.Name),
RaftHeartbeatTimeout: ctx.Duration(flags.RaftHeartbeatTimeout.Name),
RaftLeaderLeaseTimeout: ctx.Duration(flags.RaftLeaderLeaseTimeout.Name),
NodeRPC: ctx.String(flags.NodeRPC.Name),
ExecutionRPC: ctx.String(flags.ExecutionRPC.Name),
SupervisorRPC: ctx.String(flags.SupervisorRPC.Name),
RollupBoostEnabled: ctx.Bool(flags.RollupBoostEnabled.Name),
RollupBoostHealthcheckTimeout: ctx.Duration(flags.RollupBoostHealthcheckTimeout.Name),
Paused: ctx.Bool(flags.Paused.Name),
HealthCheck: HealthCheckConfig{
Interval: ctx.Uint64(flags.HealthCheckInterval.Name),
UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name),
Expand Down
10 changes: 10 additions & 0 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"net/http"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -207,6 +208,14 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
return errors.Wrap(err, "failed to create node rpc client")
}
node := sources.NewRollupClient(nc)

var rb client.RollupBoostClient
if c.cfg.RollupBoostEnabled {
rb = client.NewRollupBoostClient(c.cfg.ExecutionRPC, &http.Client{
Timeout: c.cfg.RollupBoostHealthcheckTimeout,
})
}

p2p := sources.NewP2PClient(nc)

var supervisor health.SupervisorHealthAPI
Expand All @@ -230,6 +239,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
node,
p2p,
supervisor,
rb,
)
c.healthUpdateCh = c.hmon.Subscribe()

Expand Down
64 changes: 64 additions & 0 deletions op-conductor/conductor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,70 @@ func (s *OpConductorTestSuite) TestHandleInitError() {
s.False(ok)
}

// TestRollupBoostHealthFailure tests that OpConductor correctly handles rollup boost health failures
func (s *OpConductorTestSuite) TestRollupBoostHealthFailure() {
s.enableSynchronization()

// set initial state as a leader that is healthy and sequencing
s.conductor.leader.Store(true)
s.conductor.healthy.Store(true)
s.conductor.seqActive.Store(true)
s.conductor.prevState = &state{
leader: true,
healthy: true,
active: true,
}

// Setup expectations - leader with unhealthy rollup boost should stop sequencing and transfer leadership
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, nil).Times(1)
s.cons.EXPECT().TransferLeader().Return(nil).Times(1)

// Simulate a rollup boost health failure
s.updateHealthStatusAndExecuteAction(health.ErrRollupBoostNotHealthy)

// Verify the OpConductor transitions to follower state and stops sequencing
s.False(s.conductor.leader.Load(), "Should transition to follower")
s.False(s.conductor.healthy.Load(), "Should be marked as unhealthy")
s.False(s.conductor.seqActive.Load(), "Sequencer should be stopped")
s.Equal(health.ErrRollupBoostNotHealthy, s.conductor.hcerr, "Error should be stored")

// Verify method calls
s.ctrl.AssertNumberOfCalls(s.T(), "StopSequencer", 1)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}

// TestRollupBoostConnectionDown tests that OpConductor correctly handles rollup boost connection failures
func (s *OpConductorTestSuite) TestRollupBoostConnectionDown() {
s.enableSynchronization()

// set initial state as a leader that is healthy and sequencing
s.conductor.leader.Store(true)
s.conductor.healthy.Store(true)
s.conductor.seqActive.Store(true)
s.conductor.prevState = &state{
leader: true,
healthy: true,
active: true,
}

// Setup expectations - leader with rollup boost connection down should stop sequencing and transfer leadership
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, nil).Times(1)
s.cons.EXPECT().TransferLeader().Return(nil).Times(1)

// Simulate a rollup boost connection failure
s.updateHealthStatusAndExecuteAction(health.ErrRollupBoostConnectionDown)

// Verify the OpConductor transitions to follower state and stops sequencing
s.False(s.conductor.leader.Load(), "Should transition to follower")
s.False(s.conductor.healthy.Load(), "Should be marked as unhealthy")
s.False(s.conductor.seqActive.Load(), "Sequencer should be stopped")
s.Equal(health.ErrRollupBoostConnectionDown, s.conductor.hcerr, "Error should be stored")

// Verify method calls
s.ctrl.AssertNumberOfCalls(s.T(), "StopSequencer", 1)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}

func TestControlLoop(t *testing.T) {
suite.Run(t, new(OpConductorTestSuite))
}
Expand Down
Loading