Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

arbitrum nitro support: implement the jsonrpc client logic to get required data #67

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 103 additions & 8 deletions client/jsonrpc/arbitrum_nitro.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package jsonrpc

import (
"bytes"
"context"
"errors"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
"golang.org/x/sync/errgroup"
)

type ArbitrumNitroClient struct {
Expand All @@ -30,15 +33,107 @@ func NewArbitrumNitroClient(log *slog.Logger, cfg Config) (*ArbitrumNitroClient,
// TODO: this method should be optional
// 2. call to eth_getTransactionReceipt for each Tx present in the Block
//
// We encode the payload in NDJSON, and use a header line to indicate how many Tx are present in the block
func (c *ArbitrumNitroClient) BlockByNumber(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
// We encode the payload in NDJSON
func (c *ArbitrumNitroClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) {
tStart := time.Now()
defer func() {
c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "duration", time.Since(tStart))
}()
// TODO: lets not implement this yet
return models.RPCBlock{
BlockNumber: blockNumber,
Error: errors.New("not implemented"),
}, errors.New("not implemented")

blockNumberHex := fmt.Sprintf("0x%x", blockNumber)

methods := []string{
"eth_getBlockByNumber",
"debug_traceBlockByNumber",
}
methodArgs := map[string][]any{
"eth_getBlockByNumber": {blockNumberHex, true},
"debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}},
}
group, groupCtx := errgroup.WithContext(ctx)
results := make([]*bytes.Buffer, len(methods))
for i, method := range methods {
results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(results[i])

group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love us to have something like RxGo (but maintained) or go-streams or similar to make some of this channel + errgroup + ant stuff simpler. Even if we wrote a small utility function that covered the basic pattern we keep using.

defer close(errCh)
err := c.getResponseBody(groupCtx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", method,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
}

if err := group.Wait(); err != nil {
return models.RPCBlock{}, err
}

txHashes, err := getTransactionHashes(results[0].Bytes())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we start working on getting the transactions once we complete all methods from above (eth_getBlockByNumber, debug_traceBlockByNumber). But in reality we only need eth_getBlockByNumber. Does it make any difference waiting all methods to complete, compared to start getting transactions once we get the eth_getBlockByNumber response ?

if err != nil {
return models.RPCBlock{}, err
}

c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "txCount", len(txHashes))
group, groupCtx = errgroup.WithContext(ctx)
for _, tx := range txHashes {
result := c.bufPool.Get().(*bytes.Buffer)
defer c.putBuffer(result)

results = append(results, result)
group.Go(func() error {
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(groupCtx, "eth_getTransactionReceipt", []any{tx.Hash}, result)
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", "eth_getTransactionReceipt",
"txHash", tx.Hash,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
}
if err := group.Wait(); err != nil {
return models.RPCBlock{}, err
}

return c.buildRPCBlockResponse(blockNumber, results)
}

type transactionHash struct {
Hash string `json:"hash"`
}

func getTransactionHashes(blockResp []byte) ([]transactionHash, error) {
// minimal parse the block response to extract the transaction hashes
type blockResponse struct {
Result struct {
Transactions []transactionHash `json:"transactions"`
} `json:"result"`
}
var resp blockResponse
err := json.Unmarshal(blockResp, &resp)
if err != nil {
return nil, fmt.Errorf("failed to parse eth_getBlockByNumber response: %w", err)
}
return resp.Result.Transactions, nil
}
19 changes: 18 additions & 1 deletion client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis
if err != nil {
return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err)
}
log.Info("Connected to jsonrpc", "url", cfg.URL)
log.Info("Initialized and Connected to node jsonRPC", "config", fmt.Sprintf("%+v", cfg))
return rpc, nil
}

Expand Down Expand Up @@ -167,3 +167,20 @@ func (c *rpcClient) Close() error {
c.wrkPool.Release()
return nil
}

func (c *rpcClient) buildRPCBlockResponse(number int64, results []*bytes.Buffer) (models.RPCBlock, error) {
var buffer bytes.Buffer
for _, res := range results {
buffer.Grow(res.Len())
buffer.ReadFrom(res)
}
return models.RPCBlock{
BlockNumber: number,
Payload: buffer.Bytes(),
}, nil
}

func (c *rpcClient) putBuffer(buf *bytes.Buffer) {
buf.Reset()
c.bufPool.Put(buf)
}
15 changes: 3 additions & 12 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
results := make([]*bytes.Buffer, len(methods))
for i, method := range methods {
results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(results[i])
results[i].Reset()
defer c.putBuffer(results[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this makes any difference, but here you also defering the buf.Reset() compared to previous implementation. Just making sure this is intentional.


group.Go(func() error {
errCh := make(chan error, 1)
Expand All @@ -66,6 +65,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
err := c.getResponseBody(ctx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"blockNumber", blockNumber,
"method", method,
"error", err,
)
Expand All @@ -82,14 +82,5 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
return models.RPCBlock{}, err
}

// copy the responses in order
var buffer bytes.Buffer
for _, res := range results {
buffer.Grow(res.Len())
buffer.ReadFrom(res)
}
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: buffer.Bytes(),
}, nil
return c.buildRPCBlockResponse(blockNumber, results)
}
Loading