Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/onflow/cadence"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go/fvm/evm/events"
evmTypes "github.com/onflow/flow-go/fvm/evm/types"
)

// isBlockExecutedEvent checks whether the given event contains block executed data.
Expand Down Expand Up @@ -119,6 +120,19 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
return nil, fmt.Errorf("EVM block can not be nil if transactions are present, Flow block: %d", events.Height)
}

if e.block != nil {
txHashes := evmTypes.TransactionHashes{}
for _, tx := range e.transactions {
txHashes = append(txHashes, tx.Hash())
}
if e.block.TransactionHashRoot != txHashes.RootHash() {
return nil, fmt.Errorf(
"block %d references missing transaction/s",
e.block.Height,
)
}
}

return e, nil
}

Expand Down
97 changes: 95 additions & 2 deletions models/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,98 @@ func TestCadenceEvents_Block(t *testing.T) {
}
})
}

cadenceHeight := uint64(1)
txCount := 10
hashes := make([]gethCommon.Hash, txCount)
events := make([]flow.Event, 0)

// generate txs
for i := 0; i < txCount; i++ {
tx, _, txEvent, err := newTransaction(uint64(i))
require.NoError(t, err)
hashes[i] = tx.Hash()
events = append(events, txEvent)
}

t.Run("block with less transaction hashes", func(t *testing.T) {
// generate single block
_, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2])
require.NoError(t, err)

blockEvents := flow.BlockEvents{
BlockID: flow.Identifier{0x1},
Height: cadenceHeight,
Events: events,
}

blockEvents.Events = append(blockEvents.Events, blockEvent)

_, err = NewCadenceEvents(blockEvents)
require.Error(t, err)
assert.ErrorContains(
t,
err,
"block 1 references missing transaction/s",
)
})

t.Run("block with equal transaction hashes", func(t *testing.T) {
// generate single block
_, blockEvent, err := newBlock(cadenceHeight, hashes)
require.NoError(t, err)

blockEvents := flow.BlockEvents{
BlockID: flow.Identifier{0x1},
Height: cadenceHeight,
Events: events,
}

blockEvents.Events = append(blockEvents.Events, blockEvent)

_, err = NewCadenceEvents(blockEvents)
require.NoError(t, err)
})

t.Run("block with empty transaction hashes", func(t *testing.T) {
// generate single block
_, blockEvent, err := newBlock(cadenceHeight, []gethCommon.Hash{})
require.NoError(t, err)

blockEvents := flow.BlockEvents{
BlockID: flow.Identifier{0x1},
Height: cadenceHeight,
}

blockEvents.Events = append(blockEvents.Events, blockEvent)

_, err = NewCadenceEvents(blockEvents)
require.NoError(t, err)
})

t.Run("block with more transaction hashes", func(t *testing.T) {
tx, _, _, err := newTransaction(1)
require.NoError(t, err)

// generate single block
_, blockEvent, err := newBlock(cadenceHeight, []gethCommon.Hash{tx.Hash()})
require.NoError(t, err)

blockEvents := flow.BlockEvents{
BlockID: flow.Identifier{0x1},
Height: cadenceHeight,
}

blockEvents.Events = append(blockEvents.Events, blockEvent)

_, err = NewCadenceEvents(blockEvents)
require.Error(t, err)
assert.ErrorContains(
t,
err,
"block 1 references missing transaction/s",
)
})
}

func Test_EventDecoding(t *testing.T) {
Expand Down Expand Up @@ -174,17 +266,18 @@ func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error
return TransactionCall{Transaction: tx}, res, flowEvent, err
}

