From a4482317bbe1a2860c050285de02e12b8040074f Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Thu, 27 Jun 2024 07:46:32 +0200 Subject: [PATCH] Dune client: SendBlock -> SendBlocks (#36) This PR changes the Dune client to send a batch of blocks. We still only send one block from the main loop, though. See also previous draft PR https://github.com/duneanalytics/node-indexer/pull/36. --- client/duneapi/client.go | 97 +++++++++++++++++++++++++-------------- client/duneapi/models.go | 12 ++--- ingester/mainloop.go | 2 +- ingester/mainloop_test.go | 23 ++++++++-- mocks/duneapi/client.go | 62 ++++++++++++------------- 5 files changed, 118 insertions(+), 78 deletions(-) diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 671bdcb..035e633 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "net/http" + "strings" "sync" "time" @@ -23,8 +24,8 @@ const ( ) type BlockchainIngester interface { - // SendBlock sends a block to DuneAPI - SendBlock(ctx context.Context, payload models.RPCBlock) error + // SendBlock sends a batch of blocks to DuneAPI + SendBlocks(ctx context.Context, payloads []models.RPCBlock) error // GetProgressReport gets a progress report from DuneAPI GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) @@ -81,42 +82,59 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line } // SendBlock sends a block to DuneAPI -func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error { +func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) error { buffer := c.bufPool.Get().(*bytes.Buffer) defer c.bufPool.Put(buffer) - request, err := c.buildRequest(payload, buffer) + request, err := c.buildRequest(payloads, buffer) if err != nil { return err } - return c.sendRequest(ctx, request) + return c.sendRequest(ctx, *request) } -func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) { - var request BlockchainIngestRequest +func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) { + request := &BlockchainIngestRequest{} + // not thread safe, multiple calls to the compressor here if c.cfg.DisableCompression { - request.Payload = payload.Payload + buffer.Reset() + for _, block := range payloads { + _, err := buffer.Write(block.Payload) + if err != nil { + return nil, err + } + } + request.Payload = buffer.Bytes() } else { - // not thread safe, multiple calls to the compressor here buffer.Reset() c.compressor.Reset(buffer) - _, err := c.compressor.Write(payload.Payload) + for _, block := range payloads { + _, err := c.compressor.Write(block.Payload) + if err != nil { + return nil, err + } + } + err := c.compressor.Close() if err != nil { - return request, err + return nil, err } - c.compressor.Close() request.ContentEncoding = "application/zstd" request.Payload = buffer.Bytes() } - request.BlockNumber = payload.BlockNumber - request.IdempotencyKey = c.idempotencyKey(payload) + + numbers := make([]string, len(payloads)) + for i, payload := range payloads { + numbers[i] = fmt.Sprintf("%d", payload.BlockNumber) + } + blockNumbers := strings.Join(numbers, ",") + request.BlockNumbers = blockNumbers + request.IdempotencyKey = c.idempotencyKey(*request) request.EVMStack = c.cfg.Stack.String() return request, nil } func (c *client) sendRequest(ctx context.Context, request BlockchainIngestRequest) error { - // TODO: implement timeouts (context with deadline) start := time.Now() var err error var response BlockchainIngestResponse @@ -124,15 +142,15 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques defer func() { if err != nil { c.log.Error("INGEST FAILED", - "blockNumber", request.BlockNumber, + "blockNumbers", request.BlockNumbers, "error", err, "statusCode", responseStatus, "payloadSize", len(request.Payload), "duration", time.Since(start), ) } else { - c.log.Info("BLOCK SENT", - "blockNumber", request.BlockNumber, + c.log.Info("INGEST SUCCESS", + "blockNumbers", request.BlockNumbers, "response", response.String(), "payloadSize", len(request.Payload), "duration", time.Since(start), @@ -153,25 +171,38 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques req.Header.Set("x-idempotency-key", request.IdempotencyKey) req.Header.Set("x-dune-evm-stack", request.EVMStack) req.Header.Set("x-dune-api-key", c.cfg.APIKey) + req.Header.Set("x-dune-batch-size", fmt.Sprintf("%d", request.BatchSize)) req = req.WithContext(ctx) resp, err := c.httpClient.Do(req) if err != nil { return err } defer resp.Body.Close() + responseStatus = resp.Status + if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status) + bs, err := io.ReadAll(resp.Body) + responseBody := string(bs) + if err != nil { + return err + } + // We mutate the global err here because we have deferred a log message where we check for non-nil err + err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody) + return err } + err = json.NewDecoder(resp.Body).Decode(&response) if err != nil { return err } + return nil } -func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string { - // for idempotency we use the block number (should we use also the date?, or a startup timestamp?) - return fmt.Sprintf("%v", rpcBlock.BlockNumber) +func (c *client) idempotencyKey(r BlockchainIngestRequest) string { + // for idempotency we use the block numbers in the request + // (should we use also the date?, or a startup timestamp?) + return r.BlockNumbers } func (c *client) Close() error { @@ -229,12 +260,10 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch responseStatus = resp.Status if resp.StatusCode != http.StatusOK { - bs, err := io.ReadAll(resp.Body) - responseBody = string(bs) - if err != nil { - return err - } - err = fmt.Errorf("got non-OK response, status code: %s body: %s", responseStatus, responseBody) + bs, _ := io.ReadAll(resp.Body) + responseBody := string(bs) + // We mutate the global err here because we have deferred a log message where we check for non-nil err + err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody) return err } @@ -265,7 +294,7 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) c.log.Debug("Sending request", "url", url) - req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body if err != nil { return nil, err } @@ -292,12 +321,10 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex }, nil } if resp.StatusCode != http.StatusOK { - var errorResp errorResponse - err = json.Unmarshal(responseBody, &errorResp) - if err != nil { - return nil, err - } - err = fmt.Errorf("got non-OK response, status code: %d, body: '%s'", resp.StatusCode, errorResp.Error) + bs, _ := io.ReadAll(resp.Body) + responseBody := string(bs) + // We mutate the global err here because we have deferred a log message where we check for non-nil err + err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody) return nil, err } diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 1afd273..2782bc5 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -23,6 +23,7 @@ type Config struct { DisableCompression bool } +// The response from the DuneAPI ingest endpoint. type BlockchainIngestResponse struct { Tables []IngestedTableInfo `json:"tables"` } @@ -49,7 +50,8 @@ func (b *BlockchainIngestResponse) String() string { } type BlockchainIngestRequest struct { - BlockNumber int64 + BlockNumbers string + BatchSize int // number of blocks in the batch ContentEncoding string EVMStack string IdempotencyKey string @@ -57,14 +59,10 @@ type BlockchainIngestRequest struct { } type BlockchainProgress struct { - LastIngestedBlockNumber int64 `json:"last_ingested_block_number"` - LatestBlockNumber int64 `json:"latest_block_number"` + LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"` + LatestBlockNumber int64 `json:"latest_block_number,omitempty"` } func (p *BlockchainProgress) String() string { return fmt.Sprintf("%+v", *p) } - -type errorResponse struct { - Error string `json:"error"` -} diff --git a/ingester/mainloop.go b/ingester/mainloop.go index ee7a2bd..4f892dd 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -196,7 +196,7 @@ func (i *ingester) trySendCompletedBlocks( ) int64 { // Send this block only if we have sent all previous blocks for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] { - if err := i.dune.SendBlock(ctx, block); err != nil { + if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil { if errors.Is(err, context.Canceled) { i.log.Info("SendBlocks: Context canceled, stopping") return nextNumberToSend diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 366ae90..1bedc47 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -25,7 +25,12 @@ func TestRunLoopUntilCancel(t *testing.T) { sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) != 1 { + panic("expected 1 block") + } + block := blocks[0] + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) if block.BlockNumber == maxBlockNumber { // cancel execution when we send the last block @@ -68,7 +73,7 @@ func TestRunLoopUntilCancel(t *testing.T) { func TestProduceBlockNumbers(t *testing.T) { duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, _ models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -107,7 +112,12 @@ func TestProduceBlockNumbers(t *testing.T) { func TestSendBlocks(t *testing.T) { sentBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) != 1 { + panic("expected 1 block") + } + block := blocks[0] + // DuneAPI must fail if it receives blocks out of order if block.BlockNumber != sentBlockNumber+1 { return errors.Errorf("blocks out of order") @@ -157,7 +167,12 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) { sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) != 1 { + panic("expected 1 block") + } + block := blocks[0] + // Test must fail if DuneAPI receives blocks out of order require.Equal(t, block.BlockNumber, sentBlockNumber+1) diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go index f918ed3..64d979a 100644 --- a/mocks/duneapi/client.go +++ b/mocks/duneapi/client.go @@ -26,8 +26,8 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // PostProgressReportFunc: func(ctx context.Context, progress models.BlockchainIndexProgress) error { // panic("mock out the PostProgressReport method") // }, -// SendBlockFunc: func(ctx context.Context, payload models.RPCBlock) error { -// panic("mock out the SendBlock method") +// SendBlocksFunc: func(ctx context.Context, payloads []models.RPCBlock) error { +// panic("mock out the SendBlocks method") // }, // } // @@ -42,8 +42,8 @@ type BlockchainIngesterMock struct { // PostProgressReportFunc mocks the PostProgressReport method. PostProgressReportFunc func(ctx context.Context, progress models.BlockchainIndexProgress) error - // SendBlockFunc mocks the SendBlock method. - SendBlockFunc func(ctx context.Context, payload models.RPCBlock) error + // SendBlocksFunc mocks the SendBlocks method. + SendBlocksFunc func(ctx context.Context, payloads []models.RPCBlock) error // calls tracks calls to the methods. calls struct { @@ -59,17 +59,17 @@ type BlockchainIngesterMock struct { // Progress is the progress argument value. Progress models.BlockchainIndexProgress } - // SendBlock holds details about calls to the SendBlock method. - SendBlock []struct { + // SendBlocks holds details about calls to the SendBlocks method. + SendBlocks []struct { // Ctx is the ctx argument value. Ctx context.Context - // Payload is the payload argument value. - Payload models.RPCBlock + // Payloads is the payloads argument value. + Payloads []models.RPCBlock } } lockGetProgressReport sync.RWMutex lockPostProgressReport sync.RWMutex - lockSendBlock sync.RWMutex + lockSendBlocks sync.RWMutex } // GetProgressReport calls GetProgressReportFunc. @@ -140,38 +140,38 @@ func (mock *BlockchainIngesterMock) PostProgressReportCalls() []struct { return calls } -// SendBlock calls SendBlockFunc. -func (mock *BlockchainIngesterMock) SendBlock(ctx context.Context, payload models.RPCBlock) error { - if mock.SendBlockFunc == nil { - panic("BlockchainIngesterMock.SendBlockFunc: method is nil but BlockchainIngester.SendBlock was just called") +// SendBlocks calls SendBlocksFunc. +func (mock *BlockchainIngesterMock) SendBlocks(ctx context.Context, payloads []models.RPCBlock) error { + if mock.SendBlocksFunc == nil { + panic("BlockchainIngesterMock.SendBlocksFunc: method is nil but BlockchainIngester.SendBlocks was just called") } callInfo := struct { - Ctx context.Context - Payload models.RPCBlock + Ctx context.Context + Payloads []models.RPCBlock }{ - Ctx: ctx, - Payload: payload, + Ctx: ctx, + Payloads: payloads, } - mock.lockSendBlock.Lock() - mock.calls.SendBlock = append(mock.calls.SendBlock, callInfo) - mock.lockSendBlock.Unlock() - return mock.SendBlockFunc(ctx, payload) + mock.lockSendBlocks.Lock() + mock.calls.SendBlocks = append(mock.calls.SendBlocks, callInfo) + mock.lockSendBlocks.Unlock() + return mock.SendBlocksFunc(ctx, payloads) } -// SendBlockCalls gets all the calls that were made to SendBlock. +// SendBlocksCalls gets all the calls that were made to SendBlocks. // Check the length with: // -// len(mockedBlockchainIngester.SendBlockCalls()) -func (mock *BlockchainIngesterMock) SendBlockCalls() []struct { - Ctx context.Context - Payload models.RPCBlock +// len(mockedBlockchainIngester.SendBlocksCalls()) +func (mock *BlockchainIngesterMock) SendBlocksCalls() []struct { + Ctx context.Context + Payloads []models.RPCBlock } { var calls []struct { - Ctx context.Context - Payload models.RPCBlock + Ctx context.Context + Payloads []models.RPCBlock } - mock.lockSendBlock.RLock() - calls = mock.calls.SendBlock - mock.lockSendBlock.RUnlock() + mock.lockSendBlocks.RLock() + calls = mock.calls.SendBlocks + mock.lockSendBlocks.RUnlock() return calls }