Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor event subscriber #636

Merged
merged 6 commits into from
Oct 30, 2024
Merged

Conversation

janezpodhostnik
Copy link
Contributor

@janezpodhostnik janezpodhostnik commented Oct 29, 2024

Closes: #???

Description

A small refactor for the event subscriber.

I want to eventually switch this to a flowgo.Component, because of the easier and clearer error handling we get that way.


For contributor use:

  • Targeted PR against master branch
  • Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
  • Code follows the standards mentioned here.
  • Updated relevant documentation
  • Re-reviewed Files changed in the Github PR explorer
  • Added appropriate labels

Summary by CodeRabbit

  • New Features

    • Enhanced event subscription process by centralizing state management within the RPCEventSubscriber.
    • Streamlined command-line interface configuration for the EVM Gateway Node, improving clarity and usability.
  • Bug Fixes

    • Improved error handling during event subscription by removing references to the removed cadence height.
  • Tests

    • Updated test cases to reflect changes in event subscriber instantiation and method signatures, ensuring continued validation of event ingestion functionality.

Copy link
Contributor

coderabbitai bot commented Oct 29, 2024

Walkthrough

The changes in this pull request involve significant modifications to the event ingestion process within the Bootstrap and Engine structures, as well as updates to the EventSubscriber interface and its implementations. The primary alteration is the removal of the latestCadenceHeight parameter from various methods, simplifying the event subscription logic. Additionally, the RPCSubscriber has been renamed to RPCEventSubscriber, and its internal state management has been enhanced. These changes aim to streamline event handling and improve the clarity of the codebase.

Changes

File Path Change Summary
bootstrap/bootstrap.go Updated StartEventIngestion to instantiate RPCEventSubscriber instead of RPCSubscriber, adding latestCadenceHeight as an argument.
services/ingestion/engine.go Modified Run method to remove retrieval of the latest cadence height, simplifying logging and altering event subscription logic.
services/ingestion/engine_test.go Changed Subscribe method signature in tests to reflect removal of latest parameter, simplifying event subscription setup.
services/ingestion/event_subscriber.go Updated EventSubscriber interface and RPCEventSubscriber struct to remove height parameter from Subscribe method. Renamed RPCSubscriber to RPCEventSubscriber and introduced RPCEventSubscriberConfig.
services/ingestion/event_subscriber_test.go Modified test cases to use NewRPCEventSubscriber and updated Subscribe calls to reflect the new method signature, ensuring consistent usage across tests.
services/ingestion/mocks/EventSubscriber.go Changed Subscribe method signature in mock to accept only ctx context.Context, updating internal logic accordingly.
cmd/run/cmd.go Removed the heartbeat interval flag from CLI configuration, indicating a shift in event subscription management.
config/config.go Removed HeartbeatInterval field from Config struct, indicating that custom heartbeat intervals are no longer supported.

Possibly related PRs

Suggested labels

Improvement, EVM, Testing

Suggested reviewers

  • peterargue
  • zhangchiqing
  • m-Peter
  • ramtinms

Poem

🐇 In the garden where events do play,
A rabbit hops, brightening the day.
With subscriptions now so light and free,
We dance with joy, oh, can’t you see?
New paths we forge, with every leap,
In the world of code, our dreams we keep! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (6)
services/ingestion/engine.go (2)

108-110: Consider adding more context to subscription error.

While the error message is clear, consider adding additional context such as the subscription ID or connection details to aid in debugging subscription failures.

-				"failure in event subscription with: %w",
+				"failure in event subscription (id: %s): %w",
+				e.subscriber.ID(),
 				events.Err,

Line range hint 101-117: LGTM! The changes align well with the architectural goals.

The simplified event subscription logic and error handling structure provide a good foundation for the eventual transition to flowgo.Component. The code maintains clear separation of concerns and follows Go best practices.

For the future transition to flowgo.Component, consider:

  1. Implementing a graceful shutdown mechanism in the Stop() method
  2. Adding component lifecycle hooks for initialization and cleanup
bootstrap/bootstrap.go (1)

120-120: Consider documenting the heartbeat interval parameter.

The addition of HeartbeatInterval parameter suggests monitoring capabilities. Consider adding a comment explaining its purpose and expected value range.

Add a comment above the subscriber initialization:

+// Initialize event subscriber with heartbeat monitoring.
+// HeartbeatInterval defines the frequency of health checks.
 subscriber := ingestion.NewRPCEventSubscriber(b.logger, b.client, b.config.FlowNetworkID, latestCadenceHeight, b.config.HeartbeatInterval)
