Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/little-squids-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@eth-optimism/proxyd': patch
---

Unwrap single RPC batches
32 changes: 26 additions & 6 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,17 @@ func (b *Backend) setOffline() {
}

func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
body := mustMarshalJSON(rpcReqs)
isSingleElementBatch := len(rpcReqs) == 1

// Single element batches are unwrapped before being sent
// since Alchemy handles single requests better than batches.

var body []byte
if isSingleElementBatch {
body = mustMarshalJSON(rpcReqs[0])
} else {
body = mustMarshalJSON(rpcReqs)
}

httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil {
Expand Down Expand Up @@ -402,12 +412,22 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
}

var res []*RPCRes
if err := json.Unmarshal(resB, &res); err != nil {
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) {
return nil, ErrBackendUnexpectedJSONRPC
if isSingleElementBatch {
var singleRes RPCRes
if err := json.Unmarshal(resB, &singleRes); err != nil {
return nil, ErrBackendBadResponse
}
res = []*RPCRes{
&singleRes,
}
} else {
if err := json.Unmarshal(resB, &res); err != nil {
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) {
return nil, ErrBackendUnexpectedJSONRPC
}
return nil, ErrBackendBadResponse
}
return nil, ErrBackendBadResponse
}

if len(rpcReqs) != len(res) {
Expand Down
6 changes: 5 additions & 1 deletion proxyd/integration_tests/mock_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ func SingleResponseHandler(code int, response string) http.HandlerFunc {
}

func BatchedResponseHandler(code int, responses ...string) http.HandlerFunc {
// all proxyd upstream requests are batched
return func(w http.ResponseWriter, r *http.Request) {
if len(responses) == 1 {
SingleResponseHandler(code, responses[0])(w, r)
return
}

var body string
body += "["
for i, response := range responses {
Expand Down
5 changes: 5 additions & 0 deletions proxyd/integration_tests/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ func TestConcurrentWSPanic(t *testing.T) {

<-readyCh

var wg sync.WaitGroup
wg.Add(2)
// spam messages
go func() {
for {
select {
case <-quitC:
wg.Done()
return
default:
_ = backendToProxyConn.WriteMessage(websocket.TextMessage, []byte("garbage"))
Expand All @@ -61,6 +64,7 @@ func TestConcurrentWSPanic(t *testing.T) {
for {
select {
case <-quitC:
wg.Done()
return
default:
_ = client.WriteMessage(websocket.TextMessage, []byte("{\"id\": 1, \"method\": \"eth_foo\", \"params\": [\"newHeads\"]}"))
Expand All @@ -72,6 +76,7 @@ func TestConcurrentWSPanic(t *testing.T) {
// concurrent write to websocket connection
time.Sleep(time.Second)
close(quitC)
wg.Wait()
}

type backendHandler struct {
Expand Down