Skip to content

Commit

Permalink
mainloop: increase throughput and simplify the trySendCompletedBlocks
Browse files Browse the repository at this point in the history
default RPCConcurrency to 25
also, increase throughput by avoiding very small batches
  • Loading branch information
msf committed Jun 27, 2024
1 parent 17be443 commit d1df9d4
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 44 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Config struct {
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"10"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
SkipFailedBlocks bool `long:"skip-failed-blocks" env:"SKIP_FAILED_BLOCKS" description:"Skip failed blocks when submitting to Dune"` // nolint:lll
}
Expand Down
100 changes: 57 additions & 43 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,54 +215,69 @@ func (i *ingester) trySendCompletedBlocks(
collectedBlocks map[int64]models.RPCBlock,
nextBlockToSend int64,
) (int64, error) {
initialNextBlockToSend := nextBlockToSend
for {
if len(collectedBlocks) < maxBatchSize/10 {
// if we have very little extra to send, wait a bit before sending to avoid tiny batches impacting throughput
return nextBlockToSend, nil
}
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize)
if err != nil || nextBlock == nextBlockToSend {
return nextBlock, err
}
nextBlockToSend = nextBlock
}
}

func (i *ingester) trySendBlockBatch(
ctx context.Context,
collectedBlocks map[int64]models.RPCBlock,
nextBlockToSend int64,
maxBatchSize int,
) (int64, error) {
startTime := time.Now()
// Outer loop: We might need to send multiple batch requests if our buffer is too big
for _, ok := collectedBlocks[nextBlockToSend]; ok; _, ok = collectedBlocks[nextBlockToSend] {
// Collect a blocks of blocks to send, only send those which are in order
// Collect a batch to send, only send those which are in order
blockBatch := make([]models.RPCBlock, 0, maxBatchSize)
for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] {
// Skip Failed block if we're configured to skip Failed blocks
if i.cfg.SkipFailedBlocks && block.Errored() {
nextBlockToSend++
continue
}

blockBatch = append(blockBatch, block)
delete(collectedBlocks, nextBlockToSend)
// Collect a blocks of blocks to send, only send those which are in order
// Collect a batch to send, only send those which are in order
blockBatch := make([]models.RPCBlock, 0, maxBatchSize)
for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] {
// Skip Failed block if we're configured to skip Failed blocks
if i.cfg.SkipFailedBlocks && block.Errored() {
nextBlockToSend++

if len(blockBatch) == maxBatchSize {
break
}
continue
}

if len(blockBatch) == 0 {
return nextBlockToSend, nil
}
blockBatch = append(blockBatch, block)
delete(collectedBlocks, nextBlockToSend)
nextBlockToSend++

// Send the batch
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber
if lastBlockNumber != nextBlockToSend-1 {
panic("unexpected last block number")
if len(blockBatch) == maxBatchSize {
break
}
if err := i.dune.SendBlocks(ctx, blockBatch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextBlockToSend, nil
}
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries
err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextBlockToSend, err
}

if len(blockBatch) == 0 {
return nextBlockToSend, nil
}

// Send the batch
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber
if lastBlockNumber != nextBlockToSend-1 {
panic("unexpected last block number")
}
if err := i.dune.SendBlocks(ctx, blockBatch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextBlockToSend, nil
}
atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber)
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries
err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextBlockToSend, err
}
atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber)
i.log.Info(
"Sent blocks to DuneAPI",
"blocksSent", nextBlockToSend-initialNextBlockToSend,
"bufferSize", len(collectedBlocks),
"batchSize", len(blockBatch),
"nextBlockToSend", nextBlockToSend,
"elapsed", time.Since(startTime),
)
Expand All @@ -284,7 +299,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
defer timer.Stop()

previousTime := time.Now()
previousDistance := int64(0)
previousHoursToCatchUp := float64(0)
previousIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)

for {
Expand All @@ -297,7 +312,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error {

blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make this more stable

rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
Expand All @@ -306,12 +320,13 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
"latestBlockNumber", latest,
"ingestedBlockNumber", lastIngested,
}
if fallingBehind {
fields = append(fields, "fallingBehind", fallingBehind)
}
if newDistance > 1 {
etaHours := time.Duration(float64(newDistance) / blocksPerSec * float64(time.Second)).Hours()
fields = append(fields, "hoursToCatchUp", fmt.Sprintf("%.1f", etaHours))
if previousHoursToCatchUp < (0.8 * etaHours) {
fields = append(fields, "fallingBehind", true)
}
previousHoursToCatchUp = etaHours
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
Expand All @@ -322,7 +337,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error {

i.log.Info("PROGRESS REPORT", fields...)
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow

// TODO: include errors in the report, reset the error list
Expand Down

0 comments on commit d1df9d4

Please sign in to comment.