Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a constant amount of memory for error slices and add error abstraction #57

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
Errors: errors,
DuneErrorCounts: progress.DuneErrorCounts,
RPCErrorCounts: progress.RPCErrorCounts,
Since: progress.Since,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
payload, err := json.Marshal(request)
Expand Down
5 changes: 4 additions & 1 deletion client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (p *GetBlockchainProgressResponse) String() string {
type PostBlockchainProgressRequest struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
Errors []BlockchainError `json:"errors,omitempty"`
Errors []BlockchainError `json:"errors"`
DuneErrorCounts int `json:"dune_error_counts"`
RPCErrorCounts int `json:"rpc_error_counts"`
Since time.Time `json:"since"`
}

type BlockchainError struct {
Expand Down
41 changes: 1 addition & 40 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,42 +52,6 @@ type Config struct {
SkipFailedBlocks bool
}

type Info struct {
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
}

// Errors returns a combined list of errors from RPC requests and Dune requests, for use in progress reporting
func (info Info) Errors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.RPCErrors)+len(info.DuneErrors))
for _, e := range info.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Error error
}

type ingester struct {
log *slog.Logger
node jsonrpc.BlockchainClient
Expand All @@ -103,10 +67,7 @@ func New(
cfg Config,
progress *models.BlockchainIndexProgress,
) Ingester {
info := Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
}
info := NewInfo(cfg.BlockchainName, cfg.Stack.String())
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
Expand Down
31 changes: 7 additions & 24 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested

rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
fields := []interface{}{
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
"latestBlockNumber", latest,
Expand All @@ -203,31 +201,23 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
}
previousHoursToCatchUp = etaHours
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
if i.info.Errors.RPCErrorCount > 0 {
fields = append(fields, "rpcErrors", i.info.Errors.RPCErrorCount)
}
if duneErrors > 0 {
fields = append(fields, "duneErrors", duneErrors)
if i.info.Errors.DuneErrorCount > 0 {
fields = append(fields, "duneErrors", i.info.Errors.DuneErrorCount)
}

i.log.Info("PROGRESS REPORT", fields...)
previousIngested = lastIngested
previousTime = tNow

err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
Errors: i.info.Errors(),
})
err := i.dune.PostProgressReport(ctx, i.info.ToProgressReport())
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
} else {
i.log.Debug("Posted progress report")
// Reset errors after reporting
i.info.RPCErrors = []ErrorInfo{}
i.info.DuneErrors = []ErrorInfo{}
i.info.ResetErrors()
}
}
}
Expand All @@ -238,14 +228,7 @@ func (i *ingester) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
i.log.Info("Sending final progress report")
err := i.dune.PostProgressReport(
ctx,
models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: i.info.IngestedBlockNumber,
LatestBlockNumber: i.info.LatestBlockNumber,
})
err := i.dune.PostProgressReport(ctx, i.info.ToProgressReport())
i.log.Info("Closing node")
if err != nil {
_ = i.node.Close()
Expand Down
117 changes: 117 additions & 0 deletions ingester/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package ingester

import (
"time"

"github.com/duneanalytics/blockchain-ingester/models"
)

type Info struct {
BlockchainName string
Stack string
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
Errors ErrorState
Since time.Time
}

func NewInfo(blockchain string, stack string) Info {
return Info{
BlockchainName: blockchain,
Stack: stack,
Errors: ErrorState{
RPCErrors: make([]ErrorInfo, 0, 100),
DuneErrors: make([]ErrorInfo, 0, 100),
RPCErrorCount: 0,
DuneErrorCount: 0,
},
Since: time.Now(),
}
}

func (info *Info) ToProgressReport() models.BlockchainIndexProgress {
return models.BlockchainIndexProgress{
BlockchainName: info.BlockchainName,
EVMStack: info.Stack,
LastIngestedBlockNumber: info.IngestedBlockNumber,
LatestBlockNumber: info.LatestBlockNumber,
Errors: info.ProgressReportErrors(),
DuneErrorCounts: info.Errors.DuneErrorCount,
RPCErrorCounts: info.Errors.RPCErrorCount,
Since: info.Since,
}
}

func (info *Info) ResetErrors() {
info.Since = time.Now()
info.Errors.Reset()
}

type ErrorState struct {
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
RPCErrorCount int
DuneErrorCount int
}

// ProgressReportErrors returns a combined list of errors from RPC requests and Dune requests
func (info Info) ProgressReportErrors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.Errors.RPCErrors)+len(info.Errors.DuneErrors))
for _, e := range info.Errors.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.Errors.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

