diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 3420d2a..fe6aea1 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -41,13 +41,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er return i.ReportProgress(ctx) }) - if err := errGroup.Wait(); err != nil && err != errFinishedConsumeBlocks { + if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks { return err } return nil } -var errFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") +var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") // ConsumeBlocks from the NPC Node func (i *ingester) ConsumeBlocks( @@ -57,6 +57,7 @@ func (i *ingester) ConsumeBlocks( latestBlockNumber := i.tryUpdateLatestBlockNumber() waitForBlock := func(blockNumber, latestBlockNumber int64) int64 { + // TODO: handle cancellation here for blockNumber > latestBlockNumber { i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval), "blockNumber", blockNumber, @@ -94,7 +95,7 @@ func (i *ingester) ConsumeBlocks( select { case <-ctx.Done(): - return nil + return ctx.Err() case outChan <- block: } @@ -110,7 +111,7 @@ func (i *ingester) ConsumeBlocks( ) } } - return errFinishedConsumeBlocks + return ErrFinishedConsumeBlocks // FIXME: this is wrong } func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index c7a1f19..3d8fc96 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -4,7 +4,6 @@ import ( "context" "io" "log/slog" - "sync" "sync/atomic" "testing" "time" @@ -13,19 +12,75 @@ import ( duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi" jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc" "github.com/duneanalytics/blockchain-ingester/models" + "github.com/go-errors/errors" "github.com/stretchr/testify/require" ) -func TestBlockConsumptionLoop(t *testing.T) { - testcases := []string{ - "we're up to date, following the head", - "we're erroring systematically, the RPC node is broken, all API calls are failing", - "we're erroring only on GetBlockByNumber, a specific jsonRPC on the RPC node is broken", +func TestBlockConsumptionLoopErrors(t *testing.T) { + testcases := []struct { + name string + LatestIsBroken bool + BlockByNumberIsBroken bool + }{ + { + name: "we're up to date, following the head", + LatestIsBroken: false, + BlockByNumberIsBroken: false, + }, + { + name: "the RPC node is broken, all API calls are failing", + LatestIsBroken: true, + BlockByNumberIsBroken: true, + }, + { + name: "BlockByNumber, a specific jsonRPC on the RPC node is broken", + LatestIsBroken: false, + BlockByNumberIsBroken: true, + }, } - for _, testcase := range testcases { - t.Run(testcase, func(t *testing.T) { - t.Skip("not implemented") + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + if tc.LatestIsBroken { + t.Skip("latest block number is broken, we don't behave correctly yet") + } + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(100) + producedBlockNumber := int64(0) + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + if tc.LatestIsBroken { + return 0, errors.New("latest block number is broken") + } + return maxBlockNumber, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + if tc.BlockByNumberIsBroken { + return models.RPCBlock{}, errors.New("block by number is broken") + } + if blockNumber > maxBlockNumber { + // end tests + cancel() + } + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + } + ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + + outCh := make(chan models.RPCBlock, maxBlockNumber+1) + defer close(outCh) + err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber) + require.Error(t, err) // this is expected + if tc.BlockByNumberIsBroken { + require.Equal(t, producedBlockNumber, int64(0)) + } }) } } @@ -78,8 +133,6 @@ func TestRunLoopBaseCase(t *testing.T) { PollInterval: 1000 * time.Millisecond, }) - var wg sync.WaitGroup - wg.Add(1) err := ing.Run(context.Background(), 0, tc.i) require.NoError(t, err) require.Equal(t, producedBlockNumber, tc.i) @@ -121,7 +174,7 @@ func TestRunLoopUntilCancel(t *testing.T) { }) err := ing.Run(ctx, 0, maxBlockNumber) - require.NoError(t, err) + require.ErrorIs(t, err, context.Canceled) require.Equal(t, producedBlockNumber, maxBlockNumber) require.Equal(t, sentBlockNumber, maxBlockNumber) }