Skip to content

Commit

Permalink
Limit the number of errors stored and reported
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jul 2, 2024
1 parent e247c53 commit a5bfdd2
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 61 deletions.
3 changes: 3 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
Errors: errors,
DuneErrorCounts: progress.DuneErrorCounts,
RPCErrorCounts: progress.RPCErrorCounts,
Since: progress.Since,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
payload, err := json.Marshal(request)
Expand Down
5 changes: 4 additions & 1 deletion client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (p *GetBlockchainProgressResponse) String() string {
type PostBlockchainProgressRequest struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
Errors []BlockchainError `json:"errors,omitempty"`
Errors []BlockchainError `json:"errors"`
DuneErrorCounts int `json:"dune_error_counts"`
RPCErrorCounts int `json:"rpc_error_counts"`
Since time.Time `json:"since"`
}

type BlockchainError struct {
Expand Down
41 changes: 1 addition & 40 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,42 +52,6 @@ type Config struct {
SkipFailedBlocks bool
}

type Info struct {
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
}

// Errors returns a combined list of errors from RPC requests and Dune requests, for use in progress reporting
func (info Info) Errors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.RPCErrors)+len(info.DuneErrors))
for _, e := range info.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Error error
}

type ingester struct {
log *slog.Logger
node jsonrpc.BlockchainClient
Expand All @@ -103,10 +67,7 @@ func New(
cfg Config,
progress *models.BlockchainIndexProgress,
) Ingester {
info := Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
}
info := NewInfo(cfg.BlockchainName, cfg.Stack.String())
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
Expand Down
22 changes: 6 additions & 16 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested

rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
fields := []interface{}{
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
"latestBlockNumber", latest,
Expand All @@ -203,31 +201,23 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
}
previousHoursToCatchUp = etaHours
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
if i.info.Errors.RPCErrorCount > 0 {
fields = append(fields, "rpcErrors", i.info.Errors.RPCErrorCount)
}
if duneErrors > 0 {
fields = append(fields, "duneErrors", duneErrors)
if i.info.Errors.DuneErrorCount > 0 {
fields = append(fields, "duneErrors", i.info.Errors.DuneErrorCount)
}

i.log.Info("PROGRESS REPORT", fields...)
previousIngested = lastIngested
previousTime = tNow

err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
Errors: i.info.Errors(),
})
err := i.dune.PostProgressReport(ctx, i.info.ToProgressReport())
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
} else {
i.log.Debug("Posted progress report")
// Reset errors after reporting
i.info.RPCErrors = []ErrorInfo{}
i.info.DuneErrors = []ErrorInfo{}
i.info.ResetErrors()
}
}
}
Expand Down
119 changes: 119 additions & 0 deletions ingester/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package ingester

import (
"time"

"github.com/duneanalytics/blockchain-ingester/models"
)

type Info struct {
BlockchainName string
Stack string
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
Errors ErrorState
Since time.Time
}

func NewInfo(blockchain string, stack string) Info {
return Info{
BlockchainName: blockchain,
Stack: stack,
Errors: ErrorState{
RPCErrors: make([]ErrorInfo, 0, 100),
DuneErrors: make([]ErrorInfo, 0, 100),
RPCErrorCount: 0,
DuneErrorCount: 0,
},
Since: time.Now(),
}
}

func (info *Info) ToProgressReport() models.BlockchainIndexProgress {
return models.BlockchainIndexProgress{
BlockchainName: info.BlockchainName,
EVMStack: info.Stack,
LastIngestedBlockNumber: info.IngestedBlockNumber,
LatestBlockNumber: info.LatestBlockNumber,
Errors: info.ProgressReportErrors(),
DuneErrorCounts: info.Errors.DuneErrorCount,
RPCErrorCounts: info.Errors.DuneErrorCount,
Since: info.Since,
}
}

func (info *Info) ResetErrors() {
info.Since = time.Now()
info.Errors.Reset()
}

type ErrorState struct {
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
RPCErrorCount int
DuneErrorCount int
}

// ProgressReportErrors returns a combined list of errors from RPC requests and Dune requests
func (info Info) ProgressReportErrors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.Errors.RPCErrors)+len(info.Errors.DuneErrors))
for _, e := range info.Errors.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.Errors.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

func (es *ErrorState) Reset() {
es.RPCErrors = es.RPCErrors[:0]
es.DuneErrors = es.DuneErrors[:0]
es.RPCErrorCount = 0
es.DuneErrorCount = 0
}

func (es *ErrorState) ObserveRPCError(err ErrorInfo) {
es.RPCErrorCount++
err.Timestamp = time.Now()

// Store bounded number of error messages
if len(es.RPCErrors) == cap(es.RPCErrors) {
// Remove oldest error, so we can insert the new one at the end
tmp := es.RPCErrors
es.RPCErrors = make([]ErrorInfo, 0, len(es.RPCErrors))
copy(es.RPCErrors, tmp)
}
es.RPCErrors = append(es.RPCErrors, err)
}

func (es *ErrorState) ObserveDuneError(err ErrorInfo) {
es.DuneErrorCount++
err.Timestamp = time.Now()

// Store bounded number of error messages
if len(es.DuneErrors) == cap(es.DuneErrors) {
// Remove oldest error, so we can insert the new one at the end
tmp := es.DuneErrors
es.DuneErrors = make([]ErrorInfo, 0, len(es.DuneErrors))
copy(es.DuneErrors, tmp)
}
es.DuneErrors = append(es.DuneErrors, err)
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Error error
}
26 changes: 26 additions & 0 deletions ingester/models_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ingester_test

import (
"testing"

"github.com/duneanalytics/blockchain-ingester/ingester"
"github.com/stretchr/testify/require"
)

// TestInfoErrors ensures that we never allow the error slices to grow indefinitely
func TestInfoErrors(t *testing.T) {
info := ingester.NewInfo("test", "test")
for j := 0; j < 2; j++ {
for i := 0; i < 200; i++ {
info.Errors.ObserveDuneError(ingester.ErrorInfo{})
info.Errors.ObserveRPCError(ingester.ErrorInfo{})
require.Equal(t, 100, cap(info.Errors.RPCErrors))
require.Equal(t, 100, cap(info.Errors.DuneErrors))
}
info.ResetErrors()
require.Len(t, info.Errors.RPCErrors, 0)
require.Len(t, info.Errors.DuneErrors, 0)
require.Equal(t, 100, cap(info.Errors.RPCErrors))
require.Equal(t, 100, cap(info.Errors.DuneErrors))
}
}
7 changes: 3 additions & 4 deletions ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock
}

if block.Errored() {
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
i.info.Errors.ObserveRPCError(ErrorInfo{
BlockNumbers: fmt.Sprintf("%d", block.BlockNumber),
Error: block.Error,
})
Expand Down Expand Up @@ -125,8 +124,8 @@ func (i *ingester) trySendBlockBatch(
for i, block := range blockBatch {
blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),

i.info.Errors.ObserveDuneError(ErrorInfo{
Error: err,
BlockNumbers: strings.Join(blocknumbers, ","),
})
Expand Down
3 changes: 3 additions & 0 deletions models/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type BlockchainIndexProgress struct {
LastIngestedBlockNumber int64
LatestBlockNumber int64
Errors []BlockchainIndexError
DuneErrorCounts int
RPCErrorCounts int
Since time.Time
}

type BlockchainIndexError struct {
Expand Down

0 comments on commit a5bfdd2

Please sign in to comment.