diff --git a/op-batcher/batcher/batch_submitter.go b/op-batcher/batcher/batch_submitter.go index a35b9ad1a18..91225c810aa 100644 --- a/op-batcher/batcher/batch_submitter.go +++ b/op-batcher/batcher/batch_submitter.go @@ -29,6 +29,6 @@ func Main(version string) cliapp.LifecycleAction { opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l) l.Info("Initializing Batch Submitter") - return BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l) + return BatcherServiceFromCLIConfig(cliCtx.Context, closeApp, version, cfg, l) } } diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 7cce08003ef..3fac88ac25f 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -83,6 +83,7 @@ type RollupClient interface { // DriverSetup is the collection of input/output interfaces and configuration that the driver operates on. type DriverSetup struct { + closeApp context.CancelCauseFunc Log log.Logger Metr metrics.Metricer RollupConfig *rollup.Config @@ -99,7 +100,6 @@ type DriverSetup struct { // batches to L1 for availability. type BatchSubmitter struct { DriverSetup - wg *sync.WaitGroup shutdownCtx, killCtx context.Context cancelShutdownCtx, cancelKillCtx context.CancelFunc @@ -585,6 +585,10 @@ func (l *BatchSubmitter) receiptsLoop(wg *sync.WaitGroup, receiptsCh chan txmgr. l.Log.Info("receiptsLoop returning") } +func ErrSetMaxDASizeRPCMethodUnavailable(endpoint string, err error) error { + return fmt.Errorf("%s unavailable at %s, either enable it or disable throttling: %w", SetMaxDASizeMethod, endpoint, err) +} + // singleEndpointThrottler handles throttling for a specific endpoint func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSignal chan struct{}, endpoint string) { defer wg.Done() @@ -623,20 +627,13 @@ func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSig return } - var rpcErr rpc.Error - if errors.As(err, &rpcErr) && eth.ErrorCode(rpcErr.ErrorCode()).IsGenericRPCError() { - l.Log.Error("SetMaxDASize RPC method unavailable on endpoint, shutting down. Either enable it or disable throttling.", - "endpoint", endpoint, "err", err) - - // We have a strict requirement that all endpoints must have the SetMaxDASize endpoint, and shut down if this RPC method is not available - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // Call StopBatchSubmitting in another goroutine to avoid deadlock. - go func() { - _ = l.StopBatchSubmitting(ctx) - }() + if isCriticalThrottlingRPCError(err) { + // We have a strict requirement that all endpoints must have the SetMaxDASize endpoint, + // and shut down if this RPC method is not available or returns another application-level error. + l.shutdownOnCriticalError(ErrSetMaxDASizeRPCMethodUnavailable(endpoint, err)) return } else if err != nil { + // Transport-level errors are retried. l.Log.Warn("SetMaxDASize RPC failed for endpoint, retrying.", "endpoint", endpoint, "err", err) retryTimer.Reset(retryInterval) return @@ -671,6 +668,21 @@ func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSig } } +func isCriticalThrottlingRPCError(err error) bool { + var rpcErr rpc.Error + return errors.As(err, &rpcErr) && eth.ErrorCode(rpcErr.ErrorCode()).IsGenericRPCError() +} + +func (l *BatchSubmitter) shutdownOnCriticalError(err error) { + l.Log.Error("Critical error detected, attempting batcher shut down", "err", err) + if l.closeApp != nil { + // Call closeApp to trigger process to exit (gracefully) if l.closeApp is set. + l.closeApp(err) + } else { + l.Log.Warn("No closeApp function set, cannot shut down batcher on critical error", "err", err) + } +} + // throttlingLoop acts as a distributor that spawns individual throttling loops for each endpoint // and fans out the unsafe bytes updates to each endpoint func (l *BatchSubmitter) throttlingLoop(wg *sync.WaitGroup, unsafeBytesUpdated chan int64) { diff --git a/op-batcher/batcher/driver_test.go b/op-batcher/batcher/driver_test.go index 454ade01ee7..a58521b377a 100644 --- a/op-batcher/batcher/driver_test.go +++ b/op-batcher/batcher/driver_test.go @@ -4,9 +4,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "net" "net/http" "net/http/httptest" + "slices" "sync" "testing" "time" @@ -20,6 +22,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -49,13 +52,20 @@ func (p *mockL2EndpointProvider) Close() {} const genesisL1Origin = uint64(123) -func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) { +func setup(t *testing.T, closeAppFn context.CancelCauseFunc) (*BatchSubmitter, *mockL2EndpointProvider) { ep := newEndpointProvider() cfg := defaultTestRollupConfig cfg.Genesis.L1.Number = genesisL1Origin + if closeAppFn == nil { + closeAppFn = func(cause error) { + t.Fatalf("closeAppFn called, batcher hit a critical error: %v", cause) + } + } + return NewBatchSubmitter(DriverSetup{ + closeApp: closeAppFn, Log: testlog.Logger(t, log.LevelDebug), Metr: metrics.NoopMetrics, RollupConfig: cfg, @@ -70,7 +80,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) { } func TestBatchSubmitter_SafeL1Origin(t *testing.T) { - bs, ep := setup(t) + bs, ep := setup(t, nil) tests := []struct { name string @@ -123,7 +133,7 @@ func TestBatchSubmitter_SafeL1Origin(t *testing.T) { } func TestBatchSubmitter_SafeL1Origin_FailsToResolveRollupClient(t *testing.T) { - bs, ep := setup(t) + bs, ep := setup(t, nil) ep.rollupClientErr = errors.New("failed to resolve rollup client") @@ -145,7 +155,7 @@ func (q *MockTxQueue) Load(id string) txmgr.TxCandidate { } func TestBatchSubmitter_sendTx_FloorDataGas(t *testing.T) { - bs, _ := setup(t) + bs, _ := setup(t, nil) q := new(MockTxQueue) @@ -173,9 +183,17 @@ func TestBatchSubmitter_sendTx_FloorDataGas(t *testing.T) { require.GreaterOrEqual(t, candidateOut.GasLimit, expectedFloorDataGas) } +type handlerFailureMode string + +const ( + noFailure handlerFailureMode = "none" + internalError handlerFailureMode = "internal_error" + methodNotFound handlerFailureMode = "method_not_found" +) + // createHTTPHandler creates a mock HTTP handler for testing, it accepts a callback which // is invoked when the expected request is received. -func createHTTPHandler(t *testing.T, cb func(), alwaysFails bool) http.HandlerFunc { +func createHTTPHandler(t *testing.T, cb func(), failureMode handlerFailureMode) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { var req struct { @@ -185,16 +203,22 @@ func createHTTPHandler(t *testing.T, cb func(), alwaysFails bool) http.HandlerFu ID interface{} `json:"id"` } if err := json.NewDecoder(r.Body).Decode(&req); err == nil { + cb() - if alwaysFails { + switch failureMode { + case noFailure: + w.Header().Set("Content-Type", "application/json") + _, err := w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":true}`)) + if err != nil { + t.Logf("Error writing response: %v", err) + } + return + case internalError: http.Error(w, "Simulated failure", http.StatusInternalServerError) - cb() return - } - if req.Method == "miner_setMaxDASize" && len(req.Params) == 2 { - cb() + case methodNotFound: w.Header().Set("Content-Type", "application/json") - _, err := w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":true}`)) + _, err := w.Write([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"error":{"code":%d,"message":"method not found"}}`, eth.MethodNotFound))) if err != nil { t.Logf("Error writing response: %v", err) } @@ -207,7 +231,8 @@ func createHTTPHandler(t *testing.T, cb func(), alwaysFails bool) http.HandlerFu } func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) { - + // Set a very long timeout to avoid flakiness + timeout := time.Second * 120 testThrottlingEndpoints := func(numHealthyServers, numUnhealthyServers int) func(t *testing.T) { return func(t *testing.T) { @@ -220,12 +245,12 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) { urls := make([]string, 0, numHealthyServers+numUnhealthyServers) for i := range healthyCalls { - healthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { healthyCalls[i]++ }, false)) + healthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { healthyCalls[i]++ }, noFailure)) urls = append(urls, healthyServers[i].URL) defer healthyServers[i].Close() } for i := range unHealthyCalls { - unhealthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { unHealthyCalls[i]++ }, true)) + unhealthyServers[i] = httptest.NewServer(createHTTPHandler(t, func() { unHealthyCalls[i]++ }, internalError)) urls = append(urls, unhealthyServers[i].URL) defer unhealthyServers[i].Close() } @@ -238,8 +263,12 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) { t.Log("Throttling endpoints:", urls) + var batcherShutdownError error + // Create test BatchSubmitter using the setup function - bs, _ := setup(t) + bs, _ := setup(t, func(cause error) { + batcherShutdownError = cause + }) bs.shutdownCtx = ctx bs.Config.NetworkTimeout = time.Second bs.Config.ThrottleParams.Endpoints = urls @@ -290,15 +319,8 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) { require.Eventually(t, func() bool { // Check that all endpoints were called - for i := range healthyCalls { - if healthyCalls[i] == 0 { - return false - } - } - for i := range unHealthyCalls { - if unHealthyCalls[i] == 0 { - return false - } + if slices.Contains(healthyCalls, 0) || slices.Contains(unHealthyCalls, 0) { + return false } return true }, time.Second*10, time.Millisecond*10, "All endpoints should have been called within 10s") @@ -322,18 +344,66 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) { addr := healthyServers[0].Listener.Addr().String() healthyServers[0].Close() time.Sleep(time.Second * 2) - startTestServerAtAddr(addr, createHTTPHandler(t, func() { restartedServerCalled = true }, false)) + startTestServerAtAddr(addr, createHTTPHandler(t, func() { restartedServerCalled = true }, noFailure)) defer healthyServers[0].Close() t.Log("restarted server at", addr) require.Eventually(t, func() bool { return restartedServerCalled - }, time.Second*2, time.Millisecond*10, "Restarted server should have been called within 2s") + }, timeout, time.Millisecond*10, "Restarted server should have been called within 2s") } + // Take an unhealthy server down, wait 2s and bring it back up with misconfiguration. Check the batcher exits. + if len(unhealthyServers) > 0 { + restartedServerCalled := false + + addr := unhealthyServers[0].Listener.Addr().String() + unhealthyServers[0].Close() + time.Sleep(time.Second * 2) + startTestServerAtAddr(addr, createHTTPHandler(t, func() { restartedServerCalled = true }, methodNotFound)) + defer unhealthyServers[0].Close() + t.Log("restarted server at", addr) + + require.Eventually(t, func() bool { + return restartedServerCalled + }, timeout, time.Millisecond*10, "Restarted server should have been called within 2s") + + require.Eventually(t, func() bool { + return batcherShutdownError != nil + }, timeout, time.Millisecond*10, "Batcher should have triggered self shutdown within 2s") + + require.Equal(t, batcherShutdownError.Error(), ErrSetMaxDASizeRPCMethodUnavailable("http://"+addr, errors.New("method not found")).Error(), "Batcher shutdown error should be the same as the expected error") + } } } t.Run("two normal endpoints", testThrottlingEndpoints(2, 0)) t.Run("two failing endpoints", testThrottlingEndpoints(0, 2)) t.Run("one normal endpoint, one failing endpoint", testThrottlingEndpoints(1, 1)) } + +func TestBatchSubmitter_CriticalError(t *testing.T) { + criticalErrors := []error{ + eth.InputError{ + Code: eth.MethodNotFound, + }, + eth.InputError{ + Code: eth.InvalidParams, + }, + } + + for _, e := range criticalErrors { + assert.True(t, isCriticalThrottlingRPCError(e), "false positive: %s", e) + } + + nonCriticalErrors := []error{ + eth.InputError{ + Code: eth.UnsupportedFork, + }, + errors.New("timeout"), + } + + for _, e := range nonCriticalErrors { + assert.False(t, isCriticalThrottlingRPCError(e), "false negative: %s", e) + } + +} diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index d12285e2a18..e253420b1ac 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -55,6 +55,7 @@ type BatcherConfig struct { // BatcherService represents a full batch-submitter instance and its resources, // and conforms to the op-service CLI Lifecycle interface. type BatcherService struct { + closeApp context.CancelCauseFunc Log log.Logger Metrics metrics.Metricer L1Client *ethclient.Client @@ -86,15 +87,16 @@ type DriverSetupOption func(setup *DriverSetup) // BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig. // The service components are fully started, except for the driver, // which will not be submitting batches (if it was configured to) until the Start part of the lifecycle. -func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) (*BatcherService, error) { +func BatcherServiceFromCLIConfig(ctx context.Context, closeApp context.CancelCauseFunc, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) (*BatcherService, error) { var bs BatcherService - if err := bs.initFromCLIConfig(ctx, version, cfg, log, opts...); err != nil { + if err := bs.initFromCLIConfig(ctx, closeApp, version, cfg, log, opts...); err != nil { return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt } return &bs, nil } -func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) error { +func (bs *BatcherService) initFromCLIConfig(ctx context.Context, closeApp context.CancelCauseFunc, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) error { + bs.closeApp = closeApp bs.Version = version bs.Log = log bs.NotSubmittingOnStart = cfg.Stopped @@ -385,6 +387,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error { func (bs *BatcherService) initDriver(opts ...DriverSetupOption) { ds := DriverSetup{ + closeApp: bs.closeApp, Log: bs.Log, Metr: bs.Metrics, RollupConfig: bs.RollupConfig, diff --git a/op-devstack/sysgo/l2_batcher.go b/op-devstack/sysgo/l2_batcher.go index 46e0db27a4f..4001ec30582 100644 --- a/op-devstack/sysgo/l2_batcher.go +++ b/op-devstack/sysgo/l2_batcher.go @@ -115,8 +115,13 @@ func WithBatcher(batcherID stack.L2BatcherID, l1ELID stack.L1ELNodeID, l2CLID st opt(batcherID, batcherCLIConfig) } + batcherContext, cancelBatcherCtx := context.WithCancel(p.Ctx()) + var closeAppFn context.CancelCauseFunc = func(cause error) { + p.Errorf("closeAppFn called, batcher hit a critical error: %v", cause) + cancelBatcherCtx() + } batcher, err := bss.BatcherServiceFromCLIConfig( - p.Ctx(), "0.0.1", batcherCLIConfig, + batcherContext, closeAppFn, "0.0.1", batcherCLIConfig, logger) require.NoError(err) require.NoError(batcher.Start(p.Ctx())) diff --git a/op-e2e/interop/supersystem_l2.go b/op-e2e/interop/supersystem_l2.go index 3d7b0fa7c04..f62f3924078 100644 --- a/op-e2e/interop/supersystem_l2.go +++ b/op-e2e/interop/supersystem_l2.go @@ -300,8 +300,15 @@ func (s *interopE2ESystem) newBatcherForL2( DataAvailabilityType: daType, CompressionAlgo: derive.Brotli, } + + batcherContext, batcherCancel := context.WithCancel(context.Background()) + var closeAppFn context.CancelCauseFunc = func(cause error) { + s.t.Fatalf("closeAppFn called, batcher hit a critical error: %v", cause) + batcherCancel() + } + batcher, err := bss.BatcherServiceFromCLIConfig( - context.Background(), "0.0.1", batcherCLIConfig, + batcherContext, closeAppFn, "0.0.1", batcherCLIConfig, logger.New("service", "batcher")) require.NoError(s.t, err) require.NoError(s.t, batcher.Start(context.Background())) diff --git a/op-e2e/system/conductor/sequencer_failover_setup.go b/op-e2e/system/conductor/sequencer_failover_setup.go index 8a0832f7be9..38a67f51c0f 100644 --- a/op-e2e/system/conductor/sequencer_failover_setup.go +++ b/op-e2e/system/conductor/sequencer_failover_setup.go @@ -308,7 +308,13 @@ func setupBatcher(t *testing.T, sys *e2esys.System, conductors map[string]*condu CompressionAlgo: derive.Zlib, } - batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) + batcherContext, batcherCancel := context.WithCancel(context.Background()) + var closeAppFn context.CancelCauseFunc = func(cause error) { + t.Fatalf("closeAppFn called, batcher hit a critical error: %v", cause) + batcherCancel() + } + + batcher, err := bss.BatcherServiceFromCLIConfig(batcherContext, closeAppFn, "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) require.NoError(t, err) err = batcher.Start(context.Background()) require.NoError(t, err) diff --git a/op-e2e/system/e2esys/setup.go b/op-e2e/system/e2esys/setup.go index baf0b52854b..265e14c6ed0 100644 --- a/op-e2e/system/e2esys/setup.go +++ b/op-e2e/system/e2esys/setup.go @@ -1014,7 +1014,12 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, } // Batch Submitter - batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) + batcherContext, batcherCancel := context.WithCancel(context.Background()) + var closeAppFn context.CancelCauseFunc = func(cause error) { + t.Fatalf("closeAppFn called, batcher hit a critical error: %v", cause) + batcherCancel() + } + batcher, err := bss.BatcherServiceFromCLIConfig(batcherContext, closeAppFn, "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) if err != nil { return nil, fmt.Errorf("failed to setup batch submitter: %w", err) }