services/ingestion/engine_test.go (1)

451-453: Consider adding .Once() expectation consistently across all test cases.

While this test case correctly uses .Once() to ensure Subscribe is called exactly once, this expectation is missing in the other test cases. Consider adding it consistently for better test reliability.

Apply this pattern to other test cases:

 subscriber.
-  On("Subscribe", mock.Anything).
+  On("Subscribe", mock.Anything).
+  Once().
   Return(func(ctx context.Context) <-chan models.BlockEvents {
     return eventsChan
   })
services/ingestion/event_subscriber.go (2)

112-113: Address the TODO regarding HeartbeatInterval

There's a TODO comment suggesting that the heartbeatInterval should always be 1 in this context. Consider updating the code to set HeartbeatInterval to 1, or provide an explanation if a different value is necessary.

Do you need assistance implementing this change or would you like me to open a GitHub issue to track this task?


Line range hint 263-290: Handle potential errors when fetching missing data

In the fetchMissingData method, there is an assumption that len(recoveredEvents) should be exactly 1. If this is not the case, an error is returned. Consider handling scenarios where multiple events might be returned or provide more context in the error message.

Consider updating the error handling as follows:

 if len(recoveredEvents) != 1 {
     return models.NewBlockEventsError(
         fmt.Errorf(
-            "received %d but expected 1 event for height %d",
+            "received %d events but expected 1 for height %d",
             len(recoveredEvents),
             blockEvents.Height,
         ),
     )
 }

This adjustment clarifies the error message.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between e95b413 and 197617c.

📒 Files selected for processing (6)
  • bootstrap/bootstrap.go (1 hunks)
  • services/ingestion/engine.go (1 hunks)
  • services/ingestion/engine_test.go (5 hunks)
  • services/ingestion/event_subscriber.go (10 hunks)
  • services/ingestion/event_subscriber_test.go (10 hunks)
  • services/ingestion/mocks/EventSubscriber.go (1 hunks)
🔇 Additional comments (15)
services/ingestion/mocks/EventSubscriber.go (1)

18-19: LGTM: Method signature update is consistent with the interface changes.

The removal of the height parameter from the Subscribe method aligns with the broader refactoring of the event subscriber component.

services/ingestion/engine.go (1)

105-111: Verify event ordering after removing latestCadenceHeight.

The removal of latestCadenceHeight from the subscription process simplifies the code, but we should verify that this doesn't affect event ordering or processing, especially during network partitions or reorgs.

Let's check the event subscriber implementation:

✅ Verification successful

Let me analyze the search results and generate one more verification to be thorough:


Based on the code analysis, I can now generate the final response:

Event ordering is maintained after latestCadenceHeight removal

The event subscriber implementation maintains proper event ordering through:

  • Sequential backfilling of events from previous sporks starting at the initial height
  • Tracking and incrementing height during backfill via ev.Events.CadenceHeight()
  • Ordered live subscription after backfill using subscribe() with heartbeat interval
  • Built-in height-based ordering from the Flow client's SubscribeEventsByBlockHeight

The removal of latestCadenceHeight parameter simplifies the API without affecting the core ordering guarantees provided by the underlying Flow client subscription.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any height-based ordering or processing logic in the event subscriber
rg -A 5 "Subscribe|ProcessEvent" --type go

Length of output: 22713


Script:

#!/bin/bash
# Let's check the implementation of the event subscriber to understand how events are ordered
ast-grep --pattern 'func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
  $$$
}'

Length of output: 3569

services/ingestion/event_subscriber_test.go (6)

46-48: LGTM: Constructor and subscription changes look good

The changes simplify the interface while maintaining the test's functionality. The parameter reordering in the constructor improves readability by grouping related parameters.


86-88: LGTM: Missing block handling test properly updated

Changes maintain consistency with the new interface while preserving the critical test case for missing block handling.


163-163: LGTM: Retry mechanism test properly updated

Changes maintain the test's ability to verify the backup event fetching mechanism while adapting to the new interface.

Also applies to: 188-190, 217-217


226-226: LGTM: Duplicate blocks handling test properly updated

Changes maintain consistency with the new interface while preserving the critical test case for duplicate block handling.

Also applies to: 251-253


289-289: LGTM: Empty blocks handling and mock setup properly updated

Changes maintain consistency with the new interface while preserving the critical test cases for empty blocks and proper mock setup for event range queries.

