Skip to content

Commit

Permalink
arbitrum nitro support: implement the jsonrpc client logic to obtain …
Browse files Browse the repository at this point in the history
…required jsonrpc messages

Arbitrum Nitro doesn't have eth_getBlockReceipts, so we need to make a
variable number of jsonRPC requests, we call eth_getTransactionReceipt
for each transaction in the block.

For this we parse the eth_getBlockByNumber response and extract the tx
hashes
  • Loading branch information
msf committed Jul 23, 2024
1 parent 2d406c3 commit 6ea9768
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 20 deletions.
110 changes: 102 additions & 8 deletions client/jsonrpc/arbitrum_nitro.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package jsonrpc

import (
"bytes"
"context"
"errors"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
"golang.org/x/sync/errgroup"
)

type ArbitrumNitroClient struct {
Expand All @@ -30,15 +33,106 @@ func NewArbitrumNitroClient(log *slog.Logger, cfg Config) (*ArbitrumNitroClient,
// TODO: this method should be optional
// 2. call to eth_getTransactionReceipt for each Tx present in the Block
//
// We encode the payload in NDJSON, and use a header line to indicate how many Tx are present in the block
func (c *ArbitrumNitroClient) BlockByNumber(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
// We encode the payload in NDJSON
func (c *ArbitrumNitroClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) {
tStart := time.Now()
defer func() {
c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "duration", time.Since(tStart))
}()
// TODO: lets not implement this yet
return models.RPCBlock{
BlockNumber: blockNumber,
Error: errors.New("not implemented"),
}, errors.New("not implemented")

blockNumberHex := fmt.Sprintf("0x%x", blockNumber)

methods := []string{
"eth_getBlockByNumber",
"debug_traceBlockByNumber",
}
methodArgs := map[string][]any{
"eth_getBlockByNumber": {blockNumberHex, true},
"debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}},
}
group, ctx := errgroup.WithContext(ctx)
results := make([]*bytes.Buffer, len(methods))
for i, method := range methods {
results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(results[i])

group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(ctx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", method,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
}

if err := group.Wait(); err != nil {
return models.RPCBlock{}, err
}

txHashes, err := getTransactionHashes(results[0].Bytes())
if err != nil {
return models.RPCBlock{}, err
}

c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "txCount", len(txHashes))
for _, tx := range txHashes {
result := c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(result)

results = append(results, result)
group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(ctx, "eth_getTransactionReceipt", []any{tx.Hash}, result)
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", "eth_getTransactionReceipt",
"txHash", tx.Hash,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
}
if err := group.Wait(); err != nil {
return models.RPCBlock{}, err
}

return c.buildRPCBlockResponse(blockNumber, results)
}

type transactionHash struct {
Hash string `json:"hash"`
}

func getTransactionHashes(blockResp []byte) ([]transactionHash, error) {
// minimal parse the block response to extract the transaction hashes
type blockResponse struct {
Result struct {
Transactions []transactionHash `json:"transactions"`
} `json:"result"`
}
var resp blockResponse
err := json.Unmarshal(blockResp, &resp)
if err != nil {
return nil, fmt.Errorf("failed to parse eth_getBlockByNumber response: %w", err)
}
return resp.Result.Transactions, nil
}
17 changes: 17 additions & 0 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,20 @@ func (c *rpcClient) Close() error {
c.wrkPool.Release()
return nil
}

func (c *rpcClient) buildRPCBlockResponse(number int64, results []*bytes.Buffer) (models.RPCBlock, error) {
var buffer bytes.Buffer
for _, res := range results {
buffer.Grow(res.Len())
buffer.ReadFrom(res)
}
return models.RPCBlock{
BlockNumber: number,
Payload: buffer.Bytes(),
}, nil
}

func (c *rpcClient) putBuffer(buf *bytes.Buffer) {
buf.Reset()
c.bufPool.Put(buf)
}
15 changes: 3 additions & 12 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
results := make([]*bytes.Buffer, len(methods))
for i, method := range methods {
results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(results[i])
results[i].Reset()
defer c.putBuffer(results[i])

group.Go(func() error {
errCh := make(chan error, 1)
Expand All @@ -66,6 +65,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
err := c.getResponseBody(ctx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", method,
"error", err,
)
Expand All @@ -82,14 +82,5 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
return models.RPCBlock{}, err
}

// copy the responses in order
var buffer bytes.Buffer
for _, res := range results {
buffer.Grow(res.Len())
buffer.ReadFrom(res)
}
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: buffer.Bytes(),
}, nil
return c.buildRPCBlockResponse(blockNumber, results)
}

0 comments on commit 6ea9768

Please sign in to comment.