Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-batcher/batcher/batch_submitter.go
Comment thread
sebastianst marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ func Main(version string) cliapp.LifecycleAction {
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)

l.Info("Initializing Batch Submitter")
return BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l)
return BatcherServiceFromCLIConfig(cliCtx.Context, closeApp, version, cfg, l)
}
}
38 changes: 25 additions & 13 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type RollupClient interface {

// DriverSetup is the collection of input/output interfaces and configuration that the driver operates on.
type DriverSetup struct {
closeApp context.CancelCauseFunc
Log log.Logger
Metr metrics.Metricer
RollupConfig *rollup.Config
Expand All @@ -99,7 +100,6 @@ type DriverSetup struct {
// batches to L1 for availability.
type BatchSubmitter struct {
DriverSetup

wg *sync.WaitGroup
shutdownCtx, killCtx context.Context
cancelShutdownCtx, cancelKillCtx context.CancelFunc
Expand Down Expand Up @@ -585,6 +585,10 @@ func (l *BatchSubmitter) receiptsLoop(wg *sync.WaitGroup, receiptsCh chan txmgr.
l.Log.Info("receiptsLoop returning")
}

func ErrSetMaxDASizeRPCMethodUnavailable(endpoint string, err error) error {
return fmt.Errorf("%s unavailable at %s, either enable it or disable throttling: %w", SetMaxDASizeMethod, endpoint, err)
}

// singleEndpointThrottler handles throttling for a specific endpoint
func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSignal chan struct{}, endpoint string) {
defer wg.Done()
Expand Down Expand Up @@ -623,20 +627,13 @@ func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSig
return
}

var rpcErr rpc.Error
if errors.As(err, &rpcErr) && eth.ErrorCode(rpcErr.ErrorCode()).IsGenericRPCError() {
l.Log.Error("SetMaxDASize RPC method unavailable on endpoint, shutting down. Either enable it or disable throttling.",
"endpoint", endpoint, "err", err)

// We have a strict requirement that all endpoints must have the SetMaxDASize endpoint, and shut down if this RPC method is not available
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Call StopBatchSubmitting in another goroutine to avoid deadlock.
go func() {
_ = l.StopBatchSubmitting(ctx)
}()
if isCriticalThrottlingRPCError(err) {
// We have a strict requirement that all endpoints must have the SetMaxDASize endpoint,
// and shut down if this RPC method is not available or returns another application-level error.
l.shutdownOnCriticalError(ErrSetMaxDASizeRPCMethodUnavailable(endpoint, err))
Comment thread
geoknee marked this conversation as resolved.
return
} else if err != nil {
// Transport-level errors are retried.
l.Log.Warn("SetMaxDASize RPC failed for endpoint, retrying.", "endpoint", endpoint, "err", err)
retryTimer.Reset(retryInterval)
return
Expand Down Expand Up @@ -671,6 +668,21 @@ func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSig
}
}

func isCriticalThrottlingRPCError(err error) bool {
Comment thread
sebastianst marked this conversation as resolved.
var rpcErr rpc.Error
return errors.As(err, &rpcErr) && eth.ErrorCode(rpcErr.ErrorCode()).IsGenericRPCError()
}

func (l *BatchSubmitter) shutdownOnCriticalError(err error) {
l.Log.Error("Critical error detected, attempting batcher shut down", "err", err)
if l.closeApp != nil {
// Call closeApp to trigger process to exit (gracefully) if l.closeApp is set.
l.closeApp(err)
} else {
l.Log.Warn("No closeApp function set, cannot shut down batcher on critical error", "err", err)
}
Comment thread
geoknee marked this conversation as resolved.
}
Comment thread
sebastianst marked this conversation as resolved.
Comment thread
sebastianst marked this conversation as resolved.

// throttlingLoop acts as a distributor that spawns individual throttling loops for each endpoint
// and fans out the unsafe bytes updates to each endpoint
func (l *BatchSubmitter) throttlingLoop(wg *sync.WaitGroup, unsafeBytesUpdated chan int64) {
Expand Down
122 changes: 96 additions & 26 deletions op-batcher/batcher/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/http/httptest"
"slices"
"sync"
"testing"
"time"
Expand All @@ -20,6 +22,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -49,13 +52,20 @@ func (p *mockL2EndpointProvider) Close() {}

const genesisL1Origin = uint64(123)

func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) {
func setup(t *testing.T, closeAppFn context.CancelCauseFunc) (*BatchSubmitter, *mockL2EndpointProvider) {
ep := newEndpointProvider()

cfg := defaultTestRollupConfig
cfg.Genesis.L1.Number = genesisL1Origin

if closeAppFn == nil {
closeAppFn = func(cause error) {
t.Fatalf("closeAppFn called, batcher hit a critical error: %v", cause)
}
}

return NewBatchSubmitter(DriverSetup{
closeApp: closeAppFn,
Log: testlog.Logger(t, log.LevelDebug),
Metr: metrics.NoopMetrics,
RollupConfig: cfg,
Expand All @@ -70,7 +80,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) {
}

func TestBatchSubmitter_SafeL1Origin(t *testing.T) {
bs, ep := setup(t)
bs, ep := setup(t, nil)

tests := []struct {
name string
Expand Down Expand Up @@ -123,7 +133,7 @@ func TestBatchSubmitter_SafeL1Origin(t *testing.T) {
}

func TestBatchSubmitter_SafeL1Origin_FailsToResolveRollupClient(t *testing.T) {
bs, ep := setup(t)
bs, ep := setup(t, nil)

ep.rollupClientErr = errors.New("failed to resolve rollup client")

Expand All @@ -145,7 +155,7 @@ func (q *MockTxQueue) Load(id string) txmgr.TxCandidate {
}

func TestBatchSubmitter_sendTx_FloorDataGas(t *testing.T) {
bs, _ := setup(t)
bs, _ := setup(t, nil)

q := new(MockTxQueue)

Expand Down Expand Up @@ -173,9 +183,17 @@ func TestBatchSubmitter_sendTx_FloorDataGas(t *testing.T) {
require.GreaterOrEqual(t, candidateOut.GasLimit, expectedFloorDataGas)
}

type handlerFailureMode string

const (
noFailure handlerFailureMode = "none"
internalError handlerFailureMode = "internal_error"
methodNotFound handlerFailureMode = "method_not_found"
)

// createHTTPHandler creates a mock HTTP handler for testing, it accepts a callback which
// is invoked when the expected request is received.
func createHTTPHandler(t *testing.T, cb func(), alwaysFails bool) http.HandlerFunc {
func createHTTPHandler(t *testing.T, cb func(), failureMode handlerFailureMode) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
var req struct {
Expand All @@ -185,16 +203,22 @@ func createHTTPHandler(t *testing.T, cb func(), alwaysFails bool) http.HandlerFu
ID interface{} `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err == nil {
cb()

if alwaysFails {
switch failureMode {
case noFailure:
w.Header().Set("Content-Type", "application/json")
_, err := w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":true}`))
if err != nil {
t.Logf("Error writing response: %v", err)
}
return
case internalError:
http.Error(w, "Simulated failure", http.StatusInternalServerError)
cb()
return
}
if req.Method == "miner_setMaxDASize" && len(req.Params) == 2 {
cb()
case methodNotFound:
w.Header().Set("Content-Type", "application/json")
_, err := w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":true}`))
_, err := w.Write([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"error":{"code":%d,"message":"method not found"}}`, eth.MethodNotFound)))
if err != nil {
t.Logf("Error writing response: %v", err)
}
Expand All @@ -207,7 +231,8 @@ func createHTTPHandler(t *testing.T, cb func(), alwaysFails bool) http.HandlerFu
}

func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) {

// Set a very long timeout to avoid flakiness
timeout := time.Second * 120
testThrottlingEndpoints := func(numHealthyServers, numUnhealthyServers int) func(t *testing.T) {

return func(t *testing.T) {
Expand All @@ -220,12 +245,12 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) {
urls := make([]string, 0, numHealthyServers+numUnhealthyServers)

for i := range healthyCalls {
healthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { healthyCalls[i]++ }, false))
healthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { healthyCalls[i]++ }, noFailure))
urls = append(urls, healthyServers[i].URL)
defer healthyServers[i].Close()
}
for i := range unHealthyCalls {
unhealthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { unHealthyCalls[i]++ }, true))
unhealthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { unHealthyCalls[i]++ }, internalError))
urls = append(urls, unhealthyServers[i].URL)
defer unhealthyServers[i].Close()
}
Expand All @@ -238,8 +263,12 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) {

t.Log("Throttling endpoints:", urls)

var batcherShutdownError error

// Create test BatchSubmitter using the setup function
bs, _ := setup(t)
bs, _ := setup(t, func(cause error) {
batcherShutdownError = cause
})
bs.shutdownCtx = ctx
bs.Config.NetworkTimeout = time.Second
bs.Config.ThrottleParams.Endpoints = urls
Expand Down Expand Up @@ -290,15 +319,8 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) {
require.Eventually(t,
func() bool {
// Check that all endpoints were called
for i := range healthyCalls {
if healthyCalls[i] == 0 {
return false
}
}
for i := range unHealthyCalls {
if unHealthyCalls[i] == 0 {
return false
}
if slices.Contains(healthyCalls, 0) || slices.Contains(unHealthyCalls, 0) {
return false
}
return true
}, time.Second*10, time.Millisecond*10, "All endpoints should have been called within 10s")
Expand All @@ -322,18 +344,66 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) {
addr := healthyServers[0].Listener.Addr().String()
healthyServers[0].Close()
time.Sleep(time.Second * 2)
startTestServerAtAddr(addr, createHTTPHandler(t, func() { restartedServerCalled = true }, false))
startTestServerAtAddr(addr, createHTTPHandler(t, func() { restartedServerCalled = true }, noFailure))
defer healthyServers[0].Close()
t.Log("restarted server at", addr)

require.Eventually(t, func() bool {
return restartedServerCalled
}, time.Second*2, time.Millisecond*10, "Restarted server should have been called within 2s")
}, timeout, time.Millisecond*10, "Restarted server should have been called within 2s")
}

// Take an unhealthy server down, wait 2s and bring it back up with misconfiguration. Check the batcher exits.
if len(unhealthyServers) > 0 {
restartedServerCalled := false

addr := unhealthyServers[0].Listener.Addr().String()
unhealthyServers[0].Close()
time.Sleep(time.Second * 2)
startTestServerAtAddr(addr, createHTTPHandler(t, func() { restartedServerCalled = true }, methodNotFound))
defer unhealthyServers[0].Close()
t.Log("restarted server at", addr)

require.Eventually(t, func() bool {
return restartedServerCalled
}, timeout, time.Millisecond*10, "Restarted server should have been called within 2s")

require.Eventually(t, func() bool {
return batcherShutdownError != nil
}, timeout, time.Millisecond*10, "Batcher should have triggered self shutdown within 2s")

require.Equal(t, batcherShutdownError.Error(), ErrSetMaxDASizeRPCMethodUnavailable("http://"+addr, errors.New("method not found")).Error(), "Batcher shutdown error should be the same as the expected error")
}
}
}
t.Run("two normal endpoints", testThrottlingEndpoints(2, 0))
t.Run("two failing endpoints", testThrottlingEndpoints(0, 2))
t.Run("one normal endpoint, one failing endpoint", testThrottlingEndpoints(1, 1))
}

func TestBatchSubmitter_CriticalError(t *testing.T) {
criticalErrors := []error{
eth.InputError{
Code: eth.MethodNotFound,
},
eth.InputError{
Code: eth.InvalidParams,
},
}

for _, e := range criticalErrors {
assert.True(t, isCriticalThrottlingRPCError(e), "false positive: %s", e)
}

nonCriticalErrors := []error{
eth.InputError{
Code: eth.UnsupportedFork,
},
errors.New("timeout"),
}

for _, e := range nonCriticalErrors {
assert.False(t, isCriticalThrottlingRPCError(e), "false negative: %s", e)
}

}
9 changes: 6 additions & 3 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BatcherConfig struct {
// BatcherService represents a full batch-submitter instance and its resources,
// and conforms to the op-service CLI Lifecycle interface.
type BatcherService struct {
closeApp context.CancelCauseFunc
Log log.Logger
Metrics metrics.Metricer
L1Client *ethclient.Client
Expand Down Expand Up @@ -86,15 +87,16 @@ type DriverSetupOption func(setup *DriverSetup)
// BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig.
// The service components are fully started, except for the driver,
// which will not be submitting batches (if it was configured to) until the Start part of the lifecycle.
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) (*BatcherService, error) {
func BatcherServiceFromCLIConfig(ctx context.Context, closeApp context.CancelCauseFunc, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) (*BatcherService, error) {
var bs BatcherService
if err := bs.initFromCLIConfig(ctx, version, cfg, log, opts...); err != nil {
if err := bs.initFromCLIConfig(ctx, closeApp, version, cfg, log, opts...); err != nil {
return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt
}
return &bs, nil
}

func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) error {
func (bs *BatcherService) initFromCLIConfig(ctx context.Context, closeApp context.CancelCauseFunc, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) error {
bs.closeApp = closeApp
bs.Version = version
bs.Log = log
bs.NotSubmittingOnStart = cfg.Stopped
Expand Down Expand Up @@ -385,6 +387,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {

func (bs *BatcherService) initDriver(opts ...DriverSetupOption) {
ds := DriverSetup{
closeApp: bs.closeApp,
Log: bs.Log,
Metr: bs.Metrics,
RollupConfig: bs.RollupConfig,
Expand Down
7 changes: 6 additions & 1 deletion op-devstack/sysgo/l2_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ func WithBatcher(batcherID stack.L2BatcherID, l1ELID stack.L1ELNodeID, l2CLID st
opt(batcherID, batcherCLIConfig)
}

batcherContext, cancelBatcherCtx := context.WithCancel(p.Ctx())
var closeAppFn context.CancelCauseFunc = func(cause error) {
p.Errorf("closeAppFn called, batcher hit a critical error: %v", cause)
cancelBatcherCtx()
}
batcher, err := bss.BatcherServiceFromCLIConfig(
p.Ctx(), "0.0.1", batcherCLIConfig,
batcherContext, closeAppFn, "0.0.1", batcherCLIConfig,
logger)
require.NoError(err)
require.NoError(batcher.Start(p.Ctx()))
Expand Down
9 changes: 8 additions & 1 deletion op-e2e/interop/supersystem_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,15 @@ func (s *interopE2ESystem) newBatcherForL2(
DataAvailabilityType: daType,
CompressionAlgo: derive.Brotli,
}

batcherContext, batcherCancel := context.WithCancel(context.Background())
var closeAppFn context.CancelCauseFunc = func(cause error) {
s.t.Fatalf("closeAppFn called, batcher hit a critical error: %v", cause)
batcherCancel()
}

batcher, err := bss.BatcherServiceFromCLIConfig(
context.Background(), "0.0.1", batcherCLIConfig,
batcherContext, closeAppFn, "0.0.1", batcherCLIConfig,
logger.New("service", "batcher"))
require.NoError(s.t, err)
require.NoError(s.t, batcher.Start(context.Background()))
Expand Down
Loading