Skip to content

Commit

Permalink
start working on better tests around expected faults
Browse files Browse the repository at this point in the history
  • Loading branch information
msf committed Jun 6, 2024
1 parent 055d88b commit a9f1255
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 16 deletions.
9 changes: 5 additions & 4 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er
return i.ReportProgress(ctx)
})

if err := errGroup.Wait(); err != nil && err != errFinishedConsumeBlocks {
if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
return err
}
return nil
}

var errFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")
var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")

// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
Expand All @@ -57,6 +57,7 @@ func (i *ingester) ConsumeBlocks(
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(blockNumber, latestBlockNumber int64) int64 {
// TODO: handle cancellation here
for blockNumber > latestBlockNumber {
i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
"blockNumber", blockNumber,
Expand Down Expand Up @@ -94,7 +95,7 @@ func (i *ingester) ConsumeBlocks(

select {
case <-ctx.Done():
return nil
return ctx.Err()
case outChan <- block:
}

Expand All @@ -110,7 +111,7 @@ func (i *ingester) ConsumeBlocks(
)
}
}
return errFinishedConsumeBlocks
return ErrFinishedConsumeBlocks // FIXME: this is wrong
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
Expand Down
77 changes: 65 additions & 12 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -13,19 +12,75 @@ import (
duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi"
jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/go-errors/errors"
"github.com/stretchr/testify/require"
)

func TestBlockConsumptionLoop(t *testing.T) {
testcases := []string{
"we're up to date, following the head",
"we're erroring systematically, the RPC node is broken, all API calls are failing",
"we're erroring only on GetBlockByNumber, a specific jsonRPC on the RPC node is broken",
func TestBlockConsumptionLoopErrors(t *testing.T) {
testcases := []struct {
name string
LatestIsBroken bool
BlockByNumberIsBroken bool
}{
{
name: "we're up to date, following the head",
LatestIsBroken: false,
BlockByNumberIsBroken: false,
},
{
name: "the RPC node is broken, all API calls are failing",
LatestIsBroken: true,
BlockByNumberIsBroken: true,
},
{
name: "BlockByNumber, a specific jsonRPC on the RPC node is broken",
LatestIsBroken: false,
BlockByNumberIsBroken: true,
},
}

for _, testcase := range testcases {
t.Run(testcase, func(t *testing.T) {
t.Skip("not implemented")
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if tc.LatestIsBroken {
t.Skip("latest block number is broken, we don't behave correctly yet")
}
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(100)
producedBlockNumber := int64(0)
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
if tc.LatestIsBroken {
return 0, errors.New("latest block number is broken")
}
return maxBlockNumber, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
if tc.BlockByNumberIsBroken {
return models.RPCBlock{}, errors.New("block by number is broken")
}
if blockNumber > maxBlockNumber {
// end tests
cancel()
}
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: []byte(`block`),
}, nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

outCh := make(chan models.RPCBlock, maxBlockNumber+1)
defer close(outCh)
err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber)
require.Error(t, err) // this is expected
if tc.BlockByNumberIsBroken {
require.Equal(t, producedBlockNumber, int64(0))
}
})
}
}
Expand Down Expand Up @@ -78,8 +133,6 @@ func TestRunLoopBaseCase(t *testing.T) {
PollInterval: 1000 * time.Millisecond,
})

var wg sync.WaitGroup
wg.Add(1)
err := ing.Run(context.Background(), 0, tc.i)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, tc.i)
Expand Down Expand Up @@ -121,7 +174,7 @@ func TestRunLoopUntilCancel(t *testing.T) {
})

err := ing.Run(ctx, 0, maxBlockNumber)
require.NoError(t, err)
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, producedBlockNumber, maxBlockNumber)
require.Equal(t, sentBlockNumber, maxBlockNumber)
}

0 comments on commit a9f1255

Please sign in to comment.