diff --git a/config/config.go b/config/config.go index 42ce28a..7e2dc43 100644 --- a/config/config.go +++ b/config/config.go @@ -40,7 +40,7 @@ type Config struct { ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"10"` // nolint:lll + RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll SkipFailedBlocks bool `long:"skip-failed-blocks" env:"SKIP_FAILED_BLOCKS" description:"Skip failed blocks when submitting to Dune"` // nolint:lll } diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 6277f65..11618b9 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -215,54 +215,69 @@ func (i *ingester) trySendCompletedBlocks( collectedBlocks map[int64]models.RPCBlock, nextBlockToSend int64, ) (int64, error) { - initialNextBlockToSend := nextBlockToSend + for { + if len(collectedBlocks) < maxBatchSize/10 { + // if we have very little extra to send, wait a bit before sending to avoid tiny batches impacting throughput + return nextBlockToSend, nil + } + nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize) + if err != nil || nextBlock == nextBlockToSend { + return nextBlock, err + } + nextBlockToSend = nextBlock + } +} + +func (i *ingester) trySendBlockBatch( + ctx context.Context, + collectedBlocks map[int64]models.RPCBlock, + nextBlockToSend int64, + maxBatchSize int, +) (int64, error) { startTime := time.Now() - // Outer loop: We might need to send multiple batch requests if our buffer is too big - for _, ok := collectedBlocks[nextBlockToSend]; ok; _, ok = collectedBlocks[nextBlockToSend] { - // Collect a blocks of blocks to send, only send those which are in order - // Collect a batch to send, only send those which are in order - blockBatch := make([]models.RPCBlock, 0, maxBatchSize) - for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] { - // Skip Failed block if we're configured to skip Failed blocks - if i.cfg.SkipFailedBlocks && block.Errored() { - nextBlockToSend++ - continue - } - blockBatch = append(blockBatch, block) - delete(collectedBlocks, nextBlockToSend) + // Collect a blocks of blocks to send, only send those which are in order + // Collect a batch to send, only send those which are in order + blockBatch := make([]models.RPCBlock, 0, maxBatchSize) + for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] { + // Skip Failed block if we're configured to skip Failed blocks + if i.cfg.SkipFailedBlocks && block.Errored() { nextBlockToSend++ - - if len(blockBatch) == maxBatchSize { - break - } + continue } - if len(blockBatch) == 0 { - return nextBlockToSend, nil - } + blockBatch = append(blockBatch, block) + delete(collectedBlocks, nextBlockToSend) + nextBlockToSend++ - // Send the batch - lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber - if lastBlockNumber != nextBlockToSend-1 { - panic("unexpected last block number") + if len(blockBatch) == maxBatchSize { + break } - if err := i.dune.SendBlocks(ctx, blockBatch); err != nil { - if errors.Is(err, context.Canceled) { - i.log.Info("SendBlocks: Context canceled, stopping") - return nextBlockToSend, nil - } - // TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries - err := errors.Errorf("failed to send batch: %w", err) - i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) - return nextBlockToSend, err + } + + if len(blockBatch) == 0 { + return nextBlockToSend, nil + } + + // Send the batch + lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber + if lastBlockNumber != nextBlockToSend-1 { + panic("unexpected last block number") + } + if err := i.dune.SendBlocks(ctx, blockBatch); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("SendBlocks: Context canceled, stopping") + return nextBlockToSend, nil } - atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber) + // TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries + err := errors.Errorf("failed to send batch: %w", err) + i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) + return nextBlockToSend, err } + atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber) i.log.Info( "Sent blocks to DuneAPI", - "blocksSent", nextBlockToSend-initialNextBlockToSend, - "bufferSize", len(collectedBlocks), + "batchSize", len(blockBatch), "nextBlockToSend", nextBlockToSend, "elapsed", time.Since(startTime), ) @@ -284,7 +299,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error { defer timer.Stop() previousTime := time.Now() - previousDistance := int64(0) + previousHoursToCatchUp := float64(0) previousIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber) for { @@ -297,7 +312,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error { blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds() newDistance := latest - lastIngested - fallingBehind := newDistance > (previousDistance + 1) // TODO: make this more stable rpcErrors := len(i.info.RPCErrors) duneErrors := len(i.info.DuneErrors) @@ -306,12 +320,13 @@ func (i *ingester) ReportProgress(ctx context.Context) error { "latestBlockNumber", latest, "ingestedBlockNumber", lastIngested, } - if fallingBehind { - fields = append(fields, "fallingBehind", fallingBehind) - } if newDistance > 1 { etaHours := time.Duration(float64(newDistance) / blocksPerSec * float64(time.Second)).Hours() fields = append(fields, "hoursToCatchUp", fmt.Sprintf("%.1f", etaHours)) + if previousHoursToCatchUp < (0.8 * etaHours) { + fields = append(fields, "fallingBehind", true) + } + previousHoursToCatchUp = etaHours } if rpcErrors > 0 { fields = append(fields, "rpcErrors", rpcErrors) @@ -322,7 +337,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error { i.log.Info("PROGRESS REPORT", fields...) previousIngested = lastIngested - previousDistance = newDistance previousTime = tNow // TODO: include errors in the report, reset the error list