diff --git a/models/events.go b/models/events.go index 5d1896970..fb44878c4 100644 --- a/models/events.go +++ b/models/events.go @@ -7,6 +7,7 @@ import ( "github.com/onflow/cadence" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/fvm/evm/events" + evmTypes "github.com/onflow/flow-go/fvm/evm/types" ) // isBlockExecutedEvent checks whether the given event contains block executed data. @@ -119,6 +120,19 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { return nil, fmt.Errorf("EVM block can not be nil if transactions are present, Flow block: %d", events.Height) } + if e.block != nil { + txHashes := evmTypes.TransactionHashes{} + for _, tx := range e.transactions { + txHashes = append(txHashes, tx.Hash()) + } + if e.block.TransactionHashRoot != txHashes.RootHash() { + return nil, fmt.Errorf( + "block %d references missing transaction/s", + e.block.Height, + ) + } + } + return e, nil } diff --git a/models/events_test.go b/models/events_test.go index 5c82bf46d..8c8299738 100644 --- a/models/events_test.go +++ b/models/events_test.go @@ -61,6 +61,98 @@ func TestCadenceEvents_Block(t *testing.T) { } }) } + + cadenceHeight := uint64(1) + txCount := 10 + hashes := make([]gethCommon.Hash, txCount) + events := make([]flow.Event, 0) + + // generate txs + for i := 0; i < txCount; i++ { + tx, _, txEvent, err := newTransaction(uint64(i)) + require.NoError(t, err) + hashes[i] = tx.Hash() + events = append(events, txEvent) + } + + t.Run("block with less transaction hashes", func(t *testing.T) { + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) + require.NoError(t, err) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + Events: events, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.Error(t, err) + assert.ErrorContains( + t, + err, + "block 1 references missing transaction/s", + ) + }) + + t.Run("block with equal transaction hashes", func(t *testing.T) { + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, hashes) + require.NoError(t, err) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + Events: events, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.NoError(t, err) + }) + + t.Run("block with empty transaction hashes", func(t *testing.T) { + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, []gethCommon.Hash{}) + require.NoError(t, err) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.NoError(t, err) + }) + + t.Run("block with more transaction hashes", func(t *testing.T) { + tx, _, _, err := newTransaction(1) + require.NoError(t, err) + + // generate single block + _, blockEvent, err := newBlock(cadenceHeight, []gethCommon.Hash{tx.Hash()}) + require.NoError(t, err) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + } + + blockEvents.Events = append(blockEvents.Events, blockEvent) + + _, err = NewCadenceEvents(blockEvents) + require.Error(t, err) + assert.ErrorContains( + t, + err, + "block 1 references missing transaction/s", + ) + }) } func Test_EventDecoding(t *testing.T) { @@ -174,7 +266,7 @@ func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error return TransactionCall{Transaction: tx}, res, flowEvent, err } -func newBlock(height uint64, hashes []gethCommon.Hash) (*Block, flow.Event, error) { +func newBlock(height uint64, txHashes []gethCommon.Hash) (*Block, flow.Event, error) { gethBlock := types.NewBlock( gethCommon.HexToHash("0x01"), height, @@ -182,9 +274,10 @@ func newBlock(height uint64, hashes []gethCommon.Hash) (*Block, flow.Event, erro big.NewInt(100), gethCommon.HexToHash("0x15"), ) + gethBlock.TransactionHashRoot = types.TransactionHashes(txHashes).RootHash() evmBlock := &Block{ Block: gethBlock, - TransactionHashes: hashes, + TransactionHashes: txHashes, } ev := events.NewBlockEvent(gethBlock) diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index bbeba52e9..a2b2651ab 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -556,6 +556,7 @@ func newBlock(height uint64, txHashes []gethCommon.Hash) (cadence.Event, *models TotalSupply: big.NewInt(100), ReceiptRoot: gethCommon.HexToHash("0x2"), } + gethBlock.TransactionHashRoot = types.TransactionHashes(txHashes).RootHash() block := &models.Block{ Block: gethBlock, TransactionHashes: txHashes, @@ -567,6 +568,17 @@ func newBlock(height uint64, txHashes []gethCommon.Hash) (cadence.Event, *models } func newTransaction(height uint64) (cadence.Event, *events.Event, models.Transaction, *types.Result, error) { + txEncoded, err := hex.DecodeString("f9015880808301e8488080b901086060604052341561000f57600080fd5b60eb8061001d6000396000f300606060405260043610603f576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff168063c6888fa1146044575b600080fd5b3415604e57600080fd5b606260048080359060200190919050506078565b6040518082815260200191505060405180910390f35b60007f24abdb5865df5079dcc5ac590ff6f01d5c16edbc5fab4e195d9febd1114503da600783026040518082815260200191505060405180910390a16007820290509190505600a165627a7a7230582040383f19d9f65246752244189b02f56e8d0980ed44e7a56c0b200458caad20bb002982052fa09c05a7389284dc02b356ec7dee8a023c5efd3a9d844fa3c481882684b0640866a057e96d0a71a857ed509bb2b7333e78b2408574b8cc7f51238f25c58812662653") + if err != nil { + return cadence.Event{}, nil, nil, nil, err + } + + tx := &gethTypes.Transaction{} + err = tx.UnmarshalBinary(txEncoded) + if err != nil { + return cadence.Event{}, nil, nil, nil, err + } + res := &types.Result{ VMError: nil, TxType: 1, @@ -580,18 +592,7 @@ func newTransaction(height uint64) (cadence.Event, *events.Event, models.Transac Address: gethCommon.Address{0x3, 0x5}, Topics: []gethCommon.Hash{{0x2, 0x66}, {0x7, 0x1}}, }}, - TxHash: gethCommon.HexToHash("0x33"), - } - - txEncoded, err := hex.DecodeString("f9015880808301e8488080b901086060604052341561000f57600080fd5b60eb8061001d6000396000f300606060405260043610603f576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff168063c6888fa1146044575b600080fd5b3415604e57600080fd5b606260048080359060200190919050506078565b6040518082815260200191505060405180910390f35b60007f24abdb5865df5079dcc5ac590ff6f01d5c16edbc5fab4e195d9febd1114503da600783026040518082815260200191505060405180910390a16007820290509190505600a165627a7a7230582040383f19d9f65246752244189b02f56e8d0980ed44e7a56c0b200458caad20bb002982052fa09c05a7389284dc02b356ec7dee8a023c5efd3a9d844fa3c481882684b0640866a057e96d0a71a857ed509bb2b7333e78b2408574b8cc7f51238f25c58812662653") - if err != nil { - return cadence.Event{}, nil, nil, nil, err - } - - tx := &gethTypes.Transaction{} - err = tx.UnmarshalBinary(txEncoded) - if err != nil { - return cadence.Event{}, nil, nil, nil, err + TxHash: tx.Hash(), } ev := events.NewTransactionEvent( diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 6b7b76481..7b2ca1fba 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -146,7 +146,19 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac return } - events <- models.NewBlockEvents(blockEvents) + evts := models.NewBlockEvents(blockEvents) + if evts.Err != nil { + r.logger.Warn().Err(err).Msgf( + "failed to parse EVM block events for Flow height: %d, retrying with gRPC API...", + blockEvents.Height, + ) + // call the `GetEventsForHeightRange` gRPC API endpoint to fetch + // the EVM-related events, when event streaming returned an + // inconsistent response. + events <- r.fetchBlockEvents(ctx, blockEvents) + } else { + events <- models.NewBlockEvents(blockEvents) + } case err, ok := <-errChan: if !ok { @@ -252,3 +264,46 @@ func (r *RPCSubscriber) blocksFilter() flow.EventFilter { }, } } + +// fetchBlockEvents is used as a backup mechanism for fetching EVM-related +// events, when the event streaming API returns an inconsistent response. +// An inconsistent response could be an EVM block that references EVM +// transactions which are not present in the response. +// Under the hood, it uses the `GetEventsForHeightRange` gRPC API endpoint, +// making sure that we receive the expected events length for each event type +// and Flow height. +func (r *RPCSubscriber) fetchBlockEvents( + ctx context.Context, + blockEvents flow.BlockEvents, +) models.BlockEvents { + blkEvents := flow.BlockEvents{ + BlockID: blockEvents.BlockID, + Height: blockEvents.Height, + BlockTimestamp: blockEvents.BlockTimestamp, + } + for _, eventType := range r.blocksFilter().EventTypes { + recoveredEvents, err := r.client.GetEventsForHeightRange( + ctx, + eventType, + blockEvents.Height, + blockEvents.Height, + ) + if err != nil { + return models.NewBlockEventsError(err) + } + + if len(recoveredEvents) != 1 { + return models.NewBlockEventsError( + fmt.Errorf( + "received %d but expected 1 event for height %d", + len(recoveredEvents), + blockEvents.Height, + ), + ) + } + + blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...) + } + + return models.NewBlockEvents(blkEvents) +} diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go index 40278db4b..c867e9a12 100644 --- a/services/ingestion/subscriber_test.go +++ b/services/ingestion/subscriber_test.go @@ -3,15 +3,21 @@ package ingestion import ( "context" "testing" + "time" + "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/access" + gethCommon "github.com/onflow/go-ethereum/common" + "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/requester" "github.com/onflow/flow-evm-gateway/services/testutils" flowGo "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -59,3 +65,309 @@ func Test_Subscribing(t *testing.T) { // this makes sure we indexed all the events require.Equal(t, uint64(endHeight), prevHeight) } + +// Test that back-up fetching of EVM events is triggered when the +// Event Streaming API returns an inconsistent response. +// This scenario tests the happy path, when the back-up fetching of +// EVM events through the gRPC API, returns the correct data. +func Test_SubscribingWithRetryOnError(t *testing.T) { + endHeight := uint64(10) + sporkClients := []access.Client{} + currentClient := testutils.SetupClientForRange(1, endHeight) + + cadenceHeight := uint64(5) + evmTxEvents, txHashes := generateEvmTxEvents(t, cadenceHeight) + evmBlock, evmBlockEvents := generateEvmBlock(t, cadenceHeight, txHashes) + + setupClientForBackupEventFetching( + t, + currentClient, + cadenceHeight, + []flow.BlockEvents{evmBlockEvents}, + evmTxEvents, + txHashes, + endHeight, + ) + + client, err := requester.NewCrossSporkClient( + currentClient, + sporkClients, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + var prevHeight uint64 + + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break + } + + require.NoError(t, ev.Err) + + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight + + if eventHeight == cadenceHeight { + assert.Equal(t, evmBlock, ev.Events.Block()) + for i := 0; i < len(txHashes); i++ { + tx := ev.Events.Transactions()[i] + assert.Equal(t, txHashes[i], tx.Hash()) + } + } + } + + // this makes sure we indexed all the events + require.Equal(t, uint64(endHeight), prevHeight) +} + +// Test that back-up fetching of EVM events is triggered when the +// Event Streaming API returns an inconsistent response. +// This scenario tests the unhappy path, when the back-up fetching +// of EVM events through the gRPC API, returns duplicate EVM blocks. +func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { + endHeight := uint64(10) + sporkClients := []access.Client{} + currentClient := testutils.SetupClientForRange(1, endHeight) + + cadenceHeight := uint64(5) + evmTxEvents, txHashes := generateEvmTxEvents(t, cadenceHeight) + _, evmBlockEvents := generateEvmBlock(t, cadenceHeight, txHashes) + + setupClientForBackupEventFetching( + t, + currentClient, + cadenceHeight, + []flow.BlockEvents{evmBlockEvents, evmBlockEvents}, // return the same EVM block twice + evmTxEvents, + txHashes, + endHeight, + ) + + client, err := requester.NewCrossSporkClient( + currentClient, + sporkClients, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + var prevHeight uint64 + + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break + } + + if prevHeight+1 == cadenceHeight { + require.Error(t, ev.Err) + assert.ErrorContains( + t, + ev.Err, + "received 2 but expected 1 event for height 5", + ) + prevHeight = cadenceHeight + } else { + require.NoError(t, ev.Err) + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight + } + } + + require.Equal(t, endHeight, prevHeight) +} + +// Test that back-up fetching of EVM events is triggered when the +// Event Streaming API returns an inconsistent response. +// This scenario tests the unhappy path, when the back-up fetching +// of EVM events through the gRPC API, returns no EVM blocks. +func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { + endHeight := uint64(10) + sporkClients := []access.Client{} + currentClient := testutils.SetupClientForRange(1, endHeight) + + cadenceHeight := uint64(5) + evmTxEvents, txHashes := generateEvmTxEvents(t, cadenceHeight) + + setupClientForBackupEventFetching( + t, + currentClient, + cadenceHeight, + []flow.BlockEvents{}, + evmTxEvents, + txHashes, + endHeight, + ) + + client, err := requester.NewCrossSporkClient( + currentClient, + sporkClients, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + var prevHeight uint64 + + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break + } + + if prevHeight+1 == cadenceHeight { + require.Error(t, ev.Err) + assert.ErrorContains( + t, + ev.Err, + "received 0 but expected 1 event for height 5", + ) + prevHeight = cadenceHeight + } else { + require.NoError(t, ev.Err) + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight + } + } + + require.Equal(t, endHeight, prevHeight) +} + +func generateEvmTxEvents(t *testing.T, cadenceHeight uint64) ( + flow.BlockEvents, + []gethCommon.Hash, +) { + txCount := 10 + hashes := make([]gethCommon.Hash, txCount) + flowEvents := make([]flow.Event, 0) + + // generate txs + for i := 0; i < txCount; i++ { + cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) + require.NoError(t, err) + hashes[i] = tx.Hash() + flowEvent := flow.Event{ + Type: string(txEvent.Etype), + Value: cdcEvent, + } + flowEvents = append(flowEvents, flowEvent) + } + + return flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: flowEvents, + }, hashes +} + +func generateEvmBlock( + t *testing.T, + cadenceHeight uint64, + txHashes []gethCommon.Hash, +) (*models.Block, flow.BlockEvents) { + // generate single block + cdcEvent, evmBlock, blockEvent, err := newBlock(cadenceHeight, txHashes) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + evmBlockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: []flow.Event{flowEvent}, + } + + return evmBlock, evmBlockEvents +} + +func setupClientForBackupEventFetching( + t *testing.T, + client *testutils.MockClient, + cadenceHeight uint64, + evmBlockEvents []flow.BlockEvents, + evmTxEvents flow.BlockEvents, + txHashes []gethCommon.Hash, + endHeight uint64, +) { + client.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.BlockExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return(evmBlockEvents, nil).Once() + + client.On( + "GetEventsForHeightRange", + mock.AnythingOfType("context.backgroundCtx"), + "A.b6763b4399a888c8.EVM.TransactionExecuted", + uint64(cadenceHeight), + uint64(cadenceHeight), + ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() + + client.SubscribeEventsByBlockHeightFunc = func( + ctx context.Context, + startHeight uint64, + filter flow.EventFilter, + opts ...access.SubscribeOption, + ) (<-chan flow.BlockEvents, <-chan error, error) { + events := make(chan flow.BlockEvents) + errors := make(chan error) + + blockEvents := flow.BlockEvents{ + BlockID: flow.Identifier{0x1}, + Height: cadenceHeight, + BlockTimestamp: time.Now(), + Events: evmTxEvents.Events, + } + + // generate single block + cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, txHashes[:len(txHashes)-2]) + require.NoError(t, err) + flowEvent := flow.Event{ + Type: string(blockEvent.Etype), + Value: cdcEvent, + } + blockEvents.Events = append(blockEvents.Events, flowEvent) + + go func() { + defer close(events) + + for i := startHeight; i <= endHeight; i++ { + if i == cadenceHeight { + events <- blockEvents + } else { + events <- flow.BlockEvents{ + Height: i, + } + } + } + }() + + return events, errors, nil + } +} diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index eece1c9df..ad1fb1144 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -11,22 +11,22 @@ import ( type MockClient struct { *mocks.Client - getLatestBlockHeaderFunc func(context.Context, bool) (*flow.BlockHeader, error) - getBlockHeaderByHeightFunc func(context.Context, uint64) (*flow.BlockHeader, error) - subscribeEventsByBlockHeightFunc func(context.Context, uint64, flow.EventFilter, ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) - getNodeVersionInfoFunc func(ctx context.Context) (*flow.NodeVersionInfo, error) + GetLatestBlockHeaderFunc func(context.Context, bool) (*flow.BlockHeader, error) + GetBlockHeaderByHeightFunc func(context.Context, uint64) (*flow.BlockHeader, error) + SubscribeEventsByBlockHeightFunc func(context.Context, uint64, flow.EventFilter, ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) + GetNodeVersionInfoFunc func(ctx context.Context) (*flow.NodeVersionInfo, error) } func (c *MockClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) (*flow.BlockHeader, error) { - return c.getBlockHeaderByHeightFunc(ctx, height) + return c.GetBlockHeaderByHeightFunc(ctx, height) } func (c *MockClient) GetLatestBlockHeader(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { - return c.getLatestBlockHeaderFunc(ctx, sealed) + return c.GetLatestBlockHeaderFunc(ctx, sealed) } func (c *MockClient) GetNodeVersionInfo(ctx context.Context) (*flow.NodeVersionInfo, error) { - return c.getNodeVersionInfoFunc(ctx) + return c.GetNodeVersionInfoFunc(ctx) } func (c *MockClient) SubscribeEventsByBlockHeight( @@ -35,18 +35,18 @@ func (c *MockClient) SubscribeEventsByBlockHeight( filter flow.EventFilter, opts ...access.SubscribeOption, ) (<-chan flow.BlockEvents, <-chan error, error) { - return c.subscribeEventsByBlockHeightFunc(ctx, startHeight, filter, opts...) + return c.SubscribeEventsByBlockHeightFunc(ctx, startHeight, filter, opts...) } -func SetupClientForRange(startHeight uint64, endHeight uint64) access.Client { +func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient { return &MockClient{ Client: &mocks.Client{}, - getLatestBlockHeaderFunc: func(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { + GetLatestBlockHeaderFunc: func(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { return &flow.BlockHeader{ Height: endHeight, }, nil }, - getBlockHeaderByHeightFunc: func(ctx context.Context, height uint64) (*flow.BlockHeader, error) { + GetBlockHeaderByHeightFunc: func(ctx context.Context, height uint64) (*flow.BlockHeader, error) { if height < startHeight || height > endHeight { return nil, storage.ErrNotFound } @@ -55,12 +55,12 @@ func SetupClientForRange(startHeight uint64, endHeight uint64) access.Client { Height: height, }, nil }, - getNodeVersionInfoFunc: func(ctx context.Context) (*flow.NodeVersionInfo, error) { + GetNodeVersionInfoFunc: func(ctx context.Context) (*flow.NodeVersionInfo, error) { return &flow.NodeVersionInfo{ NodeRootBlockHeight: startHeight, }, nil }, - subscribeEventsByBlockHeightFunc: func( + SubscribeEventsByBlockHeightFunc: func( ctx context.Context, startHeight uint64, filter flow.EventFilter,