Also applies to: 313-315, 408-409, 416-417


Line range hint 1-462: Verify test coverage for the refactored event subscriber

While the tests have been properly updated and maintain good coverage of edge cases, let's verify that no test cases were missed during the refactor.

✅ Verification successful

From the results, I can now generate a final response. The verification shows that all public methods in the event subscriber implementation have corresponding test coverage:

Test coverage is comprehensive for the event subscriber implementation

All public methods are well-tested with both happy and unhappy paths:

  • Subscribe() - covered by Test_Subscribing
  • Error recovery and backfilling - covered by Test_MissingBlockEvent
  • Retry mechanisms - covered by three variations of Test_SubscribingWithRetryOnError:
    • Happy path with correct data
    • Unhappy path with duplicate blocks
    • Unhappy path with empty blocks
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any untested methods in the event subscriber implementation

# Look for any public methods in the event subscriber implementation
rg -A 2 "^func.*EventSubscriber" --type go

# Look for any test methods to ensure coverage
rg -A 2 "^func Test.*EventSubscriber" --type go

Length of output: 2323


Script:

#!/bin/bash
# Let's search for test files related to event subscriber and analyze test coverage

# Find all test files related to event subscriber
fd -e go -x grep -l "Test.*EventSubscriber" {}

# Look for test functions in event_subscriber_test.go
rg "^func Test_" services/ingestion/event_subscriber_test.go

# Look for any test helper functions that might be testing subscriber functionality
rg "^func.*test.*subscriber" --type go -i

Length of output: 426

bootstrap/bootstrap.go (1)

120-120: Verify HeartbeatInterval configuration.

The subscriber initialization looks good. However, please ensure that HeartbeatInterval is properly configured in the application configuration.

Let's verify the configuration setup:

✅ Verification successful

The verification results show that HeartbeatInterval is properly configured and used throughout the codebase:

  1. It's defined in the config package (config/config.go)
  2. Has a default value of 100 set via command-line flag in cmd/run/cmd.go
  3. Is properly used in the event subscriber implementation (services/ingestion/event_subscriber.go)
  4. Is correctly passed to the subscriber in bootstrap/bootstrap.go

HeartbeatInterval configuration is properly implemented

The HeartbeatInterval configuration is well-structured with:

  • Clear configuration definition with documentation
  • Default value of 100 via CLI flags
  • Proper usage in event subscription with Flow Access API
  • Consistent implementation across the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if HeartbeatInterval is defined in config files and verify its usage
# Test: Search for HeartbeatInterval configuration
rg -A 5 'HeartbeatInterval'

Length of output: 3312

services/ingestion/engine_test.go (3)

59-60: LGTM! Mock correctly updated for new Subscribe signature.

The mock setup properly reflects the simplified Subscribe method signature, maintaining the test's ability to verify sequential block processing.


249-250: LGTM! Mock setup maintains transaction ingestion test coverage.

The simplified Subscribe signature is correctly implemented while preserving the test's ability to verify both block and transaction event processing.


352-353: LGTM! Mock setup preserves out-of-order event testing.

The updated Subscribe signature maintains the test's ability to verify correct processing order of block and transaction events.

services/ingestion/event_subscriber.go (3)

75-120: ⚠️ Potential issue

Ensure thread-safe access to r.height

The field r.height is being read and modified within the goroutine started in the Subscribe method. If r.height is accessed by other goroutines concurrently, this could lead to data races.

Consider protecting r.height with a mutex or ensuring it is only accessed within this goroutine. To verify if r.height is accessed elsewhere concurrently, run the following script:

#!/bin/bash
# Description: Search for accesses to r.height outside of the Subscribe method's goroutine.

# Find all occurrences of r.height in the codebase
rg -U 'r\.height' --glob '*.go' | grep -v 'func\s\+(r\s\*\RPCEventSubscriber)\sSubscribe'

# Specifically check for writes to r.height outside the goroutine
rg -U '(r\.height\s*=)' --glob '*.go' | grep -v 'func\s\+(r\s\*\RPCEventSubscriber)\sSubscribe'

This script helps identify potential concurrent accesses to r.height.


27-27: Ensure all implementations of EventSubscriber are updated

The Subscribe method signature in the EventSubscriber interface has been changed by removing the height parameter. Please verify that all implementations of this interface have been updated to match the new signature to prevent any runtime issues.

Run the following script to find all implementations of EventSubscriber and check their Subscribe method signatures:

This script helps ensure that all implementations have been properly updated.

