diff --git a/.gitignore b/.gitignore index 9424251..f2eccb6 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,7 @@ go.work go.work.sum # Binary -indexer \ No newline at end of file +indexer +bin + +.idea diff --git a/client/duneapi/client.go b/client/duneapi/client.go index c75aa5c..29acf1e 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -33,6 +33,8 @@ type BlockchainIngester interface { // PostProgressReport sends a progress report to DuneAPI PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error + GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) + // - API to discover the latest block number ingested // this can also provide "next block ranges" to push to DuneAPI // - log/metrics on catching up/falling behind, distance from tip of chain @@ -366,3 +368,76 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex } return progress, nil } + +func (c *client) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) { + var response BlockchainGapsResponse + var err error + var responseStatus string + start := time.Now() + + // Log response + defer func() { + if err != nil { + c.log.Error("Getting block gaps failed", + "error", err, + "statusCode", responseStatus, + "duration", time.Since(start), + ) + } else { + c.log.Info("Got block gaps", + "blockGaps", response.String(), + "duration", time.Since(start), + ) + } + }() + + url := fmt.Sprintf("%s/api/beta/blockchain/%s/gaps", c.cfg.URL, c.cfg.BlockchainName) + c.log.Debug("Sending request", "url", url) + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body + if err != nil { + return nil, err + } + req.Header.Set("x-dune-api-key", c.cfg.APIKey) + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + bs, _ := io.ReadAll(resp.Body) + responseBody := string(bs) + // We mutate the global err here because we have deferred a log message where we check for non-nil err + err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody) + return nil, err + } + + err = json.Unmarshal(responseBody, &response) + if err != nil { + return nil, err + } + + gaps := &models.BlockchainGaps{ + Gaps: mapSlice(response.Gaps, func(gap BlockGap) models.BlockGap { + return models.BlockGap{ + FirstMissing: gap.FirstMissing, + LastMissing: gap.LastMissing, + } + }), + } + return gaps, nil +} + +func mapSlice[T any, U any](slice []T, mapper func(T) U) []U { + result := make([]U, len(slice)) + for i, v := range slice { + result[i] = mapper(v) + } + return result +} diff --git a/client/duneapi/models.go b/client/duneapi/models.go index d9ad542..0a1e12e 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -83,3 +83,17 @@ type BlockchainError struct { Error string `json:"error"` Source string `json:"source"` } + +type BlockchainGapsResponse struct { + Gaps []BlockGap `json:"gaps"` +} + +// BlockGap declares an inclusive range of missing block numbers +type BlockGap struct { + FirstMissing int64 `json:"first_missing"` + LastMissing int64 `json:"last_missing"` +} + +func (b *BlockchainGapsResponse) String() string { + return fmt.Sprintf("%+v", *b) +} diff --git a/cmd/main.go b/cmd/main.go index b1c7e86..dd15eae 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/duneanalytics/blockchain-ingester/client/jsonrpc" "github.com/duneanalytics/blockchain-ingester/config" "github.com/duneanalytics/blockchain-ingester/ingester" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" "github.com/duneanalytics/blockchain-ingester/models" ) @@ -59,6 +60,19 @@ func main() { } defer duneClient.Close() + // Create an extra Dune API client for DLQ processing since it is not thread-safe yet + duneClientDLQ, err := duneapi.New(logger, duneapi.Config{ + APIKey: cfg.Dune.APIKey, + URL: cfg.Dune.URL, + BlockchainName: cfg.BlockchainName, + Stack: cfg.RPCStack, + DisableCompression: cfg.DisableCompression, + }) + if err != nil { + stdlog.Fatal(err) + } + defer duneClientDLQ.Close() + var wg stdsync.WaitGroup var rpcClient jsonrpc.BlockchainClient @@ -101,21 +115,37 @@ func main() { startBlockNumber = cfg.BlockHeight } + dlqBlockNumbers := dlq.NewDLQWithDelay[int64](dlq.RetryDelayLinear(cfg.DLQRetryInterval)) + + if !cfg.DisableGapsQuery { + blockGaps, err := duneClient.GetBlockGaps(ctx) + if err != nil { + stdlog.Fatal(err) + } else { + ingester.AddBlockGaps(dlqBlockNumbers, blockGaps.Gaps) + } + } + maxCount := int64(0) // 0 means ingest until cancelled ingester := ingester.New( logger, rpcClient, duneClient, + duneClientDLQ, ingester.Config{ - MaxConcurrentRequests: cfg.RPCConcurrency, - ReportProgressInterval: cfg.ReportProgressInterval, - PollInterval: cfg.PollInterval, - Stack: cfg.RPCStack, - BlockchainName: cfg.BlockchainName, - BlockSubmitInterval: cfg.BlockSubmitInterval, - SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks, + MaxConcurrentRequests: cfg.RPCConcurrency, + MaxConcurrentRequestsDLQ: cfg.DLQConcurrency, + ReportProgressInterval: cfg.ReportProgressInterval, + PollInterval: cfg.PollInterval, + PollDLQInterval: cfg.PollDLQInterval, + Stack: cfg.RPCStack, + BlockchainName: cfg.BlockchainName, + BlockSubmitInterval: cfg.BlockSubmitInterval, + SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks, + DLQOnly: cfg.DLQOnly, }, progress, + dlqBlockNumbers, ) wg.Add(1) diff --git a/config/config.go b/config/config.go index 8259943..94d2f9d 100644 --- a/config/config.go +++ b/config/config.go @@ -42,15 +42,20 @@ func (r RPCClient) HasError() error { } type Config struct { - BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll - BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll - DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll + BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll + BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll + DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll + DisableGapsQuery bool `long:"disable-gaps-query" env:"DISABLE_GAPS_QUERY" description:"disable gaps query used to populate the initial DLQ"` // nolint:lll + DLQOnly bool `long:"dlq-only" env:"DLQ_ONLY" description:"Runs just the DLQ processing on its own"` // nolint:lll Dune DuneClient PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll + PollDLQInterval time.Duration `long:"dlq-poll-interval" env:"DLQ_POLL_INTERVAL" description:"Interval to poll the dlq" default:"300ms"` // nolint:lll + DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll + DLQConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent requests to the RPC node for DLQ processing" default:"2"` // nolint:lll BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll } diff --git a/go.mod b/go.mod index c506fe2..26973a4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/duneanalytics/blockchain-ingester go 1.22.2 require ( + github.com/emirpasic/gods v1.18.1 github.com/go-errors/errors v1.5.1 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/jessevdk/go-flags v1.5.0 diff --git a/go.sum b/go.sum index 7ee828f..1835ce7 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= diff --git a/ingester/ingester.go b/ingester/ingester.go index 82925fd..16dec59 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -5,6 +5,8 @@ import ( "log/slog" "time" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" + "github.com/duneanalytics/blockchain-ingester/client/duneapi" "github.com/duneanalytics/blockchain-ingester/client/jsonrpc" "github.com/duneanalytics/blockchain-ingester/models" @@ -24,48 +26,67 @@ type Ingester interface { // It can safely be run concurrently. FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error - // SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop + // SendBlocks consumes RPCBlocks from the channel, reorders them, and sends batches to DuneAPI in an endless loop // it will block until: // - the context is cancelled // - channel is closed // - a fatal error occurs SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error - // This is just a placeholder for now - Info() Info + // ProduceBlockNumbersDLQ sends block numbers from the DLQ to outChan. + // It will run continuously until the context is cancelled. + // When the DLQ does not return an eligible next block, it waits for PollDLQInterval before trying again + ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error + + // FetchBlockLoopDLQ fetches blocks sent on the channel and sends them on the other channel. + // It will run continuously until the context is cancelled, or the channel is closed. + // It can safely be run concurrently. + FetchBlockLoopDLQ(ctx context.Context, + blockNumbers <-chan dlq.Item[int64], + blocks chan<- dlq.Item[models.RPCBlock], + ) error + + // SendBlocksDLQ pushes one RPCBlock at a time to DuneAPI in the order they are received in + SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error Close() error } const ( - defaultMaxBatchSize = 5 defaultReportProgressInterval = 30 * time.Second ) type Config struct { - MaxConcurrentRequests int - PollInterval time.Duration - ReportProgressInterval time.Duration - Stack models.EVMStack - BlockchainName string - BlockSubmitInterval time.Duration - SkipFailedBlocks bool + MaxConcurrentRequests int + MaxConcurrentRequestsDLQ int + PollInterval time.Duration + PollDLQInterval time.Duration + ReportProgressInterval time.Duration + Stack models.EVMStack + BlockchainName string + BlockSubmitInterval time.Duration + SkipFailedBlocks bool + DLQOnly bool } type ingester struct { - log *slog.Logger - node jsonrpc.BlockchainClient - dune duneapi.BlockchainIngester - cfg Config - info Info + log *slog.Logger + node jsonrpc.BlockchainClient + dune duneapi.BlockchainIngester + duneDLQ duneapi.BlockchainIngester + cfg Config + info Info + dlq *dlq.DLQ[int64] } func New( log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, + duneDLQ duneapi.BlockchainIngester, cfg Config, progress *models.BlockchainIndexProgress, + dlq *dlq.DLQ[int64], ) Ingester { info := NewInfo(cfg.BlockchainName, cfg.Stack.String()) if progress != nil { @@ -73,18 +94,16 @@ func New( info.IngestedBlockNumber = progress.LastIngestedBlockNumber } ing := &ingester{ - log: log.With("module", "ingester"), - node: node, - dune: dune, - cfg: cfg, - info: info, + log: log.With("module", "ingester"), + node: node, + dune: dune, + duneDLQ: duneDLQ, + cfg: cfg, + info: info, + dlq: dlq, } if ing.cfg.ReportProgressInterval == 0 { ing.cfg.ReportProgressInterval = defaultReportProgressInterval } return ing } - -func (i *ingester) Info() Info { - return i.info -} diff --git a/ingester/mainloop.go b/ingester/mainloop.go index fb73172..b417d38 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -3,9 +3,14 @@ package ingester import ( "context" "fmt" + "log/slog" + "slices" "sync/atomic" "time" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" + "github.com/emirpasic/gods/utils" + "github.com/duneanalytics/blockchain-ingester/models" "github.com/go-errors/errors" "golang.org/x/sync/errgroup" @@ -20,6 +25,18 @@ import ( // The SendBlocks goroutine receives all blocks on an unbuffered channel, // but buffers them in a map until they can be sent in order. func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { + // + if i.cfg.DLQOnly { + i.cfg.MaxConcurrentRequests = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0 + } else { + if i.cfg.MaxConcurrentRequests <= 0 { + return errors.Errorf("MaxConcurrentRequests must be > 0") + } + } + if i.cfg.MaxConcurrentRequestsDLQ <= 0 { + return errors.Errorf("MaxConcurrentRequestsDLQ must be > 0") + } + ctx, cancel := context.WithCancel(ctx) defer cancel() errGroup, ctx := errgroup.WithContext(ctx) @@ -33,9 +50,6 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int defer close(blocks) // Start MaxBatchSize goroutines to consume blocks concurrently - if i.cfg.MaxConcurrentRequests <= 0 { - return errors.Errorf("MaxConcurrentRequests must be > 0") - } for range i.cfg.MaxConcurrentRequests { errGroup.Go(func() error { return i.FetchBlockLoop(ctx, blockNumbers, blocks) @@ -48,6 +62,26 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int return i.SendBlocks(ctx, blocks, startBlockNumber) }) + // Start DLQ processing + + blockNumbersDLQ := make(chan dlq.Item[int64]) + defer close(blockNumbersDLQ) + + blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.MaxConcurrentRequestsDLQ+1) + defer close(blocksDLQ) + + errGroup.Go(func() error { + return i.SendBlocksDLQ(ctx, blocksDLQ) + }) + for range i.cfg.MaxConcurrentRequestsDLQ { + errGroup.Go(func() error { + return i.FetchBlockLoopDLQ(ctx, blockNumbersDLQ, blocksDLQ) + }) + } + errGroup.Go(func() error { + return i.ProduceBlockNumbersDLQ(ctx, blockNumbersDLQ) + }) + // Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever endBlockNumber := startBlockNumber - 1 + maxCount i.log.Info("Starting ingester", @@ -131,14 +165,19 @@ func (i *ingester) FetchBlockLoop( i.log.Error("Failed to get block by number", "blockNumber", blockNumber, - "continueing", i.cfg.SkipFailedBlocks, + "continuing", i.cfg.SkipFailedBlocks, "elapsed", time.Since(startTime), "error", err, ) if !i.cfg.SkipFailedBlocks { return err } - blocks <- models.RPCBlock{BlockNumber: blockNumber, Error: err} + select { + case <-ctx.Done(): + i.log.Debug("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) + return ctx.Err() + case blocks <- models.RPCBlock{BlockNumber: blockNumber, Error: err}: + } continue } @@ -223,6 +262,127 @@ func (i *ingester) ReportProgress(ctx context.Context) error { } } +func (i *ingester) ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error { + for { + select { + case <-ctx.Done(): + i.log.Debug("ProduceBlockNumbersDLQ: Context canceled, stopping") + return ctx.Err() + default: + block, ok := i.dlq.GetNextItem() + + if ok { + if i.log.Enabled(ctx, slog.LevelDebug) { + i.log.Debug("ProduceBlockNumbersDLQ: Reprocessing block", "block", block, + "dlqSize", i.dlq.Size()) + } + select { + case outChan <- *block: + // Successfully sent the block to the out channel + case <-ctx.Done(): + i.log.Debug("ProduceBlockNumbersDLQ: Context canceled while sending block, stopping") + return ctx.Err() + } + } else { + if i.log.Enabled(ctx, slog.LevelDebug) { + i.log.Debug("ProduceBlockNumbersDLQ: No eligible blocks in the DLQ so sleeping", + "dlqSize", i.dlq.Size()) + } + select { + case <-time.After(i.cfg.PollDLQInterval): // Polling interval when DLQ is empty + case <-ctx.Done(): + i.log.Debug("ProduceBlockNumbersDLQ: Context canceled while sleeping, stopping") + return ctx.Err() + } + } + } + } +} + +func (i *ingester) FetchBlockLoopDLQ(ctx context.Context, blockNumbers <-chan dlq.Item[int64], + blocks chan<- dlq.Item[models.RPCBlock], +) error { + for { + select { + case <-ctx.Done(): + i.log.Info("FetchBlockLoopDLQ: context is done") + return ctx.Err() + case blockNumber := <-blockNumbers: + startTime := time.Now() + block, err := i.node.BlockByNumber(ctx, blockNumber.Value) + if err != nil { + if errors.Is(err, context.Canceled) { + i.log.Error("FetchBlockLoopDLQ: Context canceled, stopping") + return ctx.Err() + } + i.log.Error("FetchBlockLoopDLQ: Failed to get block by number", + "blockNumber", blockNumber, + "elapsed", time.Since(startTime), + "error", err, + ) + select { + case <-ctx.Done(): + i.log.Debug("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) + return ctx.Err() + case blocks <- dlq.MapItem(blockNumber, func(blockNumber int64) models.RPCBlock { + return models.RPCBlock{BlockNumber: blockNumber, Error: err} + }): + } + continue + } + getBlockElapsed := time.Since(startTime) + select { + case <-ctx.Done(): + i.log.Debug("FetchBlockLoopDLQ: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) + return ctx.Err() + case blocks <- dlq.MapItem(blockNumber, func(_ int64) models.RPCBlock { + return block + }): + i.log.Debug( + "FetchBlockLoopDLQ: Got and sent block", + "blockNumber", blockNumber, + "getBlockElapsed", getBlockElapsed, + ) + } + } + } +} + +func (i *ingester) SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error { + i.log.Debug("SendBlocksDLQ: Starting to receive blocks") + for { + select { + case <-ctx.Done(): + i.log.Debug("SendBlocksDLQ: Context canceled, stopping") + return ctx.Err() + case block, ok := <-blocks: + if !ok { + i.log.Debug("SendBlocksDLQ: Channel is closed, returning") + return nil + } + if block.Value.Errored() { + i.dlq.AddItem(block.Value.BlockNumber, block.Retries) + i.log.Error("Received FAILED block", "number", block.Value.BlockNumber) + // TODO: report error once ErrorState struct is made thread-safe + } else { + i.log.Debug( + "SendBlocksDLQ: Received block", + "blockNumber", block.Value.BlockNumber, + ) + if err := i.duneDLQ.SendBlocks(ctx, []models.RPCBlock{block.Value}); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("SendBlocksDLQ: Context canceled, stopping") + return ctx.Err() + } + i.log.Error("SendBlocksDLQ: Failed to send block, requeueing...", "block", block.Value.BlockNumber, "error", err) + i.dlq.AddItem(block.Value.BlockNumber, block.Retries) + // TODO: report error once ErrorState struct is made thread-safe + } + } + } + } +} + func (i *ingester) Close() error { // Send a final progress report to flush progress ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -237,3 +397,16 @@ func (i *ingester) Close() error { return i.node.Close() } + +func AddBlockGaps(dlq *dlq.DLQ[int64], gaps []models.BlockGap) { + // queue these in reverse so that recent blocks are retried first + slices.SortFunc(gaps, func(a, b models.BlockGap) int { + return -utils.Int64Comparator(a.FirstMissing, b.FirstMissing) + }) + + for _, gap := range gaps { + for i := gap.FirstMissing; i <= gap.LastMissing; i++ { + dlq.AddItem(i, 0) + } + } +} diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index d65e6a4..8209f1e 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" + "github.com/duneanalytics/blockchain-ingester/ingester" duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi" jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc" @@ -73,12 +75,15 @@ func TestRunUntilCancel(t *testing.T) { slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, + duneapi, ingester.Config{ - BlockSubmitInterval: time.Nanosecond, - MaxConcurrentRequests: 10, - SkipFailedBlocks: false, + BlockSubmitInterval: time.Nanosecond, + MaxConcurrentRequests: 10, + MaxConcurrentRequestsDLQ: 2, + SkipFailedBlocks: false, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(ctx, 1, -1) // run until canceled @@ -110,10 +115,12 @@ func TestProduceBlockNumbers(t *testing.T) { slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, + duneapi, ingester.Config{ BlockSubmitInterval: time.Nanosecond, }, nil, // progress + dlq.NewDLQ[int64](), ) blockNumbers := make(chan int64) var wg sync.WaitGroup @@ -155,10 +162,12 @@ func TestSendBlocks(t *testing.T) { slog.New(slog.NewTextHandler(logOutput, nil)), nil, // node client isn't used in this unit test duneapi, + duneapi, ingester.Config{ BlockSubmitInterval: time.Nanosecond, }, nil, // progress + dlq.NewDLQ[int64](), ) blocks := make(chan models.RPCBlock) @@ -245,14 +254,17 @@ func TestRunBlocksOutOfOrder(t *testing.T) { slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, + duneapi, ingester.Config{ - MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines + MaxConcurrentRequests: 20, + MaxConcurrentRequestsDLQ: 2, // fetch blocks in multiple goroutines // big enough compared to the time spent in block by number to ensure batching. We panic // in the mocked Dune client if we don't get a batch of blocks (more than one block). BlockSubmitInterval: 50 * time.Millisecond, SkipFailedBlocks: false, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(ctx, 1, -1) // run until canceled @@ -294,12 +306,15 @@ func TestRunRPCNodeFails(t *testing.T) { slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, + duneapi, ingester.Config{ - MaxConcurrentRequests: 10, - BlockSubmitInterval: time.Millisecond, - SkipFailedBlocks: false, + MaxConcurrentRequests: 10, + MaxConcurrentRequestsDLQ: 2, + BlockSubmitInterval: time.Millisecond, + SkipFailedBlocks: false, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(ctx, 1, -1) // run until canceled @@ -313,12 +328,198 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) { slog.New(slog.NewTextHandler(logOutput, nil)), nil, nil, + nil, ingester.Config{ MaxConcurrentRequests: 0, }, nil, // progress + dlq.NewDLQ[int64](), ) err := ing.Run(context.Background(), 1, -1) // run until canceled require.ErrorContains(t, err, "MaxConcurrentRequests must be > 0") } + +func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) { + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + nil, + nil, + nil, + ingester.Config{ + MaxConcurrentRequests: 10, + MaxConcurrentRequestsDLQ: 0, + }, + nil, // progress + dlq.NewDLQ[int64](), + ) + + err := ing.Run(context.Background(), 1, -1) // run until canceled + require.ErrorContains(t, err, "MaxConcurrentRequestsDLQ must be > 0") +} + +func TestRunWithDLQ(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(1000) + startBlockNumber := int64(10) + + // Initial DLQ + dlqBlockNumbers := dlq.NewDLQWithDelay[int64](dlq.RetryDelayLinear(time.Duration(10) * time.Millisecond)) + gaps := []models.BlockGap{ + { + FirstMissing: 9, + LastMissing: 9, + }, { + FirstMissing: 3, + LastMissing: 7, + }, { + FirstMissing: 0, + LastMissing: 0, + }, + } + ingester.AddBlockGaps(dlqBlockNumbers, gaps) + + // blockNumber int64 -> timesSubmitted int + var blocksIndexed sync.Map + // Prepopulate expected blocks + for i := int64(0); i < maxBlockNumber; i++ { + blocksIndexed.Store(i, 0) + } + // Add those that aren't considered as previous gaps + incrementAndGet(&blocksIndexed, int64(1)) + incrementAndGet(&blocksIndexed, int64(2)) + incrementAndGet(&blocksIndexed, int64(8)) + + // Dune API Mocking + var sendBlocksRequests sync.Map + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) == 0 { + return nil + } + // Count Requests by block number + for _, block := range blocks { + incrementAndGet(&sendBlocksRequests, block.BlockNumber) + } + + // Fail if this batch contains a block number that hasn't been requested at least twice before this call + for _, block := range blocks { + requests, _ := sendBlocksRequests.Load(block.BlockNumber) + if requests.(int) <= 2 { + return errors.Errorf("failing batch due to %v having only been requested %v times", + block.BlockNumber, requests) + } + } + + // Count blocks as indexed by block number + for _, block := range blocks { + incrementAndGet(&blocksIndexed, block.BlockNumber) + } + + // Look for gaps + if !duneStoreContainsGaps(&blocksIndexed, maxBlockNumber) { + // cancel execution when we have sent all blocks + cancel() + return context.Canceled + } + + return nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + + // RPC Mocking + var rpcBlocksRequests sync.Map + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + incrementAndGet(&rpcBlocksRequests, blockNumber) + + // Fail every 10th block numbers the first 2 times + if blockNumber%10 == 0 { + requests, _ := rpcBlocksRequests.Load(blockNumber) + if requests.(int) <= 2 { + return models.RPCBlock{}, + errors.Errorf("failing rpc request due to %v having only been requested %v times", + blockNumber, requests) + } + } + + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + CloseFunc: func() error { + return nil + }, + } + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + MaxConcurrentRequests: 10, + MaxConcurrentRequestsDLQ: 1, + DLQOnly: false, + SkipFailedBlocks: true, + }, + nil, // progress + dlqBlockNumbers, + ) + + err := ing.Run(ctx, startBlockNumber, -1) // run until canceled + require.False(t, duneStoreContainsGaps(&blocksIndexed, maxBlockNumber)) + require.GreaterOrEqual(t, lenSyncMap(&blocksIndexed), int(maxBlockNumber)) + require.ErrorIs(t, err, context.Canceled) // this is expected +} + +func duneStoreContainsGaps(blocksIndexed *sync.Map, maxBlockNumber int64) bool { + containsGap := false + blocksIndexed.Range(func(key, value any) bool { + blockNumber := key.(int64) + count := value.(int) + if blockNumber <= maxBlockNumber && count < 1 { + containsGap = true + return false + } + return true + }) + return containsGap +} + +func incrementAndGet(m *sync.Map, key interface{}) int { + for { + // Load the current value associated with the key else initialise + currentValue, _ := m.LoadOrStore(key, 0) + + // Increment the current value. + newValue := currentValue.(int) + 1 + + // Attempt to store the new value back into the sync.Map. Compare-and-swap (CAS) approach ensures atomicity. + if m.CompareAndSwap(key, currentValue, newValue) { + // If the swap succeeded, return the new value. + return newValue + } + // If the swap failed, it means the value was updated by another goroutine. Retry the operation. + } +} + +func lenSyncMap(m *sync.Map) int { + length := 0 + m.Range(func(_, _ interface{}) bool { + length++ + return true + }) + return length +} diff --git a/ingester/send.go b/ingester/send.go index 8b69e01..50a90ac 100644 --- a/ingester/send.go +++ b/ingester/send.go @@ -91,6 +91,9 @@ func (i *ingester) trySendBlockBatch( for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] { // Skip Failed block if we're configured to skip Failed blocks if i.cfg.SkipFailedBlocks && block.Errored() { + i.log.Error("SendBlocks: RPCBlock has an error, requeueing...", "block", block.BlockNumber, "error", block.Error) + i.dlq.AddItemHighPriority(block.BlockNumber) + delete(collectedBlocks, nextBlockToSend) nextBlockToSend++ continue } @@ -110,7 +113,7 @@ func (i *ingester) trySendBlockBatch( // Send the batch lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber - if lastBlockNumber != nextBlockToSend-1 { + if !i.cfg.SkipFailedBlocks && lastBlockNumber != nextBlockToSend-1 { panic("unexpected last block number") } if err := i.dune.SendBlocks(ctx, blockBatch); err != nil { @@ -119,17 +122,25 @@ func (i *ingester) trySendBlockBatch( return nextBlockToSend, nil } - // Store error for reporting - blocknumbers := make([]string, len(blockBatch)) - for i, block := range blockBatch { - blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber) + i.log.Error("SendBlocks: Failed to send batch, requeueing...", + "firstBlockInBatch", blockBatch[0].BlockNumber, + "lastBlockInBatch", lastBlockNumber, "error", err) + blockNumbers := make([]string, len(blockBatch)) + for n, block := range blockBatch { + i.dlq.AddItemHighPriority(block.BlockNumber) + blockNumbers[n] = fmt.Sprintf("%d", block.BlockNumber) } i.info.Errors.ObserveDuneError(ErrorInfo{ Error: err, - BlockNumbers: strings.Join(blocknumbers, ","), + BlockNumbers: strings.Join(blockNumbers, ","), }) + if i.cfg.SkipFailedBlocks { + i.log.Error("SendBlocks: Failed to send batch, continuing", "error", err) + return nextBlockToSend, nil + } + err := errors.Errorf("failed to send batch: %w", err) i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) return nextBlockToSend, err diff --git a/lib/dlq/dlq.go b/lib/dlq/dlq.go new file mode 100644 index 0000000..34ab7d9 --- /dev/null +++ b/lib/dlq/dlq.go @@ -0,0 +1,94 @@ +package dlq + +import ( + "sync" + "time" + + pq "github.com/emirpasic/gods/queues/priorityqueue" + "github.com/emirpasic/gods/utils" +) + +type DLQ[T any] struct { + priorityQueue pq.Queue // structure not thread safe + mutex sync.Mutex + retryDelay RetryStrategy +} + +// RetryStrategy takes the number of retries and returns a Duration. +// This allows the caller to implement any strategy they like, be it constant, linear, exponential, etc. +type RetryStrategy func(retries int) time.Duration + +// Item This is generic so that metadata about retries can be maintained in an envelope during processing for when +// an item needs to make its way back onto the DLQ later +type Item[T any] struct { + Value T + Retries int + nextRunTime time.Time +} + +func MapItem[T, U any](b Item[T], mapper func(T) U) Item[U] { + return Item[U]{ + Value: mapper(b.Value), + Retries: b.Retries, + nextRunTime: b.nextRunTime, + } +} + +func NewDLQ[T any]() *DLQ[T] { + return NewDLQWithDelay[T](RetryDelayLinear(time.Minute)) +} + +func RetryDelayLinear(backoff time.Duration) RetryStrategy { + return func(retries int) time.Duration { + return time.Duration(retries) * backoff // retries must be converted to a Duration for multiplication + } +} + +func NewDLQWithDelay[T any](retryDelay func(retries int) time.Duration) *DLQ[T] { + return &DLQ[T]{priorityQueue: *pq.NewWith(byNextRunTime[T]), retryDelay: retryDelay} +} + +// Comparator function (sort by nextRunTime in ascending order) +func byNextRunTime[T any](a, b interface{}) int { + return utils.TimeComparator(a.(Item[T]).nextRunTime, b.(Item[T]).nextRunTime) +} + +func (dlq *DLQ[T]) AddItem(item T, retries int) { + nextRunTime := time.Now().Add(dlq.retryDelay(retries)) + + dlq.mutex.Lock() + defer dlq.mutex.Unlock() + + dlq.priorityQueue.Enqueue(Item[T]{Value: item, Retries: retries + 1, nextRunTime: nextRunTime}) +} + +func (dlq *DLQ[T]) AddItemHighPriority(item T) { + dlq.mutex.Lock() + defer dlq.mutex.Unlock() + + dlq.priorityQueue.Enqueue(Item[T]{Value: item, Retries: 0, nextRunTime: time.Time{}}) +} + +func (dlq *DLQ[T]) GetNextItem() (value *Item[T], ok bool) { + dlq.mutex.Lock() + defer dlq.mutex.Unlock() + + peek, ok := dlq.priorityQueue.Peek() + if !ok || peek.(Item[T]).nextRunTime.After(time.Now()) { + return nil, false + } + + item, ok := dlq.priorityQueue.Dequeue() + if ok { + itemCasted := item.(Item[T]) + return &itemCasted, ok + } + return nil, ok +} + +func (dlq *DLQ[T]) Size() int { + dlq.mutex.Lock() + defer dlq.mutex.Unlock() + + return dlq.priorityQueue.Size() +} diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go index 64d979a..973a3f7 100644 --- a/mocks/duneapi/client.go +++ b/mocks/duneapi/client.go @@ -20,6 +20,9 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // // make and configure a mocked duneapi.BlockchainIngester // mockedBlockchainIngester := &BlockchainIngesterMock{ +// GetBlockGapsFunc: func(ctx context.Context) (*models.BlockchainGaps, error) { +// panic("mock out the GetBlockGaps method") +// }, // GetProgressReportFunc: func(ctx context.Context) (*models.BlockchainIndexProgress, error) { // panic("mock out the GetProgressReport method") // }, @@ -36,6 +39,9 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // } type BlockchainIngesterMock struct { + // GetBlockGapsFunc mocks the GetBlockGaps method. + GetBlockGapsFunc func(ctx context.Context) (*models.BlockchainGaps, error) + // GetProgressReportFunc mocks the GetProgressReport method. GetProgressReportFunc func(ctx context.Context) (*models.BlockchainIndexProgress, error) @@ -47,6 +53,11 @@ type BlockchainIngesterMock struct { // calls tracks calls to the methods. calls struct { + // GetBlockGaps holds details about calls to the GetBlockGaps method. + GetBlockGaps []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } // GetProgressReport holds details about calls to the GetProgressReport method. GetProgressReport []struct { // Ctx is the ctx argument value. @@ -67,11 +78,44 @@ type BlockchainIngesterMock struct { Payloads []models.RPCBlock } } + lockGetBlockGaps sync.RWMutex lockGetProgressReport sync.RWMutex lockPostProgressReport sync.RWMutex lockSendBlocks sync.RWMutex } +// GetBlockGaps calls GetBlockGapsFunc. +func (mock *BlockchainIngesterMock) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) { + if mock.GetBlockGapsFunc == nil { + panic("BlockchainIngesterMock.GetBlockGapsFunc: method is nil but BlockchainIngester.GetBlockGaps was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockGetBlockGaps.Lock() + mock.calls.GetBlockGaps = append(mock.calls.GetBlockGaps, callInfo) + mock.lockGetBlockGaps.Unlock() + return mock.GetBlockGapsFunc(ctx) +} + +// GetBlockGapsCalls gets all the calls that were made to GetBlockGaps. +// Check the length with: +// +// len(mockedBlockchainIngester.GetBlockGapsCalls()) +func (mock *BlockchainIngesterMock) GetBlockGapsCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockGetBlockGaps.RLock() + calls = mock.calls.GetBlockGaps + mock.lockGetBlockGaps.RUnlock() + return calls +} + // GetProgressReport calls GetProgressReportFunc. func (mock *BlockchainIngesterMock) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) { if mock.GetProgressReportFunc == nil { diff --git a/models/gaps.go b/models/gaps.go new file mode 100644 index 0000000..cc17645 --- /dev/null +++ b/models/gaps.go @@ -0,0 +1,10 @@ +package models + +type BlockchainGaps struct { + Gaps []BlockGap +} + +type BlockGap struct { + FirstMissing int64 + LastMissing int64 +}