diff --git a/cmd/main.go b/cmd/main.go index 8f3268f..059b691 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -147,6 +147,7 @@ func main() { ingester.Config{ MaxConcurrentRequests: cfg.BlockConcurrency, MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency, + MaxBatchSize: cfg.MaxBatchSize, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, PollDLQInterval: cfg.PollDLQInterval, diff --git a/config/config.go b/config/config.go index 8216d72..f3494fb 100644 --- a/config/config.go +++ b/config/config.go @@ -60,6 +60,7 @@ type Config struct { DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll + MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send in a single batch" default:"128"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index 16dec59..4b80a2a 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -67,6 +67,7 @@ type Config struct { BlockSubmitInterval time.Duration SkipFailedBlocks bool DLQOnly bool + MaxBatchSize int } type ingester struct { @@ -105,5 +106,10 @@ func New( if ing.cfg.ReportProgressInterval == 0 { ing.cfg.ReportProgressInterval = defaultReportProgressInterval } + if ing.cfg.MaxBatchSize == 0 { + ing.cfg.MaxBatchSize = maxBatchSize + } else if ing.cfg.MaxBatchSize > maxBatchSize { + ing.cfg.MaxBatchSize = maxBatchSize + } return ing } diff --git a/ingester/send.go b/ingester/send.go index fbb225d..f197011 100644 --- a/ingester/send.go +++ b/ingester/send.go @@ -17,7 +17,7 @@ const maxBatchSize = 256 // We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune. func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error { // Buffer for temporarily storing blocks that have arrived out of order - collectedBlocks := make(map[int64]models.RPCBlock, maxBatchSize) + collectedBlocks := make(map[int64]models.RPCBlock, i.cfg.MaxBatchSize) nextNumberToSend := startBlockNumber batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval) defer batchTimer.Stop() @@ -69,7 +69,7 @@ func (i *ingester) trySendCompletedBlocks( nextBlockToSend int64, ) (int64, error) { for { - nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize) + nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, i.cfg.MaxBatchSize) if err != nil || nextBlock == nextBlockToSend { return nextBlock, err }