Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mean-islands-rush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@eth-optimism/proxyd': patch
---

Batch metrics and max batch size
6 changes: 6 additions & 0 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type BackendGroupsConfig map[string]*BackendGroupConfig

type MethodMappingsConfig map[string]string

type BatchConfig struct {
MaxSize int `toml:"max_size"`
ErrorMessage string `toml:"error_message"`
}

type Config struct {
WSBackendGroup string `toml:"ws_backend_group"`
Server ServerConfig `toml:"server"`
Expand All @@ -104,6 +109,7 @@ type Config struct {
RateLimit RateLimitConfig `toml:"rate_limit"`
BackendOptions BackendOptions `toml:"backend"`
Backends BackendsConfig `toml:"backends"`
BatchConfig BatchConfig `toml:"batch"`
Authentication map[string]string `toml:"authentication"`
BackendGroups BackendGroupsConfig `toml:"backend_groups"`
RPCMethodMappings map[string]string `toml:"rpc_method_mappings"`
Expand Down
57 changes: 36 additions & 21 deletions proxyd/integration_tests/batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func TestBatching(t *testing.T) {
callMock1 := mockResult{"eth_call", "1", "ekans1"}

tests := []struct {
name string
handler http.Handler
mocks []mockResult
reqs []*proxyd.RPCReq
expectedRes string
maxBatchSize int
numExpectedForwards int
name string
handler http.Handler
mocks []mockResult
reqs []*proxyd.RPCReq
expectedRes string
maxUpstreamBatchSize int
numExpectedForwards int
}{
{
name: "backend returns batches out of order",
Expand All @@ -49,9 +49,9 @@ func TestBatching(t *testing.T) {
NewRPCReq("2", "eth_chainId", nil),
NewRPCReq("3", "eth_chainId", nil),
},
expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3),
maxBatchSize: 2,
numExpectedForwards: 2,
expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3),
maxUpstreamBatchSize: 2,
numExpectedForwards: 2,
},
{
// infura behavior
Expand All @@ -65,8 +65,8 @@ func TestBatching(t *testing.T) {
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
),
maxBatchSize: 10,
numExpectedForwards: 1,
maxUpstreamBatchSize: 10,
numExpectedForwards: 1,
},
{
name: "backend returns single RPC response object for minibatches",
Expand All @@ -79,8 +79,8 @@ func TestBatching(t *testing.T) {
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
),
maxBatchSize: 1,
numExpectedForwards: 2,
maxUpstreamBatchSize: 1,
numExpectedForwards: 2,
},
{
name: "duplicate request ids are on distinct batches",
Expand All @@ -96,9 +96,24 @@ func TestBatching(t *testing.T) {
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "eth_call", nil),
},
expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1),
maxBatchSize: 2,
numExpectedForwards: 3,
expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1),
maxUpstreamBatchSize: 2,
numExpectedForwards: 3,
},
{
name: "over max size",
mocks: []mockResult{},
reqs: []*proxyd.RPCReq{
NewRPCReq("1", "net_version", nil),
NewRPCReq("2", "eth_chainId", nil),
NewRPCReq("3", "eth_chainId", nil),
NewRPCReq("4", "eth_call", nil),
NewRPCReq("5", "eth_call", nil),
NewRPCReq("6", "eth_call", nil),
},
expectedRes: "{\"error\":{\"code\":-32014,\"message\":\"over batch size custom message\"},\"id\":null,\"jsonrpc\":\"2.0\"}",
maxUpstreamBatchSize: 2,
numExpectedForwards: 0,
},
{
name: "eth_accounts does not get forwarded",
Expand All @@ -109,15 +124,15 @@ func TestBatching(t *testing.T) {
NewRPCReq("1", "eth_call", nil),
NewRPCReq("2", "eth_accounts", nil),
},
expectedRes: asArray(callResponse1, ethAccountsResponse2),
maxBatchSize: 2,
numExpectedForwards: 1,
expectedRes: asArray(callResponse1, ethAccountsResponse2),
maxUpstreamBatchSize: 2,
numExpectedForwards: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config.Server.MaxUpstreamBatchSize = tt.maxBatchSize
config.Server.MaxUpstreamBatchSize = tt.maxUpstreamBatchSize

handler := tt.handler
if handler == nil {
Expand Down
4 changes: 4 additions & 0 deletions proxyd/integration_tests/testdata/batching.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ backends = ["good"]
eth_chainId = "main"
net_version = "main"
eth_call = "main"

[batch]
error_message = "over batch size custom message"
max_size = 5
18 changes: 18 additions & 0 deletions proxyd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,20 @@ var (
}, []string{
"backend_name",
})

batchSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "batch_size_summary",
Help: "Summary of batch sizes",
Buckets: []float64{
1,
5,
10,
25,
50,
100,
},
})
)

