diff --git a/client/jsonrpc/arbitrum_nitro.go b/client/jsonrpc/arbitrum_nitro.go index 7208666..aeb9b6a 100644 --- a/client/jsonrpc/arbitrum_nitro.go +++ b/client/jsonrpc/arbitrum_nitro.go @@ -33,75 +33,48 @@ func (c *ArbitrumNitroClient) BlockByNumber(ctx context.Context, blockNumber int blockNumberHex := fmt.Sprintf("0x%x", blockNumber) - methods := []string{ - "eth_getBlockByNumber", - "debug_traceBlockByNumber", - } - methodArgs := map[string][]any{ - "eth_getBlockByNumber": {blockNumberHex, true}, - "debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}}, - } - group, groupCtx := errgroup.WithContext(ctx) - results := make([]*bytes.Buffer, len(methods)) - for i, method := range methods { - results[i] = c.bufPool.Get().(*bytes.Buffer) - defer c.putBuffer(results[i]) + results := make([]*bytes.Buffer, 0, 8) - group.Go(func() error { - errCh := make(chan error, 1) - c.wrkPool.Submit(func() { - defer close(errCh) - err := c.getResponseBody(groupCtx, method, methodArgs[method], results[i]) - if err != nil { - c.log.Error("Failed to get response for jsonRPC", - "blockNumber", blockNumber, - "method", method, - "error", err, - ) - errCh <- err - } else { - errCh <- nil - } - }) - return <-errCh - }) - } - - if err := group.Wait(); err != nil { + // eth_getBlockByNumber and extract the transaction hashes + getBlockNumberResponse := c.bufPool.Get().(*bytes.Buffer) + defer c.putBuffer(getBlockNumberResponse) + results = append(results, getBlockNumberResponse) + err := c.getResponseBody(ctx, "eth_getBlockByNumber", []any{blockNumberHex, true}, getBlockNumberResponse) + if err != nil { + c.log.Error("Failed to get response for jsonRPC", + "blockNumber", blockNumber, + "method", "eth_getBlockByNumber", + "error", err, + ) return models.RPCBlock{}, err } - - txHashes, err := getTransactionHashes(results[0].Bytes()) + txHashes, err := getTransactionHashes(getBlockNumberResponse.Bytes()) if err != nil { return models.RPCBlock{}, err } c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "txCount", len(txHashes)) - group, groupCtx = errgroup.WithContext(ctx) + group, groupCtx := errgroup.WithContext(ctx) + + // debug_traceBlockByNumber + result := c.bufPool.Get().(*bytes.Buffer) + defer c.putBuffer(result) + results = append(results, result) + + c.GroupedJSONrpc( + groupCtx, + group, + "debug_traceBlockByNumber", + []any{blockNumberHex, map[string]string{"tracer": "callTracer"}}, + result, + blockNumber, + ) for _, tx := range txHashes { result := c.bufPool.Get().(*bytes.Buffer) defer c.putBuffer(result) - results = append(results, result) - group.Go(func() error { - errCh := make(chan error, 1) - c.wrkPool.Submit(func() { - defer close(errCh) - err := c.getResponseBody(groupCtx, "eth_getTransactionReceipt", []any{tx.Hash}, result) - if err != nil { - c.log.Error("Failed to get response for jsonRPC", - "blockNumber", blockNumber, - "method", "eth_getTransactionReceipt", - "txHash", tx.Hash, - "error", err, - ) - errCh <- err - } else { - errCh <- nil - } - }) - return <-errCh - }) + + c.GroupedJSONrpc(groupCtx, group, "eth_getTransactionReceipt", []any{tx.Hash}, result, blockNumber) } if err := group.Wait(); err != nil { return models.RPCBlock{}, err diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index 37da897..7aa637f 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -14,6 +14,7 @@ import ( "github.com/duneanalytics/blockchain-ingester/models" "github.com/hashicorp/go-retryablehttp" "github.com/panjf2000/ants/v2" + "golang.org/x/sync/errgroup" ) type BlockchainClient interface { @@ -110,6 +111,37 @@ func (c *rpcClient) LatestBlockNumber() (int64, error) { return hexutils.IntFromHex(resp.Result) } +// GroupedJSONrpc is a helper function to spawn multiple calls belonging to the same group. +// errors are propagated to the errgroup. +// concurrency is managed by the worker pool. +func (c *rpcClient) GroupedJSONrpc( + ctx context.Context, + group *errgroup.Group, + method string, + args []any, + output *bytes.Buffer, + debugBlockNumber int64, +) { + group.Go(func() error { + errCh := make(chan error, 1) + c.wrkPool.Submit(func() { + defer close(errCh) + err := c.getResponseBody(ctx, method, args, output) + if err != nil { + c.log.Error("Failed to get response for jsonRPC", + "blockNumber", debugBlockNumber, + "method", method, + "error", err, + ) + errCh <- err + } else { + errCh <- nil + } + }) + return <-errCh + }) +} + // getResponseBody sends a request to the server and returns the response body func (c *rpcClient) getResponseBody( ctx context.Context, method string, params []interface{}, output *bytes.Buffer, diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index 2f9858e..31de45e 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -49,24 +49,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m results[i] = c.bufPool.Get().(*bytes.Buffer) defer c.putBuffer(results[i]) - group.Go(func() error { - errCh := make(chan error, 1) - c.wrkPool.Submit(func() { - defer close(errCh) - err := c.getResponseBody(ctx, method, methodArgs[method], results[i]) - if err != nil { - c.log.Error("Failed to get response for jsonRPC", - "blockNumber", blockNumber, - "method", method, - "error", err, - ) - errCh <- err - } else { - errCh <- nil - } - }) - return <-errCh - }) + c.GroupedJSONrpc(ctx, group, method, methodArgs[method], results[i], blockNumber) } if err := group.Wait(); err != nil {