diff --git a/.changeset/mean-islands-rush.md b/.changeset/mean-islands-rush.md new file mode 100644 index 0000000000000..f513773e178c1 --- /dev/null +++ b/.changeset/mean-islands-rush.md @@ -0,0 +1,5 @@ +--- +'@eth-optimism/proxyd': patch +--- + +Batch metrics and max batch size diff --git a/proxyd/config.go b/proxyd/config.go index 466deb9231b62..0647074ab8cb6 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -95,6 +95,11 @@ type BackendGroupsConfig map[string]*BackendGroupConfig type MethodMappingsConfig map[string]string +type BatchConfig struct { + MaxSize int `toml:"max_size"` + ErrorMessage string `toml:"error_message"` +} + type Config struct { WSBackendGroup string `toml:"ws_backend_group"` Server ServerConfig `toml:"server"` @@ -104,6 +109,7 @@ type Config struct { RateLimit RateLimitConfig `toml:"rate_limit"` BackendOptions BackendOptions `toml:"backend"` Backends BackendsConfig `toml:"backends"` + BatchConfig BatchConfig `toml:"batch"` Authentication map[string]string `toml:"authentication"` BackendGroups BackendGroupsConfig `toml:"backend_groups"` RPCMethodMappings map[string]string `toml:"rpc_method_mappings"` diff --git a/proxyd/integration_tests/batching_test.go b/proxyd/integration_tests/batching_test.go index 1bcbba933acda..b0f811ca4b043 100644 --- a/proxyd/integration_tests/batching_test.go +++ b/proxyd/integration_tests/batching_test.go @@ -33,13 +33,13 @@ func TestBatching(t *testing.T) { callMock1 := mockResult{"eth_call", "1", "ekans1"} tests := []struct { - name string - handler http.Handler - mocks []mockResult - reqs []*proxyd.RPCReq - expectedRes string - maxBatchSize int - numExpectedForwards int + name string + handler http.Handler + mocks []mockResult + reqs []*proxyd.RPCReq + expectedRes string + maxUpstreamBatchSize int + numExpectedForwards int }{ { name: "backend returns batches out of order", @@ -49,9 +49,9 @@ func TestBatching(t *testing.T) { NewRPCReq("2", "eth_chainId", nil), NewRPCReq("3", "eth_chainId", nil), }, - expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3), - maxBatchSize: 2, - numExpectedForwards: 2, + expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3), + maxUpstreamBatchSize: 2, + numExpectedForwards: 2, }, { // infura behavior @@ -65,8 +65,8 @@ func TestBatching(t *testing.T) { `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, ), - maxBatchSize: 10, - numExpectedForwards: 1, + maxUpstreamBatchSize: 10, + numExpectedForwards: 1, }, { name: "backend returns single RPC response object for minibatches", @@ -79,8 +79,8 @@ func TestBatching(t *testing.T) { `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, ), - maxBatchSize: 1, - numExpectedForwards: 2, + maxUpstreamBatchSize: 1, + numExpectedForwards: 2, }, { name: "duplicate request ids are on distinct batches", @@ -96,9 +96,24 @@ func TestBatching(t *testing.T) { NewRPCReq("1", "eth_chainId", nil), NewRPCReq("1", "eth_call", nil), }, - expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1), - maxBatchSize: 2, - numExpectedForwards: 3, + expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1), + maxUpstreamBatchSize: 2, + numExpectedForwards: 3, + }, + { + name: "over max size", + mocks: []mockResult{}, + reqs: []*proxyd.RPCReq{ + NewRPCReq("1", "net_version", nil), + NewRPCReq("2", "eth_chainId", nil), + NewRPCReq("3", "eth_chainId", nil), + NewRPCReq("4", "eth_call", nil), + NewRPCReq("5", "eth_call", nil), + NewRPCReq("6", "eth_call", nil), + }, + expectedRes: "{\"error\":{\"code\":-32014,\"message\":\"over batch size custom message\"},\"id\":null,\"jsonrpc\":\"2.0\"}", + maxUpstreamBatchSize: 2, + numExpectedForwards: 0, }, { name: "eth_accounts does not get forwarded", @@ -109,15 +124,15 @@ func TestBatching(t *testing.T) { NewRPCReq("1", "eth_call", nil), NewRPCReq("2", "eth_accounts", nil), }, - expectedRes: asArray(callResponse1, ethAccountsResponse2), - maxBatchSize: 2, - numExpectedForwards: 1, + expectedRes: asArray(callResponse1, ethAccountsResponse2), + maxUpstreamBatchSize: 2, + numExpectedForwards: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - config.Server.MaxUpstreamBatchSize = tt.maxBatchSize + config.Server.MaxUpstreamBatchSize = tt.maxUpstreamBatchSize handler := tt.handler if handler == nil { diff --git a/proxyd/integration_tests/testdata/batching.toml b/proxyd/integration_tests/testdata/batching.toml index 2aa591d1e37a3..476283597537c 100644 --- a/proxyd/integration_tests/testdata/batching.toml +++ b/proxyd/integration_tests/testdata/batching.toml @@ -17,3 +17,7 @@ backends = ["good"] eth_chainId = "main" net_version = "main" eth_call = "main" + +[batch] +error_message = "over batch size custom message" +max_size = 5 \ No newline at end of file diff --git a/proxyd/metrics.go b/proxyd/metrics.go index 5241dfab6af9c..a3cfe45035892 100644 --- a/proxyd/metrics.go +++ b/proxyd/metrics.go @@ -222,6 +222,20 @@ var ( }, []string{ "backend_name", }) + + batchSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Name: "batch_size_summary", + Help: "Summary of batch sizes", + Buckets: []float64{ + 1, + 5, + 10, + 25, + 50, + 100, + }, + }) ) func RecordRedisError(source string) { @@ -278,3 +292,7 @@ func RecordCacheHit(method string) { func RecordCacheMiss(method string) { cacheMissesTotal.WithLabelValues(method).Inc() } + +func RecordBatchSize(size int) { + batchSizeHistogram.Observe(float64(size)) +} diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index af9f44eb73197..12a6a1a834b58 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -65,6 +65,9 @@ func Start(config *Config) (func(), error) { if config.WhitelistErrorMessage != "" { ErrMethodNotWhitelisted.Message = config.WhitelistErrorMessage } + if config.BatchConfig.ErrorMessage != "" { + ErrTooManyBatchRequests.Message = config.BatchConfig.ErrorMessage + } maxConcurrentRPCs := config.Server.MaxConcurrentRPCs if maxConcurrentRPCs == 0 { @@ -236,6 +239,7 @@ func Start(config *Config) (func(), error) { config.RateLimit, config.Server.EnableRequestLog, config.Server.MaxRequestBodyLogLen, + config.BatchConfig.MaxSize, ) if err != nil { return nil, fmt.Errorf("error creating server: %w", err) diff --git a/proxyd/server.go b/proxyd/server.go index 509dc6a338c5b..94787d1b1321c 100644 --- a/proxyd/server.go +++ b/proxyd/server.go @@ -28,7 +28,7 @@ const ( ContextKeyAuth = "authorization" ContextKeyReqID = "req_id" ContextKeyXForwardedFor = "x_forwarded_for" - MaxBatchRPCCalls = 100 + MaxBatchRPCCallsHardLimit = 100 cacheStatusHdr = "X-Proxyd-Cache-Status" defaultServerTimeout = time.Second * 10 maxRequestBodyLogLen = 2000 @@ -48,6 +48,7 @@ type Server struct { authenticatedPaths map[string]string timeout time.Duration maxUpstreamBatchSize int + maxBatchSize int upgrader *websocket.Upgrader mainLim limiter.Store overrideLims map[string]limiter.Store @@ -75,6 +76,7 @@ func NewServer( rateLimitConfig RateLimitConfig, enableRequestLog bool, maxRequestBodyLogLen int, + maxBatchSize int, ) (*Server, error) { if cache == nil { cache = &NoopRPCCache{} @@ -92,6 +94,10 @@ func NewServer( maxUpstreamBatchSize = defaultMaxUpstreamBatchSize } + if maxBatchSize == 0 || maxBatchSize > MaxBatchRPCCallsHardLimit { + maxBatchSize = MaxBatchRPCCallsHardLimit + } + var mainLim limiter.Store limExemptOrigins := make(map[string]bool) limExemptUserAgents := make(map[string]bool) @@ -139,6 +145,7 @@ func NewServer( cache: cache, enableRequestLog: enableRequestLog, maxRequestBodyLogLen: maxRequestBodyLogLen, + maxBatchSize: maxBatchSize, upgrader: &websocket.Upgrader{ HandshakeTimeout: 5 * time.Second, }, @@ -291,7 +298,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } - if len(reqs) > MaxBatchRPCCalls { + RecordBatchSize(len(reqs)) + + if len(reqs) > s.maxBatchSize { RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrTooManyBatchRequests) writeRPCError(ctx, w, nil, ErrTooManyBatchRequests) return