-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Limit the number of errors stored and reported
- Loading branch information
Showing
8 changed files
with
165 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package ingester | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/duneanalytics/blockchain-ingester/models" | ||
) | ||
|
||
type Info struct { | ||
BlockchainName string | ||
Stack string | ||
LatestBlockNumber int64 | ||
IngestedBlockNumber int64 | ||
ConsumedBlockNumber int64 | ||
Errors ErrorState | ||
Since time.Time | ||
} | ||
|
||
func NewInfo(blockchain string, stack string) Info { | ||
return Info{ | ||
BlockchainName: blockchain, | ||
Stack: stack, | ||
Errors: ErrorState{ | ||
RPCErrors: make([]ErrorInfo, 0, 100), | ||
DuneErrors: make([]ErrorInfo, 0, 100), | ||
RPCErrorCount: 0, | ||
DuneErrorCount: 0, | ||
}, | ||
Since: time.Now(), | ||
} | ||
} | ||
|
||
func (info *Info) ToProgressReport() models.BlockchainIndexProgress { | ||
return models.BlockchainIndexProgress{ | ||
BlockchainName: info.BlockchainName, | ||
EVMStack: info.Stack, | ||
LastIngestedBlockNumber: info.IngestedBlockNumber, | ||
LatestBlockNumber: info.LatestBlockNumber, | ||
Errors: info.ProgressReportErrors(), | ||
DuneErrorCounts: info.Errors.DuneErrorCount, | ||
RPCErrorCounts: info.Errors.DuneErrorCount, | ||
Since: info.Since, | ||
} | ||
} | ||
|
||
func (i *Info) ResetErrors() { | ||
i.Since = time.Now() | ||
i.Errors.Reset() | ||
} | ||
|
||
type ErrorState struct { | ||
RPCErrors []ErrorInfo | ||
DuneErrors []ErrorInfo | ||
RPCErrorCount int | ||
DuneErrorCount int | ||
} | ||
|
||
// ToProgressReportErrors returns a combined list of errors from RPC requests and Dune requests | ||
func (info Info) ProgressReportErrors() []models.BlockchainIndexError { | ||
errors := make([]models.BlockchainIndexError, 0, len(info.Errors.RPCErrors)+len(info.Errors.DuneErrors)) | ||
for _, e := range info.Errors.RPCErrors { | ||
errors = append(errors, models.BlockchainIndexError{ | ||
Timestamp: e.Timestamp, | ||
BlockNumbers: e.BlockNumbers, | ||
Error: e.Error.Error(), | ||
Source: "rpc", | ||
}) | ||
} | ||
for _, e := range info.Errors.DuneErrors { | ||
errors = append(errors, models.BlockchainIndexError{ | ||
Timestamp: e.Timestamp, | ||
BlockNumbers: e.BlockNumbers, | ||
Error: e.Error.Error(), | ||
Source: "dune", | ||
}) | ||
} | ||
return errors | ||
} | ||
|
||
func (es *ErrorState) Reset() { | ||
es.RPCErrors = es.RPCErrors[:0] | ||
es.DuneErrors = es.DuneErrors[:0] | ||
es.RPCErrorCount = 0 | ||
es.DuneErrorCount = 0 | ||
} | ||
|
||
func (es *ErrorState) ObserveRPCError(err ErrorInfo) { | ||
es.RPCErrorCount++ | ||
err.Timestamp = time.Now() | ||
|
||
// Store bounded number of error messages | ||
if len(es.RPCErrors) == cap(es.RPCErrors) { | ||
// Remove oldest error, so we can insert the new one at the end | ||
tmp := es.RPCErrors | ||
es.RPCErrors = make([]ErrorInfo, 0, len(es.RPCErrors)) | ||
copy(es.RPCErrors, tmp) | ||
} | ||
es.RPCErrors = append(es.RPCErrors, err) | ||
} | ||
|
||
func (es *ErrorState) ObserveDuneError(err ErrorInfo) { | ||
es.DuneErrorCount++ | ||
err.Timestamp = time.Now() | ||
|
||
// Store bounded number of error messages | ||
if len(es.DuneErrors) == cap(es.DuneErrors) { | ||
// Remove oldest error, so we can insert the new one at the end | ||
tmp := es.DuneErrors | ||
es.DuneErrors = make([]ErrorInfo, 0, len(es.DuneErrors)) | ||
copy(es.DuneErrors, tmp) | ||
} | ||
es.DuneErrors = append(es.DuneErrors, err) | ||
} | ||
|
||
type ErrorInfo struct { | ||
Timestamp time.Time | ||
BlockNumbers string | ||
Error error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package ingester_test | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/duneanalytics/blockchain-ingester/ingester" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
// TestInfoErrors ensures that we never allow the error slices to grow indefinitely | ||
func TestInfoErrors(t *testing.T) { | ||
info := ingester.NewInfo("test", "test") | ||
for i := 0; i < 200; i++ { | ||
fmt.Printf("i %d len %d cap %d\n", i, len(info.Errors.DuneErrors), cap(info.Errors.DuneErrors)) | ||
info.Errors.ObserveDuneError(ingester.ErrorInfo{}) | ||
info.Errors.ObserveRPCError(ingester.ErrorInfo{}) | ||
require.Equal(t, 100, cap(info.Errors.RPCErrors)) | ||
require.Equal(t, 100, cap(info.Errors.DuneErrors)) | ||
} | ||
info.ResetErrors() | ||
require.Len(t, info.Errors.RPCErrors, 0) | ||
require.Len(t, info.Errors.DuneErrors, 0) | ||
require.Equal(t, 100, cap(info.Errors.RPCErrors)) | ||
require.Equal(t, 100, cap(info.Errors.DuneErrors)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters