diff --git a/.changeset/swift-ways-glow.md b/.changeset/swift-ways-glow.md new file mode 100644 index 0000000000000..b1cccc8af9f3d --- /dev/null +++ b/.changeset/swift-ways-glow.md @@ -0,0 +1,5 @@ +--- +'@eth-optimism/proxyd': minor +--- + +Updated metrics, support local rate limiter diff --git a/go/proxyd/backend.go b/go/proxyd/backend.go index 8831a2b9395c6..81134123293cb 100644 --- a/go/proxyd/backend.go +++ b/go/proxyd/backend.go @@ -14,6 +14,7 @@ import ( "math" "math/rand" "net/http" + "strconv" "time" ) @@ -24,36 +25,44 @@ const ( var ( ErrInvalidRequest = &RPCErr{ - Code: -32601, - Message: "invalid request", + Code: -32601, + Message: "invalid request", + HTTPErrorCode: 400, } ErrParseErr = &RPCErr{ - Code: -32700, - Message: "parse error", + Code: -32700, + Message: "parse error", + HTTPErrorCode: 400, } ErrInternal = &RPCErr{ - Code: JSONRPCErrorInternal, - Message: "internal error", + Code: JSONRPCErrorInternal, + Message: "internal error", + HTTPErrorCode: 500, } ErrMethodNotWhitelisted = &RPCErr{ - Code: JSONRPCErrorInternal - 1, - Message: "rpc method is not whitelisted", + Code: JSONRPCErrorInternal - 1, + Message: "rpc method is not whitelisted", + HTTPErrorCode: 403, } ErrBackendOffline = &RPCErr{ - Code: JSONRPCErrorInternal - 10, - Message: "backend offline", + Code: JSONRPCErrorInternal - 10, + Message: "backend offline", + HTTPErrorCode: 503, } ErrNoBackends = &RPCErr{ - Code: JSONRPCErrorInternal - 11, - Message: "no backends available for method", + Code: JSONRPCErrorInternal - 11, + Message: "no backends available for method", + HTTPErrorCode: 503, } ErrBackendOverCapacity = &RPCErr{ - Code: JSONRPCErrorInternal - 12, - Message: "backend is over capacity", + Code: JSONRPCErrorInternal - 12, + Message: "backend is over capacity", + HTTPErrorCode: 429, } ErrBackendBadResponse = &RPCErr{ - Code: JSONRPCErrorInternal - 13, - Message: "backend returned an invalid response", + Code: JSONRPCErrorInternal - 13, + Message: "backend returned an invalid response", + HTTPErrorCode: 500, } ) @@ -63,7 +72,7 @@ type Backend struct { wsURL string authUsername string authPassword string - redis Redis + rateLimiter RateLimiter client *http.Client dialer *websocket.Dialer maxRetries int @@ -122,14 +131,14 @@ func NewBackend( name string, rpcURL string, wsURL string, - redis Redis, + rateLimiter RateLimiter, opts ...BackendOpt, ) *Backend { backend := &Backend{ Name: name, rpcURL: rpcURL, wsURL: wsURL, - redis: redis, + rateLimiter: rateLimiter, maxResponseSize: math.MaxInt64, client: &http.Client{ Timeout: 5 * time.Second, @@ -160,7 +169,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) { for i := 0; i <= b.maxRetries; i++ { RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP) respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method)) - res, err := b.doForward(req) + res, err := b.doForward(ctx, req) if err != nil { lastError = err log.Warn( @@ -210,7 +219,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet backendConn, _, err := b.dialer.Dial(b.wsURL, nil) if err != nil { b.setOffline() - if err := b.redis.DecBackendWSConns(b.Name); err != nil { + if err := b.rateLimiter.DecBackendWSConns(b.Name); err != nil { log.Error("error decrementing backend ws conns", "name", b.Name, "err", err) } return nil, wrapErr(err, "error dialing backend") @@ -221,7 +230,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet } func (b *Backend) Online() bool { - online, err := b.redis.IsBackendOnline(b.Name) + online, err := b.rateLimiter.IsBackendOnline(b.Name) if err != nil { log.Warn( "error getting backend availability, assuming it is offline", @@ -238,7 +247,7 @@ func (b *Backend) IsRateLimited() bool { return false } - usedLimit, err := b.redis.IncBackendRPS(b.Name) + usedLimit, err := b.rateLimiter.IncBackendRPS(b.Name) if err != nil { log.Error( "error getting backend used rate limit, assuming limit is exhausted", @@ -256,7 +265,7 @@ func (b *Backend) IsWSSaturated() bool { return false } - incremented, err := b.redis.IncBackendWSConns(b.Name, b.maxWSConns) + incremented, err := b.rateLimiter.IncBackendWSConns(b.Name, b.maxWSConns) if err != nil { log.Error( "error getting backend used ws conns, assuming limit is exhausted", @@ -270,7 +279,7 @@ func (b *Backend) IsWSSaturated() bool { } func (b *Backend) setOffline() { - err := b.redis.SetBackendOffline(b.Name, b.outOfServiceInterval) + err := b.rateLimiter.SetBackendOffline(b.Name, b.outOfServiceInterval) if err != nil { log.Warn( "error setting backend offline", @@ -280,7 +289,7 @@ func (b *Backend) setOffline() { } } -func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) { +func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) { body := mustMarshalJSON(rpcReq) httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body)) @@ -299,6 +308,13 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) { return nil, wrapErr(err, "error in backend request") } + rpcBackendHTTPResponseCodesTotal.WithLabelValues( + GetAuthCtx(ctx), + b.Name, + rpcReq.Method, + strconv.Itoa(httpRes.StatusCode), + ).Inc() + // Alchemy returns a 400 on bad JSONs, so handle that case if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { return nil, fmt.Errorf("response code %d", httpRes.StatusCode) @@ -315,6 +331,12 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) { return nil, ErrBackendBadResponse } + // capture the HTTP status code in the response. this will only + // ever be 400 given the status check on line 318 above. + if httpRes.StatusCode != 200 { + res.Error.HTTPErrorCode = httpRes.StatusCode + } + return res, nil } @@ -556,7 +578,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) { func (w *WSProxier) close() { w.clientConn.Close() w.backendConn.Close() - if err := w.backend.redis.DecBackendWSConns(w.backend.Name); err != nil { + if err := w.backend.rateLimiter.DecBackendWSConns(w.backend.Name); err != nil { log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err) } activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec() diff --git a/go/proxyd/config.go b/go/proxyd/config.go index 18a638d044460..8bf8cec802352 100644 --- a/go/proxyd/config.go +++ b/go/proxyd/config.go @@ -37,7 +37,7 @@ type BackendConfig struct { type BackendsConfig map[string]*BackendConfig type BackendGroupConfig struct { - Backends []string `toml:"backends"` + Backends []string `toml:"backends"` } type BackendGroupsConfig map[string]*BackendGroupConfig diff --git a/go/proxyd/metrics.go b/go/proxyd/metrics.go index 4296676e38f05..18592d8ec4a15 100644 --- a/go/proxyd/metrics.go +++ b/go/proxyd/metrics.go @@ -38,6 +38,17 @@ var ( "source", }) + rpcBackendHTTPResponseCodesTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "rpc_backend_http_response_codes_total", + Help: "Count of total backend responses by HTTP status code.", + }, []string{ + "auth", + "backend_name", + "method_name", + "status_code", + }) + rpcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: MetricsNamespace, Name: "rpc_errors_total", @@ -101,6 +112,14 @@ var ( Help: "Count of total HTTP requests.", }) + httpResponseCodesTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "http_response_codes_total", + Help: "Count of total HTTP response codes.", + }, []string{ + "status_code", + }) + httpRequestDurationSumm = promauto.NewSummary(prometheus.SummaryOpts{ Namespace: MetricsNamespace, Name: "http_request_duration_seconds", diff --git a/go/proxyd/proxyd.go b/go/proxyd/proxyd.go index 950baf0748107..ac32359f76007 100644 --- a/go/proxyd/proxyd.go +++ b/go/proxyd/proxyd.go @@ -29,9 +29,16 @@ func Start(config *Config) error { } } - redis, err := NewRedis(config.Redis.URL) - if err != nil { - return err + var lim RateLimiter + var err error + if config.Redis == nil { + log.Warn("redis is not configured, using local rate limiter") + lim = NewLocalRateLimiter() + } else { + lim, err = NewRedisRateLimiter(config.Redis.URL) + if err != nil { + return err + } } backendNames := make([]string, 0) @@ -68,7 +75,7 @@ func Start(config *Config) error { if cfg.Password != "" { opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password)) } - back := NewBackend(name, cfg.RPCURL, cfg.WSURL, redis, opts...) + back := NewBackend(name, cfg.RPCURL, cfg.WSURL, lim, opts...) backendNames = append(backendNames, name) backendsByName[name] = back log.Info("configured backend", "name", name, "rpc_url", cfg.RPCURL, "ws_url", cfg.WSURL) @@ -90,17 +97,17 @@ func Start(config *Config) error { backendGroups[bgName] = group } - var wsBackendGroup *BackendGroup - if config.WSBackendGroup != "" { - wsBackendGroup = backendGroups[config.WSBackendGroup] - if wsBackendGroup == nil { - return fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup) - } - } + var wsBackendGroup *BackendGroup + if config.WSBackendGroup != "" { + wsBackendGroup = backendGroups[config.WSBackendGroup] + if wsBackendGroup == nil { + return fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup) + } + } - if wsBackendGroup == nil && config.Server.WSPort != 0 { - return fmt.Errorf("a ws port was defined, but no ws group was defined") - } + if wsBackendGroup == nil && config.Server.WSPort != 0 { + return fmt.Errorf("a ws port was defined, but no ws group was defined") + } for _, bg := range config.RPCMethodMappings { if backendGroups[bg] == nil { @@ -152,7 +159,7 @@ func Start(config *Config) error { recvSig := <-sig log.Info("caught signal, shutting down", "signal", recvSig) srv.Shutdown() - if err := redis.FlushBackendWSConns(backendNames); err != nil { + if err := lim.FlushBackendWSConns(backendNames); err != nil { log.Error("error flushing backend ws conns", "err", err) } return nil diff --git a/go/proxyd/redis.go b/go/proxyd/rate_limiter.go similarity index 64% rename from go/proxyd/redis.go rename to go/proxyd/rate_limiter.go index b88514d1a5a05..6954949c35e77 100644 --- a/go/proxyd/redis.go +++ b/go/proxyd/rate_limiter.go @@ -40,7 +40,7 @@ end return false ` -type Redis interface { +type RateLimiter interface { IsBackendOnline(name string) (bool, error) SetBackendOffline(name string, duration time.Duration) error IncBackendRPS(name string) (int, error) @@ -49,14 +49,14 @@ type Redis interface { FlushBackendWSConns(names []string) error } -type RedisImpl struct { +type RedisRateLimiter struct { rdb *redis.Client randID string touchKeys map[string]time.Duration tkMtx sync.Mutex } -func NewRedis(url string) (Redis, error) { +func NewRedisRateLimiter(url string) (RateLimiter, error) { opts, err := redis.ParseURL(url) if err != nil { return nil, err @@ -65,7 +65,7 @@ func NewRedis(url string) (Redis, error) { if err := rdb.Ping(context.Background()).Err(); err != nil { return nil, wrapErr(err, "error connecting to redis") } - out := &RedisImpl{ + out := &RedisRateLimiter{ rdb: rdb, randID: randStr(20), touchKeys: make(map[string]time.Duration), @@ -74,7 +74,7 @@ func NewRedis(url string) (Redis, error) { return out, nil } -func (r *RedisImpl) IsBackendOnline(name string) (bool, error) { +func (r *RedisRateLimiter) IsBackendOnline(name string) (bool, error) { exists, err := r.rdb.Exists(context.Background(), fmt.Sprintf("backend:%s:offline", name)).Result() if err != nil { RecordRedisError("IsBackendOnline") @@ -84,7 +84,7 @@ func (r *RedisImpl) IsBackendOnline(name string) (bool, error) { return exists == 0, nil } -func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error { +func (r *RedisRateLimiter) SetBackendOffline(name string, duration time.Duration) error { err := r.rdb.SetEX( context.Background(), fmt.Sprintf("backend:%s:offline", name), @@ -98,7 +98,7 @@ func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error return nil } -func (r *RedisImpl) IncBackendRPS(name string) (int, error) { +func (r *RedisRateLimiter) IncBackendRPS(name string) (int, error) { cmd := r.rdb.Eval( context.Background(), MaxRPSScript, @@ -112,7 +112,7 @@ func (r *RedisImpl) IncBackendRPS(name string) (int, error) { return rps, nil } -func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) { +func (r *RedisRateLimiter) IncBackendWSConns(name string, max int) (bool, error) { connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) r.tkMtx.Lock() r.touchKeys[connsKey] = 5 * time.Minute @@ -138,7 +138,7 @@ func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) { return incremented, nil } -func (r *RedisImpl) DecBackendWSConns(name string) error { +func (r *RedisRateLimiter) DecBackendWSConns(name string) error { connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) err := r.rdb.Decr(context.Background(), connsKey).Err() if err != nil { @@ -148,7 +148,7 @@ func (r *RedisImpl) DecBackendWSConns(name string) error { return nil } -func (r *RedisImpl) FlushBackendWSConns(names []string) error { +func (r *RedisRateLimiter) FlushBackendWSConns(names []string) error { ctx := context.Background() for _, name := range names { connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) @@ -168,7 +168,7 @@ func (r *RedisImpl) FlushBackendWSConns(names []string) error { return nil } -func (r *RedisImpl) touch() { +func (r *RedisRateLimiter) touch() { for { r.tkMtx.Lock() for key, dur := range r.touchKeys { @@ -182,6 +182,76 @@ func (r *RedisImpl) touch() { } } +type LocalRateLimiter struct { + deadBackends map[string]time.Time + backendRPS map[string]int + backendWSConns map[string]int + mtx sync.RWMutex +} + +func NewLocalRateLimiter() *LocalRateLimiter { + out := &LocalRateLimiter{ + deadBackends: make(map[string]time.Time), + backendRPS: make(map[string]int), + backendWSConns: make(map[string]int), + } + go out.clear() + return out +} + +func (l *LocalRateLimiter) IsBackendOnline(name string) (bool, error) { + l.mtx.RLock() + defer l.mtx.RUnlock() + return l.deadBackends[name].Before(time.Now()), nil +} + +func (l *LocalRateLimiter) SetBackendOffline(name string, duration time.Duration) error { + l.mtx.Lock() + defer l.mtx.Unlock() + l.deadBackends[name] = time.Now().Add(duration) + return nil +} + +func (l *LocalRateLimiter) IncBackendRPS(name string) (int, error) { + l.mtx.Lock() + defer l.mtx.Unlock() + l.backendRPS[name] += 1 + return l.backendRPS[name], nil +} + +func (l *LocalRateLimiter) IncBackendWSConns(name string, max int) (bool, error) { + l.mtx.Lock() + defer l.mtx.Unlock() + if l.backendWSConns[name] == max { + return false, nil + } + l.backendWSConns[name] += 1 + return true, nil +} + +func (l *LocalRateLimiter) DecBackendWSConns(name string) error { + l.mtx.Lock() + defer l.mtx.Unlock() + if l.backendWSConns[name] == 0 { + return nil + } + l.backendWSConns[name] -= 1 + return nil +} + +func (l *LocalRateLimiter) FlushBackendWSConns(names []string) error { + return nil +} + +func (l *LocalRateLimiter) clear() { + for { + time.Sleep(time.Second) + l.mtx.Lock() + l.backendRPS = make(map[string]int) + l.mtx.Unlock() + } +} + func randStr(l int) string { b := make([]byte, l) if _, err := rand.Read(b); err != nil { diff --git a/go/proxyd/rpc.go b/go/proxyd/rpc.go index a70b525d6abc8..6b79671b0708b 100644 --- a/go/proxyd/rpc.go +++ b/go/proxyd/rpc.go @@ -25,8 +25,9 @@ func (r *RPCRes) IsError() bool { } type RPCErr struct { - Code int `json:"code"` - Message string `json:"message"` + Code int `json:"code"` + Message string `json:"message"` + HTTPErrorCode int `json:"-"` } func (r *RPCErr) Error() string { diff --git a/go/proxyd/server.go b/go/proxyd/server.go index 6e9e70f190b9d..392add32640a5 100644 --- a/go/proxyd/server.go +++ b/go/proxyd/server.go @@ -12,6 +12,7 @@ import ( "github.com/rs/cors" "io" "net/http" + "strconv" "time" ) @@ -105,7 +106,12 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } - log.Info("received RPC request", "req_id", GetReqID(ctx), "auth", GetAuthCtx(ctx)) + log.Info( + "received RPC request", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "user_agent", r.Header.Get("user-agent"), + ) req, err := ParseRPCReq(io.LimitReader(r.Body, s.maxBodySize)) if err != nil { @@ -200,6 +206,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context // but someone sends in an auth key anyway if authorization != "" { log.Info("blocked authenticated request against unauthenticated proxy") + httpResponseCodesTotal.WithLabelValues("404").Inc() w.WriteHeader(404) return nil } @@ -212,6 +219,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context if authorization == "" || s.authenticatedPaths[authorization] == "" { log.Info("blocked unauthorized request", "authorization", authorization) + httpResponseCodesTotal.WithLabelValues("401").Inc() w.WriteHeader(401) return nil } @@ -225,21 +233,29 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context } func writeRPCError(w http.ResponseWriter, id *int, err error) { - enc := json.NewEncoder(w) - w.WriteHeader(200) - - var body *RPCRes + var res *RPCRes if r, ok := err.(*RPCErr); ok { - body = NewRPCErrorRes(id, r) + res = NewRPCErrorRes(id, r) } else { - body = NewRPCErrorRes(id, &RPCErr{ + res = NewRPCErrorRes(id, &RPCErr{ Code: JSONRPCErrorInternal, Message: "internal error", }) } - if err := enc.Encode(body); err != nil { - log.Error("error writing rpc error", "err", err) + writeRPCRes(w, res) +} + +func writeRPCRes(w http.ResponseWriter, res *RPCRes) { + statusCode := 200 + if res.IsError() && res.Error.HTTPErrorCode != 0 { + statusCode = res.Error.HTTPErrorCode + } + w.WriteHeader(statusCode) + enc := json.NewEncoder(w) + if err := enc.Encode(res); err != nil { + log.Error("error writing rpc response", "err", err) } + httpResponseCodesTotal.WithLabelValues(strconv.Itoa(statusCode)).Inc() } func instrumentedHdlr(h http.Handler) http.HandlerFunc {