func newBlock(height uint64, hashes []gethCommon.Hash) (*Block, flow.Event, error) {
func newBlock(height uint64, txHashes []gethCommon.Hash) (*Block, flow.Event, error) {
gethBlock := types.NewBlock(
gethCommon.HexToHash("0x01"),
height,
uint64(1337),
big.NewInt(100),
gethCommon.HexToHash("0x15"),
)
gethBlock.TransactionHashRoot = types.TransactionHashes(txHashes).RootHash()
evmBlock := &Block{
Block: gethBlock,
TransactionHashes: hashes,
TransactionHashes: txHashes,
}

ev := events.NewBlockEvent(gethBlock)
Expand Down
25 changes: 13 additions & 12 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func newBlock(height uint64, txHashes []gethCommon.Hash) (cadence.Event, *models
TotalSupply: big.NewInt(100),
ReceiptRoot: gethCommon.HexToHash("0x2"),
}
gethBlock.TransactionHashRoot = types.TransactionHashes(txHashes).RootHash()

This comment was marked as outdated.

block := &models.Block{
Block: gethBlock,
TransactionHashes: txHashes,
Expand All @@ -567,6 +568,17 @@ func newBlock(height uint64, txHashes []gethCommon.Hash) (cadence.Event, *models
}

func newTransaction(height uint64) (cadence.Event, *events.Event, models.Transaction, *types.Result, error) {
txEncoded, err := hex.DecodeString("f9015880808301e8488080b901086060604052341561000f57600080fd5b60eb8061001d6000396000f300606060405260043610603f576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff168063c6888fa1146044575b600080fd5b3415604e57600080fd5b606260048080359060200190919050506078565b6040518082815260200191505060405180910390f35b60007f24abdb5865df5079dcc5ac590ff6f01d5c16edbc5fab4e195d9febd1114503da600783026040518082815260200191505060405180910390a16007820290509190505600a165627a7a7230582040383f19d9f65246752244189b02f56e8d0980ed44e7a56c0b200458caad20bb002982052fa09c05a7389284dc02b356ec7dee8a023c5efd3a9d844fa3c481882684b0640866a057e96d0a71a857ed509bb2b7333e78b2408574b8cc7f51238f25c58812662653")
if err != nil {
return cadence.Event{}, nil, nil, nil, err
}

tx := &gethTypes.Transaction{}
err = tx.UnmarshalBinary(txEncoded)
if err != nil {
return cadence.Event{}, nil, nil, nil, err
}

res := &types.Result{
VMError: nil,
TxType: 1,
Expand All @@ -580,18 +592,7 @@ func newTransaction(height uint64) (cadence.Event, *events.Event, models.Transac
Address: gethCommon.Address{0x3, 0x5},
Topics: []gethCommon.Hash{{0x2, 0x66}, {0x7, 0x1}},
}},
TxHash: gethCommon.HexToHash("0x33"),
}

txEncoded, err := hex.DecodeString("f9015880808301e8488080b901086060604052341561000f57600080fd5b60eb8061001d6000396000f300606060405260043610603f576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff168063c6888fa1146044575b600080fd5b3415604e57600080fd5b606260048080359060200190919050506078565b6040518082815260200191505060405180910390f35b60007f24abdb5865df5079dcc5ac590ff6f01d5c16edbc5fab4e195d9febd1114503da600783026040518082815260200191505060405180910390a16007820290509190505600a165627a7a7230582040383f19d9f65246752244189b02f56e8d0980ed44e7a56c0b200458caad20bb002982052fa09c05a7389284dc02b356ec7dee8a023c5efd3a9d844fa3c481882684b0640866a057e96d0a71a857ed509bb2b7333e78b2408574b8cc7f51238f25c58812662653")
if err != nil {
return cadence.Event{}, nil, nil, nil, err
}

tx := &gethTypes.Transaction{}
err = tx.UnmarshalBinary(txEncoded)
if err != nil {
return cadence.Event{}, nil, nil, nil, err
TxHash: tx.Hash(),
}

ev := events.NewTransactionEvent(
Expand Down
57 changes: 56 additions & 1 deletion services/ingestion/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,19 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac
return
}

events <- models.NewBlockEvents(blockEvents)
evts := models.NewBlockEvents(blockEvents)
if evts.Err != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should test this in the subscriber_test I think ti shouldn't be hard

r.logger.Warn().Err(err).Msgf(
"failed to parse EVM block events for Flow height: %d, retrying with gRPC API...",
blockEvents.Height,
)
// call the `GetEventsForHeightRange` gRPC API endpoint to fetch
// the EVM-related events, when event streaming returned an
// inconsistent response.
events <- r.fetchBlockEvents(ctx, blockEvents)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a commnet something like:

// call an alternative grpc request to fetch events in case the event streaming failed

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 76d544c

} else {
events <- models.NewBlockEvents(blockEvents)
}

case err, ok := <-errChan:
if !ok {
Expand Down Expand Up @@ -252,3 +264,46 @@ func (r *RPCSubscriber) blocksFilter() flow.EventFilter {
},
}
}

// fetchBlockEvents is used as a backup mechanism for fetching EVM-related
// events, when the event streaming API returns an inconsistent response.
// An inconsistent response could be an EVM block that references EVM
// transactions which are not present in the response.
// Under the hood, it uses the `GetEventsForHeightRange` gRPC API endpoint,
// making sure that we receive the expected events length for each event type
// and Flow height.
func (r *RPCSubscriber) fetchBlockEvents(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment explaining what this function does and how it is used for backup

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point 👍
Added in 76d544c

ctx context.Context,
blockEvents flow.BlockEvents,
) models.BlockEvents {
blkEvents := flow.BlockEvents{
BlockID: blockEvents.BlockID,
Height: blockEvents.Height,
BlockTimestamp: blockEvents.BlockTimestamp,
}
for _, eventType := range r.blocksFilter().EventTypes {
recoveredEvents, err := r.client.GetEventsForHeightRange(
ctx,
eventType,
blockEvents.Height,
blockEvents.Height,
)
if err != nil {
return models.NewBlockEventsError(err)
}

if len(recoveredEvents) != 1 {
return models.NewBlockEventsError(
fmt.Errorf(
"received %d but expected 1 event for height %d",
len(recoveredEvents),
blockEvents.Height,
),
)
}

blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...)
}

return models.NewBlockEvents(blkEvents)
}
Loading