Skip to content

Commit

Permalink
Some improvements in mainloop.go (#34)
Browse files Browse the repository at this point in the history
Implements some useful suggestion from @msf
#32
- `ConsumeBlocks` -> `FetchBlockLoop`
- Extract `trySendCompletedBlocks` from `SendBlocks`
  • Loading branch information
vegarsti committed Jun 24, 2024
1 parent 6166f19 commit d83f853
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 51 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type Config struct {
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
7 changes: 2 additions & 5 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type Ingester interface {
// it will run continuously until the context is cancelled
ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error

// ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel.
// FetchBlockLoop fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error
FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
Expand Down Expand Up @@ -84,9 +84,6 @@ func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.Blockchai
DuneErrors: []ErrorInfo{},
},
}
if ing.cfg.MaxBatchSize == 0 {
ing.cfg.MaxBatchSize = defaultMaxBatchSize
}
if ing.cfg.PollInterval == 0 {
ing.cfg.PollInterval = defaultPollInterval
}
Expand Down
91 changes: 53 additions & 38 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

// Run fetches blocks from a node RPC and sends them in order to the Dune API.
//
// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune
// ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune
//
// We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers),
// and each concurrent ConsumeBlock goroutine gets a block number from that channel.
// and each concurrent FetchBlockLoop goroutine gets a block number from that channel.
// The SendBlocks goroutine receives all blocks on an unbuffered channel,
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
Expand All @@ -31,7 +31,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxBatchSize {
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, blockNumbers, blocks)
return i.FetchBlockLoop(ctx, blockNumbers, blocks)
})
}
errGroup.Go(func() error {
Expand Down Expand Up @@ -60,9 +60,9 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
return errGroup.Wait()
}

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")
var ErrFinishedFetchBlockLoop = errors.New("finished FetchBlockLoop")

// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks
// ProduceBlockNumbers to be consumed by multiple goroutines running FetchBlockLoop
func (i *ingester) ProduceBlockNumbers(
ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64,
) error {
Expand Down Expand Up @@ -109,25 +109,25 @@ func (i *ingester) ProduceBlockNumbers(
}
}
i.log.Info("Finished producing block numbers")
return ErrFinishedConsumeBlocks
return ErrFinishedFetchBlockLoop
}

// ConsumeBlocks from the RPC node. This can be run in multiple goroutines to parallelize block fetching.
func (i *ingester) ConsumeBlocks(
// FetchBlockLoop from the RPC node. This can be run in multiple goroutines to parallelize block fetching.
func (i *ingester) FetchBlockLoop(
ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock,
) error {
for {
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: context is done")
i.log.Info("FetchBlockLoop: context is done")
return ctx.Err()
case blockNumber := <-blockNumbers:
startTime := time.Now()

block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("ConsumeBlocks: Context canceled, stopping")
i.log.Info("FetchBlockLoop: Context canceled, stopping")
return ctx.Err()
}

Expand All @@ -147,23 +147,25 @@ func (i *ingester) ConsumeBlocks(

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
startTime = time.Now()
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime))
}
}
}
}

// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order.
// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error {
i.log.Info("SendBlocks: Starting to receive blocks")
blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
next := startBlockNumber
blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
nextNumberToSend := startBlockNumber
for {
select {
case <-ctx.Done():
Expand All @@ -175,33 +177,46 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
return nil
}

blockMap[block.BlockNumber] = block

// Send this block only if we have sent all previous blocks
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] {
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}
blocks[block.BlockNumber] = block
i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks))

nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
}
}
}

// We've sent block N, so increment the pointer
delete(blockMap, next)
next++
// trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap.
// Once we have sent all blocks, if any, we return with the nextNumberToSend.
// We return the next numberToSend such that the caller can continue from there.
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
blocks map[int64]models.RPCBlock,
nextNumberToSend int64,
) int64 {
// Send this block only if we have sent all previous blocks
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}
// We've sent block N, so increment the pointer
delete(blocks, nextNumberToSend)
nextNumberToSend++
}
return nextNumberToSend
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down
10 changes: 4 additions & 6 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,17 @@ func TestSendBlocks(t *testing.T) {
require.Equal(t, int64(5), sentBlockNumber)
}

// TestRunLoopUntilBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// TestRunLoopBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunLoopUntilBlocksOutOfOrder(t *testing.T) {
func TestRunLoopBlocksOutOfOrder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
// DuneAPI must fail if it receives blocks out of order
if block.BlockNumber != sentBlockNumber+1 {
return errors.Errorf("blocks out of order")
}
// Test must fail if DuneAPI receives blocks out of order
require.Equal(t, block.BlockNumber, sentBlockNumber+1)

atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
Expand Down

0 comments on commit d83f853

Please sign in to comment.