Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/swift-ways-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@eth-optimism/proxyd': minor
---

Updated metrics, support local rate limiter
76 changes: 49 additions & 27 deletions go/proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"math"
"math/rand"
"net/http"
"strconv"
"time"
)

Expand All @@ -24,36 +25,44 @@ const (

var (
ErrInvalidRequest = &RPCErr{
Code: -32601,
Message: "invalid request",
Code: -32601,
Message: "invalid request",
HTTPErrorCode: 400,
}
ErrParseErr = &RPCErr{
Code: -32700,
Message: "parse error",
Code: -32700,
Message: "parse error",
HTTPErrorCode: 400,
}
ErrInternal = &RPCErr{
Code: JSONRPCErrorInternal,
Message: "internal error",
Code: JSONRPCErrorInternal,
Message: "internal error",
HTTPErrorCode: 500,
}
ErrMethodNotWhitelisted = &RPCErr{
Code: JSONRPCErrorInternal - 1,
Message: "rpc method is not whitelisted",
Code: JSONRPCErrorInternal - 1,
Message: "rpc method is not whitelisted",
HTTPErrorCode: 403,
}
ErrBackendOffline = &RPCErr{
Code: JSONRPCErrorInternal - 10,
Message: "backend offline",
Code: JSONRPCErrorInternal - 10,
Message: "backend offline",
HTTPErrorCode: 503,
}
ErrNoBackends = &RPCErr{
Code: JSONRPCErrorInternal - 11,
Message: "no backends available for method",
Code: JSONRPCErrorInternal - 11,
Message: "no backends available for method",
HTTPErrorCode: 503,
}
ErrBackendOverCapacity = &RPCErr{
Code: JSONRPCErrorInternal - 12,
Message: "backend is over capacity",
Code: JSONRPCErrorInternal - 12,
Message: "backend is over capacity",
HTTPErrorCode: 429,
}
ErrBackendBadResponse = &RPCErr{
Code: JSONRPCErrorInternal - 13,
Message: "backend returned an invalid response",
Code: JSONRPCErrorInternal - 13,
Message: "backend returned an invalid response",
HTTPErrorCode: 500,
}
)

Expand All @@ -63,7 +72,7 @@ type Backend struct {
wsURL string
authUsername string
authPassword string
redis Redis
rateLimiter RateLimiter
client *http.Client
dialer *websocket.Dialer
maxRetries int
Expand Down Expand Up @@ -122,14 +131,14 @@ func NewBackend(
name string,
rpcURL string,
wsURL string,
redis Redis,
rateLimiter RateLimiter,
opts ...BackendOpt,
) *Backend {
backend := &Backend{
Name: name,
rpcURL: rpcURL,
wsURL: wsURL,
redis: redis,
rateLimiter: rateLimiter,
maxResponseSize: math.MaxInt64,
client: &http.Client{
Timeout: 5 * time.Second,
Expand Down Expand Up @@ -160,7 +169,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
for i := 0; i <= b.maxRetries; i++ {
RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP)
respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method))
res, err := b.doForward(req)
res, err := b.doForward(ctx, req)
if err != nil {
lastError = err
log.Warn(
Expand Down Expand Up @@ -210,7 +219,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
backendConn, _, err := b.dialer.Dial(b.wsURL, nil)
if err != nil {
b.setOffline()
if err := b.redis.DecBackendWSConns(b.Name); err != nil {
if err := b.rateLimiter.DecBackendWSConns(b.Name); err != nil {
log.Error("error decrementing backend ws conns", "name", b.Name, "err", err)
}
return nil, wrapErr(err, "error dialing backend")
Expand All @@ -221,7 +230,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
}

func (b *Backend) Online() bool {
online, err := b.redis.IsBackendOnline(b.Name)
online, err := b.rateLimiter.IsBackendOnline(b.Name)
if err != nil {
log.Warn(
"error getting backend availability, assuming it is offline",
Expand All @@ -238,7 +247,7 @@ func (b *Backend) IsRateLimited() bool {
return false
}

usedLimit, err := b.redis.IncBackendRPS(b.Name)
usedLimit, err := b.rateLimiter.IncBackendRPS(b.Name)
if err != nil {
log.Error(
"error getting backend used rate limit, assuming limit is exhausted",
Expand All @@ -256,7 +265,7 @@ func (b *Backend) IsWSSaturated() bool {
return false
}

incremented, err := b.redis.IncBackendWSConns(b.Name, b.maxWSConns)
incremented, err := b.rateLimiter.IncBackendWSConns(b.Name, b.maxWSConns)
if err != nil {
log.Error(
"error getting backend used ws conns, assuming limit is exhausted",
Expand All @@ -270,7 +279,7 @@ func (b *Backend) IsWSSaturated() bool {
}

func (b *Backend) setOffline() {
err := b.redis.SetBackendOffline(b.Name, b.outOfServiceInterval)
err := b.rateLimiter.SetBackendOffline(b.Name, b.outOfServiceInterval)
if err != nil {
log.Warn(
"error setting backend offline",
Expand All @@ -280,7 +289,7 @@ func (b *Backend) setOffline() {
}
}

func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
body := mustMarshalJSON(rpcReq)

httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body))
Expand All @@ -299,6 +308,13 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
return nil, wrapErr(err, "error in backend request")
}

rpcBackendHTTPResponseCodesTotal.WithLabelValues(
GetAuthCtx(ctx),
b.Name,
rpcReq.Method,
strconv.Itoa(httpRes.StatusCode),
).Inc()

// Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
Expand All @@ -315,6 +331,12 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
return nil, ErrBackendBadResponse
}

// capture the HTTP status code in the response. this will only
// ever be 400 given the status check on line 318 above.
if httpRes.StatusCode != 200 {
res.Error.HTTPErrorCode = httpRes.StatusCode
}

return res, nil
}

Expand Down Expand Up @@ -556,7 +578,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
func (w *WSProxier) close() {
w.clientConn.Close()
w.backendConn.Close()
if err := w.backend.redis.DecBackendWSConns(w.backend.Name); err != nil {
if err := w.backend.rateLimiter.DecBackendWSConns(w.backend.Name); err != nil {
log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err)
}
activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec()
Expand Down
2 changes: 1 addition & 1 deletion go/proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type BackendConfig struct {
type BackendsConfig map[string]*BackendConfig

type BackendGroupConfig struct {
Backends []string `toml:"backends"`
Backends []string `toml:"backends"`
}

type BackendGroupsConfig map[string]*BackendGroupConfig
Expand Down
19 changes: 19 additions & 0 deletions go/proxyd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ var (
"source",
})

rpcBackendHTTPResponseCodesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "rpc_backend_http_response_codes_total",
Help: "Count of total backend responses by HTTP status code.",
}, []string{
"auth",
"backend_name",
"method_name",
"status_code",
})

rpcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "rpc_errors_total",
Expand Down Expand Up @@ -101,6 +112,14 @@ var (
Help: "Count of total HTTP requests.",
})

httpResponseCodesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "http_response_codes_total",
Help: "Count of total HTTP response codes.",
}, []string{
"status_code",
})

httpRequestDurationSumm = promauto.NewSummary(prometheus.SummaryOpts{
Namespace: MetricsNamespace,
Name: "http_request_duration_seconds",
Expand Down
37 changes: 22 additions & 15 deletions go/proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,16 @@ func Start(config *Config) error {
}
}

redis, err := NewRedis(config.Redis.URL)
if err != nil {
return err
var lim RateLimiter
var err error
if config.Redis == nil {
log.Warn("redis is not configured, using local rate limiter")
lim = NewLocalRateLimiter()
} else {
lim, err = NewRedisRateLimiter(config.Redis.URL)
if err != nil {
return err
}
}

backendNames := make([]string, 0)
Expand Down Expand Up @@ -68,7 +75,7 @@ func Start(config *Config) error {
if cfg.Password != "" {
opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password))
}
back := NewBackend(name, cfg.RPCURL, cfg.WSURL, redis, opts...)
back := NewBackend(name, cfg.RPCURL, cfg.WSURL, lim, opts...)
backendNames = append(backendNames, name)
backendsByName[name] = back
log.Info("configured backend", "name", name, "rpc_url", cfg.RPCURL, "ws_url", cfg.WSURL)
Expand All @@ -90,17 +97,17 @@ func Start(config *Config) error {
backendGroups[bgName] = group
}

var wsBackendGroup *BackendGroup
if config.WSBackendGroup != "" {
wsBackendGroup = backendGroups[config.WSBackendGroup]
if wsBackendGroup == nil {
return fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup)
}
}
var wsBackendGroup *BackendGroup
if config.WSBackendGroup != "" {
wsBackendGroup = backendGroups[config.WSBackendGroup]
if wsBackendGroup == nil {
return fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup)
}
}

if wsBackendGroup == nil && config.Server.WSPort != 0 {
return fmt.Errorf("a ws port was defined, but no ws group was defined")
}
if wsBackendGroup == nil && config.Server.WSPort != 0 {
return fmt.Errorf("a ws port was defined, but no ws group was defined")
}

for _, bg := range config.RPCMethodMappings {
if backendGroups[bg] == nil {
Expand Down Expand Up @@ -152,7 +159,7 @@ func Start(config *Config) error {
recvSig := <-sig
log.Info("caught signal, shutting down", "signal", recvSig)
srv.Shutdown()
if err := redis.FlushBackendWSConns(backendNames); err != nil {
if err := lim.FlushBackendWSConns(backendNames); err != nil {
log.Error("error flushing backend ws conns", "err", err)
}
return nil
Expand Down
Loading