Skip to content

Commit

Permalink
add config option: max-batch-size
Browse files Browse the repository at this point in the history
allow cmdline configuration of max batch size on DuneAPI request.

This is useful to test different batch sizes, their throughput and error rates.

For example: large batches increase throughput but have higher latency
and can lead to request timeouts.
  • Loading branch information
msf committed Jul 25, 2024
1 parent 2d406c3 commit edc19bf
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 2 deletions.
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func main() {
ingester.Config{
MaxConcurrentRequests: cfg.BlockConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency,
MaxBatchSize: cfg.MaxBatchSize,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Config struct {
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send in a single batch" default:"128"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
6 changes: 6 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Config struct {
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
MaxBatchSize int
}

type ingester struct {
Expand Down Expand Up @@ -105,5 +106,10 @@ func New(
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
if ing.cfg.MaxBatchSize == 0 {
ing.cfg.MaxBatchSize = maxBatchSize
} else if ing.cfg.MaxBatchSize > maxBatchSize {
ing.cfg.MaxBatchSize = maxBatchSize
}
return ing
}
4 changes: 2 additions & 2 deletions ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const maxBatchSize = 256
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error {
// Buffer for temporarily storing blocks that have arrived out of order
collectedBlocks := make(map[int64]models.RPCBlock, maxBatchSize)
collectedBlocks := make(map[int64]models.RPCBlock, i.cfg.MaxBatchSize)
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval)
defer batchTimer.Stop()
Expand Down Expand Up @@ -69,7 +69,7 @@ func (i *ingester) trySendCompletedBlocks(
nextBlockToSend int64,
) (int64, error) {
for {
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize)
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, i.cfg.MaxBatchSize)
if err != nil || nextBlock == nextBlockToSend {
return nextBlock, err
}
Expand Down

0 comments on commit edc19bf

Please sign in to comment.