From 1f601695b22fd49c13d594292a7ffa5dd907695b Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 6 Sep 2024 15:45:47 +0300 Subject: [PATCH 01/11] Check data integrity for EVM events --- models/events.go | 15 ++++++++++++ models/events_test.go | 39 +++++++++++++++++++++++++++++-- services/ingestion/engine_test.go | 25 ++++++++++---------- 3 files changed, 65 insertions(+), 14 deletions(-) diff --git a/models/events.go b/models/events.go index 5d1896970..8995e6c3d 100644 --- a/models/events.go +++ b/models/events.go @@ -7,6 +7,8 @@ import ( "github.com/onflow/cadence" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/fvm/evm/events" + evmTypes "github.com/onflow/flow-go/fvm/evm/types" + gethCommon "github.com/onflow/go-ethereum/common" ) // isBlockExecutedEvent checks whether the given event contains block executed data. @@ -119,6 +121,19 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { return nil, fmt.Errorf("EVM block can not be nil if transactions are present, Flow block: %d", events.Height) } + if e.block != nil && len(e.transactions) > 0 { + txHashes := evmTypes.TransactionHashes([]gethCommon.Hash{}) + for _, tx := range e.transactions { + txHashes = append(txHashes, tx.Hash()) + } + if e.block.TransactionHashRoot != txHashes.RootHash() { + return nil, fmt.Errorf( + "block %d references missing transaction/s", + e.block.Height, + ) + } + } + return e, nil } diff --git a/models/events_test.go b/models/events_test.go index 5c82bf46d..3a6aa8c3f 100644 --- a/models/events_test.go +++ b/models/events_test.go @@ -61,6 +61,40 @@ func TestCadenceEvents_Block(t *testing.T) { } }) } + + cadenceHeight := uint64(1) + txCount := 10 + txEvents := make([]flow.Event, txCount) + txs := make([]Transaction, txCount) + hashes := make([]gethCommon.Hash, txCount) + results := make([]*types.Result, txCount) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + } + + // generate txs + for i := 0; i < txCount; i++ { + var err error + txs[i], results[i], txEvents[i], err = newTransaction(uint64(i)) + require.NoError(t, err) + hashes[i] = txs[i].Hash() + blockEvents.Events = append(blockEvents.Events, txEvents[i]) + } + + // generate single block + _, blockEvent, err := newBlock(1, hashes[0:txCount-2]) + require.NoError(t, err) + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.Error(t, err) + assert.ErrorContains( + t, + err, + "block 1 references missing transaction/s", + ) } func Test_EventDecoding(t *testing.T) { @@ -174,7 +208,7 @@ func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error return TransactionCall{Transaction: tx}, res, flowEvent, err } -func newBlock(height uint64, hashes []gethCommon.Hash) (*Block, flow.Event, error) { +func newBlock(height uint64, txHashes []gethCommon.Hash) (*Block, flow.Event, error) { gethBlock := types.NewBlock( gethCommon.HexToHash("0x01"), height, @@ -182,9 +216,10 @@ func newBlock(height uint64, hashes []gethCommon.Hash) (*Block, flow.Event, erro big.NewInt(100), gethCommon.HexToHash("0x15"), ) + gethBlock.TransactionHashRoot = types.TransactionHashes(txHashes).RootHash() evmBlock := &Block{ Block: gethBlock, - TransactionHashes: hashes, + TransactionHashes: txHashes, } ev := events.NewBlockEvent(gethBlock) diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index bbeba52e9..a2b2651ab 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -556,6 +556,7 @@ func newBlock(height uint64, txHashes []gethCommon.Hash) (cadence.Event, *models TotalSupply: big.NewInt(100), ReceiptRoot: gethCommon.HexToHash("0x2"), } + gethBlock.TransactionHashRoot = types.TransactionHashes(txHashes).RootHash() block := &models.Block{ Block: gethBlock, TransactionHashes: txHashes, @@ -567,6 +568,17 @@ func newBlock(height uint64, txHashes []gethCommon.Hash) (cadence.Event, *models } func newTransaction(height uint64) (cadence.Event, *events.Event, models.Transaction, *types.Result, error) { + txEncoded, err := hex.DecodeString("f9015880808301e8488080b901086060604052341561000f57600080fd5b60eb8061001d6000396000f300606060405260043610603f576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff168063c6888fa1146044575b600080fd5b3415604e57600080fd5b606260048080359060200190919050506078565b6040518082815260200191505060405180910390f35b60007f24abdb5865df5079dcc5ac590ff6f01d5c16edbc5fab4e195d9febd1114503da600783026040518082815260200191505060405180910390a16007820290509190505600a165627a7a7230582040383f19d9f65246752244189b02f56e8d0980ed44e7a56c0b200458caad20bb002982052fa09c05a7389284dc02b356ec7dee8a023c5efd3a9d844fa3c481882684b0640866a057e96d0a71a857ed509bb2b7333e78b2408574b8cc7f51238f25c58812662653") + if err != nil { + return cadence.Event{}, nil, nil, nil, err + } + + tx := &gethTypes.Transaction{} + err = tx.UnmarshalBinary(txEncoded) + if err != nil { + return cadence.Event{}, nil, nil, nil, err + } + res := &types.Result{ VMError: nil, TxType: 1, @@ -580,18 +592,7 @@ func newTransaction(height uint64) (cadence.Event, *events.Event, models.Transac Address: gethCommon.Address{0x3, 0x5}, Topics: []gethCommon.Hash{{0x2, 0x66}, {0x7, 0x1}}, }}, - TxHash: gethCommon.HexToHash("0x33"), - } - - txEncoded, err := hex.DecodeString("f9015880808301e8488080b901086060604052341561000f57600080fd5b60eb8061001d6000396000f300606060405260043610603f576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff168063c6888fa1146044575b600080fd5b3415604e57600080fd5b606260048080359060200190919050506078565b6040518082815260200191505060405180910390f35b60007f24abdb5865df5079dcc5ac590ff6f01d5c16edbc5fab4e195d9febd1114503da600783026040518082815260200191505060405180910390a16007820290509190505600a165627a7a7230582040383f19d9f65246752244189b02f56e8d0980ed44e7a56c0b200458caad20bb002982052fa09c05a7389284dc02b356ec7dee8a023c5efd3a9d844fa3c481882684b0640866a057e96d0a71a857ed509bb2b7333e78b2408574b8cc7f51238f25c58812662653") - if err != nil { - return cadence.Event{}, nil, nil, nil, err - } - - tx := &gethTypes.Transaction{} - err = tx.UnmarshalBinary(txEncoded) - if err != nil { - return cadence.Event{}, nil, nil, nil, err + TxHash: tx.Hash(), } ev := events.NewTransactionEvent( From 67855f0bcf56e8111b0e8807207b7de5fe0789b6 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 6 Sep 2024 17:05:56 +0300 Subject: [PATCH 02/11] Check data integrity even for empty EVM transactions --- models/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/events.go b/models/events.go index 8995e6c3d..1effbad0f 100644 --- a/models/events.go +++ b/models/events.go @@ -121,7 +121,7 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { return nil, fmt.Errorf("EVM block can not be nil if transactions are present, Flow block: %d", events.Height) } - if e.block != nil && len(e.transactions) > 0 { + if e.block != nil { txHashes := evmTypes.TransactionHashes([]gethCommon.Hash{}) for _, tx := range e.transactions { txHashes = append(txHashes, tx.Hash()) From 75361ce35abc5406647451290cb67b1da0bceeb9 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 6 Sep 2024 17:17:59 +0300 Subject: [PATCH 03/11] Add recovery logic for failures in block events parsing --- services/ingestion/subscriber.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 6b7b76481..731d880d3 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -146,6 +146,33 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac return } + evts := models.NewBlockEvents(blockEvents) + if evts.Err != nil { + blkEvents := flow.BlockEvents{ + BlockID: blockEvents.BlockID, + Height: blockEvents.Height, + BlockTimestamp: blockEvents.BlockTimestamp, + } + for _, eventFilter := range r.blocksFilter().EventTypes { + recoveredEvents, err := r.client.GetEventsForHeightRange( + ctx, + eventFilter, + blockEvents.Height, + blockEvents.Height, + ) + if err != nil { + events <- models.NewBlockEventsError(err) + return + } + for _, blockEvent := range recoveredEvents { + blkEvents.Events = append(blkEvents.Events, blockEvent.Events...) + } + } + + events <- models.NewBlockEvents(blkEvents) + return + } + events <- models.NewBlockEvents(blockEvents) case err, ok := <-errChan: From 9666919837f2d17014aec2f864df76fcf1bc13a1 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 6 Sep 2024 17:59:18 +0300 Subject: [PATCH 04/11] Extract fetchBlockEvents into its own method --- services/ingestion/subscriber.go | 53 +++++++++++++++++++------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 731d880d3..af9d9127a 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -148,28 +148,7 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac evts := models.NewBlockEvents(blockEvents) if evts.Err != nil { - blkEvents := flow.BlockEvents{ - BlockID: blockEvents.BlockID, - Height: blockEvents.Height, - BlockTimestamp: blockEvents.BlockTimestamp, - } - for _, eventFilter := range r.blocksFilter().EventTypes { - recoveredEvents, err := r.client.GetEventsForHeightRange( - ctx, - eventFilter, - blockEvents.Height, - blockEvents.Height, - ) - if err != nil { - events <- models.NewBlockEventsError(err) - return - } - for _, blockEvent := range recoveredEvents { - blkEvents.Events = append(blkEvents.Events, blockEvent.Events...) - } - } - - events <- models.NewBlockEvents(blkEvents) + events <- r.fetchBlockEvents(ctx, blockEvents) return } @@ -279,3 +258,33 @@ func (r *RPCSubscriber) blocksFilter() flow.EventFilter { }, } } + +func (r *RPCSubscriber) fetchBlockEvents( + ctx context.Context, + blockEvents flow.BlockEvents, +) models.BlockEvents { + blkEvents := flow.BlockEvents{ + BlockID: blockEvents.BlockID, + Height: blockEvents.Height, + BlockTimestamp: blockEvents.BlockTimestamp, + } + for _, eventFilter := range r.blocksFilter().EventTypes { + recoveredEvents, err := r.client.GetEventsForHeightRange( + ctx, + eventFilter, + blockEvents.Height, + blockEvents.Height, + ) + if err != nil { + return models.NewBlockEventsError(err) + } + if len(recoveredEvents) > 1 { + return models.NewBlockEventsError( + fmt.Errorf("received unexpected Flow block events for height: %d", blockEvents.Height), + ) + } + blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...) + } + + return models.NewBlockEvents(blkEvents) +} From 2be38023eb11a022893f01ea2c69fb9875f9058e Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 6 Sep 2024 18:08:01 +0300 Subject: [PATCH 05/11] Add warn log when failing to parse Flow block events --- services/ingestion/subscriber.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index af9d9127a..b867882c0 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -148,6 +148,10 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac evts := models.NewBlockEvents(blockEvents) if evts.Err != nil { + r.logger.Warn().Err(err).Msgf( + "failed to parse EVM block events for Flow height: %d, retrying with gRPC API...", + blockEvents.Height, + ) events <- r.fetchBlockEvents(ctx, blockEvents) return } From 8d6f49fae334b447ab649eef062b597f205b5b3c Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 9 Sep 2024 10:49:57 +0300 Subject: [PATCH 06/11] Add more test cases for detecting missing block transactions --- models/events.go | 3 +- models/events_test.go | 104 ++++++++++++++++++++++++++++++++---------- 2 files changed, 82 insertions(+), 25 deletions(-) diff --git a/models/events.go b/models/events.go index 1effbad0f..fb44878c4 100644 --- a/models/events.go +++ b/models/events.go @@ -8,7 +8,6 @@ import ( "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/fvm/evm/events" evmTypes "github.com/onflow/flow-go/fvm/evm/types" - gethCommon "github.com/onflow/go-ethereum/common" ) // isBlockExecutedEvent checks whether the given event contains block executed data. @@ -122,7 +121,7 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { } if e.block != nil { - txHashes := evmTypes.TransactionHashes([]gethCommon.Hash{}) + txHashes := evmTypes.TransactionHashes{} for _, tx := range e.transactions { txHashes = append(txHashes, tx.Hash()) } diff --git a/models/events_test.go b/models/events_test.go index 3a6aa8c3f..8c8299738 100644 --- a/models/events_test.go +++ b/models/events_test.go @@ -64,37 +64,95 @@ func TestCadenceEvents_Block(t *testing.T) { cadenceHeight := uint64(1) txCount := 10 - txEvents := make([]flow.Event, txCount) - txs := make([]Transaction, txCount) hashes := make([]gethCommon.Hash, txCount) - results := make([]*types.Result, txCount) - - blockEvents := flow.BlockEvents{ - BlockID: flow.Identifier{0x1}, - Height: cadenceHeight, - } + events := make([]flow.Event, 0) // generate txs for i := 0; i < txCount; i++ { - var err error - txs[i], results[i], txEvents[i], err = newTransaction(uint64(i)) + tx, _, txEvent, err := newTransaction(uint64(i)) require.NoError(t, err) - hashes[i] = txs[i].Hash() - blockEvents.Events = append(blockEvents.Events, txEvents[i]) + hashes[i] = tx.Hash() + events = append(events, txEvent) } - // generate single block - _, blockEvent, err := newBlock(1, hashes[0:txCount-2]) - require.NoError(t, err) - blockEvents.Events = append(blockEvents.Events, blockEvent) + t.Run("block with less transaction hashes", func(t *testing.T) { + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) + require.NoError(t, err) - _, err = NewCadenceEvents(blockEvents) - require.Error(t, err) - assert.ErrorContains( - t, - err, - "block 1 references missing transaction/s", - ) + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + Events: events, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.Error(t, err) + assert.ErrorContains( + t, + err, + "block 1 references missing transaction/s", + ) + }) + + t.Run("block with equal transaction hashes", func(t *testing.T) { + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, hashes) + require.NoError(t, err) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + Events: events, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.NoError(t, err) + }) + + t.Run("block with empty transaction hashes", func(t *testing.T) { + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, []gethCommon.Hash{}) + require.NoError(t, err) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.NoError(t, err) + }) + + t.Run("block with more transaction hashes", func(t *testing.T) { + tx, _, _, err := newTransaction(1) + require.NoError(t, err) + + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, []gethCommon.Hash{tx.Hash()}) + require.NoError(t, err) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.Error(t, err) + assert.ErrorContains( + t, + err, + "block 1 references missing transaction/s", + ) + }) } func Test_EventDecoding(t *testing.T) { From f0d488086ea5509293a1566f98b670ac83e44a08 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 9 Sep 2024 12:20:27 +0300 Subject: [PATCH 07/11] Add unit test for retrying EVM event fetching after inconsistent data --- services/ingestion/subscriber.go | 25 +- services/ingestion/subscriber_test.go | 414 ++++++++++++++++++++++++++ services/testutils/mock_client.go | 26 +- 3 files changed, 446 insertions(+), 19 deletions(-) diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index b867882c0..6f4e3fcdb 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -153,11 +153,10 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac blockEvents.Height, ) events <- r.fetchBlockEvents(ctx, blockEvents) - return + } else { + events <- models.NewBlockEvents(blockEvents) } - events <- models.NewBlockEvents(blockEvents) - case err, ok := <-errChan: if !ok { var err error @@ -272,21 +271,35 @@ func (r *RPCSubscriber) fetchBlockEvents( Height: blockEvents.Height, BlockTimestamp: blockEvents.BlockTimestamp, } - for _, eventFilter := range r.blocksFilter().EventTypes { + for _, eventType := range r.blocksFilter().EventTypes { recoveredEvents, err := r.client.GetEventsForHeightRange( ctx, - eventFilter, + eventType, blockEvents.Height, blockEvents.Height, ) if err != nil { return models.NewBlockEventsError(err) } + if len(recoveredEvents) > 1 { return models.NewBlockEventsError( - fmt.Errorf("received unexpected Flow block events for height: %d", blockEvents.Height), + fmt.Errorf( + "received multiple Flow block events for height: %d", + blockEvents.Height, + ), + ) + } + + if len(recoveredEvents) == 0 { + return models.NewBlockEventsError( + fmt.Errorf( + "received empty Flow block events for height: %d", + blockEvents.Height, + ), ) } + blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...) } diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go index 40278db4b..ffab428b9 100644 --- a/services/ingestion/subscriber_test.go +++ b/services/ingestion/subscriber_test.go @@ -3,8 +3,11 @@ package ingestion import ( "context" "testing" + "time" + "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/access" + gethCommon "github.com/onflow/go-ethereum/common" errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/requester" @@ -12,6 +15,8 @@ import ( flowGo "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -59,3 +64,412 @@ func Test_Subscribing(t *testing.T) { // this makes sure we indexed all the events require.Equal(t, uint64(endHeight), prevHeight) } + +func Test_SubscribingWithRetryOnError(t *testing.T) { + endHeight := uint64(10) + sporkClients := []access.Client{} + currentClient := testutils.SetupClientForRange(1, endHeight) + + cadenceHeight := uint64(5) + txCount := 10 + hashes := make([]gethCommon.Hash, txCount) + flowEvents := make([]flow.Event, 0) + + // generate txs + for i := 0; i < txCount; i++ { + cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) + require.NoError(t, err) + hashes[i] = tx.Hash() + flowEvent := flow.Event{ + Type: string(txEvent.Etype), + Value: cdcEvent, + } + flowEvents = append(flowEvents, flowEvent) + } + + evmTxEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: flowEvents, + } + + // generate single block + cdcEvent, evmBlock, blockEvent, err := newBlock(cadenceHeight, hashes) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + evmBlockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: []flow.Event{flowEvent}, + } + + currentClient.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.BlockExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return([]flow.BlockEvents{evmBlockEvents}, nil).Once() + + currentClient.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.TransactionExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() + + currentClient.SubscribeEventsByBlockHeightFunc = func( + ctx context.Context, + startHeight uint64, + filter flow.EventFilter, + opts ...access.SubscribeOption, + ) (<-chan flow.BlockEvents, <-chan error, error) { + events := make(chan flow.BlockEvents) + errors := make(chan error) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: evmTxEvents.Events, + } + + // generate single block + cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + blockEvents.Events = append(blockEvents.Events, flowEvent) + + go func() { + defer close(events) + + for i := startHeight; i <= endHeight; i++ { + if i == cadenceHeight { + events <- blockEvents + } else { + events <- flow.BlockEvents{ + Height: i, + } + } + } + }() + + return events, errors, nil + } + + client, err := requester.NewCrossSporkClient( + currentClient, + sporkClients, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + var prevHeight uint64 + + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break + } + + require.NoError(t, ev.Err) + + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight + + if eventHeight == cadenceHeight { + assert.Equal(t, evmBlock, ev.Events.Block()) + for i := 0; i < txCount; i++ { + tx := ev.Events.Transactions()[i] + assert.Equal(t, hashes[i], tx.Hash()) + } + } + } + + // this makes sure we indexed all the events + require.Equal(t, uint64(endHeight), prevHeight) +} + +func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { + endHeight := uint64(10) + sporkClients := []access.Client{} + currentClient := testutils.SetupClientForRange(1, endHeight) + + cadenceHeight := uint64(5) + txCount := 10 + hashes := make([]gethCommon.Hash, txCount) + flowEvents := make([]flow.Event, 0) + + // generate txs + for i := 0; i < txCount; i++ { + cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) + require.NoError(t, err) + hashes[i] = tx.Hash() + flowEvent := flow.Event{ + Type: string(txEvent.Etype), + Value: cdcEvent, + } + flowEvents = append(flowEvents, flowEvent) + } + + evmTxEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: flowEvents, + } + + // generate single block + cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + evmBlockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: []flow.Event{flowEvent}, + } + + currentClient.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.BlockExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return([]flow.BlockEvents{evmBlockEvents, evmBlockEvents}, nil).Once() + + currentClient.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.TransactionExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() + + currentClient.SubscribeEventsByBlockHeightFunc = func( + ctx context.Context, + startHeight uint64, + filter flow.EventFilter, + opts ...access.SubscribeOption, + ) (<-chan flow.BlockEvents, <-chan error, error) { + events := make(chan flow.BlockEvents) + errors := make(chan error) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: evmTxEvents.Events, + } + + // generate single block + cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + blockEvents.Events = append(blockEvents.Events, flowEvent) + + go func() { + defer close(events) + + for i := startHeight; i <= endHeight; i++ { + if i == cadenceHeight { + events <- blockEvents + } else { + events <- flow.BlockEvents{ + Height: i, + } + } + } + }() + + return events, errors, nil + } + + client, err := requester.NewCrossSporkClient( + currentClient, + sporkClients, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + var prevHeight uint64 + + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break + } + + if prevHeight+1 == cadenceHeight { + require.Error(t, ev.Err) + assert.ErrorContains( + t, + ev.Err, + "received multiple Flow block events for height: 5", + ) + prevHeight = cadenceHeight + } else { + require.NoError(t, ev.Err) + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight + } + } + + require.Equal(t, endHeight, prevHeight) +} + +func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { + endHeight := uint64(10) + sporkClients := []access.Client{} + currentClient := testutils.SetupClientForRange(1, endHeight) + + cadenceHeight := uint64(5) + txCount := 10 + hashes := make([]gethCommon.Hash, txCount) + flowEvents := make([]flow.Event, 0) + + // generate txs + for i := 0; i < txCount; i++ { + cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) + require.NoError(t, err) + hashes[i] = tx.Hash() + flowEvent := flow.Event{ + Type: string(txEvent.Etype), + Value: cdcEvent, + } + flowEvents = append(flowEvents, flowEvent) + } + + evmTxEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: flowEvents, + } + + currentClient.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.BlockExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return([]flow.BlockEvents{}, nil).Once() + + currentClient.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.TransactionExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() + + currentClient.SubscribeEventsByBlockHeightFunc = func( + ctx context.Context, + startHeight uint64, + filter flow.EventFilter, + opts ...access.SubscribeOption, + ) (<-chan flow.BlockEvents, <-chan error, error) { + events := make(chan flow.BlockEvents) + errors := make(chan error) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: evmTxEvents.Events, + } + + // generate single block + cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + blockEvents.Events = append(blockEvents.Events, flowEvent) + + go func() { + defer close(events) + + for i := startHeight; i <= endHeight; i++ { + if i == cadenceHeight { + events <- blockEvents + } else { + events <- flow.BlockEvents{ + Height: i, + } + } + } + }() + + return events, errors, nil + } + + client, err := requester.NewCrossSporkClient( + currentClient, + sporkClients, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + var prevHeight uint64 + + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break + } + + if prevHeight+1 == cadenceHeight { + require.Error(t, ev.Err) + assert.ErrorContains( + t, + ev.Err, + "received empty Flow block events for height: 5", + ) + prevHeight = cadenceHeight + } else { + require.NoError(t, ev.Err) + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight + } + } + + require.Equal(t, endHeight, prevHeight) +} diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index eece1c9df..ad1fb1144 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -11,22 +11,22 @@ import ( type MockClient struct { *mocks.Client - getLatestBlockHeaderFunc func(context.Context, bool) (*flow.BlockHeader, error) - getBlockHeaderByHeightFunc func(context.Context, uint64) (*flow.BlockHeader, error) - subscribeEventsByBlockHeightFunc func(context.Context, uint64, flow.EventFilter, ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) - getNodeVersionInfoFunc func(ctx context.Context) (*flow.NodeVersionInfo, error) + GetLatestBlockHeaderFunc func(context.Context, bool) (*flow.BlockHeader, error) + GetBlockHeaderByHeightFunc func(context.Context, uint64) (*flow.BlockHeader, error) + SubscribeEventsByBlockHeightFunc func(context.Context, uint64, flow.EventFilter, ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) + GetNodeVersionInfoFunc func(ctx context.Context) (*flow.NodeVersionInfo, error) } func (c *MockClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) (*flow.BlockHeader, error) { - return c.getBlockHeaderByHeightFunc(ctx, height) + return c.GetBlockHeaderByHeightFunc(ctx, height) } func (c *MockClient) GetLatestBlockHeader(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { - return c.getLatestBlockHeaderFunc(ctx, sealed) + return c.GetLatestBlockHeaderFunc(ctx, sealed) } func (c *MockClient) GetNodeVersionInfo(ctx context.Context) (*flow.NodeVersionInfo, error) { - return c.getNodeVersionInfoFunc(ctx) + return c.GetNodeVersionInfoFunc(ctx) } func (c *MockClient) SubscribeEventsByBlockHeight( @@ -35,18 +35,18 @@ func (c *MockClient) SubscribeEventsByBlockHeight( filter flow.EventFilter, opts ...access.SubscribeOption, ) (<-chan flow.BlockEvents, <-chan error, error) { - return c.subscribeEventsByBlockHeightFunc(ctx, startHeight, filter, opts...) + return c.SubscribeEventsByBlockHeightFunc(ctx, startHeight, filter, opts...) } -func SetupClientForRange(startHeight uint64, endHeight uint64) access.Client { +func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient { return &MockClient{ Client: &mocks.Client{}, - getLatestBlockHeaderFunc: func(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { + GetLatestBlockHeaderFunc: func(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { return &flow.BlockHeader{ Height: endHeight, }, nil }, - getBlockHeaderByHeightFunc: func(ctx context.Context, height uint64) (*flow.BlockHeader, error) { + GetBlockHeaderByHeightFunc: func(ctx context.Context, height uint64) (*flow.BlockHeader, error) { if height < startHeight || height > endHeight { return nil, storage.ErrNotFound } @@ -55,12 +55,12 @@ func SetupClientForRange(startHeight uint64, endHeight uint64) access.Client { Height: height, }, nil }, - getNodeVersionInfoFunc: func(ctx context.Context) (*flow.NodeVersionInfo, error) { + GetNodeVersionInfoFunc: func(ctx context.Context) (*flow.NodeVersionInfo, error) { return &flow.NodeVersionInfo{ NodeRootBlockHeight: startHeight, }, nil }, - subscribeEventsByBlockHeightFunc: func( + SubscribeEventsByBlockHeightFunc: func( ctx context.Context, startHeight uint64, filter flow.EventFilter, From 6ff9cb554e41b11d3cc50c3f8890ed3715b77b33 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Wed, 11 Sep 2024 11:50:21 +0300 Subject: [PATCH 08/11] Update logic of block event count in fetchBlockEvents method Co-authored-by: Gregor G. <75445744+sideninja@users.noreply.github.com> --- services/ingestion/subscriber.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 6f4e3fcdb..2abcdd4a3 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -282,19 +282,11 @@ func (r *RPCSubscriber) fetchBlockEvents( return models.NewBlockEventsError(err) } - if len(recoveredEvents) > 1 { - return models.NewBlockEventsError( + if len(recoveredEvents) != 1 { + return models.NewBlockEventsError( fmt.Errorf( - "received multiple Flow block events for height: %d", - blockEvents.Height, - ), - ) - } - - if len(recoveredEvents) == 0 { - return models.NewBlockEventsError( - fmt.Errorf( - "received empty Flow block events for height: %d", + "received %d but expected 1 event for height %d", + len(recoveredEvents), blockEvents.Height, ), ) From 2b1ee9185efb07c1e5a57463ee7742862a137aa3 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Wed, 11 Sep 2024 11:52:45 +0300 Subject: [PATCH 09/11] Update tests for fetchBlockEvents --- services/ingestion/subscriber_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go index ffab428b9..440cd4290 100644 --- a/services/ingestion/subscriber_test.go +++ b/services/ingestion/subscriber_test.go @@ -332,7 +332,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { assert.ErrorContains( t, ev.Err, - "received multiple Flow block events for height: 5", + "received 2 but expected 1 event for height 5", ) prevHeight = cadenceHeight } else { @@ -459,7 +459,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { assert.ErrorContains( t, ev.Err, - "received empty Flow block events for height: 5", + "received 0 but expected 1 event for height 5", ) prevHeight = cadenceHeight } else { From 76d544c7e08a528858e034b4e99bf965a4361830 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Wed, 11 Sep 2024 12:12:11 +0300 Subject: [PATCH 10/11] Add comments describing the usage of fetchBlockEvents method --- services/ingestion/subscriber.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 2abcdd4a3..7b2ca1fba 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -152,6 +152,9 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac "failed to parse EVM block events for Flow height: %d, retrying with gRPC API...", blockEvents.Height, ) + // call the `GetEventsForHeightRange` gRPC API endpoint to fetch + // the EVM-related events, when event streaming returned an + // inconsistent response. events <- r.fetchBlockEvents(ctx, blockEvents) } else { events <- models.NewBlockEvents(blockEvents) @@ -262,6 +265,13 @@ func (r *RPCSubscriber) blocksFilter() flow.EventFilter { } } +// fetchBlockEvents is used as a backup mechanism for fetching EVM-related +// events, when the event streaming API returns an inconsistent response. +// An inconsistent response could be an EVM block that references EVM +// transactions which are not present in the response. +// Under the hood, it uses the `GetEventsForHeightRange` gRPC API endpoint, +// making sure that we receive the expected events length for each event type +// and Flow height. func (r *RPCSubscriber) fetchBlockEvents( ctx context.Context, blockEvents flow.BlockEvents, @@ -283,7 +293,7 @@ func (r *RPCSubscriber) fetchBlockEvents( } if len(recoveredEvents) != 1 { - return models.NewBlockEventsError( + return models.NewBlockEventsError( fmt.Errorf( "received %d but expected 1 event for height %d", len(recoveredEvents), From 3c9ead3de7e2a226d0244c2860a2aafa8342e7a5 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Wed, 11 Sep 2024 13:14:15 +0300 Subject: [PATCH 11/11] Refactor subscriber tests for back-up fetching of EVM events --- services/ingestion/subscriber_test.go | 354 +++++++++----------------- 1 file changed, 126 insertions(+), 228 deletions(-) diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go index 440cd4290..c867e9a12 100644 --- a/services/ingestion/subscriber_test.go +++ b/services/ingestion/subscriber_test.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go-sdk/access" gethCommon "github.com/onflow/go-ethereum/common" + "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/requester" "github.com/onflow/flow-evm-gateway/services/testutils" @@ -65,106 +66,28 @@ func Test_Subscribing(t *testing.T) { require.Equal(t, uint64(endHeight), prevHeight) } +// Test that back-up fetching of EVM events is triggered when the +// Event Streaming API returns an inconsistent response. +// This scenario tests the happy path, when the back-up fetching of +// EVM events through the gRPC API, returns the correct data. func Test_SubscribingWithRetryOnError(t *testing.T) { endHeight := uint64(10) sporkClients := []access.Client{} currentClient := testutils.SetupClientForRange(1, endHeight) cadenceHeight := uint64(5) - txCount := 10 - hashes := make([]gethCommon.Hash, txCount) - flowEvents := make([]flow.Event, 0) - - // generate txs - for i := 0; i < txCount; i++ { - cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) - require.NoError(t, err) - hashes[i] = tx.Hash() - flowEvent := flow.Event{ - Type: string(txEvent.Etype), - Value: cdcEvent, - } - flowEvents = append(flowEvents, flowEvent) - } - - evmTxEvents := flow.BlockEvents{ - BlockID: flow.Identifier{0x1}, - Height: cadenceHeight, - BlockTimestamp: time.Now(), - Events: flowEvents, - } - - // generate single block - cdcEvent, evmBlock, blockEvent, err := newBlock(cadenceHeight, hashes) - require.NoError(t, err) - flowEvent := flow.Event{ - Type: string(blockEvent.Etype), - Value: cdcEvent, - } - evmBlockEvents := flow.BlockEvents{ - BlockID: flow.Identifier{0x1}, - Height: cadenceHeight, - BlockTimestamp: time.Now(), - Events: []flow.Event{flowEvent}, - } - - currentClient.On( - "GetEventsForHeightRange", - mock.AnythingOfType("context.backgroundCtx"), - "A.b6763b4399a888c8.EVM.BlockExecuted", - uint64(cadenceHeight), - uint64(cadenceHeight), - ).Return([]flow.BlockEvents{evmBlockEvents}, nil).Once() - - currentClient.On( - "GetEventsForHeightRange", - mock.AnythingOfType("context.backgroundCtx"), - "A.b6763b4399a888c8.EVM.TransactionExecuted", - uint64(cadenceHeight), - uint64(cadenceHeight), - ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() - - currentClient.SubscribeEventsByBlockHeightFunc = func( - ctx context.Context, - startHeight uint64, - filter flow.EventFilter, - opts ...access.SubscribeOption, - ) (<-chan flow.BlockEvents, <-chan error, error) { - events := make(chan flow.BlockEvents) - errors := make(chan error) - - blockEvents := flow.BlockEvents{ - BlockID: flow.Identifier{0x1}, - Height: cadenceHeight, - BlockTimestamp: time.Now(), - Events: evmTxEvents.Events, - } - - // generate single block - cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) - require.NoError(t, err) - flowEvent := flow.Event{ - Type: string(blockEvent.Etype), - Value: cdcEvent, - } - blockEvents.Events = append(blockEvents.Events, flowEvent) + evmTxEvents, txHashes := generateEvmTxEvents(t, cadenceHeight) + evmBlock, evmBlockEvents := generateEvmBlock(t, cadenceHeight, txHashes) - go func() { - defer close(events) - - for i := startHeight; i <= endHeight; i++ { - if i == cadenceHeight { - events <- blockEvents - } else { - events <- flow.BlockEvents{ - Height: i, - } - } - } - }() - - return events, errors, nil - } + setupClientForBackupEventFetching( + t, + currentClient, + cadenceHeight, + []flow.BlockEvents{evmBlockEvents}, + evmTxEvents, + txHashes, + endHeight, + ) client, err := requester.NewCrossSporkClient( currentClient, @@ -195,9 +118,9 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { if eventHeight == cadenceHeight { assert.Equal(t, evmBlock, ev.Events.Block()) - for i := 0; i < txCount; i++ { + for i := 0; i < len(txHashes); i++ { tx := ev.Events.Transactions()[i] - assert.Equal(t, hashes[i], tx.Hash()) + assert.Equal(t, txHashes[i], tx.Hash()) } } } @@ -206,106 +129,90 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { require.Equal(t, uint64(endHeight), prevHeight) } +// Test that back-up fetching of EVM events is triggered when the +// Event Streaming API returns an inconsistent response. +// This scenario tests the unhappy path, when the back-up fetching +// of EVM events through the gRPC API, returns duplicate EVM blocks. func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { endHeight := uint64(10) sporkClients := []access.Client{} currentClient := testutils.SetupClientForRange(1, endHeight) cadenceHeight := uint64(5) - txCount := 10 - hashes := make([]gethCommon.Hash, txCount) - flowEvents := make([]flow.Event, 0) - - // generate txs - for i := 0; i < txCount; i++ { - cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) - require.NoError(t, err) - hashes[i] = tx.Hash() - flowEvent := flow.Event{ - Type: string(txEvent.Etype), - Value: cdcEvent, - } - flowEvents = append(flowEvents, flowEvent) - } + evmTxEvents, txHashes := generateEvmTxEvents(t, cadenceHeight) + _, evmBlockEvents := generateEvmBlock(t, cadenceHeight, txHashes) - evmTxEvents := flow.BlockEvents{ - BlockID: flow.Identifier{0x1}, - Height: cadenceHeight, - BlockTimestamp: time.Now(), - Events: flowEvents, - } + setupClientForBackupEventFetching( + t, + currentClient, + cadenceHeight, + []flow.BlockEvents{evmBlockEvents, evmBlockEvents}, // return the same EVM block twice + evmTxEvents, + txHashes, + endHeight, + ) - // generate single block - cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes) + client, err := requester.NewCrossSporkClient( + currentClient, + sporkClients, + zerolog.Nop(), + flowGo.Previewnet, + ) require.NoError(t, err) - flowEvent := flow.Event{ - Type: string(blockEvent.Etype), - Value: cdcEvent, - } - evmBlockEvents := flow.BlockEvents{ - BlockID: flow.Identifier{0x1}, - Height: cadenceHeight, - BlockTimestamp: time.Now(), - Events: []flow.Event{flowEvent}, - } - currentClient.On( - "GetEventsForHeightRange", - mock.AnythingOfType("context.backgroundCtx"), - "A.b6763b4399a888c8.EVM.BlockExecuted", - uint64(cadenceHeight), - uint64(cadenceHeight), - ).Return([]flow.BlockEvents{evmBlockEvents, evmBlockEvents}, nil).Once() + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) - currentClient.On( - "GetEventsForHeightRange", - mock.AnythingOfType("context.backgroundCtx"), - "A.b6763b4399a888c8.EVM.TransactionExecuted", - uint64(cadenceHeight), - uint64(cadenceHeight), - ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() + events := subscriber.Subscribe(context.Background(), 1) - currentClient.SubscribeEventsByBlockHeightFunc = func( - ctx context.Context, - startHeight uint64, - filter flow.EventFilter, - opts ...access.SubscribeOption, - ) (<-chan flow.BlockEvents, <-chan error, error) { - events := make(chan flow.BlockEvents) - errors := make(chan error) + var prevHeight uint64 - blockEvents := flow.BlockEvents{ - BlockID: flow.Identifier{0x1}, - Height: cadenceHeight, - BlockTimestamp: time.Now(), - Events: evmTxEvents.Events, + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break } - // generate single block - cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) - require.NoError(t, err) - flowEvent := flow.Event{ - Type: string(blockEvent.Etype), - Value: cdcEvent, + if prevHeight+1 == cadenceHeight { + require.Error(t, ev.Err) + assert.ErrorContains( + t, + ev.Err, + "received 2 but expected 1 event for height 5", + ) + prevHeight = cadenceHeight + } else { + require.NoError(t, ev.Err) + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight } - blockEvents.Events = append(blockEvents.Events, flowEvent) + } - go func() { - defer close(events) + require.Equal(t, endHeight, prevHeight) +} - for i := startHeight; i <= endHeight; i++ { - if i == cadenceHeight { - events <- blockEvents - } else { - events <- flow.BlockEvents{ - Height: i, - } - } - } - }() +// Test that back-up fetching of EVM events is triggered when the +// Event Streaming API returns an inconsistent response. +// This scenario tests the unhappy path, when the back-up fetching +// of EVM events through the gRPC API, returns no EVM blocks. +func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { + endHeight := uint64(10) + sporkClients := []access.Client{} + currentClient := testutils.SetupClientForRange(1, endHeight) - return events, errors, nil - } + cadenceHeight := uint64(5) + evmTxEvents, txHashes := generateEvmTxEvents(t, cadenceHeight) + + setupClientForBackupEventFetching( + t, + currentClient, + cadenceHeight, + []flow.BlockEvents{}, + evmTxEvents, + txHashes, + endHeight, + ) client, err := requester.NewCrossSporkClient( currentClient, @@ -332,7 +239,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { assert.ErrorContains( t, ev.Err, - "received 2 but expected 1 event for height 5", + "received 0 but expected 1 event for height 5", ) prevHeight = cadenceHeight } else { @@ -347,12 +254,10 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { require.Equal(t, endHeight, prevHeight) } -func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { - endHeight := uint64(10) - sporkClients := []access.Client{} - currentClient := testutils.SetupClientForRange(1, endHeight) - - cadenceHeight := uint64(5) +func generateEvmTxEvents(t *testing.T, cadenceHeight uint64) ( + flow.BlockEvents, + []gethCommon.Hash, +) { txCount := 10 hashes := make([]gethCommon.Hash, txCount) flowEvents := make([]flow.Event, 0) @@ -369,22 +274,54 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { flowEvents = append(flowEvents, flowEvent) } - evmTxEvents := flow.BlockEvents{ + return flow.BlockEvents{ BlockID: flow.Identifier{0x1}, Height: cadenceHeight, BlockTimestamp: time.Now(), Events: flowEvents, + }, hashes +} + +func generateEvmBlock( + t *testing.T, + cadenceHeight uint64, + txHashes []gethCommon.Hash, +) (*models.Block, flow.BlockEvents) { + // generate single block + cdcEvent, evmBlock, blockEvent, err := newBlock(cadenceHeight, txHashes) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + evmBlockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: []flow.Event{flowEvent}, } - currentClient.On( + return evmBlock, evmBlockEvents +} + +func setupClientForBackupEventFetching( + t *testing.T, + client *testutils.MockClient, + cadenceHeight uint64, + evmBlockEvents []flow.BlockEvents, + evmTxEvents flow.BlockEvents, + txHashes []gethCommon.Hash, + endHeight uint64, +) { + client.On( "GetEventsForHeightRange", mock.AnythingOfType("context.backgroundCtx"), "A.b6763b4399a888c8.EVM.BlockExecuted", uint64(cadenceHeight), uint64(cadenceHeight), - ).Return([]flow.BlockEvents{}, nil).Once() + ).Return(evmBlockEvents, nil).Once() - currentClient.On( + client.On( "GetEventsForHeightRange", mock.AnythingOfType("context.backgroundCtx"), "A.b6763b4399a888c8.EVM.TransactionExecuted", @@ -392,7 +329,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { uint64(cadenceHeight), ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() - currentClient.SubscribeEventsByBlockHeightFunc = func( + client.SubscribeEventsByBlockHeightFunc = func( ctx context.Context, startHeight uint64, filter flow.EventFilter, @@ -409,7 +346,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { } // generate single block - cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) + cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, txHashes[:len(txHashes)-2]) require.NoError(t, err) flowEvent := flow.Event{ Type: string(blockEvent.Etype), @@ -433,43 +370,4 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { return events, errors, nil } - - client, err := requester.NewCrossSporkClient( - currentClient, - sporkClients, - zerolog.Nop(), - flowGo.Previewnet, - ) - require.NoError(t, err) - - subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) - - events := subscriber.Subscribe(context.Background(), 1) - - var prevHeight uint64 - - for ev := range events { - if prevHeight == endHeight { - require.ErrorIs(t, ev.Err, errs.ErrDisconnected) - break - } - - if prevHeight+1 == cadenceHeight { - require.Error(t, ev.Err) - assert.ErrorContains( - t, - ev.Err, - "received 0 but expected 1 event for height 5", - ) - prevHeight = cadenceHeight - } else { - require.NoError(t, ev.Err) - // this makes sure all the event heights are sequential - eventHeight := ev.Events.CadenceHeight() - require.Equal(t, prevHeight+1, eventHeight) - prevHeight = eventHeight - } - } - - require.Equal(t, endHeight, prevHeight) }