Skip to content

Commit

Permalink
Fix a few issues to make the main loop work (#10)
Browse files Browse the repository at this point in the history
- Removes the retryable http client for now. It's getting stuck retrying
forever if there's an error (at least an internal error) and seems to
ignore context cancel
- Remove extra newlines in the payload
- Fix trace request (we were adding an extra layer of string escaping)
  • Loading branch information
vegarsti committed Jun 7, 2024
1 parent 7174033 commit 64c84fa
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 25 deletions.
38 changes: 23 additions & 15 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package duneapi

import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
"github.com/hashicorp/go-retryablehttp"
"github.com/klauspost/compress/zstd"
)

Expand All @@ -19,16 +20,18 @@ const (

type BlockchainIngester interface {
// SendBlock sends a block to DuneAPI
SendBlock(payload models.RPCBlock) error
SendBlock(ctx context.Context, payload models.RPCBlock) error

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
}

type client struct {
log *slog.Logger
httpClient *retryablehttp.Client
log *slog.Logger
// TODO: Use retryable client
// httpClient *retryablehttp.Client
httpClient *http.Client
cfg Config
compressor *zstd.Encoder
bufPool *sync.Pool
Expand All @@ -41,14 +44,16 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
if err != nil {
return nil, err
}
httpClient := retryablehttp.NewClient()
httpClient.RetryMax = MaxRetries
httpClient.Logger = log
httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy
httpClient.Backoff = retryablehttp.LinearJitterBackoff
// TODO: Use retryable client
// httpClient := retryablehttp.NewClient()
// httpClient.RetryMax = MaxRetries
// httpClient.Logger = log
// httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy
// httpClient.Backoff = retryablehttp.LinearJitterBackoff
return &client{
log: log,
httpClient: httpClient,
httpClient: &http.Client{},
// httpClient: httpClient,
cfg: cfg,
compressor: comp,
bufPool: &sync.Pool{
Expand All @@ -61,7 +66,7 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line

// SendBlock sends a block to DuneAPI
// TODO: support batching multiple blocks in a single request
func (c *client) SendBlock(payload models.RPCBlock) error {
func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error {
start := time.Now()
buffer := c.bufPool.Get().(*bytes.Buffer)
defer func() {
Expand All @@ -73,7 +78,7 @@ func (c *client) SendBlock(payload models.RPCBlock) error {
if err != nil {
return err
}
return c.sendRequest(request)
return c.sendRequest(ctx, request)
}

func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) {
Expand All @@ -92,12 +97,13 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl
request.ContentType = "application/zstd"
request.Payload = buffer.Bytes()
}
request.BlockNumber = payload.BlockNumber
request.IdempotencyKey = c.idempotencyKey(payload)
request.EVMStack = c.cfg.Stack.String()
return request, nil
}

func (c *client) sendRequest(request BlockchainIngestRequest) error {
func (c *client) sendRequest(ctx context.Context, request BlockchainIngestRequest) error {
// TODO: implement timeouts (context with deadline)
start := time.Now()
var err error
Expand All @@ -122,15 +128,17 @@ func (c *client) sendRequest(request BlockchainIngestRequest) error {

url := fmt.Sprintf("%s/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequest("POST", url, bytes.NewReader(request.Payload))
// TODO: Use retryable client
// req, err := retryablehttp.NewRequest("POST", url, bytes.NewReader(request.Payload))
req, err := http.NewRequest("POST", url, bytes.NewReader(request.Payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", request.ContentType)
req.Header.Set("x-idempotency-key", request.IdempotencyKey)
req.Header.Set("x-dune-evm-stack", request.EVMStack)
req.Header.Set("x-dune-api-key", c.cfg.APIKey)

req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (b *BlockchainIngestResponse) String() string {
}

type BlockchainIngestRequest struct {
BlockNumber string
BlockNumber int64
ContentType string
EVMStack string
IdempotencyKey string
Expand Down
1 change: 1 addition & 0 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis
if err != nil {
return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err)
}
log.Info("Connected to jsonrpc", "url", cfg.URL)
return rpc, nil
}

Expand Down
3 changes: 1 addition & 2 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
methodArgs := map[string][]any{
"eth_getBlockByNumber": {blockNumberHex, true},
"eth_getBlockReceipts": {blockNumberHex},
"debug_traceBlockByNumber": {blockNumberHex, `{"tracer":"callTracer"}`},
"debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}},
}
group, ctx := errgroup.WithContext(ctx)
results := make([]*bytes.Buffer, len(methods))
Expand Down Expand Up @@ -83,7 +83,6 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
for _, res := range results {
buffer.Grow(res.Len() + 1)
buffer.ReadFrom(res)
buffer.WriteString("\n")
}
return models.RPCBlock{
BlockNumber: blockNumber,
Expand Down
2 changes: 1 addition & 1 deletion ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
if !ok {
return nil // channel closed
}
if err := i.dune.SendBlock(payload); err != nil {
if err := i.dune.SendBlock(ctx, payload); err != nil {
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
Expand Down
4 changes: 2 additions & 2 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestRunLoopBaseCase(t *testing.T) {
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(block models.RPCBlock) error {
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
return nil
},
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestRunLoopUntilCancel(t *testing.T) {
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(block models.RPCBlock) error {
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
Expand Down
15 changes: 11 additions & 4 deletions mocks/duneapi/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 64c84fa

Please sign in to comment.