Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 107 additions & 36 deletions go/proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math"
"math/rand"
"net/http"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -199,23 +200,34 @@ func NewBackend(
return backend
}

func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
if !b.Online() {
RecordRPCError(ctx, b.Name, req.Method, ErrBackendOffline)
RecordBatchRPCError(ctx, b.Name, reqs, ErrBackendOffline)
return nil, ErrBackendOffline
}
if b.IsRateLimited() {
RecordRPCError(ctx, b.Name, req.Method, ErrBackendOverCapacity)
RecordBatchRPCError(ctx, b.Name, reqs, ErrBackendOverCapacity)
return nil, ErrBackendOverCapacity
}

var lastError error
// <= to account for the first attempt not technically being
// a retry
for i := 0; i <= b.maxRetries; i++ {
RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP)
respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method))
res, err := b.doForward(ctx, req)
RecordBatchRPCForward(ctx, b.Name, reqs, RPCRequestSourceHTTP)
metricLabelMethod := reqs[0].Method
if isBatch {
metricLabelMethod = "<batch>"
}
timer := prometheus.NewTimer(
rpcBackendRequestDurationSumm.WithLabelValues(
b.Name,
metricLabelMethod,
strconv.FormatBool(isBatch),
),
)

res, err := b.doForward(ctx, reqs, isBatch)
if err != nil {
lastError = err
log.Warn(
Expand All @@ -224,31 +236,14 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
"req_id", GetReqID(ctx),
"err", err,
)
respTimer.ObserveDuration()
RecordRPCError(ctx, b.Name, req.Method, err)
timer.ObserveDuration()
RecordBatchRPCError(ctx, b.Name, reqs, err)
sleepContext(ctx, calcBackoff(i))
continue
}
respTimer.ObserveDuration()
if res.IsError() {
RecordRPCError(ctx, b.Name, req.Method, res.Error)
log.Info(
"backend responded with RPC error",
"backend", b.Name,
"code", res.Error.Code,
"msg", res.Error.Message,
"req_id", GetReqID(ctx),
"source", "rpc",
"auth", GetAuthCtx(ctx),
)
} else {
log.Info("forwarded RPC request",
"backend", b.Name,
"method", req.Method,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
}
timer.ObserveDuration()

MaybeRecordErrorsInRPCRes(ctx, b.Name, reqs, res)
return res, nil
}

Expand Down Expand Up @@ -337,8 +332,8 @@ func (b *Backend) setOffline() {
}
}

func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
body := mustMarshalJSON(rpcReq)
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
body := mustMarshalJSON(rpcReqs)

httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil {
Expand Down Expand Up @@ -367,11 +362,16 @@ func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error
return nil, wrapErr(err, "error in backend request")
}

metricLabelMethod := rpcReqs[0].Method
if isBatch {
metricLabelMethod = "<batch>"
}
rpcBackendHTTPResponseCodesTotal.WithLabelValues(
GetAuthCtx(ctx),
b.Name,
rpcReq.Method,
metricLabelMethod,
strconv.Itoa(httpRes.StatusCode),
strconv.FormatBool(isBatch),
).Inc()

// Alchemy returns a 400 on bad JSONs, so handle that case
Expand All @@ -385,30 +385,60 @@ func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error
return nil, wrapErr(err, "error reading response body")
}

res := new(RPCRes)
if err := json.Unmarshal(resB, res); err != nil {
var res []*RPCRes
if err := json.Unmarshal(resB, &res); err != nil {
return nil, ErrBackendBadResponse
}

// Alas! Certain node providers (Infura) always return a single JSON object for some types of errors
if len(rpcReqs) != len(res) {
return nil, ErrBackendBadResponse
}

// capture the HTTP status code in the response. this will only
// ever be 400 given the status check on line 318 above.
if httpRes.StatusCode != 200 {
res.Error.HTTPErrorCode = httpRes.StatusCode
for _, res := range res {
res.Error.HTTPErrorCode = httpRes.StatusCode
}
}

sortBatchRPCResponse(rpcReqs, res)
return res, nil
}

// sortBatchRPCResponse sorts the RPCRes slice according to the position of its corresponding ID in the RPCReq slice
func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
pos := make(map[string]int, len(req))
for i, r := range req {
key := string(r.ID)
if _, ok := pos[key]; ok {
panic("bug! detected requests with duplicate IDs")
}
pos[key] = i
}

sort.Slice(res, func(i, j int) bool {
l := res[i].ID
r := res[j].ID
return pos[string(l)] < pos[string(r)]
})
}

type BackendGroup struct {
Name string
Backends []*Backend
}

func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
if len(rpcReqs) == 0 {
return nil, nil
}

rpcRequestsTotal.Inc()

for _, back := range b.Backends {
res, err := back.Forward(ctx, rpcReq)
res, err := back.Forward(ctx, rpcReqs, isBatch)
if errors.Is(err, ErrMethodNotWhitelisted) {
return nil, err
}
Expand Down Expand Up @@ -712,3 +742,44 @@ func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error)
defer c.sem.Release(1)
return c.Do(req)
}

func RecordBatchRPCError(ctx context.Context, backendName string, reqs []*RPCReq, err error) {
for _, req := range reqs {
RecordRPCError(ctx, backendName, req.Method, err)
}
}

