Skip to content

Commit

Permalink
Dune client: SendBlock -> SendBlocks (#36)
Browse files Browse the repository at this point in the history
This PR changes the Dune client to send a batch of blocks. We still only
send one block from the main loop, though.

See also previous draft PR
#36.
  • Loading branch information
vegarsti committed Jun 27, 2024
1 parent d83f853 commit a448231
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 78 deletions.
97 changes: 62 additions & 35 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -23,8 +24,8 @@ const (
)

type BlockchainIngester interface {
// SendBlock sends a block to DuneAPI
SendBlock(ctx context.Context, payload models.RPCBlock) error
// SendBlock sends a batch of blocks to DuneAPI
SendBlocks(ctx context.Context, payloads []models.RPCBlock) error

// GetProgressReport gets a progress report from DuneAPI
GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error)
Expand Down Expand Up @@ -81,58 +82,75 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
}

// SendBlock sends a block to DuneAPI
func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error {
func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) error {
buffer := c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(buffer)

request, err := c.buildRequest(payload, buffer)
request, err := c.buildRequest(payloads, buffer)
if err != nil {
return err
}
return c.sendRequest(ctx, request)
return c.sendRequest(ctx, *request)
}

func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) {
var request BlockchainIngestRequest
func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) {
request := &BlockchainIngestRequest{}

// not thread safe, multiple calls to the compressor here
if c.cfg.DisableCompression {
request.Payload = payload.Payload
buffer.Reset()
for _, block := range payloads {
_, err := buffer.Write(block.Payload)
if err != nil {
return nil, err
}
}
request.Payload = buffer.Bytes()
} else {
// not thread safe, multiple calls to the compressor here
buffer.Reset()
c.compressor.Reset(buffer)
_, err := c.compressor.Write(payload.Payload)
for _, block := range payloads {
_, err := c.compressor.Write(block.Payload)
if err != nil {
return nil, err
}
}
err := c.compressor.Close()
if err != nil {
return request, err
return nil, err
}
c.compressor.Close()
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
request.BlockNumber = payload.BlockNumber
request.IdempotencyKey = c.idempotencyKey(payload)

numbers := make([]string, len(payloads))
for i, payload := range payloads {
numbers[i] = fmt.Sprintf("%d", payload.BlockNumber)
}
blockNumbers := strings.Join(numbers, ",")
request.BlockNumbers = blockNumbers
request.IdempotencyKey = c.idempotencyKey(*request)
request.EVMStack = c.cfg.Stack.String()
return request, nil
}

func (c *client) sendRequest(ctx context.Context, request BlockchainIngestRequest) error {
// TODO: implement timeouts (context with deadline)
start := time.Now()
var err error
var response BlockchainIngestResponse
var responseStatus string
defer func() {
if err != nil {
c.log.Error("INGEST FAILED",
"blockNumber", request.BlockNumber,
"blockNumbers", request.BlockNumbers,
"error", err,
"statusCode", responseStatus,
"payloadSize", len(request.Payload),
"duration", time.Since(start),
)
} else {
c.log.Info("BLOCK SENT",
"blockNumber", request.BlockNumber,
c.log.Info("INGEST SUCCESS",
"blockNumbers", request.BlockNumbers,
"response", response.String(),
"payloadSize", len(request.Payload),
"duration", time.Since(start),
Expand All @@ -153,25 +171,38 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
req.Header.Set("x-idempotency-key", request.IdempotencyKey)
req.Header.Set("x-dune-evm-stack", request.EVMStack)
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req.Header.Set("x-dune-batch-size", fmt.Sprintf("%d", request.BatchSize))
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
responseStatus = resp.Status

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status)
bs, err := io.ReadAll(resp.Body)
responseBody := string(bs)
if err != nil {
return err
}
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody)
return err
}

err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

return nil
}

func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string {
// for idempotency we use the block number (should we use also the date?, or a startup timestamp?)
return fmt.Sprintf("%v", rpcBlock.BlockNumber)
func (c *client) idempotencyKey(r BlockchainIngestRequest) string {
// for idempotency we use the block numbers in the request
// (should we use also the date?, or a startup timestamp?)
return r.BlockNumbers
}

func (c *client) Close() error {
Expand Down Expand Up @@ -229,12 +260,10 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
responseStatus = resp.Status

if resp.StatusCode != http.StatusOK {
bs, err := io.ReadAll(resp.Body)
responseBody = string(bs)
if err != nil {
return err
}
err = fmt.Errorf("got non-OK response, status code: %s body: %s", responseStatus, responseBody)
bs, _ := io.ReadAll(resp.Body)
responseBody := string(bs)
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody)
return err
}

Expand Down Expand Up @@ -265,7 +294,7 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex

url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body
if err != nil {
return nil, err
}
Expand All @@ -292,12 +321,10 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex
}, nil
}
if resp.StatusCode != http.StatusOK {
var errorResp errorResponse
err = json.Unmarshal(responseBody, &errorResp)
if err != nil {
return nil, err
}
err = fmt.Errorf("got non-OK response, status code: %d, body: '%s'", resp.StatusCode, errorResp.Error)
bs, _ := io.ReadAll(resp.Body)
responseBody := string(bs)
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody)
return nil, err
}

Expand Down
12 changes: 5 additions & 7 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
DisableCompression bool
}

// The response from the DuneAPI ingest endpoint.
type BlockchainIngestResponse struct {
Tables []IngestedTableInfo `json:"tables"`
}
Expand All @@ -49,22 +50,19 @@ func (b *BlockchainIngestResponse) String() string {
}

type BlockchainIngestRequest struct {
BlockNumber int64
BlockNumbers string
BatchSize int // number of blocks in the batch
ContentEncoding string
EVMStack string
IdempotencyKey string
Payload []byte
}

type BlockchainProgress struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number"`
LatestBlockNumber int64 `json:"latest_block_number"`
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
}

func (p *BlockchainProgress) String() string {
return fmt.Sprintf("%+v", *p)
}

type errorResponse struct {
Error string `json:"error"`
}
2 changes: 1 addition & 1 deletion ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (i *ingester) trySendCompletedBlocks(
) int64 {
// Send this block only if we have sent all previous blocks
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlock(ctx, block); err != nil {
if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
Expand Down
23 changes: 19 additions & 4 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ func TestRunLoopUntilCancel(t *testing.T) {
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
}
block := blocks[0]

atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
Expand Down Expand Up @@ -68,7 +73,7 @@ func TestRunLoopUntilCancel(t *testing.T) {

func TestProduceBlockNumbers(t *testing.T) {
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, _ models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error {
return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
Expand Down Expand Up @@ -107,7 +112,12 @@ func TestProduceBlockNumbers(t *testing.T) {
func TestSendBlocks(t *testing.T) {
sentBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
}
block := blocks[0]

// DuneAPI must fail if it receives blocks out of order
if block.BlockNumber != sentBlockNumber+1 {
return errors.Errorf("blocks out of order")
Expand Down Expand Up @@ -157,7 +167,12 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) {
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
}
block := blocks[0]

// Test must fail if DuneAPI receives blocks out of order
require.Equal(t, block.BlockNumber, sentBlockNumber+1)

Expand Down
62 changes: 31 additions & 31 deletions mocks/duneapi/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a448231

Please sign in to comment.