diff --git a/.gitignore b/.gitignore index 9424251..f2eccb6 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,7 @@ go.work go.work.sum # Binary -indexer \ No newline at end of file +indexer +bin + +.idea diff --git a/client/duneapi/client.go b/client/duneapi/client.go index c75aa5c..29acf1e 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -33,6 +33,8 @@ type BlockchainIngester interface { // PostProgressReport sends a progress report to DuneAPI PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error + GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) + // - API to discover the latest block number ingested // this can also provide "next block ranges" to push to DuneAPI // - log/metrics on catching up/falling behind, distance from tip of chain @@ -366,3 +368,76 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex } return progress, nil } + +func (c *client) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) { + var response BlockchainGapsResponse + var err error + var responseStatus string + start := time.Now() + + // Log response + defer func() { + if err != nil { + c.log.Error("Getting block gaps failed", + "error", err, + "statusCode", responseStatus, + "duration", time.Since(start), + ) + } else { + c.log.Info("Got block gaps", + "blockGaps", response.String(), + "duration", time.Since(start), + ) + } + }() + + url := fmt.Sprintf("%s/api/beta/blockchain/%s/gaps", c.cfg.URL, c.cfg.BlockchainName) + c.log.Debug("Sending request", "url", url) + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body + if err != nil { + return nil, err + } + req.Header.Set("x-dune-api-key", c.cfg.APIKey) + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + bs, _ := io.ReadAll(resp.Body) + responseBody := string(bs) + // We mutate the global err here because we have deferred a log message where we check for non-nil err + err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody) + return nil, err + } + + err = json.Unmarshal(responseBody, &response) + if err != nil { + return nil, err + } + + gaps := &models.BlockchainGaps{ + Gaps: mapSlice(response.Gaps, func(gap BlockGap) models.BlockGap { + return models.BlockGap{ + FirstMissing: gap.FirstMissing, + LastMissing: gap.LastMissing, + } + }), + } + return gaps, nil +} + +func mapSlice[T any, U any](slice []T, mapper func(T) U) []U { + result := make([]U, len(slice)) + for i, v := range slice { + result[i] = mapper(v) + } + return result +} diff --git a/client/duneapi/models.go b/client/duneapi/models.go index d9ad542..0a1e12e 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -83,3 +83,17 @@ type BlockchainError struct { Error string `json:"error"` Source string `json:"source"` } + +type BlockchainGapsResponse struct { + Gaps []BlockGap `json:"gaps"` +} + +// BlockGap declares an inclusive range of missing block numbers +type BlockGap struct { + FirstMissing int64 `json:"first_missing"` + LastMissing int64 `json:"last_missing"` +} + +func (b *BlockchainGapsResponse) String() string { + return fmt.Sprintf("%+v", *b) +} diff --git a/cmd/main.go b/cmd/main.go index b1c7e86..c0070b2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,6 +5,7 @@ package main import ( "context" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" stdlog "log" "log/slog" "os" @@ -101,6 +102,17 @@ func main() { startBlockNumber = cfg.BlockHeight } + dlqBlockNumbers := dlq.NewDLQWithDelay[int64](dlq.RetryDelayLinear(cfg.DLQRetryInterval)) + + if !cfg.DisableGapsQuery { + blockGaps, err := duneClient.GetBlockGaps(ctx) + if err != nil { + stdlog.Fatal(err) + } else { + ingester.AddBlockGaps(dlqBlockNumbers, blockGaps.Gaps) + } + } + maxCount := int64(0) // 0 means ingest until cancelled ingester := ingester.New( logger, @@ -110,12 +122,14 @@ func main() { MaxConcurrentRequests: cfg.RPCConcurrency, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, + PollDLQInterval: cfg.PollDLQInterval, Stack: cfg.RPCStack, BlockchainName: cfg.BlockchainName, BlockSubmitInterval: cfg.BlockSubmitInterval, SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks, }, progress, + dlqBlockNumbers, ) wg.Add(1) diff --git a/config/config.go b/config/config.go index 8259943..edd8d99 100644 --- a/config/config.go +++ b/config/config.go @@ -42,11 +42,14 @@ func (r RPCClient) HasError() error { } type Config struct { - BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll - BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll - DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll + BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll + BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll + DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll + DisableGapsQuery bool `long:"disable-gaps-query" env:"DISABLE_GAPS_QUERY" description:"disable gaps query used to populate the initial DLQ"` // nolint:lll Dune DuneClient PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll + PollDLQInterval time.Duration `long:"dlq-poll-interval" env:"DLQ_POLL_INTERVAL" description:"Interval to poll the dlq" default:"300ms"` // nolint:lll + DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll diff --git a/go.mod b/go.mod index c506fe2..26973a4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/duneanalytics/blockchain-ingester go 1.22.2 require ( + github.com/emirpasic/gods v1.18.1 github.com/go-errors/errors v1.5.1 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/jessevdk/go-flags v1.5.0 diff --git a/go.sum b/go.sum index 7ee828f..1835ce7 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= diff --git a/ingester/ingester.go b/ingester/ingester.go index 82925fd..ce9f16c 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" "log/slog" "time" @@ -31,20 +32,26 @@ type Ingester interface { // - a fatal error occurs SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error - // This is just a placeholder for now - Info() Info + ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.DLQItem[int64]) error + + FetchBlockLoopDLQ(ctx context.Context, + blockNumbers <-chan dlq.DLQItem[int64], + blocks chan<- dlq.DLQItem[models.RPCBlock], + ) error + + SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.DLQItem[models.RPCBlock]) error Close() error } const ( - defaultMaxBatchSize = 5 defaultReportProgressInterval = 30 * time.Second ) type Config struct { MaxConcurrentRequests int PollInterval time.Duration + PollDLQInterval time.Duration ReportProgressInterval time.Duration Stack models.EVMStack BlockchainName string @@ -58,6 +65,7 @@ type ingester struct { dune duneapi.BlockchainIngester cfg Config info Info + dlq *dlq.DLQ[int64] } func New( @@ -66,6 +74,7 @@ func New( dune duneapi.BlockchainIngester, cfg Config, progress *models.BlockchainIndexProgress, + dlq *dlq.DLQ[int64], ) Ingester { info := NewInfo(cfg.BlockchainName, cfg.Stack.String()) if progress != nil { @@ -78,13 +87,10 @@ func New( dune: dune, cfg: cfg, info: info, + dlq: dlq, } if ing.cfg.ReportProgressInterval == 0 { ing.cfg.ReportProgressInterval = defaultReportProgressInterval } return ing } - -func (i *ingester) Info() Info { - return i.info -} diff --git a/ingester/mainloop.go b/ingester/mainloop.go index fb73172..4ea4896 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -3,6 +3,9 @@ package ingester import ( "context" "fmt" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" + "github.com/emirpasic/gods/utils" + "slices" "sync/atomic" "time" @@ -48,6 +51,23 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int return i.SendBlocks(ctx, blocks, startBlockNumber) }) + // Start DLQ processing + blockNumbersDLQ := make(chan dlq.DLQItem[int64]) + defer close(blockNumbersDLQ) + + blocksDLQ := make(chan dlq.DLQItem[models.RPCBlock]) + defer close(blocksDLQ) + + errGroup.Go(func() error { + return i.SendBlocksDLQ(ctx, blocksDLQ) + }) + errGroup.Go(func() error { + return i.FetchBlockLoopDLQ(ctx, blockNumbersDLQ, blocksDLQ) + }) + errGroup.Go(func() error { + return i.ProduceBlockNumbersDLQ(ctx, blockNumbersDLQ) + }) + // Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever endBlockNumber := startBlockNumber - 1 + maxCount i.log.Info("Starting ingester", @@ -131,7 +151,7 @@ func (i *ingester) FetchBlockLoop( i.log.Error("Failed to get block by number", "blockNumber", blockNumber, - "continueing", i.cfg.SkipFailedBlocks, + "continuing", i.cfg.SkipFailedBlocks, "elapsed", time.Since(startTime), "error", err, ) @@ -223,6 +243,104 @@ func (i *ingester) ReportProgress(ctx context.Context) error { } } +func (i *ingester) ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.DLQItem[int64]) error { + for { + select { + case <-ctx.Done(): + i.log.Debug("ProduceBlockNumbersDLQ: Context canceled, stopping") + return ctx.Err() + default: + block, ok := i.dlq.GetNextItem() + if ok { + i.log.Debug("ProduceBlockNumbersDLQ: Reprocessing block", "block", block) + outChan <- *block + } else { + i.log.Debug("ProduceBlockNumbersDLQ: No eligible blocks in the DLQ so sleeping ") + time.Sleep(i.cfg.PollDLQInterval) // Polling interval when DLQ is empty + } + } + } +} + +func (i *ingester) FetchBlockLoopDLQ(ctx context.Context, blockNumbers <-chan dlq.DLQItem[int64], + blocks chan<- dlq.DLQItem[models.RPCBlock], +) error { + for { + select { + case <-ctx.Done(): + i.log.Info("FetchBlockLoopDLQ: context is done") + return ctx.Err() + case blockNumber := <-blockNumbers: + startTime := time.Now() + block, err := i.node.BlockByNumber(ctx, blockNumber.Value) + if err != nil { + if errors.Is(err, context.Canceled) { + i.log.Error("FetchBlockLoopDLQ: Context canceled, stopping") + return ctx.Err() + } + i.log.Error("FetchBlockLoopDLQ: Failed to get block by number", + "blockNumber", blockNumber, + "elapsed", time.Since(startTime), + "error", err, + ) + blocks <- dlq.MapDLQItem(blockNumber, func(blockNumber int64) models.RPCBlock { + return models.RPCBlock{BlockNumber: blockNumber, Error: err} + }) + continue + } + getBlockElapsed := time.Since(startTime) + select { + case <-ctx.Done(): + i.log.Debug("FetchBlockLoopDLQ: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) + return ctx.Err() + case blocks <- dlq.MapDLQItem(blockNumber, func(_ int64) models.RPCBlock { + return block + }): + i.log.Debug( + "FetchBlockLoopDLQ: Got and sent block", + "blockNumber", blockNumber, + "getBlockElapsed", getBlockElapsed, + ) + } + } + } +} + +func (i *ingester) SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.DLQItem[models.RPCBlock]) error { + i.log.Debug("SendBlocksDLQ: Starting to receive blocks") + for { + select { + case <-ctx.Done(): + i.log.Debug("SendBlocksDLQ: Context canceled, stopping") + return ctx.Err() + case block, ok := <-blocks: + if !ok { + i.log.Debug("SendBlocksDLQ: Channel is closed, returning") + return nil + } + if block.Value.Errored() { + i.dlq.AddItem(block.Value.BlockNumber, block.Retries) + i.log.Error("Received FAILED block", "number", block.Value.BlockNumber) + // TODO: report error once ErrorState struct is made thread-safe + } else { + i.log.Debug( + "SendBlocksDLQ: Received block", + "blockNumber", block.Value.BlockNumber, + ) + if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block.Value}); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("SendBlocksDLQ: Context canceled, stopping") + return ctx.Err() + } + i.log.Error("SendBlocksDLQ: Failed to send block, requeueing...", "block", block.Value.BlockNumber, "error", err) + i.dlq.AddItem(block.Value.BlockNumber, block.Retries) + // TODO: report error once ErrorState struct is made thread-safe + } + } + } + } +} + func (i *ingester) Close() error { // Send a final progress report to flush progress ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -237,3 +355,16 @@ func (i *ingester) Close() error { return i.node.Close() } + +func AddBlockGaps(dlq *dlq.DLQ[int64], gaps []models.BlockGap) { + // queue these in reverse so that recent blocks are retried first + slices.SortFunc(gaps, func(a, b models.BlockGap) int { + return -utils.Int64Comparator(a.FirstMissing, b.FirstMissing) + }) + + for _, gap := range gaps { + for i := gap.FirstMissing; i <= gap.LastMissing; i++ { + dlq.AddItem(i, 0) + } + } +} diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index d65e6a4..d3a3220 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -2,6 +2,7 @@ package ingester_test import ( "context" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" "io" "log/slog" "math/rand" @@ -79,6 +80,7 @@ func TestRunUntilCancel(t *testing.T) { SkipFailedBlocks: false, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(ctx, 1, -1) // run until canceled @@ -114,6 +116,7 @@ func TestProduceBlockNumbers(t *testing.T) { BlockSubmitInterval: time.Nanosecond, }, nil, // progress + dlq.NewDLQ[int64](), ) blockNumbers := make(chan int64) var wg sync.WaitGroup @@ -159,6 +162,7 @@ func TestSendBlocks(t *testing.T) { BlockSubmitInterval: time.Nanosecond, }, nil, // progress + dlq.NewDLQ[int64](), ) blocks := make(chan models.RPCBlock) @@ -253,6 +257,7 @@ func TestRunBlocksOutOfOrder(t *testing.T) { SkipFailedBlocks: false, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(ctx, 1, -1) // run until canceled @@ -300,6 +305,7 @@ func TestRunRPCNodeFails(t *testing.T) { SkipFailedBlocks: false, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(ctx, 1, -1) // run until canceled @@ -317,8 +323,171 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) { MaxConcurrentRequests: 0, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(context.Background(), 1, -1) // run until canceled require.ErrorContains(t, err, "MaxConcurrentRequests must be > 0") } + +func TestRunWithDLQ(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(1000) + startBlockNumber := int64(10) + + // Initial DLQ + dlqBlockNumbers := dlq.NewDLQWithDelay[int64](dlq.RetryDelayLinear(time.Duration(10) * time.Millisecond)) + gaps := []models.BlockGap{ + { + FirstMissing: 9, + LastMissing: 9, + }, { + FirstMissing: 3, + LastMissing: 7, + }, { + FirstMissing: 0, + LastMissing: 0, + }, + } + ingester.AddBlockGaps(dlqBlockNumbers, gaps) + + // blockNumber int64 -> timesSubmitted int + var blocksIndexed sync.Map + // Prepopulate expected blocks + for i := int64(0); i < maxBlockNumber; i++ { + blocksIndexed.Store(i, 0) + } + // Add those that aren't considered as previous gaps + incrementAndGet(&blocksIndexed, int64(1)) + incrementAndGet(&blocksIndexed, int64(2)) + incrementAndGet(&blocksIndexed, int64(8)) + + // Dune API Mocking + var sendBlocksRequests sync.Map + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) == 0 { + return nil + } + // Count Requests by block number + for _, block := range blocks { + incrementAndGet(&sendBlocksRequests, block.BlockNumber) + } + + // Fail if this batch contains a block number that hasn't been requested at least twice before this call + for _, block := range blocks { + requests, _ := sendBlocksRequests.Load(block.BlockNumber) + if requests.(int) <= 2 { + return errors.Errorf("failing batch due to %v having only been requested %v times", + block.BlockNumber, requests) + } + } + + // Count blocks as indexed by block number + for _, block := range blocks { + incrementAndGet(&blocksIndexed, block.BlockNumber) + } + + // Look for gaps + if !duneStoreContainsGaps(&blocksIndexed, maxBlockNumber) { + // cancel execution when we have sent all blocks + cancel() + return context.Canceled + } + + return nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + + // RPC Mocking + var rpcBlocksRequests sync.Map + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + incrementAndGet(&rpcBlocksRequests, blockNumber) + + // Fail every 10th block numbers the first 2 times + if blockNumber%10 == 0 { + requests, _ := rpcBlocksRequests.Load(blockNumber) + if requests.(int) <= 2 { + return models.RPCBlock{}, + errors.Errorf("failing rpc request due to %v having only been requested %v times", + blockNumber, requests) + } + } + + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + CloseFunc: func() error { + return nil + }, + } + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + MaxConcurrentRequests: 10, + SkipFailedBlocks: true, + }, + nil, // progress + dlqBlockNumbers, + ) + + err := ing.Run(ctx, startBlockNumber, -1) // run until canceled + require.False(t, duneStoreContainsGaps(&blocksIndexed, maxBlockNumber)) + require.GreaterOrEqual(t, lenSyncMap(&blocksIndexed), int(maxBlockNumber)) + require.ErrorIs(t, err, context.Canceled) // this is expected +} + +func duneStoreContainsGaps(blocksIndexed *sync.Map, maxBlockNumber int64) bool { + containsGap := false + blocksIndexed.Range(func(key, value any) bool { + blockNumber := key.(int64) + count := value.(int) + if blockNumber <= maxBlockNumber && count < 1 { + containsGap = true + return false + } + return true + }) + return containsGap +} + +func incrementAndGet(m *sync.Map, key interface{}) int { + for { + // Load the current value associated with the key else initialise + currentValue, _ := m.LoadOrStore(key, 0) + + // Increment the current value. + newValue := currentValue.(int) + 1 + + // Attempt to store the new value back into the sync.Map. Compare-and-swap (CAS) approach ensures atomicity. + if m.CompareAndSwap(key, currentValue, newValue) { + // If the swap succeeded, return the new value. + return newValue + } + // If the swap failed, it means the value was updated by another goroutine. Retry the operation. + } +} + +func lenSyncMap(m *sync.Map) int { + length := 0 + m.Range(func(_, _ interface{}) bool { + length++ + return true + }) + return length +} diff --git a/ingester/send.go b/ingester/send.go index 8b69e01..9ca50c1 100644 --- a/ingester/send.go +++ b/ingester/send.go @@ -91,6 +91,9 @@ func (i *ingester) trySendBlockBatch( for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] { // Skip Failed block if we're configured to skip Failed blocks if i.cfg.SkipFailedBlocks && block.Errored() { + i.log.Error("SendBlocks: RPCBlock has an error, requeueing...", "block", block.BlockNumber, "error", block.Error) + i.dlq.AddItem(block.BlockNumber, 0) + delete(collectedBlocks, nextBlockToSend) nextBlockToSend++ continue } @@ -110,7 +113,7 @@ func (i *ingester) trySendBlockBatch( // Send the batch lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber - if lastBlockNumber != nextBlockToSend-1 { + if !i.cfg.SkipFailedBlocks && lastBlockNumber != nextBlockToSend-1 { panic("unexpected last block number") } if err := i.dune.SendBlocks(ctx, blockBatch); err != nil { @@ -119,17 +122,25 @@ func (i *ingester) trySendBlockBatch( return nextBlockToSend, nil } - // Store error for reporting - blocknumbers := make([]string, len(blockBatch)) - for i, block := range blockBatch { - blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber) + i.log.Error("SendBlocks: Failed to send batch, requeueing...", + "firstBlockInBatch", blockBatch[0].BlockNumber, + "lastBlockInBatch", lastBlockNumber, "error", err) + blockNumbers := make([]string, len(blockBatch)) + for n, block := range blockBatch { + i.dlq.AddItem(block.BlockNumber, 0) + blockNumbers[n] = fmt.Sprintf("%d", block.BlockNumber) } i.info.Errors.ObserveDuneError(ErrorInfo{ Error: err, - BlockNumbers: strings.Join(blocknumbers, ","), + BlockNumbers: strings.Join(blockNumbers, ","), }) + if i.cfg.SkipFailedBlocks { + i.log.Error("SendBlocks: Failed to send batch, continuing", "error", err) + return nextBlockToSend, nil + } + err := errors.Errorf("failed to send batch: %w", err) i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) return nextBlockToSend, err diff --git a/lib/dlq/dlq.go b/lib/dlq/dlq.go new file mode 100644 index 0000000..e20fcee --- /dev/null +++ b/lib/dlq/dlq.go @@ -0,0 +1,81 @@ +package dlq + +import ( + "fmt" + "sync" + "time" + + pq "github.com/emirpasic/gods/queues/priorityqueue" + "github.com/emirpasic/gods/utils" +) + +type DLQ[T any] struct { + priorityQueue pq.Queue // structure not thread safe + mutex sync.Mutex + retryDelay func(retries int) time.Duration +} + +// DLQItem This is generic so that metadata about retries can be maintained in an envelope during processing for when +// an item needs to make its way back onto the DLQ later +type DLQItem[T any] struct { + Value T + Retries int + nextRunTime time.Time +} + +func (b *DLQItem[T]) String() string { + return fmt.Sprintf("%+v", *b) +} + +func MapDLQItem[T, U any](b DLQItem[T], mapper func(T) U) DLQItem[U] { + return DLQItem[U]{ + Value: mapper(b.Value), + Retries: b.Retries, + nextRunTime: b.nextRunTime, + } +} + +func NewDLQ[T any]() *DLQ[T] { + return NewDLQWithDelay[T](RetryDelayLinear(time.Minute)) +} + +func RetryDelayLinear(backoff time.Duration) func(retries int) time.Duration { + return func(retries int) time.Duration { + return time.Duration(retries) * backoff // retries must be converted to a Duration for multiplication + } +} + +func NewDLQWithDelay[T any](retryDelay func(retries int) time.Duration) *DLQ[T] { + return &DLQ[T]{priorityQueue: *pq.NewWith(byNextRunTime), retryDelay: retryDelay} +} + +// Comparator function (sort by nextRunTime in ascending order) +func byNextRunTime(a, b interface{}) int { + return utils.TimeComparator(a.(DLQItem[int64]).nextRunTime, b.(DLQItem[int64]).nextRunTime) +} + +func (dlq *DLQ[T]) AddItem(item T, retries int) { + nextRunTime := time.Now().Add(dlq.retryDelay(retries + 1)) + + dlq.mutex.Lock() + defer dlq.mutex.Unlock() + + dlq.priorityQueue.Enqueue(DLQItem[T]{Value: item, Retries: retries + 1, nextRunTime: nextRunTime}) +} + +func (dlq *DLQ[T]) GetNextItem() (value *DLQItem[T], ok bool) { + dlq.mutex.Lock() + defer dlq.mutex.Unlock() + + peek, ok := dlq.priorityQueue.Peek() + if !ok || peek.(DLQItem[T]).nextRunTime.After(time.Now()) { + return nil, false + } + + item, ok := dlq.priorityQueue.Dequeue() + if ok { + itemCasted := item.(DLQItem[T]) + return &itemCasted, ok + } + return nil, ok +} diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go index 64d979a..973a3f7 100644 --- a/mocks/duneapi/client.go +++ b/mocks/duneapi/client.go @@ -20,6 +20,9 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // // make and configure a mocked duneapi.BlockchainIngester // mockedBlockchainIngester := &BlockchainIngesterMock{ +// GetBlockGapsFunc: func(ctx context.Context) (*models.BlockchainGaps, error) { +// panic("mock out the GetBlockGaps method") +// }, // GetProgressReportFunc: func(ctx context.Context) (*models.BlockchainIndexProgress, error) { // panic("mock out the GetProgressReport method") // }, @@ -36,6 +39,9 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // } type BlockchainIngesterMock struct { + // GetBlockGapsFunc mocks the GetBlockGaps method. + GetBlockGapsFunc func(ctx context.Context) (*models.BlockchainGaps, error) + // GetProgressReportFunc mocks the GetProgressReport method. GetProgressReportFunc func(ctx context.Context) (*models.BlockchainIndexProgress, error) @@ -47,6 +53,11 @@ type BlockchainIngesterMock struct { // calls tracks calls to the methods. calls struct { + // GetBlockGaps holds details about calls to the GetBlockGaps method. + GetBlockGaps []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } // GetProgressReport holds details about calls to the GetProgressReport method. GetProgressReport []struct { // Ctx is the ctx argument value. @@ -67,11 +78,44 @@ type BlockchainIngesterMock struct { Payloads []models.RPCBlock } } + lockGetBlockGaps sync.RWMutex lockGetProgressReport sync.RWMutex lockPostProgressReport sync.RWMutex lockSendBlocks sync.RWMutex } +// GetBlockGaps calls GetBlockGapsFunc. +func (mock *BlockchainIngesterMock) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) { + if mock.GetBlockGapsFunc == nil { + panic("BlockchainIngesterMock.GetBlockGapsFunc: method is nil but BlockchainIngester.GetBlockGaps was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockGetBlockGaps.Lock() + mock.calls.GetBlockGaps = append(mock.calls.GetBlockGaps, callInfo) + mock.lockGetBlockGaps.Unlock() + return mock.GetBlockGapsFunc(ctx) +} + +// GetBlockGapsCalls gets all the calls that were made to GetBlockGaps. +// Check the length with: +// +// len(mockedBlockchainIngester.GetBlockGapsCalls()) +func (mock *BlockchainIngesterMock) GetBlockGapsCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockGetBlockGaps.RLock() + calls = mock.calls.GetBlockGaps + mock.lockGetBlockGaps.RUnlock() + return calls +} + // GetProgressReport calls GetProgressReportFunc. func (mock *BlockchainIngesterMock) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) { if mock.GetProgressReportFunc == nil { diff --git a/models/gaps.go b/models/gaps.go new file mode 100644 index 0000000..cc17645 --- /dev/null +++ b/models/gaps.go @@ -0,0 +1,10 @@ +package models + +type BlockchainGaps struct { + Gaps []BlockGap +} + +type BlockGap struct { + FirstMissing int64 + LastMissing int64 +}