Skip to content

Commit

Permalink
Add error reporting to progress report API call
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 28, 2024
1 parent 68b10f4 commit 594d404
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 7 deletions.
17 changes: 14 additions & 3 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (c *client) Close() error {
}

func (c *client) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error {
var request BlockchainProgress
var request PostBlockchainProgressRequest
var err error
var responseStatus string
var responseBody string
Expand All @@ -234,14 +234,25 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
c.log.Info("Sent progress report",
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"errors", len(request.Errors),
"duration", time.Since(start),
)
}
}()

request = BlockchainProgress{
errors := make([]BlockchainError, len(progress.Errors))
for i, e := range progress.Errors {
errors[i] = BlockchainError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error,
Source: e.Source,
}
}
request = PostBlockchainProgressRequest{
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
Errors: errors,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
payload, err := json.Marshal(request)
Expand Down Expand Up @@ -275,7 +286,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
}

func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) {
var response BlockchainProgress
var response GetBlockchainProgressResponse
var err error
var responseStatus string
start := time.Now()
Expand Down
18 changes: 16 additions & 2 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
)
Expand Down Expand Up @@ -58,11 +59,24 @@ type BlockchainIngestRequest struct {
Payload []byte
}

type BlockchainProgress struct {
type GetBlockchainProgressResponse struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
}

func (p *BlockchainProgress) String() string {
func (p *GetBlockchainProgressResponse) String() string {
return fmt.Sprintf("%+v", *p)
}

type PostBlockchainProgressRequest struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
Errors []BlockchainError `json:"errors,omitempty"`
}

type BlockchainError struct {
Timestamp time.Time `json:"timestamp"`
BlockNumbers string `json:"block_numbers"`
Error string `json:"error"`
Source string `json:"source"`
}
22 changes: 22 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ type Info struct {
DuneErrors []ErrorInfo
}

// Errors returns a combined list of errors from RPC requests and Dune requests, for use in progress reporting
func (info Info) Errors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.RPCErrors)+len(info.DuneErrors))
for _, e := range info.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Expand Down
7 changes: 6 additions & 1 deletion ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,20 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
previousIngested = lastIngested
previousTime = tNow

// TODO: include errors in the report, reset the error list
err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
Errors: i.info.Errors(),
})
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
} else {
i.log.Debug("Posted progress report")
// Reset errors after reporting
i.info.RPCErrors = []ErrorInfo{}
i.info.DuneErrors = []ErrorInfo{}
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -122,7 +123,18 @@ func (i *ingester) trySendBlockBatch(
i.log.Info("SendBlocks: Context canceled, stopping")
return nextBlockToSend, nil
}
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries

// Store error for reporting
blocknumbers := make([]string, len(blockBatch))
for i, block := range blockBatch {
blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
Error: err,
BlockNumbers: strings.Join(blocknumbers, ","),
})

err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextBlockToSend, err
Expand Down
12 changes: 12 additions & 0 deletions models/progress.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
package models

import (
"time"
)

type BlockchainIndexProgress struct {
BlockchainName string
EVMStack string
LastIngestedBlockNumber int64
LatestBlockNumber int64
Errors []BlockchainIndexError
}

type BlockchainIndexError struct {
Timestamp time.Time
BlockNumbers string
Error string
Source string
}

0 comments on commit 594d404

Please sign in to comment.