Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: simplify the RPC concurrency configuration #72

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ Also, we mention some of the options here:
The `log` flag (environment variable `LOG`) controls the log level. Use `--log debug`/`LOG=debug` to emit more logs than the default `info` level. To emit less logs, use `warn`, or `error` (least).

### Tuning RPC concurrency
The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines)
to run concurrently to perform RPC node requests. Default is 25.
The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines) to run concurrently to perform RPC node requests. See `--help` for up to date default value.

### Tuning for throughput
Throughput depends on: latency & request rate between RPC <-> Node Indexer <--> DuneAPI and can be tuned via a combination of:
1. RPC_CONCURRENCY, higher values feed more blocks into the node indexer to process
1. MAX_BATCH_SIZE, higher values send more blocks per request to DuneAPI
1. BLOCK_SUBMIT_INTERVAL, the interval at which blocks to DuneAPI
See `--help` for up to date default values.
Comment on lines +52 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this section!




### RPC poll interval
The flag `--rpc-poll-interval` (environment variable `RPC_POLL_INTERVAL`) specifies the duration to wait before checking
Expand Down
26 changes: 14 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func main() {
EVMStack: cfg.RPCStack,
// real max request concurrency to RPP node
// each block requires multiple RPC requests
TotalRPCConcurrency: cfg.BlockConcurrency * 4,
TotalRPCConcurrency: cfg.RPCConcurrency,
})
if err != nil {
stdlog.Fatal(err)
Expand Down Expand Up @@ -145,17 +145,19 @@ func main() {
duneClient,
duneClientDLQ,
ingester.Config{
MaxConcurrentRequests: cfg.BlockConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency,
MaxBatchSize: cfg.MaxBatchSize,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
DLQOnly: cfg.DLQOnly,
// OpStack does 3 requests per block, ArbitrumNova is variable
// leave some room for other requests
MaxConcurrentBlocks: cfg.RPCConcurrency / 4,
DLQMaxConcurrentBlocks: cfg.DLQBlockConcurrency,
MaxBatchSize: cfg.MaxBatchSize,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
DLQOnly: cfg.DLQOnly,
},
progress,
dlqBlockNumbers,
Expand Down
13 changes: 6 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ type Config struct {
DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
// kept the old cmdline arg names and env variables for backwards compatibility
BlockConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent block requests to the RPC node" default:"25"` // nolint:lll
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of maximum concurrent jsonRPC requests to the RPC node" default:"80"` // nolint:lll
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
22 changes: 11 additions & 11 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ const (
)

type Config struct {
MaxConcurrentRequests int
MaxConcurrentRequestsDLQ int
PollInterval time.Duration
PollDLQInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
MaxBatchSize int
MaxConcurrentBlocks int
DLQMaxConcurrentBlocks int
PollInterval time.Duration
PollDLQInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
MaxBatchSize int
}

type ingester struct {
Expand Down
16 changes: 8 additions & 8 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
registerIngesterMetrics(i)

if i.cfg.DLQOnly {
i.cfg.MaxConcurrentRequests = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0
i.cfg.MaxConcurrentBlocks = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0
} else {
if i.cfg.MaxConcurrentRequests <= 0 {
if i.cfg.MaxConcurrentBlocks <= 0 {
return errors.Errorf("MaxConcurrentRequests must be > 0")
}
}
if i.cfg.MaxConcurrentRequestsDLQ <= 0 {
if i.cfg.DLQMaxConcurrentBlocks <= 0 {
return errors.Errorf("MaxConcurrentRequestsDLQ must be > 0")
}

Expand All @@ -52,8 +52,8 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
blocks := make(chan models.RPCBlock, 2*maxBatchSize)
defer close(blocks)

// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxConcurrentRequests {
// Start MaxConcurrentBlocks goroutines to consume blocks concurrently
for range i.cfg.MaxConcurrentBlocks {
errGroup.Go(func() error {
return i.FetchBlockLoop(ctx, blockNumbers, blocks)
})
Expand All @@ -70,13 +70,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
blockNumbersDLQ := make(chan dlq.Item[int64])
defer close(blockNumbersDLQ)

blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.MaxConcurrentRequestsDLQ+1)
blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.DLQMaxConcurrentBlocks+1)
defer close(blocksDLQ)

errGroup.Go(func() error {
return i.SendBlocksDLQ(ctx, blocksDLQ)
})
for range i.cfg.MaxConcurrentRequestsDLQ {
for range i.cfg.DLQMaxConcurrentBlocks {
errGroup.Go(func() error {
return i.FetchBlockLoopDLQ(ctx, blockNumbersDLQ, blocksDLQ)
})
Expand All @@ -91,7 +91,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
"runForever", maxCount <= 0,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxConcurrency", i.cfg.MaxConcurrentRequests,
"maxConcurrency", i.cfg.MaxConcurrentBlocks,
)

// Produce block numbers in the main goroutine
Expand Down
36 changes: 18 additions & 18 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ func TestRunUntilCancel(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 2,
SkipFailedBlocks: false,
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 2,
SkipFailedBlocks: false,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand Down Expand Up @@ -262,8 +262,8 @@ func TestRunBlocksOutOfOrder(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
MaxConcurrentRequests: 20,
MaxConcurrentRequestsDLQ: 2, // fetch blocks in multiple goroutines
MaxConcurrentBlocks: 20,
DLQMaxConcurrentBlocks: 2, // fetch blocks in multiple goroutines
// big enough compared to the time spent in block by number to ensure batching. We panic
// in the mocked Dune client if we don't get a batch of blocks (more than one block).
BlockSubmitInterval: 50 * time.Millisecond,
Expand Down Expand Up @@ -315,10 +315,10 @@ func TestRunRPCNodeFails(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 2,
BlockSubmitInterval: time.Millisecond,
SkipFailedBlocks: false,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 2,
BlockSubmitInterval: time.Millisecond,
SkipFailedBlocks: false,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand All @@ -338,7 +338,7 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) {
nil,
nil,
ingester.Config{
MaxConcurrentRequests: 0,
MaxConcurrentBlocks: 0,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand All @@ -357,8 +357,8 @@ func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) {
nil,
nil,
ingester.Config{
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 0,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 0,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand Down Expand Up @@ -478,11 +478,11 @@ func TestRunWithDLQ(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 1,
DLQOnly: false,
SkipFailedBlocks: true,
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 1,
DLQOnly: false,
SkipFailedBlocks: true,
},
nil, // progress
dlqBlockNumbers,
Expand Down
Loading