Check data integrity for EVM events#529
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
| return nil, fmt.Errorf("EVM block can not be nil if transactions are present, Flow block: %d", events.Height) | ||
| } | ||
|
|
||
| if e.block != nil && len(e.transactions) > 0 { |
There was a problem hiding this comment.
at this point if block is nil should be separate check and return error
There was a problem hiding this comment.
Currently, we have the assumptions that we can have at most one EVM Block, so the block can also be nil, and we should check against that.
| } | ||
|
|
||
| if e.block != nil && len(e.transactions) > 0 { | ||
| txHashes := evmTypes.TransactionHashes([]gethCommon.Hash{}) |
There was a problem hiding this comment.
then after here I don't think you need to check transactions length... because you should do that even for empty slice of txs... so it should match empty root
There was a problem hiding this comment.
Removed the check for transactions length in: 1e18078
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- models/events.go (2 hunks)
Files skipped from review as they are similar to previous changes (1)
- models/events.go
90612d1 to
e7b7901
Compare
| blkEvents := flow.BlockEvents{ | ||
| BlockID: blockEvents.BlockID, | ||
| Height: blockEvents.Height, | ||
| BlockTimestamp: blockEvents.BlockTimestamp, | ||
| } | ||
| for _, eventFilter := range r.blocksFilter().EventTypes { | ||
| recoveredEvents, err := r.client.GetEventsForHeightRange( | ||
| ctx, | ||
| eventFilter, | ||
| blockEvents.Height, | ||
| blockEvents.Height, | ||
| ) | ||
| if err != nil { | ||
| events <- models.NewBlockEventsError(err) | ||
| return | ||
| } | ||
| for _, blockEvent := range recoveredEvents { | ||
| blkEvents.Events = append(blkEvents.Events, blockEvent.Events...) | ||
| } | ||
| } | ||
|
|
||
| events <- models.NewBlockEvents(blkEvents) | ||
| return |
There was a problem hiding this comment.
please extract this into a method
| events <- models.NewBlockEventsError(err) | ||
| return | ||
| } | ||
| for _, blockEvent := range recoveredEvents { |
There was a problem hiding this comment.
we should only get back one right, because range is 1, it would be better to check length and then just use that 1 item instead of a for loop, this would be another safety check.
| } | ||
|
|
||
| evts := models.NewBlockEvents(blockEvents) | ||
| if evts.Err != nil { |
There was a problem hiding this comment.
we should test this in the subscriber_test I think ti shouldn't be hard
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- models/events.go (2 hunks)
- models/events_test.go (2 hunks)
Files skipped from review as they are similar to previous changes (2)
- models/events.go
- models/events_test.go
There was a problem hiding this comment.
Actionable comments posted: 3
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (3)
- services/ingestion/subscriber.go (2 hunks)
- services/ingestion/subscriber_test.go (2 hunks)
- services/testutils/mock_client.go (3 hunks)
Additional comments not posted (6)
services/testutils/mock_client.go (6)
14-17: Approved: Exported function fields inMockClient.The renaming of function fields to start with uppercase letters correctly exports them, allowing for external access. This is useful for testing and extending the functionality of
MockClient.
21-21: Approved: Updated method implementation.The method
GetBlockHeaderByHeightcorrectly uses the newly exported function field name, maintaining consistency with the structural changes in theMockClient.
25-25: Approved: Updated method implementation.The method
GetLatestBlockHeadercorrectly uses the newly exported function field name, ensuring consistency with the structural changes in theMockClient.
29-29: Approved: Updated method implementation.The method
GetNodeVersionInfocorrectly uses the newly exported function field name, aligning with the structural changes in theMockClient.
38-38: Approved: Updated method implementation.The method
SubscribeEventsByBlockHeightcorrectly uses the newly exported function field name, ensuring consistency with the structural changes in theMockClient.
Line range hint
41-63: Approved: Updated initialization inSetupClientForRange.The function
SetupClientForRangehas been correctly updated to initialize theMockClientstruct with the newly renamed function fields. This ensures that the mock client can be set up correctly with the updated function signatures.
| func (r *RPCSubscriber) fetchBlockEvents( | ||
| ctx context.Context, | ||
| blockEvents flow.BlockEvents, | ||
| ) models.BlockEvents { | ||
| blkEvents := flow.BlockEvents{ | ||
| BlockID: blockEvents.BlockID, | ||
| Height: blockEvents.Height, | ||
| BlockTimestamp: blockEvents.BlockTimestamp, | ||
| } | ||
| for _, eventType := range r.blocksFilter().EventTypes { | ||
| recoveredEvents, err := r.client.GetEventsForHeightRange( | ||
| ctx, | ||
| eventType, | ||
| blockEvents.Height, | ||
| blockEvents.Height, | ||
| ) | ||
| if err != nil { | ||
| return models.NewBlockEventsError(err) | ||
| } | ||
|
|
||
| if len(recoveredEvents) > 1 { | ||
| return models.NewBlockEventsError( | ||
| fmt.Errorf( | ||
| "received unexpected Flow block events for height: %d", | ||
| blockEvents.Height, | ||
| ), | ||
| ) | ||
| } | ||
|
|
||
| if len(recoveredEvents) == 0 { | ||
| models.NewBlockEvents(blkEvents) | ||
| } | ||
|
|
||
| blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...) | ||
| } | ||
|
|
||
| return models.NewBlockEvents(blkEvents) | ||
| } |
There was a problem hiding this comment.
Well-implemented error handling in fetchBlockEvents, consider enhancing logging and handling of no-event scenarios.
The fetchBlockEvents method is a robust addition to the RPCSubscriber class, enhancing error handling and data integrity during event fetching. The method's checks for errors and unexpected multiple events are well-placed.
Suggestions:
- Consider adding logging for successful event retrieval to improve traceability and debugging.
- Handle cases where no events are retrieved more explicitly, perhaps by logging or returning a specific error.
Would you like a code snippet to implement these logging enhancements?
ccd1601 to
cad404e
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (3)
- services/ingestion/subscriber.go (2 hunks)
- services/ingestion/subscriber_test.go (2 hunks)
- services/testutils/mock_client.go (3 hunks)
Files skipped from review as they are similar to previous changes (1)
- services/testutils/mock_client.go
| evts := models.NewBlockEvents(blockEvents) | ||
| if evts.Err != nil { | ||
| r.logger.Warn().Err(err).Msgf( | ||
| "failed to parse EVM block events for Flow height: %d, retrying with gRPC API...", | ||
| blockEvents.Height, | ||
| ) | ||
| events <- r.fetchBlockEvents(ctx, blockEvents) | ||
| } else { | ||
| events <- models.NewBlockEvents(blockEvents) | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| func (r *RPCSubscriber) fetchBlockEvents( | ||
| ctx context.Context, | ||
| blockEvents flow.BlockEvents, | ||
| ) models.BlockEvents { | ||
| blkEvents := flow.BlockEvents{ | ||
| BlockID: blockEvents.BlockID, | ||
| Height: blockEvents.Height, | ||
| BlockTimestamp: blockEvents.BlockTimestamp, | ||
| } | ||
| for _, eventType := range r.blocksFilter().EventTypes { | ||
| recoveredEvents, err := r.client.GetEventsForHeightRange( | ||
| ctx, | ||
| eventType, | ||
| blockEvents.Height, | ||
| blockEvents.Height, | ||
| ) | ||
| if err != nil { | ||
| return models.NewBlockEventsError(err) | ||
| } | ||
|
|
||
| if len(recoveredEvents) > 1 { | ||
| return models.NewBlockEventsError( | ||
| fmt.Errorf( | ||
| "received multiple Flow block events for height: %d", | ||
| blockEvents.Height, | ||
| ), | ||
| ) | ||
| } | ||
|
|
||
| if len(recoveredEvents) == 0 { | ||
| return models.NewBlockEventsError( | ||
| fmt.Errorf( | ||
| "received empty Flow block events for height: %d", | ||
| blockEvents.Height, | ||
| ), | ||
| ) | ||
| } | ||
|
|
||
| blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...) | ||
| } | ||
|
|
||
| return models.NewBlockEvents(blkEvents) | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| func Test_SubscribingWithRetryOnError(t *testing.T) { | ||
| endHeight := uint64(10) | ||
| sporkClients := []access.Client{} | ||
| currentClient := testutils.SetupClientForRange(1, endHeight) | ||
|
|
||
| cadenceHeight := uint64(5) | ||
| txCount := 10 | ||
| hashes := make([]gethCommon.Hash, txCount) | ||
| flowEvents := make([]flow.Event, 0) | ||
|
|
||
| // generate txs | ||
| for i := 0; i < txCount; i++ { | ||
| cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) | ||
| require.NoError(t, err) | ||
| hashes[i] = tx.Hash() | ||
| flowEvent := flow.Event{ | ||
| Type: string(txEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| flowEvents = append(flowEvents, flowEvent) | ||
| } | ||
|
|
||
| evmTxEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: flowEvents, | ||
| } | ||
|
|
||
| // generate single block | ||
| cdcEvent, evmBlock, blockEvent, err := newBlock(cadenceHeight, hashes) | ||
| require.NoError(t, err) | ||
| flowEvent := flow.Event{ | ||
| Type: string(blockEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| evmBlockEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: []flow.Event{flowEvent}, | ||
| } | ||
|
|
||
| currentClient.On( | ||
| "GetEventsForHeightRange", | ||
| mock.AnythingOfType("context.backgroundCtx"), | ||
| "A.b6763b4399a888c8.EVM.BlockExecuted", | ||
| uint64(cadenceHeight), | ||
| uint64(cadenceHeight), | ||
| ).Return([]flow.BlockEvents{evmBlockEvents}, nil).Once() | ||
|
|
||
| currentClient.On( | ||
| "GetEventsForHeightRange", | ||
| mock.AnythingOfType("context.backgroundCtx"), | ||
| "A.b6763b4399a888c8.EVM.TransactionExecuted", | ||
| uint64(cadenceHeight), | ||
| uint64(cadenceHeight), | ||
| ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() | ||
|
|
||
| currentClient.SubscribeEventsByBlockHeightFunc = func( | ||
| ctx context.Context, | ||
| startHeight uint64, | ||
| filter flow.EventFilter, | ||
| opts ...access.SubscribeOption, | ||
| ) (<-chan flow.BlockEvents, <-chan error, error) { | ||
| events := make(chan flow.BlockEvents) | ||
| errors := make(chan error) | ||
|
|
||
| blockEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: evmTxEvents.Events, | ||
| } | ||
|
|
||
| // generate single block | ||
| cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) | ||
| require.NoError(t, err) | ||
| flowEvent := flow.Event{ | ||
| Type: string(blockEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| blockEvents.Events = append(blockEvents.Events, flowEvent) | ||
|
|
||
| go func() { | ||
| defer close(events) | ||
|
|
||
| for i := startHeight; i <= endHeight; i++ { | ||
| if i == cadenceHeight { | ||
| events <- blockEvents | ||
| } else { | ||
| events <- flow.BlockEvents{ | ||
| Height: i, | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return events, errors, nil | ||
| } | ||
|
|
||
| client, err := requester.NewCrossSporkClient( | ||
| currentClient, | ||
| sporkClients, | ||
| zerolog.Nop(), | ||
| flowGo.Previewnet, | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) | ||
|
|
||
| events := subscriber.Subscribe(context.Background(), 1) | ||
|
|
||
| var prevHeight uint64 | ||
|
|
||
| for ev := range events { | ||
| if prevHeight == endHeight { | ||
| require.ErrorIs(t, ev.Err, errs.ErrDisconnected) | ||
| break | ||
| } | ||
|
|
||
| require.NoError(t, ev.Err) | ||
|
|
||
| // this makes sure all the event heights are sequential | ||
| eventHeight := ev.Events.CadenceHeight() | ||
| require.Equal(t, prevHeight+1, eventHeight) | ||
| prevHeight = eventHeight | ||
|
|
||
| if eventHeight == cadenceHeight { | ||
| assert.Equal(t, evmBlock, ev.Events.Block()) | ||
| for i := 0; i < txCount; i++ { | ||
| tx := ev.Events.Transactions()[i] | ||
| assert.Equal(t, hashes[i], tx.Hash()) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // this makes sure we indexed all the events | ||
| require.Equal(t, uint64(endHeight), prevHeight) | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { | ||
| endHeight := uint64(10) | ||
| sporkClients := []access.Client{} | ||
| currentClient := testutils.SetupClientForRange(1, endHeight) | ||
|
|
||
| cadenceHeight := uint64(5) | ||
| txCount := 10 | ||
| hashes := make([]gethCommon.Hash, txCount) | ||
| flowEvents := make([]flow.Event, 0) | ||
|
|
||
| // generate txs | ||
| for i := 0; i < txCount; i++ { | ||
| cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) | ||
| require.NoError(t, err) | ||
| hashes[i] = tx.Hash() | ||
| flowEvent := flow.Event{ | ||
| Type: string(txEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| flowEvents = append(flowEvents, flowEvent) | ||
| } | ||
|
|
||
| evmTxEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: flowEvents, | ||
| } | ||
|
|
||
| // generate single block | ||
| cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes) | ||
| require.NoError(t, err) | ||
| flowEvent := flow.Event{ | ||
| Type: string(blockEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| evmBlockEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: []flow.Event{flowEvent}, | ||
| } | ||
|
|
||
| currentClient.On( | ||
| "GetEventsForHeightRange", | ||
| mock.AnythingOfType("context.backgroundCtx"), | ||
| "A.b6763b4399a888c8.EVM.BlockExecuted", | ||
| uint64(cadenceHeight), | ||
| uint64(cadenceHeight), | ||
| ).Return([]flow.BlockEvents{evmBlockEvents, evmBlockEvents}, nil).Once() | ||
|
|
||
| currentClient.On( | ||
| "GetEventsForHeightRange", | ||
| mock.AnythingOfType("context.backgroundCtx"), | ||
| "A.b6763b4399a888c8.EVM.TransactionExecuted", | ||
| uint64(cadenceHeight), | ||
| uint64(cadenceHeight), | ||
| ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() | ||
|
|
||
| currentClient.SubscribeEventsByBlockHeightFunc = func( | ||
| ctx context.Context, | ||
| startHeight uint64, | ||
| filter flow.EventFilter, | ||
| opts ...access.SubscribeOption, | ||
| ) (<-chan flow.BlockEvents, <-chan error, error) { | ||
| events := make(chan flow.BlockEvents) | ||
| errors := make(chan error) | ||
|
|
||
| blockEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: evmTxEvents.Events, | ||
| } | ||
|
|
||
| // generate single block | ||
| cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) | ||
| require.NoError(t, err) | ||
| flowEvent := flow.Event{ | ||
| Type: string(blockEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| blockEvents.Events = append(blockEvents.Events, flowEvent) | ||
|
|
||
| go func() { | ||
| defer close(events) | ||
|
|
||
| for i := startHeight; i <= endHeight; i++ { | ||
| if i == cadenceHeight { | ||
| events <- blockEvents | ||
| } else { | ||
| events <- flow.BlockEvents{ | ||
| Height: i, | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return events, errors, nil | ||
| } | ||
|
|
||
| client, err := requester.NewCrossSporkClient( | ||
| currentClient, | ||
| sporkClients, | ||
| zerolog.Nop(), | ||
| flowGo.Previewnet, | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) | ||
|
|
||
| events := subscriber.Subscribe(context.Background(), 1) | ||
|
|
||
| var prevHeight uint64 | ||
|
|
||
| for ev := range events { | ||
| if prevHeight == endHeight { | ||
| require.ErrorIs(t, ev.Err, errs.ErrDisconnected) | ||
| break | ||
| } | ||
|
|
||
| if prevHeight+1 == cadenceHeight { | ||
| require.Error(t, ev.Err) | ||
| assert.ErrorContains( | ||
| t, | ||
| ev.Err, | ||
| "received multiple Flow block events for height: 5", | ||
| ) | ||
| prevHeight = cadenceHeight | ||
| } else { | ||
| require.NoError(t, ev.Err) | ||
| // this makes sure all the event heights are sequential | ||
| eventHeight := ev.Events.CadenceHeight() | ||
| require.Equal(t, prevHeight+1, eventHeight) | ||
| prevHeight = eventHeight | ||
| } | ||
| } | ||
|
|
||
| require.Equal(t, endHeight, prevHeight) | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { | ||
| endHeight := uint64(10) | ||
| sporkClients := []access.Client{} | ||
| currentClient := testutils.SetupClientForRange(1, endHeight) | ||
|
|
||
| cadenceHeight := uint64(5) | ||
| txCount := 10 | ||
| hashes := make([]gethCommon.Hash, txCount) | ||
| flowEvents := make([]flow.Event, 0) | ||
|
|
||
| // generate txs | ||
| for i := 0; i < txCount; i++ { | ||
| cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) | ||
| require.NoError(t, err) | ||
| hashes[i] = tx.Hash() | ||
| flowEvent := flow.Event{ | ||
| Type: string(txEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| flowEvents = append(flowEvents, flowEvent) | ||
| } | ||
|
|
||
| evmTxEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: flowEvents, | ||
| } | ||
|
|
||
| currentClient.On( | ||
| "GetEventsForHeightRange", | ||
| mock.AnythingOfType("context.backgroundCtx"), | ||
| "A.b6763b4399a888c8.EVM.BlockExecuted", | ||
| uint64(cadenceHeight), | ||
| uint64(cadenceHeight), | ||
| ).Return([]flow.BlockEvents{}, nil).Once() | ||
|
|
||
| currentClient.On( | ||
| "GetEventsForHeightRange", | ||
| mock.AnythingOfType("context.backgroundCtx"), | ||
| "A.b6763b4399a888c8.EVM.TransactionExecuted", | ||
| uint64(cadenceHeight), | ||
| uint64(cadenceHeight), | ||
| ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() | ||
|
|
||
| currentClient.SubscribeEventsByBlockHeightFunc = func( | ||
| ctx context.Context, | ||
| startHeight uint64, | ||
| filter flow.EventFilter, | ||
| opts ...access.SubscribeOption, | ||
| ) (<-chan flow.BlockEvents, <-chan error, error) { | ||
| events := make(chan flow.BlockEvents) | ||
| errors := make(chan error) | ||
|
|
||
| blockEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: evmTxEvents.Events, | ||
| } | ||
|
|
||
| // generate single block | ||
| cdcEvent, _, blockEvent, err := newBlock(cadenceHeight, hashes[:txCount-2]) | ||
| require.NoError(t, err) | ||
| flowEvent := flow.Event{ | ||
| Type: string(blockEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| blockEvents.Events = append(blockEvents.Events, flowEvent) | ||
|
|
||
| go func() { | ||
| defer close(events) | ||
|
|
||
| for i := startHeight; i <= endHeight; i++ { | ||
| if i == cadenceHeight { | ||
| events <- blockEvents | ||
| } else { | ||
| events <- flow.BlockEvents{ | ||
| Height: i, | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return events, errors, nil | ||
| } | ||
|
|
||
| client, err := requester.NewCrossSporkClient( | ||
| currentClient, | ||
| sporkClients, | ||
| zerolog.Nop(), | ||
| flowGo.Previewnet, | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) | ||
|
|
||
| events := subscriber.Subscribe(context.Background(), 1) | ||
|
|
||
| var prevHeight uint64 | ||
|
|
||
| for ev := range events { | ||
| if prevHeight == endHeight { | ||
| require.ErrorIs(t, ev.Err, errs.ErrDisconnected) | ||
| break | ||
| } | ||
|
|
||
| if prevHeight+1 == cadenceHeight { | ||
| require.Error(t, ev.Err) | ||
| assert.ErrorContains( | ||
| t, | ||
| ev.Err, | ||
| "received empty Flow block events for height: 5", | ||
| ) | ||
| prevHeight = cadenceHeight | ||
| } else { | ||
| require.NoError(t, ev.Err) | ||
| // this makes sure all the event heights are sequential | ||
| eventHeight := ev.Events.CadenceHeight() | ||
| require.Equal(t, prevHeight+1, eventHeight) | ||
| prevHeight = eventHeight | ||
| } | ||
| } | ||
|
|
||
| require.Equal(t, endHeight, prevHeight) | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| } | ||
| } | ||
|
|
||
| func (r *RPCSubscriber) fetchBlockEvents( |
There was a problem hiding this comment.
add a comment explaining what this function does and how it is used for backup
| "failed to parse EVM block events for Flow height: %d, retrying with gRPC API...", | ||
| blockEvents.Height, | ||
| ) | ||
| events <- r.fetchBlockEvents(ctx, blockEvents) |
There was a problem hiding this comment.
add a commnet something like:
// call an alternative grpc request to fetch events in case the event streaming failed
| for i := 0; i < txCount; i++ { | ||
| cdcEvent, txEvent, tx, _, err := newTransaction(cadenceHeight) | ||
| require.NoError(t, err) | ||
| hashes[i] = tx.Hash() | ||
| flowEvent := flow.Event{ | ||
| Type: string(txEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| flowEvents = append(flowEvents, flowEvent) | ||
| } | ||
|
|
||
| evmTxEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: flowEvents, | ||
| } | ||
|
|
||
| // generate single block | ||
| cdcEvent, evmBlock, blockEvent, err := newBlock(cadenceHeight, hashes) | ||
| require.NoError(t, err) | ||
| flowEvent := flow.Event{ | ||
| Type: string(blockEvent.Etype), | ||
| Value: cdcEvent, | ||
| } | ||
| evmBlockEvents := flow.BlockEvents{ | ||
| BlockID: flow.Identifier{0x1}, | ||
| Height: cadenceHeight, | ||
| BlockTimestamp: time.Now(), | ||
| Events: []flow.Event{flowEvent}, | ||
| } | ||
|
|
||
| currentClient.On( | ||
| "GetEventsForHeightRange", |
There was a problem hiding this comment.
could we extract all this setup section into a helper so all tests don't have to implement it
There was a problem hiding this comment.
Very good point 👍 It was a lot of duplicated code.
Updated in 3c9ead3
| require.Equal(t, uint64(endHeight), prevHeight) | ||
| } | ||
|
|
||
| func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { |
There was a problem hiding this comment.
can you add above each test a one-liner comment explaining what it tests
Co-authored-by: Gregor G. <75445744+sideninja@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 0
Outside diff range and nitpick comments (3)
services/ingestion/subscriber.go (3)
149-158: Integration offetchBlockEventsenhances error handling.The modifications to the
Subscribemethod effectively integrate the newfetchBlockEventsmethod, enhancing the robustness of event handling. The method's comprehensive handling of disconnection errors and context cancellations is commendable.Suggestion:
- Add more detailed comments explaining the error handling logic within the
Subscribemethod to improve maintainability and clarity for future developers.
265-299: Well-implementedfetchBlockEventsmethod, consider enhancing logging and handling of no-event scenarios.The
fetchBlockEventsmethod is a robust addition to theRPCSubscriberclass, enhancing error handling and data integrity during event fetching. The method's checks for errors and unexpected multiple events are well-placed.Suggestions:
- Consider adding logging for successful event retrieval to improve traceability and debugging.
- Handle cases where no events are retrieved more explicitly, perhaps by logging or returning a specific error.
285-293: Unexpected multiple events check is crucial, consider logging the error.The check for more than one event at lines 285-293 is crucial for ensuring data integrity.
Consider logging this error for better traceability.
4c1c822 to
2b1ee91
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Outside diff range and nitpick comments (1)
services/ingestion/subscriber.go (1)
265-299: Robust fallback mechanism for fetching block events.The new
fetchBlockEventsmethod provides a reliable way to fetch block events using the gRPC API as a fallback when the primary method fails. It constructs the necessaryBlockEventsobject, iterates through the defined event types, and retrieves the events for the specified block height.The error handling is appropriate, with the method returning a
BlockEventsErrorif an error occurs during event fetching. The check for more than one event is a good safeguard to ensure data integrity.Suggestion:
- Consider adding more detailed logging statements within the
fetchBlockEventsmethod to provide better visibility into the fetching process and any errors that may occur. This can aid in debugging and monitoring.
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (6)
- models/events.go (2 hunks)
- models/events_test.go (2 hunks)
- services/ingestion/engine_test.go (3 hunks)
- services/ingestion/subscriber.go (2 hunks)
- services/ingestion/subscriber_test.go (2 hunks)
- services/testutils/mock_client.go (3 hunks)
Files skipped from review as they are similar to previous changes (2)
- models/events.go
- services/ingestion/engine_test.go
Additional comments not posted (14)
services/testutils/mock_client.go (6)
14-17: LGTM!The changes to export the function fields by renaming them from lowercase to uppercase are approved. This enhances the usability of
MockClientby allowing external packages to access these function fields directly.
21-21: LGTM!The change to use the new exported name
GetBlockHeaderByHeightFuncfor the function field is approved. It is consistent with the renaming of the function field.
25-25: LGTM!The change to use the new exported name
GetLatestBlockHeaderFuncfor the function field is approved. It is consistent with the renaming of the function field.
29-29: LGTM!The change to use the new exported name
GetNodeVersionInfoFuncfor the function field is approved. It is consistent with the renaming of the function field.
38-38: LGTM!The change to use the new exported name
SubscribeEventsByBlockHeightFuncfor the function field is approved. It is consistent with the renaming of the function field.
Line range hint
41-84: LGTM!The changes to initialize the
MockClientstruct with the newly renamed function fields are approved. They are consistent with the renaming of the function fields in theMockClientstruct.models/events_test.go (4)
64-155: The past review comment is still valid and applicable to the current code segment. Skipping generating a similar comment.
Line range hint
157-268: LGTM!The test function
Test_EventDecodingis well-constructed and effectively tests the event decoding logic. It covers various aspects such as transaction hashes, block hash, cumulative gas used, transaction index, and log index.The code changes are approved.
269-276: There are no changes to thenewTransactionfunction. Skipping generating a comment.
277-280: LGTM!The changes to the
newBlockfunction signature and the assignment of transaction hashes to theTransactionHashesfield improve the clarity and readability of the code. The new naming convention is consistently applied throughout the function.The code changes are approved.
services/ingestion/subscriber.go (1)
149-158: Robust error handling and fallback mechanism.The modifications to the
subscribemethod enhance its resilience by introducing a fallback mechanism to fetch block events using the gRPC API when parsing EVM block events fails. This ensures that the subscriber can continue processing events even if the primary method encounters issues.The changes are well-integrated, and the logging of the error provides visibility into any issues that occur.
services/ingestion/subscriber_test.go (3)
68-207: ****
209-348: ****
350-475: ****
Description
For contributor use:
masterbranchFiles changedin the Github PR explorerSummary by CodeRabbit
New Features
Bug Fixes
Refactor