func MaybeRecordErrorsInRPCRes(ctx context.Context, backendName string, reqs []*RPCReq, resBatch []*RPCRes) {
log.Info("forwarded RPC request",
"backend", backendName,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
"batch_size", len(reqs),
)

var lastError *RPCErr
for i, res := range resBatch {
if res.IsError() {
lastError = res.Error
RecordRPCError(ctx, backendName, reqs[i].Method, res.Error)
}
}

if lastError != nil {
log.Info(
"backend responded with RPC error",
"backend", backendName,
"last_error_code", lastError.Code,
"last_error_msg", lastError.Message,
"req_id", GetReqID(ctx),
"source", "rpc",
"auth", GetAuthCtx(ctx),
)
}
}

func RecordBatchRPCForward(ctx context.Context, backendName string, reqs []*RPCReq, source string) {
for _, req := range reqs {
RecordRPCForward(ctx, backendName, req.Method, source)
}
}
2 changes: 2 additions & 0 deletions go/proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type ServerConfig struct {

// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
TimeoutSeconds int `toml:"timeout_seconds"`

MaxUpstreamBatchSize int `toml:"max_upstream_batch_size"`
}

type CacheConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion go/proxyd/integration_tests/batch_timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestBatchTimeout(t *testing.T) {
slowBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check the config. The sleep duration should be at least double the server.timeout_seconds config to prevent flakes
time.Sleep(time.Second * 2)
SingleResponseHandler(200, goodResponse)(w, r)
BatchedResponseHandler(200, goodResponse)(w, r)
}))
res, statusCode, err := client.SendBatchRPC(
NewRPCReq("1", "eth_chainId", nil),
Expand Down
141 changes: 141 additions & 0 deletions go/proxyd/integration_tests/batching_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package integration_tests

import (
"net/http"
"os"
"testing"

"github.com/ethereum-optimism/optimism/go/proxyd"
"github.com/stretchr/testify/require"
)

func TestBatching(t *testing.T) {
config := ReadConfig("batching")

chainIDResponse1 := `{"jsonrpc": "2.0", "result": "hello1", "id": 1}`
chainIDResponse2 := `{"jsonrpc": "2.0", "result": "hello2", "id": 2}`
chainIDResponse3 := `{"jsonrpc": "2.0", "result": "hello3", "id": 3}`
netVersionResponse1 := `{"jsonrpc": "2.0", "result": "1.0", "id": 1}`
callResponse1 := `{"jsonrpc": "2.0", "result": "ekans1", "id": 1}`

type mockResult struct {
method string
id string
result interface{}
}

chainIDMock1 := mockResult{"eth_chainId", "1", "hello1"}
chainIDMock2 := mockResult{"eth_chainId", "2", "hello2"}
chainIDMock3 := mockResult{"eth_chainId", "3", "hello3"}
netVersionMock1 := mockResult{"net_version", "1", "1.0"}
callMock1 := mockResult{"eth_call", "1", "ekans1"}

tests := []struct {
name string
handler http.Handler
mocks []mockResult
reqs []*proxyd.RPCReq
expectedRes string
maxBatchSize int
numExpectedForwards int
}{
{
name: "backend returns batches out of order",
mocks: []mockResult{chainIDMock1, chainIDMock2, chainIDMock3},
reqs: []*proxyd.RPCReq{
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("2", "eth_chainId", nil),
NewRPCReq("3", "eth_chainId", nil),
},
expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3),
maxBatchSize: 2,
numExpectedForwards: 2,
},
{
// infura behavior
name: "backend returns single RPC response object as error",
handler: SingleResponseHandler(500, `{"jsonrpc":"2.0","error":{"code":-32001,"message":"internal server error"},"id":1}`),
reqs: []*proxyd.RPCReq{
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("2", "eth_chainId", nil),
},
expectedRes: asArray(
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
),
maxBatchSize: 10,
numExpectedForwards: 1,
},
{
name: "backend returns single RPC response object for minibatches",
handler: SingleResponseHandler(500, `{"jsonrpc":"2.0","error":{"code":-32001,"message":"internal server error"},"id":1}`),
reqs: []*proxyd.RPCReq{
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("2", "eth_chainId", nil),
},
expectedRes: asArray(
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
),
maxBatchSize: 1,
numExpectedForwards: 2,
},
{
name: "duplicate request ids are on distinct batches",
mocks: []mockResult{
netVersionMock1,
chainIDMock2,
chainIDMock1,
callMock1,
},
reqs: []*proxyd.RPCReq{
NewRPCReq("1", "net_version", nil),
NewRPCReq("2", "eth_chainId", nil),
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "eth_call", nil),
},
expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1),
maxBatchSize: 2,
numExpectedForwards: 3,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config.Server.MaxUpstreamBatchSize = tt.maxBatchSize

handler := tt.handler
if handler == nil {
router := NewBatchRPCResponseRouter()
for _, mock := range tt.mocks {
router.SetRoute(mock.method, mock.id, mock.result)
}
handler = router
}

goodBackend := NewMockBackend(handler)
defer goodBackend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))

client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()

res, statusCode, err := client.SendBatchRPC(tt.reqs...)
require.NoError(t, err)
require.Equal(t, http.StatusOK, statusCode)
RequireEqualJSON(t, []byte(tt.expectedRes), res)

if tt.numExpectedForwards != 0 {
require.Equal(t, tt.numExpectedForwards, len(goodBackend.Requests()))
}

if handler, ok := handler.(*BatchRPCResponseRouter); ok {
for i, mock := range tt.mocks {
require.Equal(t, 1, handler.GetNumCalls(mock.method, mock.id), i)
}
}
})
}
}
Loading