From b6f85bf332ca7ea4bcb0df43f6ee94687657571f Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Tue, 23 Jul 2024 12:57:44 -0700 Subject: [PATCH 1/2] arbitrum nitro support: implement the jsonrpc client logic to obtain required jsonrpc messages Arbitrum Nitro doesn't have eth_getBlockReceipts, so we need to make a variable number of jsonRPC requests, we call eth_getTransactionReceipt for each transaction in the block. For this we parse the eth_getBlockByNumber response and extract the tx hashes --- client/jsonrpc/arbitrum_nitro.go | 111 ++++++++++++++++++++++++++++--- client/jsonrpc/client.go | 17 +++++ client/jsonrpc/opstack.go | 15 +---- 3 files changed, 123 insertions(+), 20 deletions(-) diff --git a/client/jsonrpc/arbitrum_nitro.go b/client/jsonrpc/arbitrum_nitro.go index ad5d257..a9a9039 100644 --- a/client/jsonrpc/arbitrum_nitro.go +++ b/client/jsonrpc/arbitrum_nitro.go @@ -1,12 +1,15 @@ package jsonrpc import ( + "bytes" "context" - "errors" + "encoding/json" + "fmt" "log/slog" "time" "github.com/duneanalytics/blockchain-ingester/models" + "golang.org/x/sync/errgroup" ) type ArbitrumNitroClient struct { @@ -30,15 +33,107 @@ func NewArbitrumNitroClient(log *slog.Logger, cfg Config) (*ArbitrumNitroClient, // TODO: this method should be optional // 2. call to eth_getTransactionReceipt for each Tx present in the Block // -// We encode the payload in NDJSON, and use a header line to indicate how many Tx are present in the block -func (c *ArbitrumNitroClient) BlockByNumber(_ context.Context, blockNumber int64) (models.RPCBlock, error) { +// We encode the payload in NDJSON +func (c *ArbitrumNitroClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) { tStart := time.Now() defer func() { c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "duration", time.Since(tStart)) }() - // TODO: lets not implement this yet - return models.RPCBlock{ - BlockNumber: blockNumber, - Error: errors.New("not implemented"), - }, errors.New("not implemented") + + blockNumberHex := fmt.Sprintf("0x%x", blockNumber) + + methods := []string{ + "eth_getBlockByNumber", + "debug_traceBlockByNumber", + } + methodArgs := map[string][]any{ + "eth_getBlockByNumber": {blockNumberHex, true}, + "debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}}, + } + group, groupCtx := errgroup.WithContext(ctx) + results := make([]*bytes.Buffer, len(methods)) + for i, method := range methods { + results[i] = c.bufPool.Get().(*bytes.Buffer) + defer c.putBuffer(results[i]) + + group.Go(func() error { + errCh := make(chan error, 1) + c.wrkPool.Submit(func() { + defer close(errCh) + err := c.getResponseBody(groupCtx, method, methodArgs[method], results[i]) + if err != nil { + c.log.Error("Failed to get response for jsonRPC", + "blockNumber", blockNumber, + "method", method, + "error", err, + ) + errCh <- err + } else { + errCh <- nil + } + }) + return <-errCh + }) + } + + if err := group.Wait(); err != nil { + return models.RPCBlock{}, err + } + + txHashes, err := getTransactionHashes(results[0].Bytes()) + if err != nil { + return models.RPCBlock{}, err + } + + c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "txCount", len(txHashes)) + group, groupCtx = errgroup.WithContext(ctx) + for _, tx := range txHashes { + result := c.bufPool.Get().(*bytes.Buffer) + defer c.putBuffer(result) + + results = append(results, result) + group.Go(func() error { + errCh := make(chan error, 1) + c.wrkPool.Submit(func() { + defer close(errCh) + err := c.getResponseBody(groupCtx, "eth_getTransactionReceipt", []any{tx.Hash}, result) + if err != nil { + c.log.Error("Failed to get response for jsonRPC", + "blockNumber", blockNumber, + "method", "eth_getTransactionReceipt", + "txHash", tx.Hash, + "error", err, + ) + errCh <- err + } else { + errCh <- nil + } + }) + return <-errCh + }) + } + if err := group.Wait(); err != nil { + return models.RPCBlock{}, err + } + + return c.buildRPCBlockResponse(blockNumber, results) +} + +type transactionHash struct { + Hash string `json:"hash"` +} + +func getTransactionHashes(blockResp []byte) ([]transactionHash, error) { + // minimal parse the block response to extract the transaction hashes + type blockResponse struct { + Result struct { + Transactions []transactionHash `json:"transactions"` + } `json:"result"` + } + var resp blockResponse + err := json.Unmarshal(blockResp, &resp) + if err != nil { + return nil, fmt.Errorf("failed to parse eth_getBlockByNumber response: %w", err) + } + return resp.Result.Transactions, nil } diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index c934d6c..3b27983 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -167,3 +167,20 @@ func (c *rpcClient) Close() error { c.wrkPool.Release() return nil } + +func (c *rpcClient) buildRPCBlockResponse(number int64, results []*bytes.Buffer) (models.RPCBlock, error) { + var buffer bytes.Buffer + for _, res := range results { + buffer.Grow(res.Len()) + buffer.ReadFrom(res) + } + return models.RPCBlock{ + BlockNumber: number, + Payload: buffer.Bytes(), + }, nil +} + +func (c *rpcClient) putBuffer(buf *bytes.Buffer) { + buf.Reset() + c.bufPool.Put(buf) +} diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index 399c395..f31bb51 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -56,8 +56,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m results := make([]*bytes.Buffer, len(methods)) for i, method := range methods { results[i] = c.bufPool.Get().(*bytes.Buffer) - defer c.bufPool.Put(results[i]) - results[i].Reset() + defer c.putBuffer(results[i]) group.Go(func() error { errCh := make(chan error, 1) @@ -66,6 +65,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m err := c.getResponseBody(ctx, method, methodArgs[method], results[i]) if err != nil { c.log.Error("Failed to get response for jsonRPC", + "blockNumber", blockNumber, "method", method, "error", err, ) @@ -82,14 +82,5 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m return models.RPCBlock{}, err } - // copy the responses in order - var buffer bytes.Buffer - for _, res := range results { - buffer.Grow(res.Len()) - buffer.ReadFrom(res) - } - return models.RPCBlock{ - BlockNumber: blockNumber, - Payload: buffer.Bytes(), - }, nil + return c.buildRPCBlockResponse(blockNumber, results) } From 5167d040a7807931bd824e278fe873cd29659a0e Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Tue, 23 Jul 2024 17:32:02 -0700 Subject: [PATCH 2/2] log configs for jsonRPC on startup --- client/jsonrpc/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index 3b27983..054241b 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -92,7 +92,7 @@ func newClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis if err != nil { return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err) } - log.Info("Connected to jsonrpc", "url", cfg.URL) + log.Info("Initialized and Connected to node jsonRPC", "config", fmt.Sprintf("%+v", cfg)) return rpc, nil }