✅ Verification successful

Based on the results, I can generate the final response as we have found all implementations of the Subscribe method in the codebase:

All implementations of EventSubscriber are correctly updated

The codebase scan shows that both implementations of the Subscribe method (in RPCEventSubscriber and the mock) have the correct signature (ctx context.Context) <-chan models.BlockEvents, and there are no instances of the old signature that included the height parameter.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that all implementations of EventSubscriber have the updated Subscribe method signature.

# Search for methods implementing Subscribe in structs that implement EventSubscriber
rg -U 'type\s+\w+\s+struct\s*\{[^}]*\}\s*\n[^/]*\nfunc\s+\(\w+\s+\*\w+\)\s+Subscribe\(' --glob '*.go' -A 3

# Specifically look for occurrences of the old signature
rg -U 'func\s+\([^)]+\)\s+Subscribe\s*\(\s*ctx\s+context\.Context\s*,\s*height\s+uint64\s*\)\s*<-\s*chan\s+models\.BlockEvents' --glob '*.go'

# Verify updated method signatures
rg -U 'func\s+\([^)]+\)\s+Subscribe\s*\(\s*ctx\s+context\.Context\s*\)\s*<-\s*chan\s+models\.BlockEvents' --glob '*.go'

Length of output: 631


49-55: Update usages of NewRPCEventSubscriber constructor

The constructor function has been renamed to NewRPCEventSubscriber and now requires startHeight and heartbeatInterval as parameters. Please ensure that all calls to this constructor throughout the codebase have been updated accordingly to prevent compilation errors.

Run the following script to find and review all constructor invocations:

This script assists in identifying any outdated constructor usage that needs updating.

✅ Verification successful

Let me generate additional scripts to find the constructor usages, as the previous results didn't show them:


Let me search for any potential old constructor name to ensure we haven't missed anything:


All constructor usages have been properly updated

Based on the search results, all usages of the constructor have been updated to use the new NewRPCEventSubscriber name with the correct parameters:

  • 5 test cases in services/ingestion/event_subscriber_test.go
  • 1 production usage in bootstrap/bootstrap.go

All calls include the required startHeight and heartbeatInterval parameters. No instances of the old constructor name were found.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find usages of the old and new constructor functions.

# Search for the old constructor name
rg -U 'NewRPCSubscriber\(' --glob '*.go' -C 2

# Search for the new constructor name to verify correct usage
rg -U 'NewRPCEventSubscriber\(' --glob '*.go' -C 2

Length of output: 927


Script:

#!/bin/bash
# Search for both old and new constructor patterns using ast-grep for more accurate results
ast-grep --pattern 'NewRPCEventSubscriber($$$)'

# Also search with ripgrep for any potential string matches we might have missed
rg -U 'NewRPCEventSubscriber\s*\(' --glob '*.go' -C 3

Length of output: 4077


Script:

#!/bin/bash
# Search for the old constructor name pattern
ast-grep --pattern 'NewRPCSubscriber($$$)'

# Also search with ripgrep for any potential string matches
rg -U 'NewRPCSubscriber\s*\(' --glob '*.go' -C 3

Length of output: 95

Comment on lines +27 to +28
if rf, ok := ret.Get(0).(func(context.Context) <-chan models.BlockEvents); ok {
r0 = rf(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Interface implementation requires attention

The verification reveals inconsistencies in the codebase:

  • The EventSubscriber interface defines Subscribe(ctx context.Context) <-chan models.BlockEvents
  • However, RPCEventSubscriber still contains internal methods using the height parameter:
    • subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption)
    • backfill(ctx context.Context, height uint64)
  • The mock client in services/testutils/mock_client.go still references the old signature with height parameter

These findings suggest that while the interface has been updated, some internal implementations and test utilities still need to be aligned with the new signature.

🔗 Analysis chain

Verify interface implementation consistency.

Let's ensure this interface change is consistently implemented across all subscribers and tests.


Let me gather more information to verify the interface implementation consistency.


Let me check the interface definition and its implementation to make a final determination.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that all implementations and usages of Subscribe method
# have been updated to the new signature without the height parameter

# Search for any Subscribe method definitions or calls that might still use the height parameter
rg -g '!vendor/*' -g '!mocks/*' 'Subscribe.*context.*uint64.*BlockEvents'

# Search for all EventSubscriber interface implementations
ast-grep --pattern 'type $_ struct {
  $$$
}

