Skip to content

Commit

Permalink
move sending side code to send.go
Browse files Browse the repository at this point in the history
  • Loading branch information
msf committed Jun 27, 2024
1 parent d1df9d4 commit 68b10f4
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 129 deletions.
127 changes: 0 additions & 127 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"golang.org/x/sync/errgroup"
)

const maxBatchSize = 100

// Run fetches blocks from a node RPC and sends them in order to the Dune API.
//
// ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune
Expand Down Expand Up @@ -159,131 +157,6 @@ func (i *ingester) FetchBlockLoop(
}
}

// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error {
// Buffer for temporarily storing blocks that have arrived out of order
collectedBlocks := make(map[int64]models.RPCBlock)
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval)
defer batchTimer.Stop()

i.log.Debug("SendBlocks: Starting to receive blocks")
for {
// Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed).
select {
case <-ctx.Done():
i.log.Debug("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocks:
if !ok {
i.log.Debug("SendBlocks: Channel is closed, returning")
return nil
}

if block.Errored() {
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumbers: fmt.Sprintf("%d", block.BlockNumber),
Error: block.Error,
})

i.log.Error("Received FAILED block", "number", block.BlockNumber)
}

collectedBlocks[block.BlockNumber] = block
i.log.Debug(
"SendBlocks: Received block",
"blockNumber", block.BlockNumber,
"bufferSize", len(collectedBlocks),
)
case <-batchTimer.C:
var err error
nextNumberToSend, err = i.trySendCompletedBlocks(ctx, collectedBlocks, nextNumberToSend)
if err != nil {
return errors.Errorf("send blocks: %w", err)
}
}
}
}

// trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap.
// Once we have sent all blocks, if any, we return with the nextNumberToSend.
// We return the next numberToSend such that the caller can continue from there.
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
collectedBlocks map[int64]models.RPCBlock,
nextBlockToSend int64,
) (int64, error) {
for {
if len(collectedBlocks) < maxBatchSize/10 {
// if we have very little extra to send, wait a bit before sending to avoid tiny batches impacting throughput
return nextBlockToSend, nil
}
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize)
if err != nil || nextBlock == nextBlockToSend {
return nextBlock, err
}
nextBlockToSend = nextBlock
}
}

func (i *ingester) trySendBlockBatch(
ctx context.Context,
collectedBlocks map[int64]models.RPCBlock,
nextBlockToSend int64,
maxBatchSize int,
) (int64, error) {
startTime := time.Now()

// Collect a blocks of blocks to send, only send those which are in order
// Collect a batch to send, only send those which are in order
blockBatch := make([]models.RPCBlock, 0, maxBatchSize)
for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] {
// Skip Failed block if we're configured to skip Failed blocks
if i.cfg.SkipFailedBlocks && block.Errored() {
nextBlockToSend++
continue
}

blockBatch = append(blockBatch, block)
delete(collectedBlocks, nextBlockToSend)
nextBlockToSend++

if len(blockBatch) == maxBatchSize {
break
}
}

if len(blockBatch) == 0 {
return nextBlockToSend, nil
}

// Send the batch
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber
if lastBlockNumber != nextBlockToSend-1 {
panic("unexpected last block number")
}
if err := i.dune.SendBlocks(ctx, blockBatch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextBlockToSend, nil
}
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries
err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextBlockToSend, err
}
atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber)
i.log.Info(
"Sent blocks to DuneAPI",
"batchSize", len(blockBatch),
"nextBlockToSend", nextBlockToSend,
"elapsed", time.Since(startTime),
)
return nextBlockToSend, nil
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
latest, err := i.node.LatestBlockNumber()
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func TestSendBlocks(t *testing.T) {
})

// Send blocks except the next block, ensure none are sent to the API
for _, n := range []int64{2, 3, 4, 5, 10} {
// NOTE: this size and maxBatchSize are related, because we optimize for not sending tiny batches
for _, n := range []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 19} {
select {
case <-ctx.Done(): // if error group fails, its context is canceled
require.Fail(t, "context was canceled")
Expand All @@ -187,7 +188,7 @@ func TestSendBlocks(t *testing.T) {
require.NoError(t, group.Wait())

// Ensure the last correct block was sent
require.Equal(t, int64(5), sentBlockNumber)
require.Equal(t, int64(10), sentBlockNumber)
}

func TestRunBlocksUseBatching(t *testing.T) {
Expand Down
138 changes: 138 additions & 0 deletions ingester/send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package ingester

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
"github.com/go-errors/errors"
)

const maxBatchSize = 100

// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error {
// Buffer for temporarily storing blocks that have arrived out of order
collectedBlocks := make(map[int64]models.RPCBlock, maxBatchSize)
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval)
defer batchTimer.Stop()

i.log.Debug("SendBlocks: Starting to receive blocks")
for {
// Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed).
select {
case <-ctx.Done():
i.log.Debug("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocks:
if !ok {
i.log.Debug("SendBlocks: Channel is closed, returning")
return nil
}

if block.Errored() {
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumbers: fmt.Sprintf("%d", block.BlockNumber),
Error: block.Error,
})

i.log.Error("Received FAILED block", "number", block.BlockNumber)
}

collectedBlocks[block.BlockNumber] = block
i.log.Debug(
"SendBlocks: Received block",
"blockNumber", block.BlockNumber,
"bufferSize", len(collectedBlocks),
)
case <-batchTimer.C:
var err error
nextNumberToSend, err = i.trySendCompletedBlocks(ctx, collectedBlocks, nextNumberToSend)
if err != nil {
return errors.Errorf("send blocks: %w", err)
}
}
}
}

// trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap.
// Once we have sent all blocks, if any, we return with the nextNumberToSend.
// We return the next numberToSend such that the caller can continue from there.
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
collectedBlocks map[int64]models.RPCBlock,
nextBlockToSend int64,
) (int64, error) {
for {
if len(collectedBlocks) < maxBatchSize/10 {
// if we have very little to send, wait for next tick to avoid tiny batches impacting throughput
return nextBlockToSend, nil
}
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize)
if err != nil || nextBlock == nextBlockToSend {
return nextBlock, err
}
nextBlockToSend = nextBlock
}
}

func (i *ingester) trySendBlockBatch(
ctx context.Context,
collectedBlocks map[int64]models.RPCBlock,
nextBlockToSend int64,
maxBatchSize int,
) (int64, error) {
startTime := time.Now()

// Collect a blocks of blocks to send, only send those which are in order
// Collect a batch to send, only send those which are in order
blockBatch := make([]models.RPCBlock, 0, maxBatchSize)
for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] {
// Skip Failed block if we're configured to skip Failed blocks
if i.cfg.SkipFailedBlocks && block.Errored() {
nextBlockToSend++
continue
}

blockBatch = append(blockBatch, block)
delete(collectedBlocks, nextBlockToSend)
nextBlockToSend++

if len(blockBatch) == maxBatchSize {
break
}
}

if len(blockBatch) == 0 {
return nextBlockToSend, nil
}

// Send the batch
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber
if lastBlockNumber != nextBlockToSend-1 {
panic("unexpected last block number")
}
if err := i.dune.SendBlocks(ctx, blockBatch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextBlockToSend, nil
}
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries
err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextBlockToSend, err
}
atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber)
i.log.Info(
"Sent blocks to DuneAPI",
"batchSize", len(blockBatch),
"nextBlockToSend", nextBlockToSend,
"elapsed", time.Since(startTime),
)
return nextBlockToSend, nil
}

0 comments on commit 68b10f4

Please sign in to comment.