func RecordRedisError(source string) {
Expand Down Expand Up @@ -278,3 +292,7 @@ func RecordCacheHit(method string) {
func RecordCacheMiss(method string) {
cacheMissesTotal.WithLabelValues(method).Inc()
}

func RecordBatchSize(size int) {
batchSizeHistogram.Observe(float64(size))
}
4 changes: 4 additions & 0 deletions proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func Start(config *Config) (func(), error) {
if config.WhitelistErrorMessage != "" {
ErrMethodNotWhitelisted.Message = config.WhitelistErrorMessage
}
if config.BatchConfig.ErrorMessage != "" {
ErrTooManyBatchRequests.Message = config.BatchConfig.ErrorMessage
}

maxConcurrentRPCs := config.Server.MaxConcurrentRPCs
if maxConcurrentRPCs == 0 {
Expand Down Expand Up @@ -236,6 +239,7 @@ func Start(config *Config) (func(), error) {
config.RateLimit,
config.Server.EnableRequestLog,
config.Server.MaxRequestBodyLogLen,
config.BatchConfig.MaxSize,
)
if err != nil {
return nil, fmt.Errorf("error creating server: %w", err)
Expand Down
13 changes: 11 additions & 2 deletions proxyd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
ContextKeyAuth = "authorization"
ContextKeyReqID = "req_id"
ContextKeyXForwardedFor = "x_forwarded_for"
MaxBatchRPCCalls = 100
MaxBatchRPCCallsHardLimit = 100
cacheStatusHdr = "X-Proxyd-Cache-Status"
defaultServerTimeout = time.Second * 10
maxRequestBodyLogLen = 2000
Expand All @@ -48,6 +48,7 @@ type Server struct {
authenticatedPaths map[string]string
timeout time.Duration
maxUpstreamBatchSize int
maxBatchSize int
upgrader *websocket.Upgrader
mainLim limiter.Store
overrideLims map[string]limiter.Store
Expand Down Expand Up @@ -75,6 +76,7 @@ func NewServer(
rateLimitConfig RateLimitConfig,
enableRequestLog bool,
maxRequestBodyLogLen int,
maxBatchSize int,
) (*Server, error) {
if cache == nil {
cache = &NoopRPCCache{}
Expand All @@ -92,6 +94,10 @@ func NewServer(
maxUpstreamBatchSize = defaultMaxUpstreamBatchSize
}

if maxBatchSize == 0 || maxBatchSize > MaxBatchRPCCallsHardLimit {
maxBatchSize = MaxBatchRPCCallsHardLimit
}

var mainLim limiter.Store
limExemptOrigins := make(map[string]bool)
limExemptUserAgents := make(map[string]bool)
Expand Down Expand Up @@ -139,6 +145,7 @@ func NewServer(
cache: cache,
enableRequestLog: enableRequestLog,
maxRequestBodyLogLen: maxRequestBodyLogLen,
maxBatchSize: maxBatchSize,
upgrader: &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second,
},
Expand Down Expand Up @@ -291,7 +298,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return
}

if len(reqs) > MaxBatchRPCCalls {
RecordBatchSize(len(reqs))

if len(reqs) > s.maxBatchSize {
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrTooManyBatchRequests)
writeRPCError(ctx, w, nil, ErrTooManyBatchRequests)
return
Expand Down