func \($_\) Subscribe\(ctx context.Context\) <-chan models.BlockEvents'

Length of output: 700


Script:

#!/bin/bash
# Search for EventSubscriber interface definition
rg -g '!vendor/*' 'type EventSubscriber interface'

# Search for Subscribe method calls to see usage patterns
rg -g '!vendor/*' '\.Subscribe\(.*\)'

# Look for all files containing BlockEvents to understand the full context
rg -g '!vendor/*' 'BlockEvents' -l

Length of output: 1333


Script:

#!/bin/bash
# Get the EventSubscriber interface definition
rg -g '!vendor/*' -A 5 'type EventSubscriber interface'

# Get the RPCEventSubscriber implementation
rg -g '!vendor/*' -A 5 'type RPCEventSubscriber struct'

# Get the Subscribe method implementation
rg -g '!vendor/*' -A 10 'func \(r \*RPCEventSubscriber\) Subscribe'

Length of output: 1872

Comment on lines +347 to +365
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify the blocksFilter function

The blocksFilter function constructs event type IDs using common.NewAddressLocation, which can be streamlined. Consider simplifying the creation of event type identifiers for better readability and maintainability.

Apply the following diff to simplify the code:

 func blocksFilter(chainId flowGo.ChainID) flow.EventFilter {
-    evmAddress := common.Address(systemcontracts.SystemContractsForChain(chainId).EVMContract.Address)
-
-    blockExecutedEvent := common.NewAddressLocation(
-        nil,
-        evmAddress,
-        string(events.EventTypeBlockExecuted),
-    ).ID()
-
-    transactionExecutedEvent := common.NewAddressLocation(
-        nil,
-        evmAddress,
-        string(events.EventTypeTransactionExecuted),
-    ).ID()
+    evmAddress := systemcontracts.
+        SystemContractsForChain(chainId).
+        EVMContract.
+        Address.
+        String()
+
+    blockExecutedEvent := fmt.Sprintf(
+        "A.%s.%s",
+        evmAddress,
+        events.EventTypeBlockExecuted,
+    )
+
+    transactionExecutedEvent := fmt.Sprintf(
+        "A.%s.%s",
+        evmAddress,
+        events.EventTypeTransactionExecuted,
+    )

     return flow.EventFilter{
         EventTypes: []string{
             blockExecutedEvent,
             transactionExecutedEvent,
         },
     }
 }

This refactoring enhances clarity by directly formatting the event type strings.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()
transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()
return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
}
evmAddress := systemcontracts.
SystemContractsForChain(chainId).
EVMContract.
Address.
String()
blockExecutedEvent := fmt.Sprintf(
"A.%s.%s",
evmAddress,
events.EventTypeBlockExecuted,
)
transactionExecutedEvent := fmt.Sprintf(
"A.%s.%s",
evmAddress,
events.EventTypeTransactionExecuted,
)
return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
}

Copy link
Collaborator

@m-Peter m-Peter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 👍

