Skip to content

Commit

Permalink
Send batch of blocks from the main loop (#37)
Browse files Browse the repository at this point in the history
This PR changes the node indexer from sending one block at a time to
sending a batch of blocks. Earlier we implemented concurrent block
fetching with buffering (#32). On a configurable interval (defaults to
every second), we now check the buffer and send all possible blocks.

We add a flag for specifying whether you wish to skip a failed block or
not. It's off by default. This means if all the retries to the RPC node
fails for a given block, we will crash. This ensures no block gaps.

---------

Co-authored-by: Miguel Filipe <[email protected]>
  • Loading branch information
vegarsti and msf committed Jun 27, 2024
1 parent a448231 commit a4ba018
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 126 deletions.
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ You can use our public docker container image and run it as such:

```bash
docker run -e BLOCKCHAIN_NAME='foo' -e RPC_NODE_URL='http://localhost:8545' -e DUNE_API_KEY='your-key-here' duneanalytics/node-indexer

```


Expand All @@ -36,15 +35,11 @@ Build the binary for your OS:
$ make build

$ BLOCKCHAIN_NAME='foo' RPC_NODE_URL='http://localhost:8545' DUNE_API_KEY='your-key-here' ./indexer

```

## Configuration Options

You can see all the configuration options by using the `--help` argument:
```bash
docker run duneanalytics/node-indexer ./indexer --help

```


15 changes: 7 additions & 8 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
checkRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) {
yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if yes {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
if resp == nil {
log.Warn("Retrying request", "error", err)
} else {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
}
}
return yes, err2
}
Expand Down Expand Up @@ -191,11 +195,6 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
return err
}

err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

return nil
}

Expand All @@ -220,15 +219,15 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
defer func() {
if err != nil {
c.log.Error("Sending progress report failed",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
"responseBody", responseBody,
)
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"duration", time.Since(start),
)
Expand Down
8 changes: 6 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func main() {
// Get stored progress unless config indicates we should start from 0
var startBlockNumber int64
// Default to -1 to start where the ingester left off
var progress *models.BlockchainIndexProgress
if cfg.BlockHeight == -1 {
progress, err := duneClient.GetProgressReport(ctx)
progress, err = duneClient.GetProgressReport(ctx)
if err != nil {
stdlog.Fatal(err)
} else {
Expand All @@ -82,12 +83,15 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
MaxBatchSize: cfg.Concurrency,
MaxConcurrentRequests: cfg.RPCConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.SkipFailedBlocks,
},
progress,
)

wg.Add(1)
Expand Down
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ type Config struct {
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"10"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"1s"` // nolint:lll
SkipFailedBlocks bool `long:"skip-failed-blocks" env:"SKIP_FAILED_BLOCKS" description:"Skip failed blocks when submitting to Dune"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
31 changes: 22 additions & 9 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ const (
)

type Config struct {
MaxBatchSize int
MaxConcurrentRequests int
PollInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
}

type Info struct {
Expand All @@ -60,9 +62,9 @@ type Info struct {
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumber int64
Error error
Timestamp time.Time
BlockNumbers string
Error error
}

type ingester struct {
Expand All @@ -73,16 +75,27 @@ type ingester struct {
info Info
}

func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester {
func New(
log *slog.Logger,
node jsonrpc.BlockchainClient,
dune duneapi.BlockchainIngester,
cfg Config,
progress *models.BlockchainIndexProgress,
) Ingester {
info := Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
}
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
}
ing := &ingester{
log: log.With("module", "ingester"),
node: node,
dune: dune,
cfg: cfg,
info: Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
},
info: info,
}
if ing.cfg.PollInterval == 0 {
ing.cfg.PollInterval = defaultPollInterval
Expand Down
151 changes: 105 additions & 46 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"golang.org/x/sync/errgroup"
)

const maxBatchSize = 100

// Run fetches blocks from a node RPC and sends them in order to the Dune API.
//
// ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune
Expand All @@ -21,15 +23,22 @@ import (
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errGroup, ctx := errgroup.WithContext(ctx)

blockNumbers := make(chan int64)
defer close(blockNumbers)
blocks := make(chan models.RPCBlock)

// We buffer the block channel so that RPC requests can be made concurrently with sending blocks to Dune.
// We limit the buffer size to the same number of concurrent requests, so we exert some backpressure.
blocks := make(chan models.RPCBlock, maxBatchSize)
defer close(blocks)

// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxBatchSize {
if i.cfg.MaxConcurrentRequests <= 0 {
return errors.Errorf("MaxConcurrentRequests must be > 0")
}
for range i.cfg.MaxConcurrentRequests {
errGroup.Go(func() error {
return i.FetchBlockLoop(ctx, blockNumbers, blocks)
})
Expand All @@ -44,11 +53,10 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount
i.log.Info("Starting ingester",
"max_batch_size", i.cfg.MaxBatchSize,
"run_forever", maxCount <= 0,
"start_block_number", startBlockNumber,
"end_block_number", endBlockNumber,
"batch_size", i.cfg.MaxBatchSize,
"runForever", maxCount <= 0,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxConcurrency", i.cfg.MaxConcurrentRequests,
)

// Produce block numbers in the main goroutine
Expand Down Expand Up @@ -131,57 +139,81 @@ func (i *ingester) FetchBlockLoop(
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
i.log.Error("Failed to get block by number",
"blockNumber", blockNumber,
"continueing", i.cfg.SkipFailedBlocks,
"elapsed", time.Since(startTime),
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should we sleep (backoff) here?
if !i.cfg.SkipFailedBlocks {
return err
}
blocks <- models.RPCBlock{BlockNumber: blockNumber, Error: err}
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
startTime = time.Now()
select {
case <-ctx.Done():
i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime))
i.log.Info(
"FetchBlockLoop: Got and sent block",
"blockNumber", blockNumber,
"getBlockElapsed", getBlockElapsed,
)
}
}
}
}

// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error {
i.log.Info("SendBlocks: Starting to receive blocks")
blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error {
// Buffer for temporarily storing blocks that have arrived out of order
collectedBlocks := make(map[int64]models.RPCBlock)
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval)
defer batchTimer.Stop()

i.log.Info("SendBlocks: Starting to receive blocks")
for {
// Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed).
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocksCh:
case block, ok := <-blocks:
if !ok {
i.log.Info("SendBlocks: Channel is closed, returning")
return nil
}

blocks[block.BlockNumber] = block
i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks))
if block.Errored() {
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumbers: fmt.Sprintf("%d", block.BlockNumber),
Error: block.Error,
})

i.log.Info("Received FAILED block", "number", block.BlockNumber)
}

nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
collectedBlocks[block.BlockNumber] = block
i.log.Info(
"SendBlocks: Received block",
"blockNumber", block.BlockNumber,
"bufferSize", len(collectedBlocks),
)
case <-batchTimer.C:
var err error
nextNumberToSend, err = i.trySendCompletedBlocks(ctx, collectedBlocks, nextNumberToSend)
if err != nil {
return errors.Errorf("send blocks: %w", err)
}
i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
}
}
}
Expand All @@ -191,32 +223,59 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
// We return the next numberToSend such that the caller can continue from there.
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
blocks map[int64]models.RPCBlock,
collectedBlocks map[int64]models.RPCBlock,
nextNumberToSend int64,
) int64 {
// Send this block only if we have sent all previous blocks
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil {
) (int64, error) {
// Outer loop: We might need to send multiple batch requests if our buffer is too big
for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] {
// Collect a blocks of blocks to send, only send those which are in order
// Collect a batch to send, only send those which are in order
blockBatch := make([]models.RPCBlock, 0, maxBatchSize)
for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] {
// Skip Failed block if we're configured to skip Failed blocks
if i.cfg.SkipFailedBlocks && block.Errored() {
nextNumberToSend++
continue
}

blockBatch = append(blockBatch, block)
delete(collectedBlocks, nextNumberToSend)
nextNumberToSend++

if len(blockBatch) == maxBatchSize {
break
}
}

if len(blockBatch) == 0 {
return nextNumberToSend, nil
}

// Send the batch
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber
if lastBlockNumber != nextNumberToSend-1 {
panic("unexpected last block number")
}
if err := i.dune.SendBlocks(ctx, blockBatch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
return nextNumberToSend, nil
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries
err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextNumberToSend, err
}
// We've sent block N, so increment the pointer
delete(blocks, nextNumberToSend)
nextNumberToSend++
i.log.Info(
"SendBlocks: Sent batch, updating latest ingested block number",
"blockNumberFirst", blockBatch[0].BlockNumber,
"blockNumberLast", lastBlockNumber,
"batchSize", len(blockBatch),
)
atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber)
}
return nextNumberToSend

return nextNumberToSend, nil
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down
Loading

0 comments on commit a4ba018

Please sign in to comment.