func (es *ErrorState) Reset() {
es.RPCErrors = es.RPCErrors[:0]
es.DuneErrors = es.DuneErrors[:0]
es.RPCErrorCount = 0
es.DuneErrorCount = 0
}

func (es *ErrorState) ObserveRPCError(err ErrorInfo) {
vegarsti marked this conversation as resolved.
Show resolved Hide resolved
es.RPCErrorCount++
err.Timestamp = time.Now()

// If we have filled the slice, remove the oldest error
if len(es.RPCErrors) == cap(es.RPCErrors) {
tmp := make([]ErrorInfo, len(es.RPCErrors)-1, cap(es.RPCErrors))
copy(tmp, es.RPCErrors[1:])
es.RPCErrors = tmp
}
es.RPCErrors = append(es.RPCErrors, err)
}

func (es *ErrorState) ObserveDuneError(err ErrorInfo) {
es.DuneErrorCount++
err.Timestamp = time.Now()

// If we have filled the slice, remove the oldest error
if len(es.DuneErrors) == cap(es.DuneErrors) {
tmp := make([]ErrorInfo, len(es.DuneErrors)-1, cap(es.DuneErrors))
copy(tmp, es.DuneErrors[1:])
es.DuneErrors = tmp
}
es.DuneErrors = append(es.DuneErrors, err)
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Error error
}
52 changes: 52 additions & 0 deletions ingester/models_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ingester_test

import (
"testing"

"github.com/duneanalytics/blockchain-ingester/ingester"
"github.com/go-errors/errors"
"github.com/stretchr/testify/require"
)

// TestInfoErrors ensures that we never allow the error slices to grow indefinitely
func TestInfoErrors(t *testing.T) {
info := ingester.NewInfo("test", "test")
for j := 0; j < 2; j++ {
for i := 0; i < 200; i++ {
require.Len(t, info.Errors.RPCErrors, min(i, 100))
require.Len(t, info.Errors.DuneErrors, min(i, 100))
info.Errors.ObserveDuneError(ingester.ErrorInfo{})
info.Errors.ObserveRPCError(ingester.ErrorInfo{})
require.Equal(t, 100, cap(info.Errors.RPCErrors))
require.Equal(t, 100, cap(info.Errors.DuneErrors))
}
info.ResetErrors()
require.Len(t, info.Errors.RPCErrors, 0)
require.Len(t, info.Errors.DuneErrors, 0)
require.Equal(t, 100, cap(info.Errors.RPCErrors))
require.Equal(t, 100, cap(info.Errors.DuneErrors))
}
}

func TestProgressReportErrors(t *testing.T) {
info := ingester.NewInfo("test", "test")
info.Errors.ObserveDuneError(ingester.ErrorInfo{Error: errors.New("foo")})
info.Errors.ObserveRPCError(ingester.ErrorInfo{Error: errors.New("bar")})
errors := info.ProgressReportErrors()
require.Len(t, errors, 2)
}

func TestInfoToProgressReport(t *testing.T) {
info := ingester.NewInfo("test", "test")
info.IngestedBlockNumber = 1
info.LatestBlockNumber = 2
info.Errors.ObserveDuneError(ingester.ErrorInfo{Error: errors.New("foo")})
report := info.ToProgressReport()
require.Equal(t, "test", report.BlockchainName)
require.Equal(t, "test", report.EVMStack)
require.Equal(t, int64(1), report.LastIngestedBlockNumber)
require.Equal(t, int64(2), report.LatestBlockNumber)
require.Len(t, report.Errors, 1)
require.Equal(t, 1, report.DuneErrorCounts)
require.Equal(t, 0, report.RPCErrorCounts)
}
7 changes: 3 additions & 4 deletions ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock
}

if block.Errored() {
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
i.info.Errors.ObserveRPCError(ErrorInfo{
BlockNumbers: fmt.Sprintf("%d", block.BlockNumber),
Error: block.Error,
})
Expand Down Expand Up @@ -125,8 +124,8 @@ func (i *ingester) trySendBlockBatch(
for i, block := range blockBatch {
blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),

i.info.Errors.ObserveDuneError(ErrorInfo{
Error: err,
BlockNumbers: strings.Join(blocknumbers, ","),
})
Expand Down
3 changes: 3 additions & 0 deletions models/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type BlockchainIndexProgress struct {
LastIngestedBlockNumber int64
LatestBlockNumber int64
Errors []BlockchainIndexError
DuneErrorCounts int
RPCErrorCounts int
Since time.Time
}

type BlockchainIndexError struct {
Expand Down
Loading