Skip to content

Commit

Permalink
improve some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
msf committed Jun 27, 2024
1 parent 6730e76 commit 7ff628a
Showing 1 changed file with 77 additions and 21 deletions.
98 changes: 77 additions & 21 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ingester_test

import (
"context"
"fmt"
"io"
"log/slog"
"math/rand"
Expand Down Expand Up @@ -34,9 +33,7 @@ func TestRunUntilCancel(t *testing.T) {
next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
if block.BlockNumber != next {
panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber))
}
require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber)
next++
}

Expand Down Expand Up @@ -142,9 +139,7 @@ func TestSendBlocks(t *testing.T) {
next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
if block.BlockNumber != next {
panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber))
}
require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber)
next++
}

Expand Down Expand Up @@ -195,9 +190,7 @@ func TestSendBlocks(t *testing.T) {
require.Equal(t, int64(5), sentBlockNumber)
}

// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunBlocksOutOfOrder(t *testing.T) {
func TestRunBlocksUseBatching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
Expand All @@ -209,16 +202,81 @@ func TestRunBlocksOutOfOrder(t *testing.T) {
}

// Fail if we're not sending a batch of blocks
if len(blocks) == 1 {
panic("expected batch of blocks, got 1")
require.Len(t, blocks, 1)

next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber)
next++
}

lastBlockNumber := blocks[len(blocks)-1].BlockNumber
atomic.StoreInt64(&sentBlockNumber, lastBlockNumber)
if lastBlockNumber >= maxBlockNumber {
// cancel execution when we have sent the last block
cancel()
return context.Canceled
}

return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return maxBlockNumber + 1, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
// Get blocks out of order by sleeping for a random amount of time
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil
},
CloseFunc: func() error {
return nil
},
}
// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(
slog.New(slog.NewTextHandler(logOutput, nil)),
rpcClient,
duneapi,
ingester.Config{
MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines
// big enough compared to the time spent in block by number to ensure batching. We panic
// in the mocked Dune client if we don't get a batch of blocks (more than one block).
BlockSubmitInterval: 50 * time.Millisecond,
SkipFailedBlocks: false,
},
nil, // progress
)

err := ing.Run(ctx, 1, -1) // run until canceled
require.ErrorIs(t, err, context.Canceled) // this is expected
require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber)
}

// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunBlocksOutOfOrder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) == 0 {
return nil
}

next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
if block.BlockNumber != next {
panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber))
}
require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber)
next++
}

Expand Down Expand Up @@ -258,7 +316,7 @@ func TestRunBlocksOutOfOrder(t *testing.T) {
rpcClient,
duneapi,
ingester.Config{
MaxConcurrentRequests: 10, // fetch blocks in multiple goroutines
MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines
// big enough compared to the time spent in block by number to ensure batching. We panic
// in the mocked Dune client if we don't get a batch of blocks (more than one block).
BlockSubmitInterval: 50 * time.Millisecond,
Expand All @@ -277,11 +335,10 @@ func TestRunRPCNodeFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxBlockNumber := int64(1000)
producedBlockNumber := int64(0)
someRPCError := errors.Errorf("some RPC error")
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error {
return someRPCError
return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
Expand All @@ -291,11 +348,10 @@ func TestRunRPCNodeFails(t *testing.T) {
LatestBlockNumberFunc: func() (int64, error) {
return maxBlockNumber + 1, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
BlockByNumberFunc: func(_ context.Context, _ int64) (models.RPCBlock, error) {
// Get blocks out of order by sleeping for a random amount of time
time.Sleep(time.Duration(rand.Intn(10)) * time.Nanosecond)
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil
return models.RPCBlock{}, someRPCError
},
CloseFunc: func() error {
return nil
Expand Down

0 comments on commit 7ff628a

Please sign in to comment.