From e2b19de053383b30eff0d380d17fcf962b749f37 Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Wed, 5 Jun 2024 12:18:19 +0100 Subject: [PATCH 1/5] add mocks for tests --- Makefile | 9 ++- mocks/duneapi/client.go | 76 +++++++++++++++++++ mocks/jsonrpc/rpcnode.go | 157 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 mocks/duneapi/client.go create mode 100644 mocks/jsonrpc/rpcnode.go diff --git a/Makefile b/Makefile index 812c458..aba82aa 100644 --- a/Makefile +++ b/Makefile @@ -8,9 +8,11 @@ TEST_TIMEOUT := 10s all: lint test build -setup: bin/golangci-lint bin/gofumpt +setup: bin/golangci-lint bin/gofumpt bin/moq go mod download +bin/moq: + GOBIN=$(PWD)/bin go install github.com/matryer/moq@v0.3.4 bin/golangci-lint: GOBIN=$(PWD)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.59.0 bin/gofumpt: bin @@ -30,6 +32,11 @@ test: go mod tidy go test -timeout=$(TEST_TIMEOUT) -race -bench=. -benchmem -cover ./... +gen-mocks: bin/moq ./client/jsonrpc/ ./client/duneapi/ + ./bin/moq -pkg jsonrpc_mock -out ./mocks/jsonrpc/rpcnode.go ./client/jsonrpc BlockchainClient + ./bin/moq -pkg duneapi_mock -out ./mocks/duneapi/client.go ./client/duneapi BlockchainIngester + + image-build: @echo "# Building ingester docker image..." docker build -t $(APPLICATION) -f Dockerfile --build-arg GITHUB_TOKEN=${GITHUB_TOKEN} . diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go new file mode 100644 index 0000000..40d114c --- /dev/null +++ b/mocks/duneapi/client.go @@ -0,0 +1,76 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package duneapi_mock + +import ( + "github.com/duneanalytics/blockchain-ingester/client/duneapi" + "github.com/duneanalytics/blockchain-ingester/models" + "sync" +) + +// Ensure, that BlockchainIngesterMock does implement duneapi.BlockchainIngester. +// If this is not the case, regenerate this file with moq. +var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} + +// BlockchainIngesterMock is a mock implementation of duneapi.BlockchainIngester. +// +// func TestSomethingThatUsesBlockchainIngester(t *testing.T) { +// +// // make and configure a mocked duneapi.BlockchainIngester +// mockedBlockchainIngester := &BlockchainIngesterMock{ +// SendBlockFunc: func(payload models.RPCBlock) error { +// panic("mock out the SendBlock method") +// }, +// } +// +// // use mockedBlockchainIngester in code that requires duneapi.BlockchainIngester +// // and then make assertions. +// +// } +type BlockchainIngesterMock struct { + // SendBlockFunc mocks the SendBlock method. + SendBlockFunc func(payload models.RPCBlock) error + + // calls tracks calls to the methods. + calls struct { + // SendBlock holds details about calls to the SendBlock method. + SendBlock []struct { + // Payload is the payload argument value. + Payload models.RPCBlock + } + } + lockSendBlock sync.RWMutex +} + +// SendBlock calls SendBlockFunc. +func (mock *BlockchainIngesterMock) SendBlock(payload models.RPCBlock) error { + if mock.SendBlockFunc == nil { + panic("BlockchainIngesterMock.SendBlockFunc: method is nil but BlockchainIngester.SendBlock was just called") + } + callInfo := struct { + Payload models.RPCBlock + }{ + Payload: payload, + } + mock.lockSendBlock.Lock() + mock.calls.SendBlock = append(mock.calls.SendBlock, callInfo) + mock.lockSendBlock.Unlock() + return mock.SendBlockFunc(payload) +} + +// SendBlockCalls gets all the calls that were made to SendBlock. +// Check the length with: +// +// len(mockedBlockchainIngester.SendBlockCalls()) +func (mock *BlockchainIngesterMock) SendBlockCalls() []struct { + Payload models.RPCBlock +} { + var calls []struct { + Payload models.RPCBlock + } + mock.lockSendBlock.RLock() + calls = mock.calls.SendBlock + mock.lockSendBlock.RUnlock() + return calls +} diff --git a/mocks/jsonrpc/rpcnode.go b/mocks/jsonrpc/rpcnode.go new file mode 100644 index 0000000..abe7cb6 --- /dev/null +++ b/mocks/jsonrpc/rpcnode.go @@ -0,0 +1,157 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package jsonrpc_mock + +import ( + "context" + "github.com/duneanalytics/blockchain-ingester/client/jsonrpc" + "github.com/duneanalytics/blockchain-ingester/models" + "sync" +) + +// Ensure, that BlockchainClientMock does implement jsonrpc.BlockchainClient. +// If this is not the case, regenerate this file with moq. +var _ jsonrpc.BlockchainClient = &BlockchainClientMock{} + +// BlockchainClientMock is a mock implementation of jsonrpc.BlockchainClient. +// +// func TestSomethingThatUsesBlockchainClient(t *testing.T) { +// +// // make and configure a mocked jsonrpc.BlockchainClient +// mockedBlockchainClient := &BlockchainClientMock{ +// BlockByNumberFunc: func(ctx context.Context, blockNumber int64) (models.RPCBlock, error) { +// panic("mock out the BlockByNumber method") +// }, +// CloseFunc: func() error { +// panic("mock out the Close method") +// }, +// LatestBlockNumberFunc: func() (int64, error) { +// panic("mock out the LatestBlockNumber method") +// }, +// } +// +// // use mockedBlockchainClient in code that requires jsonrpc.BlockchainClient +// // and then make assertions. +// +// } +type BlockchainClientMock struct { + // BlockByNumberFunc mocks the BlockByNumber method. + BlockByNumberFunc func(ctx context.Context, blockNumber int64) (models.RPCBlock, error) + + // CloseFunc mocks the Close method. + CloseFunc func() error + + // LatestBlockNumberFunc mocks the LatestBlockNumber method. + LatestBlockNumberFunc func() (int64, error) + + // calls tracks calls to the methods. + calls struct { + // BlockByNumber holds details about calls to the BlockByNumber method. + BlockByNumber []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // BlockNumber is the blockNumber argument value. + BlockNumber int64 + } + // Close holds details about calls to the Close method. + Close []struct { + } + // LatestBlockNumber holds details about calls to the LatestBlockNumber method. + LatestBlockNumber []struct { + } + } + lockBlockByNumber sync.RWMutex + lockClose sync.RWMutex + lockLatestBlockNumber sync.RWMutex +} + +// BlockByNumber calls BlockByNumberFunc. +func (mock *BlockchainClientMock) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) { + if mock.BlockByNumberFunc == nil { + panic("BlockchainClientMock.BlockByNumberFunc: method is nil but BlockchainClient.BlockByNumber was just called") + } + callInfo := struct { + Ctx context.Context + BlockNumber int64 + }{ + Ctx: ctx, + BlockNumber: blockNumber, + } + mock.lockBlockByNumber.Lock() + mock.calls.BlockByNumber = append(mock.calls.BlockByNumber, callInfo) + mock.lockBlockByNumber.Unlock() + return mock.BlockByNumberFunc(ctx, blockNumber) +} + +// BlockByNumberCalls gets all the calls that were made to BlockByNumber. +// Check the length with: +// +// len(mockedBlockchainClient.BlockByNumberCalls()) +func (mock *BlockchainClientMock) BlockByNumberCalls() []struct { + Ctx context.Context + BlockNumber int64 +} { + var calls []struct { + Ctx context.Context + BlockNumber int64 + } + mock.lockBlockByNumber.RLock() + calls = mock.calls.BlockByNumber + mock.lockBlockByNumber.RUnlock() + return calls +} + +// Close calls CloseFunc. +func (mock *BlockchainClientMock) Close() error { + if mock.CloseFunc == nil { + panic("BlockchainClientMock.CloseFunc: method is nil but BlockchainClient.Close was just called") + } + callInfo := struct { + }{} + mock.lockClose.Lock() + mock.calls.Close = append(mock.calls.Close, callInfo) + mock.lockClose.Unlock() + return mock.CloseFunc() +} + +// CloseCalls gets all the calls that were made to Close. +// Check the length with: +// +// len(mockedBlockchainClient.CloseCalls()) +func (mock *BlockchainClientMock) CloseCalls() []struct { +} { + var calls []struct { + } + mock.lockClose.RLock() + calls = mock.calls.Close + mock.lockClose.RUnlock() + return calls +} + +// LatestBlockNumber calls LatestBlockNumberFunc. +func (mock *BlockchainClientMock) LatestBlockNumber() (int64, error) { + if mock.LatestBlockNumberFunc == nil { + panic("BlockchainClientMock.LatestBlockNumberFunc: method is nil but BlockchainClient.LatestBlockNumber was just called") + } + callInfo := struct { + }{} + mock.lockLatestBlockNumber.Lock() + mock.calls.LatestBlockNumber = append(mock.calls.LatestBlockNumber, callInfo) + mock.lockLatestBlockNumber.Unlock() + return mock.LatestBlockNumberFunc() +} + +// LatestBlockNumberCalls gets all the calls that were made to LatestBlockNumber. +// Check the length with: +// +// len(mockedBlockchainClient.LatestBlockNumberCalls()) +func (mock *BlockchainClientMock) LatestBlockNumberCalls() []struct { +} { + var calls []struct { + } + mock.lockLatestBlockNumber.RLock() + calls = mock.calls.LatestBlockNumber + mock.lockLatestBlockNumber.RUnlock() + return calls +} From 0dd64475daeba230fb2e1c002d80d4b8ca50b0e0 Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Wed, 5 Jun 2024 12:27:17 +0100 Subject: [PATCH 2/5] start working on the test --- ingester/mainloop_test.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index a96d103..6a57de6 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -3,13 +3,33 @@ package ingester_test import "testing" func TestBlockConsumptionLoop(t *testing.T) { - t.Skip("not implemented") + testcases := []string{ + "we're very behind, trying to catch up", + "we're up to date, following the head", + "we're erroring systematically, the RPC node is broken, all API calls are failing", + "we're erroring only on GetBlockByNumber, a specific jsonRPC on the RPC node is broken", + } + + for _, testcase := range testcases { + t.Run(testcase, func(t *testing.T) { + t.Skip("not implemented") + }) + } } func TestBlockSendingLoop(t *testing.T) { - t.Skip("not implemented") + testcases := []string{ + "we're up to date, following the head", + "we're failing intermittently, the Dune API is broken", + "we're erroring systematically, the Dune API is down", + } + for _, testcase := range testcases { + t.Run(testcase, func(t *testing.T) { + t.Skip("not implemented") + }) + } } -func TestRunLoop(t *testing.T) { +func TestRunLoopHappyCase(t *testing.T) { t.Skip("not implemented") } From f8374888bac0277489a74c1b2bca9d537a70be90 Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Wed, 5 Jun 2024 15:47:49 +0100 Subject: [PATCH 3/5] add testify dependency --- go.mod | 4 ++++ go.sum | 10 ++++++++++ 2 files changed, 14 insertions(+) diff --git a/go.mod b/go.mod index 72e0af7..c506fe2 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,14 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/jessevdk/go-flags v1.5.0 github.com/klauspost/compress v1.17.8 + github.com/stretchr/testify v1.9.0 golang.org/x/sync v0.7.0 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.20.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 08c0ff4..7ee828f 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= @@ -16,8 +18,16 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 055d88b80f49b126947f231b274b1affc8db1ad5 Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Wed, 5 Jun 2024 16:06:22 +0100 Subject: [PATCH 4/5] mainloop basic tests pass --- cmd/main.go | 10 ++-- ingester/ingester.go | 9 ++-- ingester/mainloop.go | 24 ++++----- ingester/mainloop_test.go | 100 ++++++++++++++++++++++++++++++++++++-- 4 files changed, 120 insertions(+), 23 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index f873a3c..ec97a66 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -64,13 +64,17 @@ func main() { rpcClient, duneClient, ingester.Config{ - PollInterval: cfg.PollInterval, - StartBlockHeight: cfg.BlockHeight, + PollInterval: cfg.PollInterval, + MaxBatchSize: 1, }, ) wg.Add(1) - ingester.Run(context.Background(), &wg) + go func() { + defer wg.Done() + err := ingester.Run(context.Background(), cfg.BlockHeight, 0 /* maxCount */) + logger.Info("Ingester finished", "err", err) + }() // TODO: add a metrics exporter or healthcheck http endpoint ? diff --git a/ingester/ingester.go b/ingester/ingester.go index 97d807f..462ad0d 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -3,7 +3,6 @@ package ingester import ( "context" "log/slog" - "sync" "time" "github.com/duneanalytics/blockchain-ingester/client/duneapi" @@ -12,7 +11,8 @@ import ( ) type Ingester interface { - Run(ctx context.Context, wg *sync.WaitGroup) error + // Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested + Run(ctx context.Context, startBlockNumber, maxCount int64) error // ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain @@ -33,9 +33,8 @@ type Ingester interface { const defaultMaxBatchSize = 1 type Config struct { - MaxBatchSize int - StartBlockHeight int64 - PollInterval time.Duration + MaxBatchSize int + PollInterval time.Duration } type Info struct { diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 5d9b1e5..3420d2a 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -3,7 +3,6 @@ package ingester import ( "context" "fmt" - "sync" "sync/atomic" "time" @@ -12,30 +11,28 @@ import ( "golang.org/x/sync/errgroup" ) -func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error { - defer wg.Done() - +func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error { inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) defer close(inFlightChan) var err error - startBlockNumber := i.cfg.StartBlockHeight - if startBlockNumber <= 0 { + if startBlockNumber < 0 { startBlockNumber, err = i.node.LatestBlockNumber() if err != nil { return errors.Errorf("failed to get latest block number: %w", err) } } + i.log.Info("Starting ingester", "maxBatchSize", i.cfg.MaxBatchSize, - "startBlockHeight", i.cfg.StartBlockHeight, "startBlockNumber", startBlockNumber, + "maxCount", maxCount, ) errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { - return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, -1) + return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount) }) errGroup.Go(func() error { return i.SendBlocks(ctx, inFlightChan) @@ -44,9 +41,14 @@ func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error { return i.ReportProgress(ctx) }) - return errGroup.Wait() + if err := errGroup.Wait(); err != nil && err != errFinishedConsumeBlocks { + return err + } + return nil } +var errFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") + // ConsumeBlocks from the NPC Node func (i *ingester) ConsumeBlocks( ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, @@ -66,7 +68,7 @@ func (i *ingester) ConsumeBlocks( return latestBlockNumber } - for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ { + for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber) startTime := time.Now() @@ -108,7 +110,7 @@ func (i *ingester) ConsumeBlocks( ) } } - return nil + return errFinishedConsumeBlocks } func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 6a57de6..c7a1f19 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -1,10 +1,23 @@ package ingester_test -import "testing" +import ( + "context" + "io" + "log/slog" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/duneanalytics/blockchain-ingester/ingester" + duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi" + jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc" + "github.com/duneanalytics/blockchain-ingester/models" + "github.com/stretchr/testify/require" +) func TestBlockConsumptionLoop(t *testing.T) { testcases := []string{ - "we're very behind, trying to catch up", "we're up to date, following the head", "we're erroring systematically, the RPC node is broken, all API calls are failing", "we're erroring only on GetBlockByNumber, a specific jsonRPC on the RPC node is broken", @@ -30,6 +43,85 @@ func TestBlockSendingLoop(t *testing.T) { } } -func TestRunLoopHappyCase(t *testing.T) { - t.Skip("not implemented") +func TestRunLoopBaseCase(t *testing.T) { + testCases := []struct { + name string + i int64 + }{ + {name: "1 block", i: 1}, + {name: "100 blocks", i: 100}, + } + sentBlockNumber := int64(0) + producedBlockNumber := int64(0) + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(block models.RPCBlock) error { + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return 1000, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + + var wg sync.WaitGroup + wg.Add(1) + err := ing.Run(context.Background(), 0, tc.i) + require.NoError(t, err) + require.Equal(t, producedBlockNumber, tc.i) + require.Equal(t, sentBlockNumber, tc.i) + }) + } +} + +func TestRunLoopUntilCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(1000) + sentBlockNumber := int64(0) + producedBlockNumber := int64(0) + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(block models.RPCBlock) error { + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + if block.BlockNumber == maxBlockNumber { + // cancel execution when we send the last block + cancel() + } + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + } + ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + + err := ing.Run(ctx, 0, maxBlockNumber) + require.NoError(t, err) + require.Equal(t, producedBlockNumber, maxBlockNumber) + require.Equal(t, sentBlockNumber, maxBlockNumber) } From a9f12557d83f7de0814193c42b88842cdc1c386a Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Wed, 5 Jun 2024 16:34:50 +0100 Subject: [PATCH 5/5] start working on better tests around expected faults --- ingester/mainloop.go | 9 +++-- ingester/mainloop_test.go | 77 +++++++++++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 3420d2a..fe6aea1 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -41,13 +41,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er return i.ReportProgress(ctx) }) - if err := errGroup.Wait(); err != nil && err != errFinishedConsumeBlocks { + if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks { return err } return nil } -var errFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") +var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") // ConsumeBlocks from the NPC Node func (i *ingester) ConsumeBlocks( @@ -57,6 +57,7 @@ func (i *ingester) ConsumeBlocks( latestBlockNumber := i.tryUpdateLatestBlockNumber() waitForBlock := func(blockNumber, latestBlockNumber int64) int64 { + // TODO: handle cancellation here for blockNumber > latestBlockNumber { i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval), "blockNumber", blockNumber, @@ -94,7 +95,7 @@ func (i *ingester) ConsumeBlocks( select { case <-ctx.Done(): - return nil + return ctx.Err() case outChan <- block: } @@ -110,7 +111,7 @@ func (i *ingester) ConsumeBlocks( ) } } - return errFinishedConsumeBlocks + return ErrFinishedConsumeBlocks // FIXME: this is wrong } func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index c7a1f19..3d8fc96 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -4,7 +4,6 @@ import ( "context" "io" "log/slog" - "sync" "sync/atomic" "testing" "time" @@ -13,19 +12,75 @@ import ( duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi" jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc" "github.com/duneanalytics/blockchain-ingester/models" + "github.com/go-errors/errors" "github.com/stretchr/testify/require" ) -func TestBlockConsumptionLoop(t *testing.T) { - testcases := []string{ - "we're up to date, following the head", - "we're erroring systematically, the RPC node is broken, all API calls are failing", - "we're erroring only on GetBlockByNumber, a specific jsonRPC on the RPC node is broken", +func TestBlockConsumptionLoopErrors(t *testing.T) { + testcases := []struct { + name string + LatestIsBroken bool + BlockByNumberIsBroken bool + }{ + { + name: "we're up to date, following the head", + LatestIsBroken: false, + BlockByNumberIsBroken: false, + }, + { + name: "the RPC node is broken, all API calls are failing", + LatestIsBroken: true, + BlockByNumberIsBroken: true, + }, + { + name: "BlockByNumber, a specific jsonRPC on the RPC node is broken", + LatestIsBroken: false, + BlockByNumberIsBroken: true, + }, } - for _, testcase := range testcases { - t.Run(testcase, func(t *testing.T) { - t.Skip("not implemented") + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + if tc.LatestIsBroken { + t.Skip("latest block number is broken, we don't behave correctly yet") + } + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(100) + producedBlockNumber := int64(0) + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + if tc.LatestIsBroken { + return 0, errors.New("latest block number is broken") + } + return maxBlockNumber, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + if tc.BlockByNumberIsBroken { + return models.RPCBlock{}, errors.New("block by number is broken") + } + if blockNumber > maxBlockNumber { + // end tests + cancel() + } + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + } + ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + + outCh := make(chan models.RPCBlock, maxBlockNumber+1) + defer close(outCh) + err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber) + require.Error(t, err) // this is expected + if tc.BlockByNumberIsBroken { + require.Equal(t, producedBlockNumber, int64(0)) + } }) } } @@ -78,8 +133,6 @@ func TestRunLoopBaseCase(t *testing.T) { PollInterval: 1000 * time.Millisecond, }) - var wg sync.WaitGroup - wg.Add(1) err := ing.Run(context.Background(), 0, tc.i) require.NoError(t, err) require.Equal(t, producedBlockNumber, tc.i) @@ -121,7 +174,7 @@ func TestRunLoopUntilCancel(t *testing.T) { }) err := ing.Run(ctx, 0, maxBlockNumber) - require.NoError(t, err) + require.ErrorIs(t, err, context.Canceled) require.Equal(t, producedBlockNumber, maxBlockNumber) require.Equal(t, sentBlockNumber, maxBlockNumber) }