Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/twenty-items-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@eth-optimism/proxyd': minor
---

Put special errors in a dedicated metric, pass along the content-type header
45 changes: 29 additions & 16 deletions go/proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,17 @@ func NewBackend(

func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if !b.allowedRPCMethods.Has(req.Method) {
// use unknown below to prevent DOS vector that fills up memory
// with arbitrary method names.
RecordRPCError(ctx, b.Name, MethodUnknown, ErrMethodNotWhitelisted)
return nil, ErrMethodNotWhitelisted
}
if !b.Online() {
RecordRPCError(ctx, b.Name, req.Method, ErrBackendOffline)
return nil, ErrBackendOffline
}
if b.IsRateLimited() {
RecordRPCError(ctx, b.Name, req.Method, ErrBackendOverCapacity)
return nil, ErrBackendOverCapacity
}

Expand All @@ -167,22 +172,19 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
for i := 0; i <= b.maxRetries; i++ {
RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP)
respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method))
resB, err := b.doForward(req)
res, err := b.doForward(req)
if err != nil {
lastError = err
log.Warn("backend request failed, trying again", "err", err, "name", b.Name)
respTimer.ObserveDuration()
RecordRPCError(ctx, b.Name, req.Method, err)
time.Sleep(calcBackoff(i))
continue
}
respTimer.ObserveDuration()

res := new(RPCRes)
// don't mark the backend down if they give us a bad response body
if err := json.Unmarshal(resB, res); err != nil {
return nil, ErrBackendBadResponse
if res.IsError() {
RecordRPCError(ctx, b.Name, req.Method, res.Error)
}

return res, nil
}

Expand Down Expand Up @@ -271,7 +273,7 @@ func (b *Backend) setOffline() {
}
}

func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) {
func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
body := mustMarshalJSON(rpcReq)

httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body))
Expand All @@ -283,22 +285,30 @@ func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) {
httpReq.SetBasicAuth(b.authUsername, b.authPassword)
}

res, err := b.client.Do(httpReq)
httpReq.Header.Set("content-type", "application/json")

httpRes, err := b.client.Do(httpReq)
if err != nil {
return nil, wrapErr(err, "error in backend request")
}

if res.StatusCode != 200 {
return nil, fmt.Errorf("response code %d", res.StatusCode)
// Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
}

defer res.Body.Close()
resB, err := ioutil.ReadAll(io.LimitReader(res.Body, b.maxResponseSize))
defer httpRes.Body.Close()
resB, err := ioutil.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
if err != nil {
return nil, wrapErr(err, "error reading response body")
}

return resB, nil
res := new(RPCRes)
if err := json.Unmarshal(resB, res); err != nil {
return nil, ErrBackendBadResponse
}

return res, nil
}

type BackendGroup struct {
Expand Down Expand Up @@ -329,6 +339,7 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, er
return res, nil
}

RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return nil, ErrNoBackends
}

Expand Down Expand Up @@ -413,12 +424,14 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
req, err := w.parseClientMsg(msg)
if err != nil {
var id *int
method := MethodUnknown
if req != nil {
id = req.ID
method = req.Method
}
outConn = w.clientConn
msg = mustMarshalJSON(NewRPCErrorRes(id, err))
RecordRPCError(ctx, SourceClient, err)
RecordRPCError(ctx, BackendProxyd, method, err)
} else {
RecordRPCForward(ctx, w.backend.Name, req.Method, RPCRequestSourceWS)
}
Expand Down Expand Up @@ -462,7 +475,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
msg = mustMarshalJSON(NewRPCErrorRes(id, err))
}
if res.IsError() {
RecordRPCError(ctx, SourceBackend, res.Error)
RecordRPCError(ctx, w.backend.Name, MethodUnknown, res.Error)
}

err = w.clientConn.WriteMessage(msgType, msg)
Expand Down
42 changes: 37 additions & 5 deletions go/proxyd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"strconv"
"strings"
)

