diff --git a/README.md b/README.md index 39243e4..0c9c3dc 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,16 @@ Also, we mention some of the options here: The `log` flag (environment variable `LOG`) controls the log level. Use `--log debug`/`LOG=debug` to emit more logs than the default `info` level. To emit less logs, use `warn`, or `error` (least). ### Tuning RPC concurrency -The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines) -to run concurrently to perform RPC node requests. Default is 25. +The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines) to run concurrently to perform RPC node requests. See `--help` for up to date default value. + +### Tuning for throughput +Throughput depends on: latency & request rate between RPC <-> Node Indexer <--> DuneAPI and can be tuned via a combination of: +1. RPC_CONCURRENCY, higher values feed more blocks into the node indexer to process +1. MAX_BATCH_SIZE, higher values send more blocks per request to DuneAPI +1. BLOCK_SUBMIT_INTERVAL, the interval at which blocks to DuneAPI +See `--help` for up to date default values. + + ### RPC poll interval The flag `--rpc-poll-interval` (environment variable `RPC_POLL_INTERVAL`) specifies the duration to wait before checking diff --git a/cmd/main.go b/cmd/main.go index 059b691..542a9a1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -95,7 +95,7 @@ func main() { EVMStack: cfg.RPCStack, // real max request concurrency to RPP node // each block requires multiple RPC requests - TotalRPCConcurrency: cfg.BlockConcurrency * 4, + TotalRPCConcurrency: cfg.RPCConcurrency, }) if err != nil { stdlog.Fatal(err) @@ -145,17 +145,19 @@ func main() { duneClient, duneClientDLQ, ingester.Config{ - MaxConcurrentRequests: cfg.BlockConcurrency, - MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency, - MaxBatchSize: cfg.MaxBatchSize, - ReportProgressInterval: cfg.ReportProgressInterval, - PollInterval: cfg.PollInterval, - PollDLQInterval: cfg.PollDLQInterval, - Stack: cfg.RPCStack, - BlockchainName: cfg.BlockchainName, - BlockSubmitInterval: cfg.BlockSubmitInterval, - SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks, - DLQOnly: cfg.DLQOnly, + // OpStack does 3 requests per block, ArbitrumNova is variable + // leave some room for other requests + MaxConcurrentBlocks: cfg.RPCConcurrency / 4, + DLQMaxConcurrentBlocks: cfg.DLQBlockConcurrency, + MaxBatchSize: cfg.MaxBatchSize, + ReportProgressInterval: cfg.ReportProgressInterval, + PollInterval: cfg.PollInterval, + PollDLQInterval: cfg.PollDLQInterval, + Stack: cfg.RPCStack, + BlockchainName: cfg.BlockchainName, + BlockSubmitInterval: cfg.BlockSubmitInterval, + SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks, + DLQOnly: cfg.DLQOnly, }, progress, dlqBlockNumbers, diff --git a/config/config.go b/config/config.go index 7461a36..0170f17 100644 --- a/config/config.go +++ b/config/config.go @@ -54,13 +54,12 @@ type Config struct { DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - // kept the old cmdline arg names and env variables for backwards compatibility - BlockConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent block requests to the RPC node" default:"25"` // nolint:lll - DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll - BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll - LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll - MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of maximum concurrent jsonRPC requests to the RPC node" default:"80"` // nolint:lll + DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll + BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll + LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll + MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index 4b80a2a..9242ddc 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -57,17 +57,17 @@ const ( ) type Config struct { - MaxConcurrentRequests int - MaxConcurrentRequestsDLQ int - PollInterval time.Duration - PollDLQInterval time.Duration - ReportProgressInterval time.Duration - Stack models.EVMStack - BlockchainName string - BlockSubmitInterval time.Duration - SkipFailedBlocks bool - DLQOnly bool - MaxBatchSize int + MaxConcurrentBlocks int + DLQMaxConcurrentBlocks int + PollInterval time.Duration + PollDLQInterval time.Duration + ReportProgressInterval time.Duration + Stack models.EVMStack + BlockchainName string + BlockSubmitInterval time.Duration + SkipFailedBlocks bool + DLQOnly bool + MaxBatchSize int } type ingester struct { diff --git a/ingester/mainloop.go b/ingester/mainloop.go index a91c448..4d44806 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -28,13 +28,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int registerIngesterMetrics(i) if i.cfg.DLQOnly { - i.cfg.MaxConcurrentRequests = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0 + i.cfg.MaxConcurrentBlocks = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0 } else { - if i.cfg.MaxConcurrentRequests <= 0 { + if i.cfg.MaxConcurrentBlocks <= 0 { return errors.Errorf("MaxConcurrentRequests must be > 0") } } - if i.cfg.MaxConcurrentRequestsDLQ <= 0 { + if i.cfg.DLQMaxConcurrentBlocks <= 0 { return errors.Errorf("MaxConcurrentRequestsDLQ must be > 0") } @@ -52,8 +52,8 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int blocks := make(chan models.RPCBlock, 2*maxBatchSize) defer close(blocks) - // Start MaxBatchSize goroutines to consume blocks concurrently - for range i.cfg.MaxConcurrentRequests { + // Start MaxConcurrentBlocks goroutines to consume blocks concurrently + for range i.cfg.MaxConcurrentBlocks { errGroup.Go(func() error { return i.FetchBlockLoop(ctx, blockNumbers, blocks) }) @@ -70,13 +70,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int blockNumbersDLQ := make(chan dlq.Item[int64]) defer close(blockNumbersDLQ) - blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.MaxConcurrentRequestsDLQ+1) + blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.DLQMaxConcurrentBlocks+1) defer close(blocksDLQ) errGroup.Go(func() error { return i.SendBlocksDLQ(ctx, blocksDLQ) }) - for range i.cfg.MaxConcurrentRequestsDLQ { + for range i.cfg.DLQMaxConcurrentBlocks { errGroup.Go(func() error { return i.FetchBlockLoopDLQ(ctx, blockNumbersDLQ, blocksDLQ) }) @@ -91,7 +91,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int "runForever", maxCount <= 0, "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber, - "maxConcurrency", i.cfg.MaxConcurrentRequests, + "maxConcurrency", i.cfg.MaxConcurrentBlocks, ) // Produce block numbers in the main goroutine diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index b4a6261..a3c375f 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -80,10 +80,10 @@ func TestRunUntilCancel(t *testing.T) { duneapi, duneapi, ingester.Config{ - BlockSubmitInterval: time.Nanosecond, - MaxConcurrentRequests: 10, - MaxConcurrentRequestsDLQ: 2, - SkipFailedBlocks: false, + BlockSubmitInterval: time.Nanosecond, + MaxConcurrentBlocks: 10, + DLQMaxConcurrentBlocks: 2, + SkipFailedBlocks: false, }, nil, // progress dlq.NewDLQ[int64](), @@ -262,8 +262,8 @@ func TestRunBlocksOutOfOrder(t *testing.T) { duneapi, duneapi, ingester.Config{ - MaxConcurrentRequests: 20, - MaxConcurrentRequestsDLQ: 2, // fetch blocks in multiple goroutines + MaxConcurrentBlocks: 20, + DLQMaxConcurrentBlocks: 2, // fetch blocks in multiple goroutines // big enough compared to the time spent in block by number to ensure batching. We panic // in the mocked Dune client if we don't get a batch of blocks (more than one block). BlockSubmitInterval: 50 * time.Millisecond, @@ -315,10 +315,10 @@ func TestRunRPCNodeFails(t *testing.T) { duneapi, duneapi, ingester.Config{ - MaxConcurrentRequests: 10, - MaxConcurrentRequestsDLQ: 2, - BlockSubmitInterval: time.Millisecond, - SkipFailedBlocks: false, + MaxConcurrentBlocks: 10, + DLQMaxConcurrentBlocks: 2, + BlockSubmitInterval: time.Millisecond, + SkipFailedBlocks: false, }, nil, // progress dlq.NewDLQ[int64](), @@ -338,7 +338,7 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) { nil, nil, ingester.Config{ - MaxConcurrentRequests: 0, + MaxConcurrentBlocks: 0, }, nil, // progress dlq.NewDLQ[int64](), @@ -357,8 +357,8 @@ func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) { nil, nil, ingester.Config{ - MaxConcurrentRequests: 10, - MaxConcurrentRequestsDLQ: 0, + MaxConcurrentBlocks: 10, + DLQMaxConcurrentBlocks: 0, }, nil, // progress dlq.NewDLQ[int64](), @@ -478,11 +478,11 @@ func TestRunWithDLQ(t *testing.T) { duneapi, duneapi, ingester.Config{ - BlockSubmitInterval: time.Nanosecond, - MaxConcurrentRequests: 10, - MaxConcurrentRequestsDLQ: 1, - DLQOnly: false, - SkipFailedBlocks: true, + BlockSubmitInterval: time.Nanosecond, + MaxConcurrentBlocks: 10, + DLQMaxConcurrentBlocks: 1, + DLQOnly: false, + SkipFailedBlocks: true, }, nil, // progress dlqBlockNumbers,