Skip to content

Commit

Permalink
Always send a batch when possible, even if it's small
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jul 1, 2024
1 parent 2f77329 commit 55e8dae
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 74 deletions.
70 changes: 0 additions & 70 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,76 +190,6 @@ func TestSendBlocks(t *testing.T) {
require.Equal(t, int64(10), sentBlockNumber)
}

func TestRunBlocksUseBatching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) == 0 {
return nil
}

// Fail if we're not sending a batch of blocks
require.Greater(t, len(blocks), 1)

next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber)
next++
}

lastBlockNumber := blocks[len(blocks)-1].BlockNumber
atomic.StoreInt64(&sentBlockNumber, lastBlockNumber)
if lastBlockNumber >= maxBlockNumber {
// cancel execution when we have sent the last block
cancel()
return context.Canceled
}

return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return maxBlockNumber + 1, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
// Get blocks out of order by sleeping for a random amount of time
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil
},
CloseFunc: func() error {
return nil
},
}
// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(
slog.New(slog.NewTextHandler(logOutput, nil)),
rpcClient,
duneapi,
ingester.Config{
MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines
// big enough compared to the time spent in block by number to ensure batching. We panic
// in the mocked Dune client if we don't get a batch of blocks (more than one block).
BlockSubmitInterval: 50 * time.Millisecond,
SkipFailedBlocks: false,
},
nil, // progress
)

err := ing.Run(ctx, 1, -1) // run until canceled
require.ErrorIs(t, err, context.Canceled) // this is expected
require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber)
}

// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunBlocksOutOfOrder(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ func (i *ingester) trySendCompletedBlocks(
nextBlockToSend int64,
) (int64, error) {
for {
if len(collectedBlocks) < maxBatchSize/10 {
// if we have very little to send, wait for next tick to avoid tiny batches impacting throughput
return nextBlockToSend, nil
}
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize)
if err != nil || nextBlock == nextBlockToSend {
return nextBlock, err
Expand Down

0 comments on commit 55e8dae

Please sign in to comment.