const (
Expand All @@ -13,9 +14,10 @@ const (
RPCRequestSourceHTTP = "http"
RPCRequestSourceWS = "ws"

BackendProxyd = "proxyd"
SourceClient = "client"
SourceBackend = "backend"
SourceProxyd = "proxyd"
MethodUnknown = "unknown"
)

var (
Expand All @@ -42,10 +44,22 @@ var (
Help: "Count of total RPC errors.",
}, []string{
"auth",
"source",
"backend_name",
"method_name",
"error_code",
})

rpcSpecialErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "rpc_special_errors_total",
Help: "Count of total special RPC errors.",
}, []string{
"auth",
"backend_name",
"method_name",
"error_type",
})

rpcBackendRequestDurationSumm = promauto.NewSummaryVec(prometheus.SummaryOpts{
Namespace: MetricsNamespace,
Name: "rpc_backend_request_duration_seconds",
Expand Down Expand Up @@ -78,7 +92,7 @@ var (
Help: "Count of total requests that were rejected due to no backends being available.",
}, []string{
"auth",
"source",
"request_source",
})

httpRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -111,22 +125,30 @@ var (
}, []string{
"source",
})

rpcSpecialErrors = []string{
"nonce too low",
"gas price too high",
"gas price too low",
"invalid parameters",
}
)

func RecordRedisError(source string) {
redisErrorsTotal.WithLabelValues(source).Inc()
}

func RecordRPCError(ctx context.Context, source string, err error) {
func RecordRPCError(ctx context.Context, backendName, method string, err error) {
rpcErr, ok := err.(*RPCErr)
var code int
if ok {
MaybeRecordSpecialRPCError(ctx, backendName, method, rpcErr)
code = rpcErr.Code
} else {
code = -1
}

rpcErrorsTotal.WithLabelValues(GetAuthCtx(ctx), source, strconv.Itoa(code)).Inc()
rpcErrorsTotal.WithLabelValues(GetAuthCtx(ctx), backendName, method, strconv.Itoa(code)).Inc()
}

func RecordWSMessage(ctx context.Context, backendName, source string) {
Expand All @@ -140,3 +162,13 @@ func RecordUnserviceableRequest(ctx context.Context, source string) {
func RecordRPCForward(ctx context.Context, backendName, method, source string) {
rpcForwardsTotal.WithLabelValues(GetAuthCtx(ctx), backendName, method, source).Inc()
}

func MaybeRecordSpecialRPCError(ctx context.Context, backendName, method string, rpcErr *RPCErr) {
errMsg := strings.ToLower(rpcErr.Message)
for _, errStr := range rpcSpecialErrors {
if strings.Contains(errMsg, errStr) {
rpcSpecialErrorsTotal.WithLabelValues(GetAuthCtx(ctx), backendName, method, errStr).Inc()
return
}
}
}
16 changes: 2 additions & 14 deletions go/proxyd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,33 +78,21 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
req, err := ParseRPCReq(io.LimitReader(r.Body, s.maxBodySize))
if err != nil {
log.Info("rejected request with bad rpc request", "source", "rpc", "err", err)
RecordRPCError(ctx, SourceClient, err)
RecordRPCError(ctx, BackendProxyd, MethodUnknown, err)
writeRPCError(w, nil, err)
return
}

backendRes, err := s.backends.Forward(ctx, req)
if err != nil {
if errors.Is(err, ErrNoBackends) {
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
RecordRPCError(ctx, SourceProxyd, err)
} else if errors.Is(err, ErrMethodNotWhitelisted) {
RecordRPCError(ctx, SourceClient, err)
} else {
RecordRPCError(ctx, SourceBackend, err)
}
log.Error("error forwarding RPC request", "method", req.Method, "err", err)
writeRPCError(w, req.ID, err)
return
}
if backendRes.IsError() {
RecordRPCError(ctx, SourceBackend, backendRes.Error)
}

enc := json.NewEncoder(w)
if err := enc.Encode(backendRes); err != nil {
log.Error("error encoding response", "err", err)
RecordRPCError(ctx, SourceProxyd, err)
RecordRPCError(ctx, BackendProxyd, req.Method, err)
writeRPCError(w, req.ID, err)
return
}
Expand Down