diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 857dc4d..c75aa5c 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -260,6 +260,9 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch LastIngestedBlockNumber: progress.LastIngestedBlockNumber, LatestBlockNumber: progress.LatestBlockNumber, Errors: errors, + DuneErrorCounts: progress.DuneErrorCounts, + RPCErrorCounts: progress.RPCErrorCounts, + Since: progress.Since, } url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) payload, err := json.Marshal(request) diff --git a/client/duneapi/models.go b/client/duneapi/models.go index f1e79a1..d9ad542 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -71,7 +71,10 @@ func (p *GetBlockchainProgressResponse) String() string { type PostBlockchainProgressRequest struct { LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"` LatestBlockNumber int64 `json:"latest_block_number,omitempty"` - Errors []BlockchainError `json:"errors,omitempty"` + Errors []BlockchainError `json:"errors"` + DuneErrorCounts int `json:"dune_error_counts"` + RPCErrorCounts int `json:"rpc_error_counts"` + Since time.Time `json:"since"` } type BlockchainError struct { diff --git a/ingester/ingester.go b/ingester/ingester.go index 9754453..82925fd 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -52,42 +52,6 @@ type Config struct { SkipFailedBlocks bool } -type Info struct { - LatestBlockNumber int64 - IngestedBlockNumber int64 - ConsumedBlockNumber int64 - RPCErrors []ErrorInfo - DuneErrors []ErrorInfo -} - -// Errors returns a combined list of errors from RPC requests and Dune requests, for use in progress reporting -func (info Info) Errors() []models.BlockchainIndexError { - errors := make([]models.BlockchainIndexError, 0, len(info.RPCErrors)+len(info.DuneErrors)) - for _, e := range info.RPCErrors { - errors = append(errors, models.BlockchainIndexError{ - Timestamp: e.Timestamp, - BlockNumbers: e.BlockNumbers, - Error: e.Error.Error(), - Source: "rpc", - }) - } - for _, e := range info.DuneErrors { - errors = append(errors, models.BlockchainIndexError{ - Timestamp: e.Timestamp, - BlockNumbers: e.BlockNumbers, - Error: e.Error.Error(), - Source: "dune", - }) - } - return errors -} - -type ErrorInfo struct { - Timestamp time.Time - BlockNumbers string - Error error -} - type ingester struct { log *slog.Logger node jsonrpc.BlockchainClient @@ -103,10 +67,7 @@ func New( cfg Config, progress *models.BlockchainIndexProgress, ) Ingester { - info := Info{ - RPCErrors: []ErrorInfo{}, - DuneErrors: []ErrorInfo{}, - } + info := NewInfo(cfg.BlockchainName, cfg.Stack.String()) if progress != nil { info.LatestBlockNumber = progress.LatestBlockNumber info.IngestedBlockNumber = progress.LastIngestedBlockNumber diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 509496b..fb73172 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -188,8 +188,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error { blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds() newDistance := latest - lastIngested - rpcErrors := len(i.info.RPCErrors) - duneErrors := len(i.info.DuneErrors) fields := []interface{}{ "blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec), "latestBlockNumber", latest, @@ -203,31 +201,23 @@ func (i *ingester) ReportProgress(ctx context.Context) error { } previousHoursToCatchUp = etaHours } - if rpcErrors > 0 { - fields = append(fields, "rpcErrors", rpcErrors) + if i.info.Errors.RPCErrorCount > 0 { + fields = append(fields, "rpcErrors", i.info.Errors.RPCErrorCount) } - if duneErrors > 0 { - fields = append(fields, "duneErrors", duneErrors) + if i.info.Errors.DuneErrorCount > 0 { + fields = append(fields, "duneErrors", i.info.Errors.DuneErrorCount) } i.log.Info("PROGRESS REPORT", fields...) previousIngested = lastIngested previousTime = tNow - err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{ - BlockchainName: i.cfg.BlockchainName, - EVMStack: i.cfg.Stack.String(), - LastIngestedBlockNumber: lastIngested, - LatestBlockNumber: latest, - Errors: i.info.Errors(), - }) + err := i.dune.PostProgressReport(ctx, i.info.ToProgressReport()) if err != nil { i.log.Error("Failed to post progress report", "error", err) } else { i.log.Debug("Posted progress report") - // Reset errors after reporting - i.info.RPCErrors = []ErrorInfo{} - i.info.DuneErrors = []ErrorInfo{} + i.info.ResetErrors() } } } @@ -238,14 +228,7 @@ func (i *ingester) Close() error { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() i.log.Info("Sending final progress report") - err := i.dune.PostProgressReport( - ctx, - models.BlockchainIndexProgress{ - BlockchainName: i.cfg.BlockchainName, - EVMStack: i.cfg.Stack.String(), - LastIngestedBlockNumber: i.info.IngestedBlockNumber, - LatestBlockNumber: i.info.LatestBlockNumber, - }) + err := i.dune.PostProgressReport(ctx, i.info.ToProgressReport()) i.log.Info("Closing node") if err != nil { _ = i.node.Close() diff --git a/ingester/models.go b/ingester/models.go new file mode 100644 index 0000000..9d596bc --- /dev/null +++ b/ingester/models.go @@ -0,0 +1,117 @@ +package ingester + +import ( + "time" + + "github.com/duneanalytics/blockchain-ingester/models" +) + +type Info struct { + BlockchainName string + Stack string + LatestBlockNumber int64 + IngestedBlockNumber int64 + ConsumedBlockNumber int64 + Errors ErrorState + Since time.Time +} + +func NewInfo(blockchain string, stack string) Info { + return Info{ + BlockchainName: blockchain, + Stack: stack, + Errors: ErrorState{ + RPCErrors: make([]ErrorInfo, 0, 100), + DuneErrors: make([]ErrorInfo, 0, 100), + RPCErrorCount: 0, + DuneErrorCount: 0, + }, + Since: time.Now(), + } +} + +func (info *Info) ToProgressReport() models.BlockchainIndexProgress { + return models.BlockchainIndexProgress{ + BlockchainName: info.BlockchainName, + EVMStack: info.Stack, + LastIngestedBlockNumber: info.IngestedBlockNumber, + LatestBlockNumber: info.LatestBlockNumber, + Errors: info.ProgressReportErrors(), + DuneErrorCounts: info.Errors.DuneErrorCount, + RPCErrorCounts: info.Errors.RPCErrorCount, + Since: info.Since, + } +} + +func (info *Info) ResetErrors() { + info.Since = time.Now() + info.Errors.Reset() +} + +type ErrorState struct { + RPCErrors []ErrorInfo + DuneErrors []ErrorInfo + RPCErrorCount int + DuneErrorCount int +} + +// ProgressReportErrors returns a combined list of errors from RPC requests and Dune requests +func (info Info) ProgressReportErrors() []models.BlockchainIndexError { + errors := make([]models.BlockchainIndexError, 0, len(info.Errors.RPCErrors)+len(info.Errors.DuneErrors)) + for _, e := range info.Errors.RPCErrors { + errors = append(errors, models.BlockchainIndexError{ + Timestamp: e.Timestamp, + BlockNumbers: e.BlockNumbers, + Error: e.Error.Error(), + Source: "rpc", + }) + } + for _, e := range info.Errors.DuneErrors { + errors = append(errors, models.BlockchainIndexError{ + Timestamp: e.Timestamp, + BlockNumbers: e.BlockNumbers, + Error: e.Error.Error(), + Source: "dune", + }) + } + return errors +} + +func (es *ErrorState) Reset() { + es.RPCErrors = es.RPCErrors[:0] + es.DuneErrors = es.DuneErrors[:0] + es.RPCErrorCount = 0 + es.DuneErrorCount = 0 +} + +func (es *ErrorState) ObserveRPCError(err ErrorInfo) { + es.RPCErrorCount++ + err.Timestamp = time.Now() + + // If we have filled the slice, remove the oldest error + if len(es.RPCErrors) == cap(es.RPCErrors) { + tmp := make([]ErrorInfo, len(es.RPCErrors)-1, cap(es.RPCErrors)) + copy(tmp, es.RPCErrors[1:]) + es.RPCErrors = tmp + } + es.RPCErrors = append(es.RPCErrors, err) +} + +func (es *ErrorState) ObserveDuneError(err ErrorInfo) { + es.DuneErrorCount++ + err.Timestamp = time.Now() + + // If we have filled the slice, remove the oldest error + if len(es.DuneErrors) == cap(es.DuneErrors) { + tmp := make([]ErrorInfo, len(es.DuneErrors)-1, cap(es.DuneErrors)) + copy(tmp, es.DuneErrors[1:]) + es.DuneErrors = tmp + } + es.DuneErrors = append(es.DuneErrors, err) +} + +type ErrorInfo struct { + Timestamp time.Time + BlockNumbers string + Error error +} diff --git a/ingester/models_test.go b/ingester/models_test.go new file mode 100644 index 0000000..0c84c53 --- /dev/null +++ b/ingester/models_test.go @@ -0,0 +1,52 @@ +package ingester_test + +import ( + "testing" + + "github.com/duneanalytics/blockchain-ingester/ingester" + "github.com/go-errors/errors" + "github.com/stretchr/testify/require" +) + +// TestInfoErrors ensures that we never allow the error slices to grow indefinitely +func TestInfoErrors(t *testing.T) { + info := ingester.NewInfo("test", "test") + for j := 0; j < 2; j++ { + for i := 0; i < 200; i++ { + require.Len(t, info.Errors.RPCErrors, min(i, 100)) + require.Len(t, info.Errors.DuneErrors, min(i, 100)) + info.Errors.ObserveDuneError(ingester.ErrorInfo{}) + info.Errors.ObserveRPCError(ingester.ErrorInfo{}) + require.Equal(t, 100, cap(info.Errors.RPCErrors)) + require.Equal(t, 100, cap(info.Errors.DuneErrors)) + } + info.ResetErrors() + require.Len(t, info.Errors.RPCErrors, 0) + require.Len(t, info.Errors.DuneErrors, 0) + require.Equal(t, 100, cap(info.Errors.RPCErrors)) + require.Equal(t, 100, cap(info.Errors.DuneErrors)) + } +} + +func TestProgressReportErrors(t *testing.T) { + info := ingester.NewInfo("test", "test") + info.Errors.ObserveDuneError(ingester.ErrorInfo{Error: errors.New("foo")}) + info.Errors.ObserveRPCError(ingester.ErrorInfo{Error: errors.New("bar")}) + errors := info.ProgressReportErrors() + require.Len(t, errors, 2) +} + +func TestInfoToProgressReport(t *testing.T) { + info := ingester.NewInfo("test", "test") + info.IngestedBlockNumber = 1 + info.LatestBlockNumber = 2 + info.Errors.ObserveDuneError(ingester.ErrorInfo{Error: errors.New("foo")}) + report := info.ToProgressReport() + require.Equal(t, "test", report.BlockchainName) + require.Equal(t, "test", report.EVMStack) + require.Equal(t, int64(1), report.LastIngestedBlockNumber) + require.Equal(t, int64(2), report.LatestBlockNumber) + require.Len(t, report.Errors, 1) + require.Equal(t, 1, report.DuneErrorCounts) + require.Equal(t, 0, report.RPCErrorCounts) +} diff --git a/ingester/send.go b/ingester/send.go index ca5f5f7..8b69e01 100644 --- a/ingester/send.go +++ b/ingester/send.go @@ -36,8 +36,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock } if block.Errored() { - i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ - Timestamp: time.Now(), + i.info.Errors.ObserveRPCError(ErrorInfo{ BlockNumbers: fmt.Sprintf("%d", block.BlockNumber), Error: block.Error, }) @@ -125,8 +124,8 @@ func (i *ingester) trySendBlockBatch( for i, block := range blockBatch { blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber) } - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), + + i.info.Errors.ObserveDuneError(ErrorInfo{ Error: err, BlockNumbers: strings.Join(blocknumbers, ","), }) diff --git a/models/progress.go b/models/progress.go index dc7d9c3..03e7247 100644 --- a/models/progress.go +++ b/models/progress.go @@ -10,6 +10,9 @@ type BlockchainIndexProgress struct { LastIngestedBlockNumber int64 LatestBlockNumber int64 Errors []BlockchainIndexError + DuneErrorCounts int + RPCErrorCounts int + Since time.Time } type BlockchainIndexError struct {