From 055d88b80f49b126947f231b274b1affc8db1ad5 Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Wed, 5 Jun 2024 16:06:22 +0100 Subject: [PATCH] mainloop basic tests pass --- cmd/main.go | 10 ++-- ingester/ingester.go | 9 ++-- ingester/mainloop.go | 24 ++++----- ingester/mainloop_test.go | 100 ++++++++++++++++++++++++++++++++++++-- 4 files changed, 120 insertions(+), 23 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index f873a3c..ec97a66 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -64,13 +64,17 @@ func main() { rpcClient, duneClient, ingester.Config{ - PollInterval: cfg.PollInterval, - StartBlockHeight: cfg.BlockHeight, + PollInterval: cfg.PollInterval, + MaxBatchSize: 1, }, ) wg.Add(1) - ingester.Run(context.Background(), &wg) + go func() { + defer wg.Done() + err := ingester.Run(context.Background(), cfg.BlockHeight, 0 /* maxCount */) + logger.Info("Ingester finished", "err", err) + }() // TODO: add a metrics exporter or healthcheck http endpoint ? diff --git a/ingester/ingester.go b/ingester/ingester.go index 97d807f..462ad0d 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -3,7 +3,6 @@ package ingester import ( "context" "log/slog" - "sync" "time" "github.com/duneanalytics/blockchain-ingester/client/duneapi" @@ -12,7 +11,8 @@ import ( ) type Ingester interface { - Run(ctx context.Context, wg *sync.WaitGroup) error + // Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested + Run(ctx context.Context, startBlockNumber, maxCount int64) error // ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain @@ -33,9 +33,8 @@ type Ingester interface { const defaultMaxBatchSize = 1 type Config struct { - MaxBatchSize int - StartBlockHeight int64 - PollInterval time.Duration + MaxBatchSize int + PollInterval time.Duration } type Info struct { diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 5d9b1e5..3420d2a 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -3,7 +3,6 @@ package ingester import ( "context" "fmt" - "sync" "sync/atomic" "time" @@ -12,30 +11,28 @@ import ( "golang.org/x/sync/errgroup" ) -func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error { - defer wg.Done() - +func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error { inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) defer close(inFlightChan) var err error - startBlockNumber := i.cfg.StartBlockHeight - if startBlockNumber <= 0 { + if startBlockNumber < 0 { startBlockNumber, err = i.node.LatestBlockNumber() if err != nil { return errors.Errorf("failed to get latest block number: %w", err) } } + i.log.Info("Starting ingester", "maxBatchSize", i.cfg.MaxBatchSize, - "startBlockHeight", i.cfg.StartBlockHeight, "startBlockNumber", startBlockNumber, + "maxCount", maxCount, ) errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { - return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, -1) + return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount) }) errGroup.Go(func() error { return i.SendBlocks(ctx, inFlightChan) @@ -44,9 +41,14 @@ func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error { return i.ReportProgress(ctx) }) - return errGroup.Wait() + if err := errGroup.Wait(); err != nil && err != errFinishedConsumeBlocks { + return err + } + return nil } +var errFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") + // ConsumeBlocks from the NPC Node func (i *ingester) ConsumeBlocks( ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, @@ -66,7 +68,7 @@ func (i *ingester) ConsumeBlocks( return latestBlockNumber } - for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ { + for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber) startTime := time.Now() @@ -108,7 +110,7 @@ func (i *ingester) ConsumeBlocks( ) } } - return nil + return errFinishedConsumeBlocks } func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 6a57de6..c7a1f19 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -1,10 +1,23 @@ package ingester_test -import "testing" +import ( + "context" + "io" + "log/slog" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/duneanalytics/blockchain-ingester/ingester" + duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi" + jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc" + "github.com/duneanalytics/blockchain-ingester/models" + "github.com/stretchr/testify/require" +) func TestBlockConsumptionLoop(t *testing.T) { testcases := []string{ - "we're very behind, trying to catch up", "we're up to date, following the head", "we're erroring systematically, the RPC node is broken, all API calls are failing", "we're erroring only on GetBlockByNumber, a specific jsonRPC on the RPC node is broken", @@ -30,6 +43,85 @@ func TestBlockSendingLoop(t *testing.T) { } } -func TestRunLoopHappyCase(t *testing.T) { - t.Skip("not implemented") +func TestRunLoopBaseCase(t *testing.T) { + testCases := []struct { + name string + i int64 + }{ + {name: "1 block", i: 1}, + {name: "100 blocks", i: 100}, + } + sentBlockNumber := int64(0) + producedBlockNumber := int64(0) + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(block models.RPCBlock) error { + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return 1000, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + + var wg sync.WaitGroup + wg.Add(1) + err := ing.Run(context.Background(), 0, tc.i) + require.NoError(t, err) + require.Equal(t, producedBlockNumber, tc.i) + require.Equal(t, sentBlockNumber, tc.i) + }) + } +} + +func TestRunLoopUntilCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(1000) + sentBlockNumber := int64(0) + producedBlockNumber := int64(0) + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(block models.RPCBlock) error { + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + if block.BlockNumber == maxBlockNumber { + // cancel execution when we send the last block + cancel() + } + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + } + ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + + err := ing.Run(ctx, 0, maxBlockNumber) + require.NoError(t, err) + require.Equal(t, producedBlockNumber, maxBlockNumber) + require.Equal(t, sentBlockNumber, maxBlockNumber) }