Skip to content

Commit

Permalink
jsonrpc: simplify the logic on the BlockByNumber()
Browse files Browse the repository at this point in the history
extract a method for doing the grouped JSON-RPC calls
  • Loading branch information
msf committed Aug 22, 2024
1 parent f9f3195 commit 6cbb646
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 75 deletions.
87 changes: 30 additions & 57 deletions client/jsonrpc/arbitrum_nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,75 +33,48 @@ func (c *ArbitrumNitroClient) BlockByNumber(ctx context.Context, blockNumber int

blockNumberHex := fmt.Sprintf("0x%x", blockNumber)

methods := []string{
"eth_getBlockByNumber",
"debug_traceBlockByNumber",
}
methodArgs := map[string][]any{
"eth_getBlockByNumber": {blockNumberHex, true},
"debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}},
}
group, groupCtx := errgroup.WithContext(ctx)
results := make([]*bytes.Buffer, len(methods))
for i, method := range methods {
results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(results[i])
results := make([]*bytes.Buffer, 0, 8)

group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(groupCtx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", method,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
}

if err := group.Wait(); err != nil {
// eth_getBlockByNumber and extract the transaction hashes
getBlockNumberResponse := c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(getBlockNumberResponse)
results = append(results, getBlockNumberResponse)
err := c.getResponseBody(ctx, "eth_getBlockByNumber", []any{blockNumberHex, true}, getBlockNumberResponse)
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", "eth_getBlockByNumber",
"error", err,
)
return models.RPCBlock{}, err
}

txHashes, err := getTransactionHashes(results[0].Bytes())
txHashes, err := getTransactionHashes(getBlockNumberResponse.Bytes())
if err != nil {
return models.RPCBlock{}, err
}

c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "txCount", len(txHashes))
group, groupCtx = errgroup.WithContext(ctx)
group, groupCtx := errgroup.WithContext(ctx)

// debug_traceBlockByNumber
result := c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(result)
results = append(results, result)

c.GroupedJSONrpc(
groupCtx,
group,
"debug_traceBlockByNumber",
[]any{blockNumberHex, map[string]string{"tracer": "callTracer"}},
result,
blockNumber,
)
for _, tx := range txHashes {
result := c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(result)

results = append(results, result)
group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(groupCtx, "eth_getTransactionReceipt", []any{tx.Hash}, result)
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", "eth_getTransactionReceipt",
"txHash", tx.Hash,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})

c.GroupedJSONrpc(groupCtx, group, "eth_getTransactionReceipt", []any{tx.Hash}, result, blockNumber)
}
if err := group.Wait(); err != nil {
return models.RPCBlock{}, err
Expand Down
32 changes: 32 additions & 0 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/hashicorp/go-retryablehttp"
"github.com/panjf2000/ants/v2"
"golang.org/x/sync/errgroup"
)

type BlockchainClient interface {
Expand Down Expand Up @@ -110,6 +111,37 @@ func (c *rpcClient) LatestBlockNumber() (int64, error) {
return hexutils.IntFromHex(resp.Result)
}

// GroupedJSONrpc is a helper function to spawn multiple calls belonging to the same group.
// errors are propagated to the errgroup.
// concurrency is managed by the worker pool.
func (c *rpcClient) GroupedJSONrpc(
ctx context.Context,
group *errgroup.Group,
method string,
args []any,
output *bytes.Buffer,
debugBlockNumber int64,
) {
group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(ctx, method, args, output)
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", debugBlockNumber,
"method", method,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
}

// getResponseBody sends a request to the server and returns the response body
func (c *rpcClient) getResponseBody(
ctx context.Context, method string, params []interface{}, output *bytes.Buffer,
Expand Down
19 changes: 1 addition & 18 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(results[i])

group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(ctx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", method,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
c.GroupedJSONrpc(ctx, group, method, methodArgs[method], results[i], blockNumber)
}

if err := group.Wait(); err != nil {
Expand Down

0 comments on commit 6cbb646

Please sign in to comment.