diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 932e56a..79d8b6d 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -214,7 +214,7 @@ func (c *client) Close() error { } func (c *client) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error { - var request BlockchainProgress + var request PostBlockchainProgressRequest var err error var responseStatus string var responseBody string @@ -234,14 +234,25 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch c.log.Info("Sent progress report", "lastIngestedBlockNumber", request.LastIngestedBlockNumber, "latestBlockNumber", request.LatestBlockNumber, + "errors", len(request.Errors), "duration", time.Since(start), ) } }() - request = BlockchainProgress{ + errors := make([]BlockchainError, len(progress.Errors)) + for i, e := range progress.Errors { + errors[i] = BlockchainError{ + Timestamp: e.Timestamp, + BlockNumbers: e.BlockNumbers, + Error: e.Error, + Source: e.Source, + } + } + request = PostBlockchainProgressRequest{ LastIngestedBlockNumber: progress.LastIngestedBlockNumber, LatestBlockNumber: progress.LatestBlockNumber, + Errors: errors, } url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) payload, err := json.Marshal(request) @@ -275,7 +286,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch } func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) { - var response BlockchainProgress + var response GetBlockchainProgressResponse var err error var responseStatus string start := time.Now() diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 2782bc5..f1e79a1 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -4,6 +4,7 @@ import ( "fmt" "sort" "strings" + "time" "github.com/duneanalytics/blockchain-ingester/models" ) @@ -58,11 +59,24 @@ type BlockchainIngestRequest struct { Payload []byte } -type BlockchainProgress struct { +type GetBlockchainProgressResponse struct { LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"` LatestBlockNumber int64 `json:"latest_block_number,omitempty"` } -func (p *BlockchainProgress) String() string { +func (p *GetBlockchainProgressResponse) String() string { return fmt.Sprintf("%+v", *p) } + +type PostBlockchainProgressRequest struct { + LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"` + LatestBlockNumber int64 `json:"latest_block_number,omitempty"` + Errors []BlockchainError `json:"errors,omitempty"` +} + +type BlockchainError struct { + Timestamp time.Time `json:"timestamp"` + BlockNumbers string `json:"block_numbers"` + Error string `json:"error"` + Source string `json:"source"` +} diff --git a/ingester/ingester.go b/ingester/ingester.go index e9eeea0..378b095 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -61,6 +61,28 @@ type Info struct { DuneErrors []ErrorInfo } +// Errors returns a combined list of errors from RPC requests and Dune requests, for use in progress reporting +func (info Info) Errors() []models.BlockchainIndexError { + errors := make([]models.BlockchainIndexError, 0, len(info.RPCErrors)+len(info.DuneErrors)) + for _, e := range info.RPCErrors { + errors = append(errors, models.BlockchainIndexError{ + Timestamp: e.Timestamp, + BlockNumbers: e.BlockNumbers, + Error: e.Error.Error(), + Source: "rpc", + }) + } + for _, e := range info.DuneErrors { + errors = append(errors, models.BlockchainIndexError{ + Timestamp: e.Timestamp, + BlockNumbers: e.BlockNumbers, + Error: e.Error.Error(), + Source: "dune", + }) + } + return errors +} + type ErrorInfo struct { Timestamp time.Time BlockNumbers string diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 5c15d91..2bfa858 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -212,15 +212,20 @@ func (i *ingester) ReportProgress(ctx context.Context) error { previousIngested = lastIngested previousTime = tNow - // TODO: include errors in the report, reset the error list err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{ BlockchainName: i.cfg.BlockchainName, EVMStack: i.cfg.Stack.String(), LastIngestedBlockNumber: lastIngested, LatestBlockNumber: latest, + Errors: i.info.Errors(), }) if err != nil { i.log.Error("Failed to post progress report", "error", err) + } else { + i.log.Debug("Posted progress report") + // Reset errors after reporting + i.info.RPCErrors = []ErrorInfo{} + i.info.DuneErrors = []ErrorInfo{} } } } diff --git a/ingester/send.go b/ingester/send.go index f34ee0a..8d22df7 100644 --- a/ingester/send.go +++ b/ingester/send.go @@ -3,6 +3,7 @@ package ingester import ( "context" "fmt" + "strings" "sync/atomic" "time" @@ -122,7 +123,18 @@ func (i *ingester) trySendBlockBatch( i.log.Info("SendBlocks: Context canceled, stopping") return nextBlockToSend, nil } - // TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries + + // Store error for reporting + blocknumbers := make([]string, len(blockBatch)) + for i, block := range blockBatch { + blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber) + } + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + Error: err, + BlockNumbers: strings.Join(blocknumbers, ","), + }) + err := errors.Errorf("failed to send batch: %w", err) i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) return nextBlockToSend, err diff --git a/models/progress.go b/models/progress.go index 9727aa7..dc7d9c3 100644 --- a/models/progress.go +++ b/models/progress.go @@ -1,8 +1,20 @@ package models +import ( + "time" +) + type BlockchainIndexProgress struct { BlockchainName string EVMStack string LastIngestedBlockNumber int64 LatestBlockNumber int64 + Errors []BlockchainIndexError +} + +type BlockchainIndexError struct { + Timestamp time.Time + BlockNumbers string + Error string + Source string }