From d83f853fa1ba65b54436a0db694c500664313773 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Mon, 24 Jun 2024 13:13:19 +0200 Subject: [PATCH] Some improvements in mainloop.go (#34) Implements some useful suggestion from @msf https://github.com/duneanalytics/node-indexer/pull/32 - `ConsumeBlocks` -> `FetchBlockLoop` - Extract `trySendCompletedBlocks` from `SendBlocks` --- config/config.go | 4 +- ingester/ingester.go | 7 +-- ingester/mainloop.go | 91 +++++++++++++++++++++++---------------- ingester/mainloop_test.go | 10 ++--- 4 files changed, 61 insertions(+), 51 deletions(-) diff --git a/config/config.go b/config/config.go index b0641e6..b0f71ce 100644 --- a/config/config.go +++ b/config/config.go @@ -39,8 +39,8 @@ type Config struct { PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index 216464b..fbf3061 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -19,10 +19,10 @@ type Ingester interface { // it will run continuously until the context is cancelled ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error - // ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel. + // FetchBlockLoop fetches blocks sent on the channel and sends them on the other channel. // It will run continuously until the context is cancelled, or the channel is closed. // It can safely be run concurrently. - ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error + FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error // SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop // it will block until: @@ -84,9 +84,6 @@ func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.Blockchai DuneErrors: []ErrorInfo{}, }, } - if ing.cfg.MaxBatchSize == 0 { - ing.cfg.MaxBatchSize = defaultMaxBatchSize - } if ing.cfg.PollInterval == 0 { ing.cfg.PollInterval = defaultPollInterval } diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 74d0f3d..ee7a2bd 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -13,10 +13,10 @@ import ( // Run fetches blocks from a node RPC and sends them in order to the Dune API. // -// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune +// ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune // // We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers), -// and each concurrent ConsumeBlock goroutine gets a block number from that channel. +// and each concurrent FetchBlockLoop goroutine gets a block number from that channel. // The SendBlocks goroutine receives all blocks on an unbuffered channel, // but buffers them in a map until they can be sent in order. func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { @@ -31,7 +31,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int // Start MaxBatchSize goroutines to consume blocks concurrently for range i.cfg.MaxBatchSize { errGroup.Go(func() error { - return i.ConsumeBlocks(ctx, blockNumbers, blocks) + return i.FetchBlockLoop(ctx, blockNumbers, blocks) }) } errGroup.Go(func() error { @@ -60,9 +60,9 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int return errGroup.Wait() } -var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") +var ErrFinishedFetchBlockLoop = errors.New("finished FetchBlockLoop") -// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks +// ProduceBlockNumbers to be consumed by multiple goroutines running FetchBlockLoop func (i *ingester) ProduceBlockNumbers( ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64, ) error { @@ -109,17 +109,17 @@ func (i *ingester) ProduceBlockNumbers( } } i.log.Info("Finished producing block numbers") - return ErrFinishedConsumeBlocks + return ErrFinishedFetchBlockLoop } -// ConsumeBlocks from the RPC node. This can be run in multiple goroutines to parallelize block fetching. -func (i *ingester) ConsumeBlocks( +// FetchBlockLoop from the RPC node. This can be run in multiple goroutines to parallelize block fetching. +func (i *ingester) FetchBlockLoop( ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock, ) error { for { select { case <-ctx.Done(): - i.log.Info("ConsumeBlocks: context is done") + i.log.Info("FetchBlockLoop: context is done") return ctx.Err() case blockNumber := <-blockNumbers: startTime := time.Now() @@ -127,7 +127,7 @@ func (i *ingester) ConsumeBlocks( block, err := i.node.BlockByNumber(ctx, blockNumber) if err != nil { if errors.Is(err, context.Canceled) { - i.log.Info("ConsumeBlocks: Context canceled, stopping") + i.log.Info("FetchBlockLoop: Context canceled, stopping") return ctx.Err() } @@ -147,23 +147,25 @@ func (i *ingester) ConsumeBlocks( atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) getBlockElapsed := time.Since(startTime) - i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed) + i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed) + startTime = time.Now() select { case <-ctx.Done(): - i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) + i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) return ctx.Err() case blocks <- block: + i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime)) } } } } -// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order. +// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order. // We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune. func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error { i.log.Info("SendBlocks: Starting to receive blocks") - blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order - next := startBlockNumber + blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order + nextNumberToSend := startBlockNumber for { select { case <-ctx.Done(): @@ -175,33 +177,46 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo return nil } - blockMap[block.BlockNumber] = block - - // Send this block only if we have sent all previous blocks - for block, ok := blockMap[next]; ok; block, ok = blockMap[next] { - if err := i.dune.SendBlock(ctx, block); err != nil { - if errors.Is(err, context.Canceled) { - i.log.Info("SendBlocks: Context canceled, stopping") - return ctx.Err() - } - // TODO: implement DeadLetterQueue - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: block.BlockNumber, - Error: err, - }) - } else { - atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) - } + blocks[block.BlockNumber] = block + i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks)) + + nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend) + i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) + } + } +} - // We've sent block N, so increment the pointer - delete(blockMap, next) - next++ +// trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap. +// Once we have sent all blocks, if any, we return with the nextNumberToSend. +// We return the next numberToSend such that the caller can continue from there. +func (i *ingester) trySendCompletedBlocks( + ctx context.Context, + blocks map[int64]models.RPCBlock, + nextNumberToSend int64, +) int64 { + // Send this block only if we have sent all previous blocks + for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] { + if err := i.dune.SendBlock(ctx, block); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("SendBlocks: Context canceled, stopping") + return nextNumberToSend } + // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap + i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: block.BlockNumber, + Error: err, + }) + } else { + i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber) + atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) } + // We've sent block N, so increment the pointer + delete(blocks, nextNumberToSend) + nextNumberToSend++ } + return nextNumberToSend } func (i *ingester) tryUpdateLatestBlockNumber() int64 { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 907c2af..366ae90 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -149,19 +149,17 @@ func TestSendBlocks(t *testing.T) { require.Equal(t, int64(5), sentBlockNumber) } -// TestRunLoopUntilBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order +// TestRunLoopBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order // even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. -func TestRunLoopUntilBlocksOutOfOrder(t *testing.T) { +func TestRunLoopBlocksOutOfOrder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(1000) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { - // DuneAPI must fail if it receives blocks out of order - if block.BlockNumber != sentBlockNumber+1 { - return errors.Errorf("blocks out of order") - } + // Test must fail if DuneAPI receives blocks out of order + require.Equal(t, block.BlockNumber, sentBlockNumber+1) atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) if block.BlockNumber == maxBlockNumber {