diff --git a/client/duneapi/client.go b/client/duneapi/client.go index b90b599..3179f20 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -2,14 +2,15 @@ package duneapi import ( "bytes" + "context" "encoding/json" "fmt" "log/slog" + "net/http" "sync" "time" "github.com/duneanalytics/blockchain-ingester/models" - "github.com/hashicorp/go-retryablehttp" "github.com/klauspost/compress/zstd" ) @@ -19,7 +20,7 @@ const ( type BlockchainIngester interface { // SendBlock sends a block to DuneAPI - SendBlock(payload models.RPCBlock) error + SendBlock(ctx context.Context, payload models.RPCBlock) error // - API to discover the latest block number ingested // this can also provide "next block ranges" to push to DuneAPI @@ -27,8 +28,10 @@ type BlockchainIngester interface { } type client struct { - log *slog.Logger - httpClient *retryablehttp.Client + log *slog.Logger + // TODO: Use retryable client + // httpClient *retryablehttp.Client + httpClient *http.Client cfg Config compressor *zstd.Encoder bufPool *sync.Pool @@ -41,14 +44,16 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line if err != nil { return nil, err } - httpClient := retryablehttp.NewClient() - httpClient.RetryMax = MaxRetries - httpClient.Logger = log - httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy - httpClient.Backoff = retryablehttp.LinearJitterBackoff + // TODO: Use retryable client + // httpClient := retryablehttp.NewClient() + // httpClient.RetryMax = MaxRetries + // httpClient.Logger = log + // httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy + // httpClient.Backoff = retryablehttp.LinearJitterBackoff return &client{ log: log, - httpClient: httpClient, + httpClient: &http.Client{}, + // httpClient: httpClient, cfg: cfg, compressor: comp, bufPool: &sync.Pool{ @@ -61,7 +66,7 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line // SendBlock sends a block to DuneAPI // TODO: support batching multiple blocks in a single request -func (c *client) SendBlock(payload models.RPCBlock) error { +func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error { start := time.Now() buffer := c.bufPool.Get().(*bytes.Buffer) defer func() { @@ -73,7 +78,7 @@ func (c *client) SendBlock(payload models.RPCBlock) error { if err != nil { return err } - return c.sendRequest(request) + return c.sendRequest(ctx, request) } func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) { @@ -92,12 +97,13 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl request.ContentType = "application/zstd" request.Payload = buffer.Bytes() } + request.BlockNumber = payload.BlockNumber request.IdempotencyKey = c.idempotencyKey(payload) request.EVMStack = c.cfg.Stack.String() return request, nil } -func (c *client) sendRequest(request BlockchainIngestRequest) error { +func (c *client) sendRequest(ctx context.Context, request BlockchainIngestRequest) error { // TODO: implement timeouts (context with deadline) start := time.Now() var err error @@ -122,7 +128,9 @@ func (c *client) sendRequest(request BlockchainIngestRequest) error { url := fmt.Sprintf("%s/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName) c.log.Debug("Sending request", "url", url) - req, err := retryablehttp.NewRequest("POST", url, bytes.NewReader(request.Payload)) + // TODO: Use retryable client + // req, err := retryablehttp.NewRequest("POST", url, bytes.NewReader(request.Payload)) + req, err := http.NewRequest("POST", url, bytes.NewReader(request.Payload)) if err != nil { return err } @@ -130,7 +138,7 @@ func (c *client) sendRequest(request BlockchainIngestRequest) error { req.Header.Set("x-idempotency-key", request.IdempotencyKey) req.Header.Set("x-dune-evm-stack", request.EVMStack) req.Header.Set("x-dune-api-key", c.cfg.APIKey) - + req = req.WithContext(ctx) resp, err := c.httpClient.Do(req) if err != nil { return err diff --git a/client/duneapi/models.go b/client/duneapi/models.go index eb1f14a..774d1ad 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -36,7 +36,7 @@ func (b *BlockchainIngestResponse) String() string { } type BlockchainIngestRequest struct { - BlockNumber string + BlockNumber int64 ContentType string EVMStack string IdempotencyKey string diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index c3ecd56..1ae762b 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -52,6 +52,7 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis if err != nil { return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err) } + log.Info("Connected to jsonrpc", "url", cfg.URL) return rpc, nil } diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index 1dfea39..e10149a 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -53,7 +53,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m methodArgs := map[string][]any{ "eth_getBlockByNumber": {blockNumberHex, true}, "eth_getBlockReceipts": {blockNumberHex}, - "debug_traceBlockByNumber": {blockNumberHex, `{"tracer":"callTracer"}`}, + "debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}}, } group, ctx := errgroup.WithContext(ctx) results := make([]*bytes.Buffer, len(methods)) @@ -83,7 +83,6 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m for _, res := range results { buffer.Grow(res.Len() + 1) buffer.ReadFrom(res) - buffer.WriteString("\n") } return models.RPCBlock{ BlockNumber: blockNumber, diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 04d788b..0be6dcf 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -132,7 +132,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo if !ok { return nil // channel closed } - if err := i.dune.SendBlock(payload); err != nil { + if err := i.dune.SendBlock(ctx, payload); err != nil { // TODO: implement DeadLetterQueue // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err) diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 27496ab..3b03d94 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -109,7 +109,7 @@ func TestRunLoopBaseCase(t *testing.T) { sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(block models.RPCBlock) error { + SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) return nil }, @@ -147,7 +147,7 @@ func TestRunLoopUntilCancel(t *testing.T) { sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(block models.RPCBlock) error { + SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) if block.BlockNumber == maxBlockNumber { // cancel execution when we send the last block diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go index 40d114c..ccea12d 100644 --- a/mocks/duneapi/client.go +++ b/mocks/duneapi/client.go @@ -4,6 +4,7 @@ package duneapi_mock import ( + "context" "github.com/duneanalytics/blockchain-ingester/client/duneapi" "github.com/duneanalytics/blockchain-ingester/models" "sync" @@ -19,7 +20,7 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // // make and configure a mocked duneapi.BlockchainIngester // mockedBlockchainIngester := &BlockchainIngesterMock{ -// SendBlockFunc: func(payload models.RPCBlock) error { +// SendBlockFunc: func(ctx context.Context, payload models.RPCBlock) error { // panic("mock out the SendBlock method") // }, // } @@ -30,12 +31,14 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // } type BlockchainIngesterMock struct { // SendBlockFunc mocks the SendBlock method. - SendBlockFunc func(payload models.RPCBlock) error + SendBlockFunc func(ctx context.Context, payload models.RPCBlock) error // calls tracks calls to the methods. calls struct { // SendBlock holds details about calls to the SendBlock method. SendBlock []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Payload is the payload argument value. Payload models.RPCBlock } @@ -44,19 +47,21 @@ type BlockchainIngesterMock struct { } // SendBlock calls SendBlockFunc. -func (mock *BlockchainIngesterMock) SendBlock(payload models.RPCBlock) error { +func (mock *BlockchainIngesterMock) SendBlock(ctx context.Context, payload models.RPCBlock) error { if mock.SendBlockFunc == nil { panic("BlockchainIngesterMock.SendBlockFunc: method is nil but BlockchainIngester.SendBlock was just called") } callInfo := struct { + Ctx context.Context Payload models.RPCBlock }{ + Ctx: ctx, Payload: payload, } mock.lockSendBlock.Lock() mock.calls.SendBlock = append(mock.calls.SendBlock, callInfo) mock.lockSendBlock.Unlock() - return mock.SendBlockFunc(payload) + return mock.SendBlockFunc(ctx, payload) } // SendBlockCalls gets all the calls that were made to SendBlock. @@ -64,9 +69,11 @@ func (mock *BlockchainIngesterMock) SendBlock(payload models.RPCBlock) error { // // len(mockedBlockchainIngester.SendBlockCalls()) func (mock *BlockchainIngesterMock) SendBlockCalls() []struct { + Ctx context.Context Payload models.RPCBlock } { var calls []struct { + Ctx context.Context Payload models.RPCBlock } mock.lockSendBlock.RLock()