b.config.FlowNetworkID,
b.logger,
)
subscriber := ingestion.NewRPCEventSubscriber(b.logger, b.client, b.config.FlowNetworkID, latestCadenceHeight, b.config.HeartbeatInterval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tidy this the way it was initially.

Copy link
Member

@ramtinms ramtinms left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
services/ingestion/event_subscriber.go (1)

43-56: Consider adding validation for startHeight parameter.

While the constructor changes align well with the new design, consider adding validation to ensure startHeight is not zero or negative (if applicable to your domain).

 func NewRPCEventSubscriber(
   logger zerolog.Logger,
   client *requester.CrossSporkClient,
   chainID flowGo.ChainID,
   startHeight uint64,
 ) *RPCEventSubscriber {
+  if startHeight == 0 {
+    // adjust condition based on your domain requirements
+    panic("startHeight must be greater than 0")
+  }
   logger = logger.With().Str("component", "subscriber").Logger()
   return &RPCEventSubscriber{
     logger: logger,
     client: client,
     chain:  chainID,
     height: startHeight,
   }
 }
services/ingestion/event_subscriber_test.go (1)

408-409: Consider simplifying duplicate height parameters

The setup function uses the same height value twice for both event types. Consider extracting these into a single parameter to reduce duplication and make future updates easier.

 func setupClientForBackupEventFetching(
 	t *testing.T,
 	client *testutils.MockClient,
 	cadenceHeight uint64,
 	evmBlockEvents []flow.BlockEvents,
 	evmTxEvents flow.BlockEvents,
 	txHashes []gethCommon.Hash,
 	endHeight uint64,
 ) {
 	client.On(
 		"GetEventsForHeightRange",
 		mock.AnythingOfType("context.backgroundCtx"),
 		"A.b6763b4399a888c8.EVM.BlockExecuted",
-		cadenceHeight,
-		cadenceHeight,
+		cadenceHeight, // Single height for block range
+		cadenceHeight,
 	).Return(evmBlockEvents, nil).Once()
 
 	client.On(
 		"GetEventsForHeightRange",
 		mock.AnythingOfType("context.backgroundCtx"),
 		"A.b6763b4399a888c8.EVM.TransactionExecuted",
-		cadenceHeight,
-		cadenceHeight,
+		cadenceHeight, // Single height for transaction range
+		cadenceHeight,
 	).Return([]flow.BlockEvents{evmTxEvents}, nil).Once()

Also applies to: 416-417

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 197617c and d3996e4.

📒 Files selected for processing (5)
  • bootstrap/bootstrap.go (1 hunks)
  • cmd/run/cmd.go (0 hunks)
  • config/config.go (0 hunks)
  • services/ingestion/event_subscriber.go (10 hunks)
  • services/ingestion/event_subscriber_test.go (10 hunks)
💤 Files with no reviewable changes (2)
  • cmd/run/cmd.go
  • config/config.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • bootstrap/bootstrap.go
🔇 Additional comments (8)
services/ingestion/event_subscriber.go (3)

27-37: LGTM! Good encapsulation of height state.

The removal of the height parameter from the Subscribe method and its encapsulation within the RPCEventSubscriber struct improves the interface design and maintainability.


338-359: Reference existing review comment.

The previous review comment about simplifying the blocksFilter function is still valid and should be addressed.


64-108: ⚠️ Potential issue

Consider thread safety for height updates.

The height field is accessed and modified from within a goroutine. Consider using atomic operations or mutex to ensure thread-safe updates to height, especially if this subscriber might be used concurrently.

 type RPCEventSubscriber struct {
   logger zerolog.Logger
   client *requester.CrossSporkClient
   chain  flowGo.ChainID
-  height uint64
+  height atomic.Uint64
   recovery        bool
   recoveredEvents []flow.Event
 }
services/ingestion/event_subscriber_test.go (5)

46-46: LGTM: Constructor and subscription changes look good

The updates correctly reflect the new RPCEventSubscriber interface with simplified parameter ordering and subscription method.

Also applies to: 48-48


86-86: LGTM: Missing block test updates are correct

The changes maintain the test's integrity while adapting to the new subscriber interface. The core functionality for handling missing blocks remains properly tested.

Also applies to: 88-88


163-163: LGTM: Error retry test updates are appropriate

The changes correctly adapt the test to the new interface while maintaining proper verification of the error retry mechanism. The sporkClients declaration change is more idiomatic Go.

Also applies to: 188-188, 190-190, 217-217


226-226: LGTM: Multiple blocks error test updates are consistent

The changes properly adapt the test to the new interface while maintaining thorough verification of duplicate block handling scenarios.

Also applies to: 251-251, 253-253


289-289: LGTM: Empty blocks error test updates are appropriate

The changes correctly adapt the test to the new interface while maintaining proper verification of empty block handling.

Also applies to: 313-313, 315-315

Comment on lines +125 to +131
// we always use heartbeat interval of 1 to have the least amount of delay from the access node
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
ctx,
height,
blocksFilter(r.chain),
access.WithHeartbeatInterval(1),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making heartbeat interval configurable.

The heartbeat interval is hardcoded to 1 for minimal delay. Consider making this configurable through RPCEventSubscriberConfig to allow for different trade-offs between latency and resource usage in different environments.

 type RPCEventSubscriberConfig struct {
+  HeartbeatInterval uint // defaults to 1 for minimal delay
 }

 func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
   // ...
   eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
     ctx,
     height,
     blocksFilter(r.chain),
-    access.WithHeartbeatInterval(1),
+    access.WithHeartbeatInterval(r.config.HeartbeatInterval),
   )
   // ...
 }

Committable suggestion was skipped due to low confidence.

Copy link
Collaborator

@m-Peter m-Peter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 🚀

@janezpodhostnik janezpodhostnik changed the base branch from main to feature/local-tx-reexecution October 30, 2024 15:04
@janezpodhostnik janezpodhostnik merged commit 15abb84 into feature/local-tx-reexecution Oct 30, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

3 participants